Skip to main content

nodedb_vector/delta/
compaction.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! SPFresh LIRE-style topology-aware local patching.
4//!
5//! Rather than rebuilding the entire HNSW on every update, `LirePatcher`
6//! stitches fresh delta vectors into the main graph one node at a time using
7//! the graph's own `insert` routine (which runs heuristic neighbor selection
8//! and bidirectional edge maintenance internally).  Tombstones are forwarded
9//! to the HNSW as soft-deletes so search skips them immediately; physical
10//! removal is deferred to a later background compaction sweep via
11//! `HnswIndex::compact`.
12//!
13//! ## Partition-quality drift estimate
14//!
15//! After inserting each fresh node we inspect its assigned neighbors.  For
16//! each neighbor `n`, we count how many of *n*'s own neighbors were already
17//! neighbors of any other newly-inserted node in this batch.  A high overlap
18//! means the delta nodes clustered into an already-dense region — minimal
19//! drift.  A low overlap means the new node landed in a sparse region that
20//! may require broader re-wiring.
21//!
22//! This is an O(patch_size × M) approximation of LIRE's full local-rebuild
23//! signal (SOSP 2023 §4.2).  When the average overlap fraction across all
24//! patched nodes falls below `drift_threshold`, we record that subgraph in
25//! `PatchStats::drift_subgraphs` so the caller can schedule a deeper
26//! re-pruning pass at lower priority.
27
28use crate::delta::index::DeltaIndex;
29use crate::error::VectorError;
30use crate::hnsw::HnswIndex;
31
32/// SPFresh LIRE-style topology-aware local patcher.
33///
34/// Holds mutable references to both the main HNSW and the delta buffer so
35/// both can be updated atomically within a single flush.
36pub struct LirePatcher<'a> {
37    /// Main HNSW graph that fresh vectors will be patched into.
38    pub main: &'a mut HnswIndex,
39    /// Delta buffer whose fresh vectors (and tombstones) will be drained.
40    pub delta: &'a mut DeltaIndex,
41    /// Drift threshold in `[0.0, 1.0]`.  When the average neighbor-overlap
42    /// fraction for a batch falls below this value the subgraph is flagged
43    /// for deeper re-pruning.  Default: `0.3`.
44    pub drift_threshold: f32,
45}
46
47/// Statistics returned by a single `LirePatcher::patch` call.
48#[derive(Debug, Default, Clone)]
49pub struct PatchStats {
50    /// Number of fresh vectors successfully patched into the main HNSW.
51    pub patched: usize,
52    /// Number of vectors tombstoned in the main HNSW during this flush.
53    pub tombstoned_marked: usize,
54    /// Number of subgraphs flagged for deeper re-pruning due to drift.
55    pub drift_subgraphs: usize,
56}
57
58impl<'a> LirePatcher<'a> {
59    /// Create a patcher with the default drift threshold (`0.3`).
60    pub fn new(main: &'a mut HnswIndex, delta: &'a mut DeltaIndex) -> Self {
61        Self {
62            main,
63            delta,
64            drift_threshold: 0.3,
65        }
66    }
67
68    /// Flush the delta buffer into the main HNSW.
69    ///
70    /// ## Steps
71    ///
72    /// 1. Drain tombstones → call `HnswIndex::delete` on each.
73    /// 2. Drain fresh vectors → call `HnswIndex::insert` on each.
74    /// 3. After each insert, estimate local topology drift for the newly
75    ///    assigned node and accumulate the overlap fraction.
76    /// 4. If average overlap fraction < `drift_threshold`, increment
77    ///    `drift_subgraphs`.
78    ///
79    /// `k_neighbors` and `ef_construction` are accepted for API completeness
80    /// and forward-compatibility with future Vamana-style patchers; the
81    /// current HNSW implementation derives its own neighbor count from
82    /// `HnswParams` stored on the index, so these values are informational.
83    pub fn patch(
84        &mut self,
85        _k_neighbors: usize,
86        _ef_construction: usize,
87    ) -> Result<PatchStats, VectorError> {
88        let mut stats = PatchStats::default();
89
90        // --- Step 1: Forward tombstones to the main HNSW ---
91        let tombstone_ids = self.delta.drain_tombstones();
92        for id in tombstone_ids {
93            if self.main.delete(id) {
94                stats.tombstoned_marked += 1;
95            }
96        }
97
98        // --- Step 2 + 3: Insert fresh vectors and estimate drift ---
99        let fresh = self.delta.drain_fresh();
100
101        // Collect the node IDs that will be assigned to freshly inserted nodes
102        // so we can measure neighborhood overlap after each insert.
103        // The HNSW appends nodes sequentially, so the new id = len() before insert.
104        // no-governor: structural drift-tracking vec; scales with fresh delta count, compaction governed upstream
105        let mut overlap_fractions: Vec<f32> = Vec::with_capacity(fresh.len());
106        // Track the set of recently-patched node ids for overlap estimation.
107        let mut patched_ids: std::collections::HashSet<u32> =
108            std::collections::HashSet::with_capacity(fresh.len());
109
110        for (user_id, vector) in fresh {
111            // Skip tombstoned fresh inserts — they were deleted before we
112            // could patch them.
113            if self.delta.is_tombstoned(user_id) {
114                continue;
115            }
116
117            // The HNSW uses its own internal monotonic IDs (insertion order).
118            // We record what the next id will be before the insert.
119            let new_internal_id = self.main.len() as u32;
120
121            self.main.insert(vector)?;
122            stats.patched += 1;
123
124            // --- Drift estimation (LIRE approximation) ---
125            // Inspect neighbors assigned to the new node at layer 0.
126            let neighbors_l0 = self.main.hnsw_neighbors_layer0(new_internal_id);
127
128            let overlap_fraction = if neighbors_l0.is_empty() {
129                // First node or isolated — perfect connectivity by definition.
130                1.0f32
131            } else {
132                // Count how many neighbors are themselves in the current
133                // patched-ids set (i.e., recently inserted into this batch).
134                let overlap = neighbors_l0
135                    .iter()
136                    .filter(|&&nid| patched_ids.contains(&nid))
137                    .count();
138                overlap as f32 / neighbors_l0.len() as f32
139            };
140
141            overlap_fractions.push(overlap_fraction);
142            patched_ids.insert(new_internal_id);
143        }
144
145        // --- Step 4: Flag drift subgraphs ---
146        if !overlap_fractions.is_empty() {
147            let avg_overlap =
148                overlap_fractions.iter().sum::<f32>() / overlap_fractions.len() as f32;
149            if avg_overlap < self.drift_threshold {
150                stats.drift_subgraphs += 1;
151            }
152        }
153
154        Ok(stats)
155    }
156}
157
158// ---------------------------------------------------------------------------
159// Additive accessor on HnswIndex required by LirePatcher.
160//
161// `mark_deleted` does not exist on `HnswIndex`; the equivalent is `delete`.
162// We add a thin helper that exposes layer-0 neighbors so the drift estimator
163// can read them without re-exposing internal fields.
164// ---------------------------------------------------------------------------
165impl HnswIndex {
166    /// Return the layer-0 neighbor list of `node_id`, or an empty slice if
167    /// the node does not exist or has no layer-0 neighbors.
168    ///
169    /// Used by `LirePatcher` for local topology-drift estimation.
170    pub fn hnsw_neighbors_layer0(&self, node_id: u32) -> Vec<u32> {
171        self.neighbors_at(node_id, 0).to_vec()
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use crate::hnsw::HnswIndex;
179    use nodedb_types::hnsw::HnswParams;
180
181    fn small_params() -> HnswParams {
182        HnswParams {
183            m: 4,
184            m0: 8,
185            ef_construction: 20,
186            ..HnswParams::default()
187        }
188    }
189
190    #[test]
191    fn patch_grows_hnsw_and_drains_delta() {
192        let mut main = HnswIndex::with_seed(3, small_params(), 1);
193        // Pre-populate main with 10 vectors so fresh nodes have neighbors.
194        for i in 0u32..10 {
195            let v = vec![i as f32, 0.0, 0.0];
196            main.insert(v).expect("pre-populate insert failed");
197        }
198        assert_eq!(main.len(), 10);
199
200        let mut delta = DeltaIndex::new(3, 32);
201        for i in 10u32..15 {
202            let v = vec![i as f32, 1.0, 0.0];
203            delta.insert(i, v);
204        }
205        assert_eq!(delta.fresh_len(), 5);
206
207        let mut patcher = LirePatcher::new(&mut main, &mut delta);
208        let stats = patcher.patch(8, 20).expect("patch failed");
209
210        assert_eq!(stats.patched, 5);
211        assert_eq!(delta.fresh_len(), 0);
212        assert_eq!(main.len(), 15);
213    }
214
215    #[test]
216    fn tombstone_forwarded_to_hnsw() {
217        let mut main = HnswIndex::with_seed(3, small_params(), 2);
218        for i in 0u32..5 {
219            let v = vec![i as f32, 0.0, 0.0];
220            main.insert(v).expect("insert failed");
221        }
222        assert!(!main.is_deleted(2));
223
224        let mut delta = DeltaIndex::new(3, 16);
225        delta.tombstone(2);
226
227        let mut patcher = LirePatcher::new(&mut main, &mut delta);
228        let stats = patcher.patch(4, 20).expect("patch failed");
229
230        assert_eq!(stats.tombstoned_marked, 1);
231        assert!(main.is_deleted(2));
232    }
233
234    #[test]
235    fn patch_empty_delta_is_noop() {
236        let mut main = HnswIndex::with_seed(3, small_params(), 3);
237        for i in 0u32..3 {
238            main.insert(vec![i as f32, 0.0, 0.0])
239                .expect("insert failed");
240        }
241        let initial_len = main.len();
242
243        let mut delta = DeltaIndex::new(3, 16);
244        let mut patcher = LirePatcher::new(&mut main, &mut delta);
245        let stats = patcher.patch(4, 20).expect("patch failed");
246
247        assert_eq!(stats.patched, 0);
248        assert_eq!(stats.tombstoned_marked, 0);
249        assert_eq!(main.len(), initial_len);
250    }
251}