Skip to main content

qdrant_edge/segment/id_tracker/
mod.rs

1pub mod compressed;
2pub mod id_tracker_base;
3pub mod immutable_id_tracker;
4pub mod in_memory_id_tracker;
5pub mod mutable_id_tracker;
6pub mod point_mappings;
7#[cfg(feature = "rocksdb")]
8pub mod simple_id_tracker;
9
10use crate::common::types::PointOffsetType;
11pub use id_tracker_base::*;
12use itertools::Itertools as _;
13
14use crate::segment::types::{ExtendedPointId, PointIdType};
15
16/// Calling [`for_each_unique_point`] will yield this struct for each unique
17/// point.
18#[derive(Debug, Clone, Copy)]
19pub struct MergedPointId {
20    /// Unique external ID. If the same external ID is present in multiple
21    /// trackers, the item with the highest version takes precedence.
22    pub external_id: ExtendedPointId,
23    /// An index within `id_trackers` iterator that points to the [`IdTracker`]
24    /// that contains this point.
25    pub tracker_index: usize,
26    /// The internal ID of the point within the [`IdTracker`] that contains it.
27    pub internal_id: PointOffsetType,
28    /// The version of the point within the [`IdTracker`] that contains it.
29    pub version: u64,
30}
31
32/// Calls a closure for each unique point from multiple ID trackers.
33///
34/// Discard points that have no version.
35pub fn for_each_unique_point<'a>(
36    id_trackers: impl Iterator<Item = &'a (impl IdTracker + ?Sized + 'a)>,
37    mut f: impl FnMut(MergedPointId),
38) {
39    let mut iter = id_trackers
40        .enumerate()
41        .map(|(segment_index, id_tracker)| {
42            id_tracker.point_mappings().iter_from(None).filter_map(
43                move |(external_id, internal_id)| {
44                    let version = id_tracker.internal_version(internal_id);
45                    // a point without a version had an interrupted flush sequence and should be discarded
46                    version.map(|version| MergedPointId {
47                        external_id,
48                        tracker_index: segment_index,
49                        internal_id,
50                        version,
51                    })
52                },
53            )
54        })
55        .kmerge_by(|a, b| a.external_id < b.external_id);
56
57    let Some(mut best_item) = iter.next() else {
58        return;
59    };
60
61    for item in iter {
62        if best_item.external_id == item.external_id {
63            if best_item.version < item.version {
64                best_item = item;
65            }
66        } else {
67            f(best_item);
68            best_item = item;
69        }
70    }
71    f(best_item);
72}
73
74impl From<&ExtendedPointId> for PointIdType {
75    fn from(point_id: &ExtendedPointId) -> Self {
76        match point_id {
77            ExtendedPointId::NumId(idx) => PointIdType::NumId(*idx),
78            ExtendedPointId::Uuid(uuid) => PointIdType::Uuid(*uuid),
79        }
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use std::collections::{HashMap, hash_map};
86
87    use in_memory_id_tracker::InMemoryIdTracker;
88    use rand::SeedableRng as _;
89    use rand::rngs::StdRng;
90    use rstest::rstest;
91
92    use super::*;
93
94    #[rstest]
95    fn test_for_each_unique_point(#[values(0, 1, 5)] tracker_count: usize) {
96        let mut rand = StdRng::seed_from_u64(42);
97
98        let id_trackers = (0..tracker_count)
99            .map(|_| InMemoryIdTracker::random(&mut rand, 1000, 500, 10))
100            .collect_vec();
101
102        let mut collisions = 0;
103
104        // Naive HashMap-based implementation of for_each_unique_point.
105        let mut expected = HashMap::<ExtendedPointId, MergedPointId>::new();
106        for (tracker_index, id_tracker) in id_trackers.iter().enumerate() {
107            for (external_id, internal_id) in id_tracker.point_mappings().iter_from(None) {
108                let version = id_tracker.internal_version(internal_id).unwrap();
109                let merged_point_id = MergedPointId {
110                    external_id,
111                    tracker_index,
112                    internal_id,
113                    version,
114                };
115                match expected.entry(external_id) {
116                    hash_map::Entry::Occupied(mut entry) => {
117                        collisions += 1;
118                        if entry.get().version < version {
119                            entry.insert(merged_point_id);
120                        }
121                    }
122                    hash_map::Entry::Vacant(entry) => {
123                        entry.insert(merged_point_id);
124                    }
125                }
126            }
127        }
128
129        if tracker_count > 1 {
130            // Ensure generated id_trackers have a lot of common points, so we
131            // are testing the merge logic.
132            assert!(collisions > 500);
133        } else {
134            // No collisions expected for a single or no id_trackers.
135            assert_eq!(collisions, 0);
136        }
137        if tracker_count == 0 {
138            assert!(expected.is_empty());
139        }
140
141        for_each_unique_point(id_trackers.iter(), |merged_point_id| {
142            let v = expected.remove(&merged_point_id.external_id).unwrap();
143            assert_eq!(merged_point_id.tracker_index, v.tracker_index);
144            assert_eq!(merged_point_id.internal_id, v.internal_id);
145            assert_eq!(merged_point_id.version, v.version);
146        });
147
148        assert!(expected.is_empty());
149    }
150}