qdrant_edge/segment/id_tracker/
mod.rs1pub mod compressed;
2pub mod id_tracker_base;
3pub mod immutable_id_tracker;
4pub mod in_memory_id_tracker;
5pub mod mutable_id_tracker;
6pub mod point_mappings;
7#[cfg(feature = "rocksdb")]
8pub mod simple_id_tracker;
9
10use crate::common::types::PointOffsetType;
11pub use id_tracker_base::*;
12use itertools::Itertools as _;
13
14use crate::segment::types::{ExtendedPointId, PointIdType};
15
16#[derive(Debug, Clone, Copy)]
19pub struct MergedPointId {
20 pub external_id: ExtendedPointId,
23 pub tracker_index: usize,
26 pub internal_id: PointOffsetType,
28 pub version: u64,
30}
31
32pub fn for_each_unique_point<'a>(
36 id_trackers: impl Iterator<Item = &'a (impl IdTracker + ?Sized + 'a)>,
37 mut f: impl FnMut(MergedPointId),
38) {
39 let mut iter = id_trackers
40 .enumerate()
41 .map(|(segment_index, id_tracker)| {
42 id_tracker.point_mappings().iter_from(None).filter_map(
43 move |(external_id, internal_id)| {
44 let version = id_tracker.internal_version(internal_id);
45 version.map(|version| MergedPointId {
47 external_id,
48 tracker_index: segment_index,
49 internal_id,
50 version,
51 })
52 },
53 )
54 })
55 .kmerge_by(|a, b| a.external_id < b.external_id);
56
57 let Some(mut best_item) = iter.next() else {
58 return;
59 };
60
61 for item in iter {
62 if best_item.external_id == item.external_id {
63 if best_item.version < item.version {
64 best_item = item;
65 }
66 } else {
67 f(best_item);
68 best_item = item;
69 }
70 }
71 f(best_item);
72}
73
74impl From<&ExtendedPointId> for PointIdType {
75 fn from(point_id: &ExtendedPointId) -> Self {
76 match point_id {
77 ExtendedPointId::NumId(idx) => PointIdType::NumId(*idx),
78 ExtendedPointId::Uuid(uuid) => PointIdType::Uuid(*uuid),
79 }
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use std::collections::{HashMap, hash_map};
86
87 use in_memory_id_tracker::InMemoryIdTracker;
88 use rand::SeedableRng as _;
89 use rand::rngs::StdRng;
90 use rstest::rstest;
91
92 use super::*;
93
94 #[rstest]
95 fn test_for_each_unique_point(#[values(0, 1, 5)] tracker_count: usize) {
96 let mut rand = StdRng::seed_from_u64(42);
97
98 let id_trackers = (0..tracker_count)
99 .map(|_| InMemoryIdTracker::random(&mut rand, 1000, 500, 10))
100 .collect_vec();
101
102 let mut collisions = 0;
103
104 let mut expected = HashMap::<ExtendedPointId, MergedPointId>::new();
106 for (tracker_index, id_tracker) in id_trackers.iter().enumerate() {
107 for (external_id, internal_id) in id_tracker.point_mappings().iter_from(None) {
108 let version = id_tracker.internal_version(internal_id).unwrap();
109 let merged_point_id = MergedPointId {
110 external_id,
111 tracker_index,
112 internal_id,
113 version,
114 };
115 match expected.entry(external_id) {
116 hash_map::Entry::Occupied(mut entry) => {
117 collisions += 1;
118 if entry.get().version < version {
119 entry.insert(merged_point_id);
120 }
121 }
122 hash_map::Entry::Vacant(entry) => {
123 entry.insert(merged_point_id);
124 }
125 }
126 }
127 }
128
129 if tracker_count > 1 {
130 assert!(collisions > 500);
133 } else {
134 assert_eq!(collisions, 0);
136 }
137 if tracker_count == 0 {
138 assert!(expected.is_empty());
139 }
140
141 for_each_unique_point(id_trackers.iter(), |merged_point_id| {
142 let v = expected.remove(&merged_point_id.external_id).unwrap();
143 assert_eq!(merged_point_id.tracker_index, v.tracker_index);
144 assert_eq!(merged_point_id.internal_id, v.internal_id);
145 assert_eq!(merged_point_id.version, v.version);
146 });
147
148 assert!(expected.is_empty());
149 }
150}