1pub mod optimization;
2pub mod payload_ops;
3pub mod point_ops;
4#[cfg(feature = "staging")]
5pub mod staging;
6pub mod vector_ops;
7
8use crate::segment::json_path::JsonPath;
9use crate::segment::types::{PayloadFieldSchema, PointIdType};
10use serde::{Deserialize, Serialize};
11use strum::{EnumDiscriminants, EnumIter};
12
13use crate::shard::PeerId;
14use crate::shard::operations::point_ops::PointOperations;
15
16#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, EnumDiscriminants, Hash)]
17#[strum_discriminants(derive(EnumIter))]
18#[serde(untagged, rename_all = "snake_case")]
19pub enum CollectionUpdateOperations {
20 PointOperation(point_ops::PointOperations),
21 VectorOperation(vector_ops::VectorOperations),
22 PayloadOperation(payload_ops::PayloadOps),
23 FieldIndexOperation(FieldIndexOperations),
24 #[cfg(feature = "staging")]
26 StagingOperation(staging::StagingOperations),
27}
28
29impl CollectionUpdateOperations {
30 pub fn is_upsert_points(&self) -> bool {
31 matches!(
32 self,
33 Self::PointOperation(point_ops::PointOperations::UpsertPoints(_))
34 )
35 }
36
37 pub fn is_delete_points(&self) -> bool {
38 matches!(
39 self,
40 Self::PointOperation(point_ops::PointOperations::DeletePoints { .. })
41 )
42 }
43
44 pub fn point_ids(&self) -> Option<Vec<PointIdType>> {
45 match self {
46 Self::PointOperation(op) => op.point_ids(),
47 Self::VectorOperation(op) => op.point_ids(),
48 Self::PayloadOperation(op) => op.point_ids(),
49 Self::FieldIndexOperation(_) => None,
50 #[cfg(feature = "staging")]
51 Self::StagingOperation(_) => None,
52 }
53 }
54
55 pub fn upsert_point_ids(&self) -> Option<Vec<PointIdType>> {
58 match self {
59 Self::PointOperation(op) => match op {
60 PointOperations::UpsertPoints(op) => Some(op.point_ids()),
61 PointOperations::UpsertPointsConditional(op) => Some(op.points_op.point_ids()),
62 PointOperations::DeletePoints { .. } => None,
63 PointOperations::DeletePointsByFilter(_) => None,
64 PointOperations::SyncPoints(op) => {
65 Some(op.points.iter().map(|point| point.id).collect())
66 }
67 },
68 Self::VectorOperation(_) => None,
69 Self::PayloadOperation(_) => None,
70 Self::FieldIndexOperation(_) => None,
71 #[cfg(feature = "staging")]
72 Self::StagingOperation(_) => None,
73 }
74 }
75
76 pub fn retain_point_ids<F>(&mut self, filter: F)
77 where
78 F: Fn(&PointIdType) -> bool,
79 {
80 match self {
81 Self::PointOperation(op) => op.retain_point_ids(filter),
82 Self::VectorOperation(op) => op.retain_point_ids(filter),
83 Self::PayloadOperation(op) => op.retain_point_ids(filter),
84 Self::FieldIndexOperation(_) => (),
85 #[cfg(feature = "staging")]
86 Self::StagingOperation(_) => (),
87 }
88 }
89}
90
91#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, EnumDiscriminants, Hash)]
92#[strum_discriminants(derive(EnumIter))]
93#[serde(rename_all = "snake_case")]
94pub enum FieldIndexOperations {
95 CreateIndex(CreateIndex),
97 DeleteIndex(JsonPath),
99}
100
101#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, Hash)]
102#[serde(rename_all = "snake_case")]
103pub struct CreateIndex {
104 pub field_name: JsonPath,
105 pub field_schema: Option<PayloadFieldSchema>,
106}
107
108#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
109pub struct OperationWithClockTag {
110 #[serde(flatten)]
111 pub operation: CollectionUpdateOperations,
112
113 #[serde(default, skip_serializing_if = "Option::is_none")]
114 pub clock_tag: Option<ClockTag>,
115}
116
117impl OperationWithClockTag {
118 pub fn new(
119 operation: impl Into<CollectionUpdateOperations>,
120 clock_tag: Option<ClockTag>,
121 ) -> Self {
122 Self {
123 operation: operation.into(),
124 clock_tag,
125 }
126 }
127}
128
129impl From<CollectionUpdateOperations> for OperationWithClockTag {
130 fn from(operation: CollectionUpdateOperations) -> Self {
131 Self::new(operation, None)
132 }
133}
134
135#[derive(Copy, Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
136pub struct ClockTag {
137 pub peer_id: PeerId,
138 pub clock_id: u32,
139 pub clock_tick: u64,
140 pub token: ClockToken,
142 pub force: bool,
143}
144
145pub type ClockToken = u64;
146
147impl ClockTag {
148 pub fn new(peer_id: PeerId, clock_id: u32, clock_tick: u64) -> Self {
149 let random_token = rand::random();
150 Self::new_with_token(peer_id, clock_id, clock_tick, random_token)
151 }
152
153 pub fn new_with_token(
154 peer_id: PeerId,
155 clock_id: u32,
156 clock_tick: u64,
157 token: ClockToken,
158 ) -> Self {
159 Self {
160 peer_id,
161 clock_id,
162 clock_tick,
163 token,
164 force: false,
165 }
166 }
167
168 pub fn force(mut self, force: bool) -> Self {
169 self.force = force;
170 self
171 }
172}
173
174#[cfg(feature = "api")]
175impl From<api::grpc::qdrant::ClockTag> for ClockTag {
176 fn from(tag: api::grpc::qdrant::ClockTag) -> Self {
177 let api::grpc::qdrant::ClockTag {
178 peer_id,
179 clock_id,
180 clock_tick,
181 token,
182 force,
183 } = tag;
184 Self {
185 peer_id,
186 clock_id,
187 clock_tick,
188 token,
189 force,
190 }
191 }
192}
193
194#[cfg(feature = "api")]
195impl From<ClockTag> for api::grpc::qdrant::ClockTag {
196 fn from(tag: ClockTag) -> Self {
197 let ClockTag {
198 peer_id,
199 clock_id,
200 clock_tick,
201 token,
202 force,
203 } = tag;
204 Self {
205 peer_id,
206 clock_id,
207 clock_tick,
208 token,
209 force,
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use proptest::prelude::*;
217 use crate::segment::types::*;
218
219 use super::payload_ops::*;
220 use super::point_ops::*;
221 use super::vector_ops::*;
222 use super::*;
223
224 proptest::proptest! {
225 #[test]
226 fn operation_with_clock_tag_json(operation in any::<OperationWithClockTag>()) {
227 let input = serde_json::to_string(&operation).unwrap();
229 let output: OperationWithClockTag = serde_json::from_str(&input).unwrap();
230 assert_eq!(operation, output);
231
232 let input = serde_json::to_string(&operation.operation).unwrap();
234 let output: OperationWithClockTag = serde_json::from_str(&input).unwrap();
235 assert_eq!(operation.operation, output.operation);
236
237 match serde_json::to_value(&operation.operation).unwrap() {
240 serde_json::Value::Object(map) if map.len() == 1 => (),
241 _ => panic!("TODO"),
242 };
243 }
244
245 #[test]
246 fn operation_with_clock_tag_cbor(operation in any::<OperationWithClockTag>()) {
247 let input = serde_cbor::to_vec(&operation).unwrap();
249 let output: OperationWithClockTag = serde_cbor::from_slice(&input).unwrap();
250 assert_eq!(operation, output);
251
252 let input = serde_cbor::to_vec(&operation.operation).unwrap();
254 let output: OperationWithClockTag = serde_cbor::from_slice(&input).unwrap();
255 assert_eq!(operation.operation, output.operation);
256 }
257 }
258
259 impl Arbitrary for OperationWithClockTag {
260 type Parameters = ();
261 type Strategy = BoxedStrategy<Self>;
262
263 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
264 any::<(CollectionUpdateOperations, Option<ClockTag>)>()
265 .prop_map(|(operation, clock_tag)| Self::new(operation, clock_tag))
266 .boxed()
267 }
268 }
269
270 impl Arbitrary for ClockTag {
271 type Parameters = ();
272 type Strategy = BoxedStrategy<Self>;
273
274 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
275 any::<(PeerId, u32, u64)>()
276 .prop_map(|(peer_id, clock_id, clock_tick)| {
277 Self::new(peer_id, clock_id, clock_tick)
278 })
279 .boxed()
280 }
281 }
282
283 impl Arbitrary for CollectionUpdateOperations {
284 type Parameters = ();
285 type Strategy = BoxedStrategy<Self>;
286
287 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
288 prop_oneof![
289 any::<point_ops::PointOperations>().prop_map(Self::PointOperation),
290 any::<vector_ops::VectorOperations>().prop_map(Self::VectorOperation),
291 any::<payload_ops::PayloadOps>().prop_map(Self::PayloadOperation),
292 any::<FieldIndexOperations>().prop_map(Self::FieldIndexOperation),
293 ]
294 .boxed()
295 }
296 }
297
298 impl Arbitrary for point_ops::PointOperations {
299 type Parameters = ();
300 type Strategy = BoxedStrategy<Self>;
301
302 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
303 let upsert = Self::UpsertPoints(PointInsertOperationsInternal::PointsList(Vec::new()));
304 let delete = Self::DeletePoints { ids: Vec::new() };
305
306 let delete_by_filter = Self::DeletePointsByFilter(Filter {
307 should: None,
308 min_should: None,
309 must: None,
310 must_not: None,
311 });
312
313 let sync = Self::SyncPoints(PointSyncOperation {
314 from_id: None,
315 to_id: None,
316 points: Vec::new(),
317 });
318
319 prop_oneof![
320 Just(upsert),
321 Just(delete),
322 Just(delete_by_filter),
323 Just(sync),
324 ]
325 .boxed()
326 }
327 }
328
329 impl Arbitrary for vector_ops::VectorOperations {
330 type Parameters = ();
331 type Strategy = BoxedStrategy<Self>;
332
333 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
334 let update = Self::UpdateVectors(UpdateVectorsOp {
335 points: Vec::new(),
336 update_filter: None,
337 });
338
339 let delete = Self::DeleteVectors(
340 PointIdsList {
341 points: Vec::new(),
342 #[cfg(feature = "api")]
343 shard_key: None,
344 },
345 Vec::new(),
346 );
347
348 let delete_by_filter = Self::DeleteVectorsByFilter(
349 Filter {
350 should: None,
351 min_should: None,
352 must: None,
353 must_not: None,
354 },
355 Vec::new(),
356 );
357
358 prop_oneof![Just(update), Just(delete), Just(delete_by_filter),].boxed()
359 }
360 }
361
362 impl Arbitrary for payload_ops::PayloadOps {
363 type Parameters = ();
364 type Strategy = BoxedStrategy<Self>;
365
366 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
367 let set = Self::SetPayload(SetPayloadOp {
368 payload: Payload(Default::default()),
369 points: None,
370 filter: None,
371 key: None,
372 });
373
374 let overwrite = Self::OverwritePayload(SetPayloadOp {
375 payload: Payload(Default::default()),
376 points: None,
377 filter: None,
378 key: None,
379 });
380
381 let delete = Self::DeletePayload(DeletePayloadOp {
382 keys: Vec::new(),
383 points: None,
384 filter: None,
385 });
386
387 let clear = Self::ClearPayload { points: Vec::new() };
388
389 let clear_by_filter = Self::ClearPayloadByFilter(Filter {
390 should: None,
391 min_should: None,
392 must: None,
393 must_not: None,
394 });
395
396 prop_oneof![
397 Just(set),
398 Just(overwrite),
399 Just(delete),
400 Just(clear),
401 Just(clear_by_filter),
402 ]
403 .boxed()
404 }
405 }
406
407 impl Arbitrary for FieldIndexOperations {
408 type Parameters = ();
409 type Strategy = BoxedStrategy<Self>;
410
411 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
412 let create = Self::CreateIndex(CreateIndex {
413 field_name: "field_name".parse().unwrap(),
414 field_schema: None,
415 });
416
417 let delete = Self::DeleteIndex("field_name".parse().unwrap());
418
419 prop_oneof![Just(create), Just(delete),].boxed()
420 }
421 }
422
423 #[test]
424 fn test_delete_by_filter_with_has_id_uuids_cbor_roundtrip() {
425 let uuids: Vec<PointIdType> = vec![ExtendedPointId::Uuid(
426 uuid::Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap(),
427 )];
428
429 let filter = Filter {
430 should: None,
431 min_should: None,
432 must: None,
433 must_not: Some(vec![Condition::HasId(HasIdCondition::from(
434 uuids.into_iter().collect::<ahash::AHashSet<_>>(),
435 ))]),
436 };
437
438 let operation = CollectionUpdateOperations::PointOperation(
439 PointOperations::DeletePointsByFilter(filter),
440 );
441
442 let cbor_bytes = serde_cbor::to_vec(&operation).unwrap();
443 let deserialized: CollectionUpdateOperations = serde_cbor::from_slice(&cbor_bytes).unwrap();
444
445 assert_eq!(operation, deserialized);
446 }
447
448 #[test]
449 fn test_wal_roundtrip_delete_by_filter_with_has_id_uuids() {
450 use crate::shard::wal::WalRawRecord;
451
452 let uuids: Vec<PointIdType> = vec![ExtendedPointId::Uuid(
453 uuid::Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap(),
454 )];
455
456 let filter = Filter {
457 should: None,
458 min_should: None,
459 must: None,
460 must_not: Some(vec![Condition::HasId(HasIdCondition::from(
461 uuids.into_iter().collect::<ahash::AHashSet<_>>(),
462 ))]),
463 };
464
465 let operation = CollectionUpdateOperations::PointOperation(
466 PointOperations::DeletePointsByFilter(filter),
467 );
468
469 let raw = WalRawRecord::new(&operation).unwrap();
470 let deserialized: CollectionUpdateOperations = raw.deserialize().unwrap();
471
472 assert_eq!(operation, deserialized);
473 }
474}