Skip to main content

qdrant_edge/shard/operations/
mod.rs

1pub mod optimization;
2pub mod payload_ops;
3pub mod point_ops;
4#[cfg(feature = "staging")]
5pub mod staging;
6pub mod vector_ops;
7
8use crate::segment::json_path::JsonPath;
9use crate::segment::types::{PayloadFieldSchema, PointIdType};
10use serde::{Deserialize, Serialize};
11use strum::{EnumDiscriminants, EnumIter};
12
13use crate::shard::PeerId;
14use crate::shard::operations::point_ops::PointOperations;
15
16#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, EnumDiscriminants, Hash)]
17#[strum_discriminants(derive(EnumIter))]
18#[serde(untagged, rename_all = "snake_case")]
19pub enum CollectionUpdateOperations {
20    PointOperation(point_ops::PointOperations),
21    VectorOperation(vector_ops::VectorOperations),
22    PayloadOperation(payload_ops::PayloadOps),
23    FieldIndexOperation(FieldIndexOperations),
24    /// Staging-only operations for testing and debugging purposes
25    #[cfg(feature = "staging")]
26    StagingOperation(staging::StagingOperations),
27}
28
29impl CollectionUpdateOperations {
30    pub fn is_upsert_points(&self) -> bool {
31        matches!(
32            self,
33            Self::PointOperation(point_ops::PointOperations::UpsertPoints(_))
34        )
35    }
36
37    pub fn is_delete_points(&self) -> bool {
38        matches!(
39            self,
40            Self::PointOperation(point_ops::PointOperations::DeletePoints { .. })
41        )
42    }
43
44    pub fn point_ids(&self) -> Option<Vec<PointIdType>> {
45        match self {
46            Self::PointOperation(op) => op.point_ids(),
47            Self::VectorOperation(op) => op.point_ids(),
48            Self::PayloadOperation(op) => op.point_ids(),
49            Self::FieldIndexOperation(_) => None,
50            #[cfg(feature = "staging")]
51            Self::StagingOperation(_) => None,
52        }
53    }
54
55    /// List point IDs that can be created during the operation.
56    /// Do not list IDs that are deleted or modified.
57    pub fn upsert_point_ids(&self) -> Option<Vec<PointIdType>> {
58        match self {
59            Self::PointOperation(op) => match op {
60                PointOperations::UpsertPoints(op) => Some(op.point_ids()),
61                PointOperations::UpsertPointsConditional(op) => Some(op.points_op.point_ids()),
62                PointOperations::DeletePoints { .. } => None,
63                PointOperations::DeletePointsByFilter(_) => None,
64                PointOperations::SyncPoints(op) => {
65                    Some(op.points.iter().map(|point| point.id).collect())
66                }
67            },
68            Self::VectorOperation(_) => None,
69            Self::PayloadOperation(_) => None,
70            Self::FieldIndexOperation(_) => None,
71            #[cfg(feature = "staging")]
72            Self::StagingOperation(_) => None,
73        }
74    }
75
76    pub fn retain_point_ids<F>(&mut self, filter: F)
77    where
78        F: Fn(&PointIdType) -> bool,
79    {
80        match self {
81            Self::PointOperation(op) => op.retain_point_ids(filter),
82            Self::VectorOperation(op) => op.retain_point_ids(filter),
83            Self::PayloadOperation(op) => op.retain_point_ids(filter),
84            Self::FieldIndexOperation(_) => (),
85            #[cfg(feature = "staging")]
86            Self::StagingOperation(_) => (),
87        }
88    }
89}
90
91#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, EnumDiscriminants, Hash)]
92#[strum_discriminants(derive(EnumIter))]
93#[serde(rename_all = "snake_case")]
94pub enum FieldIndexOperations {
95    /// Create index for payload field
96    CreateIndex(CreateIndex),
97    /// Delete index for the field
98    DeleteIndex(JsonPath),
99}
100
101#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, Hash)]
102#[serde(rename_all = "snake_case")]
103pub struct CreateIndex {
104    pub field_name: JsonPath,
105    pub field_schema: Option<PayloadFieldSchema>,
106}
107
108#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
109pub struct OperationWithClockTag {
110    #[serde(flatten)]
111    pub operation: CollectionUpdateOperations,
112
113    #[serde(default, skip_serializing_if = "Option::is_none")]
114    pub clock_tag: Option<ClockTag>,
115}
116
117impl OperationWithClockTag {
118    pub fn new(
119        operation: impl Into<CollectionUpdateOperations>,
120        clock_tag: Option<ClockTag>,
121    ) -> Self {
122        Self {
123            operation: operation.into(),
124            clock_tag,
125        }
126    }
127}
128
129impl From<CollectionUpdateOperations> for OperationWithClockTag {
130    fn from(operation: CollectionUpdateOperations) -> Self {
131        Self::new(operation, None)
132    }
133}
134
135#[derive(Copy, Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
136pub struct ClockTag {
137    pub peer_id: PeerId,
138    pub clock_id: u32,
139    pub clock_tick: u64,
140    /// A unique token for each clock tag.
141    pub token: ClockToken,
142    pub force: bool,
143}
144
145pub type ClockToken = u64;
146
147impl ClockTag {
148    pub fn new(peer_id: PeerId, clock_id: u32, clock_tick: u64) -> Self {
149        let random_token = rand::random();
150        Self::new_with_token(peer_id, clock_id, clock_tick, random_token)
151    }
152
153    pub fn new_with_token(
154        peer_id: PeerId,
155        clock_id: u32,
156        clock_tick: u64,
157        token: ClockToken,
158    ) -> Self {
159        Self {
160            peer_id,
161            clock_id,
162            clock_tick,
163            token,
164            force: false,
165        }
166    }
167
168    pub fn force(mut self, force: bool) -> Self {
169        self.force = force;
170        self
171    }
172}
173
174#[cfg(feature = "api")]
175impl From<api::grpc::qdrant::ClockTag> for ClockTag {
176    fn from(tag: api::grpc::qdrant::ClockTag) -> Self {
177        let api::grpc::qdrant::ClockTag {
178            peer_id,
179            clock_id,
180            clock_tick,
181            token,
182            force,
183        } = tag;
184        Self {
185            peer_id,
186            clock_id,
187            clock_tick,
188            token,
189            force,
190        }
191    }
192}
193
194#[cfg(feature = "api")]
195impl From<ClockTag> for api::grpc::qdrant::ClockTag {
196    fn from(tag: ClockTag) -> Self {
197        let ClockTag {
198            peer_id,
199            clock_id,
200            clock_tick,
201            token,
202            force,
203        } = tag;
204        Self {
205            peer_id,
206            clock_id,
207            clock_tick,
208            token,
209            force,
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use proptest::prelude::*;
217    use crate::segment::types::*;
218
219    use super::payload_ops::*;
220    use super::point_ops::*;
221    use super::vector_ops::*;
222    use super::*;
223
224    proptest::proptest! {
225        #[test]
226        fn operation_with_clock_tag_json(operation in any::<OperationWithClockTag>()) {
227            // Assert that `OperationWithClockTag` can be serialized
228            let input = serde_json::to_string(&operation).unwrap();
229            let output: OperationWithClockTag = serde_json::from_str(&input).unwrap();
230            assert_eq!(operation, output);
231
232            // Assert that `OperationWithClockTag` can be deserialized from `CollectionUpdateOperation`
233            let input = serde_json::to_string(&operation.operation).unwrap();
234            let output: OperationWithClockTag = serde_json::from_str(&input).unwrap();
235            assert_eq!(operation.operation, output.operation);
236
237            // Assert that `CollectionUpdateOperation` serializes into JSON object with a single key
238            // (e.g., `{ "upsert_points": <upsert points object> }`)
239            match serde_json::to_value(&operation.operation).unwrap() {
240                serde_json::Value::Object(map) if map.len() == 1 => (),
241                _ => panic!("TODO"),
242            };
243        }
244
245        #[test]
246        fn operation_with_clock_tag_cbor(operation in any::<OperationWithClockTag>()) {
247            // Assert that `OperationWithClockTag` can be serialized
248            let input = serde_cbor::to_vec(&operation).unwrap();
249            let output: OperationWithClockTag = serde_cbor::from_slice(&input).unwrap();
250            assert_eq!(operation, output);
251
252            // Assert that `OperationWithClockTag` can be deserialized from `CollectionUpdateOperation`
253            let input = serde_cbor::to_vec(&operation.operation).unwrap();
254            let output: OperationWithClockTag = serde_cbor::from_slice(&input).unwrap();
255            assert_eq!(operation.operation, output.operation);
256        }
257    }
258
259    impl Arbitrary for OperationWithClockTag {
260        type Parameters = ();
261        type Strategy = BoxedStrategy<Self>;
262
263        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
264            any::<(CollectionUpdateOperations, Option<ClockTag>)>()
265                .prop_map(|(operation, clock_tag)| Self::new(operation, clock_tag))
266                .boxed()
267        }
268    }
269
270    impl Arbitrary for ClockTag {
271        type Parameters = ();
272        type Strategy = BoxedStrategy<Self>;
273
274        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
275            any::<(PeerId, u32, u64)>()
276                .prop_map(|(peer_id, clock_id, clock_tick)| {
277                    Self::new(peer_id, clock_id, clock_tick)
278                })
279                .boxed()
280        }
281    }
282
283    impl Arbitrary for CollectionUpdateOperations {
284        type Parameters = ();
285        type Strategy = BoxedStrategy<Self>;
286
287        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
288            prop_oneof![
289                any::<point_ops::PointOperations>().prop_map(Self::PointOperation),
290                any::<vector_ops::VectorOperations>().prop_map(Self::VectorOperation),
291                any::<payload_ops::PayloadOps>().prop_map(Self::PayloadOperation),
292                any::<FieldIndexOperations>().prop_map(Self::FieldIndexOperation),
293            ]
294            .boxed()
295        }
296    }
297
298    impl Arbitrary for point_ops::PointOperations {
299        type Parameters = ();
300        type Strategy = BoxedStrategy<Self>;
301
302        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
303            let upsert = Self::UpsertPoints(PointInsertOperationsInternal::PointsList(Vec::new()));
304            let delete = Self::DeletePoints { ids: Vec::new() };
305
306            let delete_by_filter = Self::DeletePointsByFilter(Filter {
307                should: None,
308                min_should: None,
309                must: None,
310                must_not: None,
311            });
312
313            let sync = Self::SyncPoints(PointSyncOperation {
314                from_id: None,
315                to_id: None,
316                points: Vec::new(),
317            });
318
319            prop_oneof![
320                Just(upsert),
321                Just(delete),
322                Just(delete_by_filter),
323                Just(sync),
324            ]
325            .boxed()
326        }
327    }
328
329    impl Arbitrary for vector_ops::VectorOperations {
330        type Parameters = ();
331        type Strategy = BoxedStrategy<Self>;
332
333        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
334            let update = Self::UpdateVectors(UpdateVectorsOp {
335                points: Vec::new(),
336                update_filter: None,
337            });
338
339            let delete = Self::DeleteVectors(
340                PointIdsList {
341                    points: Vec::new(),
342                    #[cfg(feature = "api")]
343                    shard_key: None,
344                },
345                Vec::new(),
346            );
347
348            let delete_by_filter = Self::DeleteVectorsByFilter(
349                Filter {
350                    should: None,
351                    min_should: None,
352                    must: None,
353                    must_not: None,
354                },
355                Vec::new(),
356            );
357
358            prop_oneof![Just(update), Just(delete), Just(delete_by_filter),].boxed()
359        }
360    }
361
362    impl Arbitrary for payload_ops::PayloadOps {
363        type Parameters = ();
364        type Strategy = BoxedStrategy<Self>;
365
366        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
367            let set = Self::SetPayload(SetPayloadOp {
368                payload: Payload(Default::default()),
369                points: None,
370                filter: None,
371                key: None,
372            });
373
374            let overwrite = Self::OverwritePayload(SetPayloadOp {
375                payload: Payload(Default::default()),
376                points: None,
377                filter: None,
378                key: None,
379            });
380
381            let delete = Self::DeletePayload(DeletePayloadOp {
382                keys: Vec::new(),
383                points: None,
384                filter: None,
385            });
386
387            let clear = Self::ClearPayload { points: Vec::new() };
388
389            let clear_by_filter = Self::ClearPayloadByFilter(Filter {
390                should: None,
391                min_should: None,
392                must: None,
393                must_not: None,
394            });
395
396            prop_oneof![
397                Just(set),
398                Just(overwrite),
399                Just(delete),
400                Just(clear),
401                Just(clear_by_filter),
402            ]
403            .boxed()
404        }
405    }
406
407    impl Arbitrary for FieldIndexOperations {
408        type Parameters = ();
409        type Strategy = BoxedStrategy<Self>;
410
411        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
412            let create = Self::CreateIndex(CreateIndex {
413                field_name: "field_name".parse().unwrap(),
414                field_schema: None,
415            });
416
417            let delete = Self::DeleteIndex("field_name".parse().unwrap());
418
419            prop_oneof![Just(create), Just(delete),].boxed()
420        }
421    }
422
423    #[test]
424    fn test_delete_by_filter_with_has_id_uuids_cbor_roundtrip() {
425        let uuids: Vec<PointIdType> = vec![ExtendedPointId::Uuid(
426            uuid::Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap(),
427        )];
428
429        let filter = Filter {
430            should: None,
431            min_should: None,
432            must: None,
433            must_not: Some(vec![Condition::HasId(HasIdCondition::from(
434                uuids.into_iter().collect::<ahash::AHashSet<_>>(),
435            ))]),
436        };
437
438        let operation = CollectionUpdateOperations::PointOperation(
439            PointOperations::DeletePointsByFilter(filter),
440        );
441
442        let cbor_bytes = serde_cbor::to_vec(&operation).unwrap();
443        let deserialized: CollectionUpdateOperations = serde_cbor::from_slice(&cbor_bytes).unwrap();
444
445        assert_eq!(operation, deserialized);
446    }
447
448    #[test]
449    fn test_wal_roundtrip_delete_by_filter_with_has_id_uuids() {
450        use crate::shard::wal::WalRawRecord;
451
452        let uuids: Vec<PointIdType> = vec![ExtendedPointId::Uuid(
453            uuid::Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap(),
454        )];
455
456        let filter = Filter {
457            should: None,
458            min_should: None,
459            must: None,
460            must_not: Some(vec![Condition::HasId(HasIdCondition::from(
461                uuids.into_iter().collect::<ahash::AHashSet<_>>(),
462            ))]),
463        };
464
465        let operation = CollectionUpdateOperations::PointOperation(
466            PointOperations::DeletePointsByFilter(filter),
467        );
468
469        let raw = WalRawRecord::new(&operation).unwrap();
470        let deserialized: CollectionUpdateOperations = raw.deserialize().unwrap();
471
472        assert_eq!(operation, deserialized);
473    }
474}