Skip to main content

nodedb_vector/delta/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! In-memory delta index for SPFresh streaming inserts.
4//!
5//! Fresh vectors accumulate here until `DeltaIndex::is_full()` signals that
6//! compaction should flush them into the main HNSW.  Tombstones are lazily
7//! tracked here and filtered at query time.
8
9use std::collections::HashSet;
10
11use crate::distance::distance;
12use nodedb_types::vector_distance::DistanceMetric;
13
14/// Secondary in-memory index that absorbs fresh inserts before they are
15/// patched into the main HNSW graph.
16pub struct DeltaIndex {
17    pub dim: usize,
18    /// Newly inserted vectors awaiting merge: `(id, vector)`.
19    fresh: Vec<(u32, Vec<f32>)>,
20    /// IDs marked tombstoned (lazy physical removal).
21    tombstones: HashSet<u32>,
22    /// Soft cap; callers should flush when `is_full()` returns `true`.
23    max_fresh: usize,
24}
25
26impl DeltaIndex {
27    /// Create a new delta index for vectors of `dim` dimensions.
28    ///
29    /// `max_fresh` is the capacity threshold above which `is_full()` returns
30    /// `true`, signalling that a compaction flush should be triggered.
31    pub fn new(dim: usize, max_fresh: usize) -> Self {
32        Self {
33            dim,
34            fresh: Vec::new(),
35            tombstones: HashSet::new(),
36            max_fresh,
37        }
38    }
39
40    /// Stage a fresh insert.  Does not deduplicate — callers must ensure IDs
41    /// are unique across the delta and the main HNSW.
42    pub fn insert(&mut self, id: u32, vector: Vec<f32>) {
43        self.fresh.push((id, vector));
44    }
45
46    /// Mark `id` as tombstoned.  It will be excluded from `search` results
47    /// and from the patch applied to the main HNSW.
48    pub fn tombstone(&mut self, id: u32) {
49        self.tombstones.insert(id);
50    }
51
52    /// Returns `true` when the number of fresh vectors has reached `max_fresh`.
53    pub fn is_full(&self) -> bool {
54        self.fresh.len() >= self.max_fresh
55    }
56
57    /// Number of un-drained fresh vectors.
58    pub fn fresh_len(&self) -> usize {
59        self.fresh.len()
60    }
61
62    /// Brute-force scan over fresh vectors (excluding tombstones), returning
63    /// the top-`k` results sorted ascending by distance.
64    pub fn search(&self, query: &[f32], k: usize, metric: DistanceMetric) -> Vec<(u32, f32)> {
65        if k == 0 {
66            return Vec::new();
67        }
68
69        let mut scored: Vec<(u32, f32)> = self
70            .fresh
71            .iter()
72            .filter(|(id, _)| !self.tombstones.contains(id))
73            .map(|(id, vec)| (*id, distance(query, vec, metric)))
74            .collect();
75
76        // Partial sort: cheapest path to top-k.
77        if k < scored.len() {
78            scored.select_nth_unstable_by(k, |a, b| {
79                a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal)
80            });
81            scored.truncate(k);
82        }
83
84        scored.sort_unstable_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
85
86        scored
87    }
88
89    /// Drain all staged fresh vectors for patching into the main HNSW.
90    /// After this call `fresh_len()` returns 0.
91    pub fn drain_fresh(&mut self) -> Vec<(u32, Vec<f32>)> {
92        std::mem::take(&mut self.fresh)
93    }
94
95    /// Drain the current tombstone set.
96    /// After this call no IDs are considered tombstoned in this delta.
97    pub fn drain_tombstones(&mut self) -> Vec<u32> {
98        std::mem::take(&mut self.tombstones).into_iter().collect()
99    }
100
101    /// Returns `true` if `id` is currently tombstoned in this delta.
102    pub fn is_tombstoned(&self, id: u32) -> bool {
103        self.tombstones.contains(&id)
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110    use nodedb_types::vector_distance::DistanceMetric;
111
112    fn make_delta() -> DeltaIndex {
113        let mut d = DeltaIndex::new(3, 16);
114        for i in 0u32..10 {
115            let v = vec![i as f32, 0.0, 0.0];
116            d.insert(i, v);
117        }
118        d
119    }
120
121    #[test]
122    fn top_k_returns_nearest() {
123        let d = make_delta();
124        let query = [0.0f32, 0.0, 0.0];
125        let results = d.search(&query, 3, DistanceMetric::L2);
126        assert_eq!(results.len(), 3);
127        // Nearest to [0,0,0] with L2^2 are ids 0,1,2
128        assert_eq!(results[0].0, 0);
129        assert_eq!(results[1].0, 1);
130        assert_eq!(results[2].0, 2);
131    }
132
133    #[test]
134    fn tombstone_excluded_from_search() {
135        let mut d = make_delta();
136        d.tombstone(0);
137        let query = [0.0f32, 0.0, 0.0];
138        let results = d.search(&query, 3, DistanceMetric::L2);
139        assert!(results.iter().all(|(id, _)| *id != 0));
140    }
141
142    #[test]
143    fn is_full_triggers_at_threshold() {
144        let mut d = DeltaIndex::new(3, 3);
145        assert!(!d.is_full());
146        d.insert(0, vec![0.0, 0.0, 0.0]);
147        d.insert(1, vec![1.0, 0.0, 0.0]);
148        assert!(!d.is_full());
149        d.insert(2, vec![2.0, 0.0, 0.0]);
150        assert!(d.is_full());
151    }
152
153    #[test]
154    fn drain_fresh_empties_buffer() {
155        let mut d = make_delta();
156        let drained = d.drain_fresh();
157        assert_eq!(drained.len(), 10);
158        assert_eq!(d.fresh_len(), 0);
159    }
160
161    #[test]
162    fn drain_tombstones_empties_set() {
163        let mut d = make_delta();
164        d.tombstone(3);
165        d.tombstone(7);
166        let ts = d.drain_tombstones();
167        assert_eq!(ts.len(), 2);
168        assert!(!d.is_tombstoned(3));
169        assert!(!d.is_tombstoned(7));
170    }
171}