nodedb_vector/delta/compaction.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! SPFresh LIRE-style topology-aware local patching.
4//!
5//! Rather than rebuilding the entire HNSW on every update, `LirePatcher`
6//! stitches fresh delta vectors into the main graph one node at a time using
7//! the graph's own `insert` routine (which runs heuristic neighbor selection
8//! and bidirectional edge maintenance internally). Tombstones are forwarded
9//! to the HNSW as soft-deletes so search skips them immediately; physical
10//! removal is deferred to a later background compaction sweep via
11//! `HnswIndex::compact`.
12//!
13//! ## Partition-quality drift estimate
14//!
15//! After inserting each fresh node we inspect its assigned neighbors. For
16//! each neighbor `n`, we count how many of *n*'s own neighbors were already
17//! neighbors of any other newly-inserted node in this batch. A high overlap
18//! means the delta nodes clustered into an already-dense region — minimal
19//! drift. A low overlap means the new node landed in a sparse region that
20//! may require broader re-wiring.
21//!
22//! This is an O(patch_size × M) approximation of LIRE's full local-rebuild
23//! signal (SOSP 2023 §4.2). When the average overlap fraction across all
24//! patched nodes falls below `drift_threshold`, we record that subgraph in
25//! `PatchStats::drift_subgraphs` so the caller can schedule a deeper
26//! re-pruning pass at lower priority.
27
28use crate::delta::index::DeltaIndex;
29use crate::error::VectorError;
30use crate::hnsw::HnswIndex;
31
32/// SPFresh LIRE-style topology-aware local patcher.
33///
34/// Holds mutable references to both the main HNSW and the delta buffer so
35/// both can be updated atomically within a single flush.
36pub struct LirePatcher<'a> {
37 /// Main HNSW graph that fresh vectors will be patched into.
38 pub main: &'a mut HnswIndex,
39 /// Delta buffer whose fresh vectors (and tombstones) will be drained.
40 pub delta: &'a mut DeltaIndex,
41 /// Drift threshold in `[0.0, 1.0]`. When the average neighbor-overlap
42 /// fraction for a batch falls below this value the subgraph is flagged
43 /// for deeper re-pruning. Default: `0.3`.
44 pub drift_threshold: f32,
45}
46
47/// Statistics returned by a single `LirePatcher::patch` call.
48#[derive(Debug, Default, Clone)]
49pub struct PatchStats {
50 /// Number of fresh vectors successfully patched into the main HNSW.
51 pub patched: usize,
52 /// Number of vectors tombstoned in the main HNSW during this flush.
53 pub tombstoned_marked: usize,
54 /// Number of subgraphs flagged for deeper re-pruning due to drift.
55 pub drift_subgraphs: usize,
56}
57
58impl<'a> LirePatcher<'a> {
59 /// Create a patcher with the default drift threshold (`0.3`).
60 pub fn new(main: &'a mut HnswIndex, delta: &'a mut DeltaIndex) -> Self {
61 Self {
62 main,
63 delta,
64 drift_threshold: 0.3,
65 }
66 }
67
68 /// Flush the delta buffer into the main HNSW.
69 ///
70 /// ## Steps
71 ///
72 /// 1. Drain tombstones → call `HnswIndex::delete` on each.
73 /// 2. Drain fresh vectors → call `HnswIndex::insert` on each.
74 /// 3. After each insert, estimate local topology drift for the newly
75 /// assigned node and accumulate the overlap fraction.
76 /// 4. If average overlap fraction < `drift_threshold`, increment
77 /// `drift_subgraphs`.
78 ///
79 /// `k_neighbors` and `ef_construction` are accepted for API completeness
80 /// and forward-compatibility with future Vamana-style patchers; the
81 /// current HNSW implementation derives its own neighbor count from
82 /// `HnswParams` stored on the index, so these values are informational.
83 pub fn patch(
84 &mut self,
85 _k_neighbors: usize,
86 _ef_construction: usize,
87 ) -> Result<PatchStats, VectorError> {
88 let mut stats = PatchStats::default();
89
90 // --- Step 1: Forward tombstones to the main HNSW ---
91 let tombstone_ids = self.delta.drain_tombstones();
92 for id in tombstone_ids {
93 if self.main.delete(id) {
94 stats.tombstoned_marked += 1;
95 }
96 }
97
98 // --- Step 2 + 3: Insert fresh vectors and estimate drift ---
99 let fresh = self.delta.drain_fresh();
100
101 // Collect the node IDs that will be assigned to freshly inserted nodes
102 // so we can measure neighborhood overlap after each insert.
103 // The HNSW appends nodes sequentially, so the new id = len() before insert.
104 // no-governor: structural drift-tracking vec; scales with fresh delta count, compaction governed upstream
105 let mut overlap_fractions: Vec<f32> = Vec::with_capacity(fresh.len());
106 // Track the set of recently-patched node ids for overlap estimation.
107 let mut patched_ids: std::collections::HashSet<u32> =
108 std::collections::HashSet::with_capacity(fresh.len());
109
110 for (user_id, vector) in fresh {
111 // Skip tombstoned fresh inserts — they were deleted before we
112 // could patch them.
113 if self.delta.is_tombstoned(user_id) {
114 continue;
115 }
116
117 // The HNSW uses its own internal monotonic IDs (insertion order).
118 // We record what the next id will be before the insert.
119 let new_internal_id = self.main.len() as u32;
120
121 self.main.insert(vector)?;
122 stats.patched += 1;
123
124 // --- Drift estimation (LIRE approximation) ---
125 // Inspect neighbors assigned to the new node at layer 0.
126 let neighbors_l0 = self.main.hnsw_neighbors_layer0(new_internal_id);
127
128 let overlap_fraction = if neighbors_l0.is_empty() {
129 // First node or isolated — perfect connectivity by definition.
130 1.0f32
131 } else {
132 // Count how many neighbors are themselves in the current
133 // patched-ids set (i.e., recently inserted into this batch).
134 let overlap = neighbors_l0
135 .iter()
136 .filter(|&&nid| patched_ids.contains(&nid))
137 .count();
138 overlap as f32 / neighbors_l0.len() as f32
139 };
140
141 overlap_fractions.push(overlap_fraction);
142 patched_ids.insert(new_internal_id);
143 }
144
145 // --- Step 4: Flag drift subgraphs ---
146 if !overlap_fractions.is_empty() {
147 let avg_overlap =
148 overlap_fractions.iter().sum::<f32>() / overlap_fractions.len() as f32;
149 if avg_overlap < self.drift_threshold {
150 stats.drift_subgraphs += 1;
151 }
152 }
153
154 Ok(stats)
155 }
156}
157
158// ---------------------------------------------------------------------------
159// Additive accessor on HnswIndex required by LirePatcher.
160//
161// `mark_deleted` does not exist on `HnswIndex`; the equivalent is `delete`.
162// We add a thin helper that exposes layer-0 neighbors so the drift estimator
163// can read them without re-exposing internal fields.
164// ---------------------------------------------------------------------------
165impl HnswIndex {
166 /// Return the layer-0 neighbor list of `node_id`, or an empty slice if
167 /// the node does not exist or has no layer-0 neighbors.
168 ///
169 /// Used by `LirePatcher` for local topology-drift estimation.
170 pub fn hnsw_neighbors_layer0(&self, node_id: u32) -> Vec<u32> {
171 self.neighbors_at(node_id, 0).to_vec()
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use crate::hnsw::HnswIndex;
179 use nodedb_types::hnsw::HnswParams;
180
181 fn small_params() -> HnswParams {
182 HnswParams {
183 m: 4,
184 m0: 8,
185 ef_construction: 20,
186 ..HnswParams::default()
187 }
188 }
189
190 #[test]
191 fn patch_grows_hnsw_and_drains_delta() {
192 let mut main = HnswIndex::with_seed(3, small_params(), 1);
193 // Pre-populate main with 10 vectors so fresh nodes have neighbors.
194 for i in 0u32..10 {
195 let v = vec![i as f32, 0.0, 0.0];
196 main.insert(v).expect("pre-populate insert failed");
197 }
198 assert_eq!(main.len(), 10);
199
200 let mut delta = DeltaIndex::new(3, 32);
201 for i in 10u32..15 {
202 let v = vec![i as f32, 1.0, 0.0];
203 delta.insert(i, v);
204 }
205 assert_eq!(delta.fresh_len(), 5);
206
207 let mut patcher = LirePatcher::new(&mut main, &mut delta);
208 let stats = patcher.patch(8, 20).expect("patch failed");
209
210 assert_eq!(stats.patched, 5);
211 assert_eq!(delta.fresh_len(), 0);
212 assert_eq!(main.len(), 15);
213 }
214
215 #[test]
216 fn tombstone_forwarded_to_hnsw() {
217 let mut main = HnswIndex::with_seed(3, small_params(), 2);
218 for i in 0u32..5 {
219 let v = vec![i as f32, 0.0, 0.0];
220 main.insert(v).expect("insert failed");
221 }
222 assert!(!main.is_deleted(2));
223
224 let mut delta = DeltaIndex::new(3, 16);
225 delta.tombstone(2);
226
227 let mut patcher = LirePatcher::new(&mut main, &mut delta);
228 let stats = patcher.patch(4, 20).expect("patch failed");
229
230 assert_eq!(stats.tombstoned_marked, 1);
231 assert!(main.is_deleted(2));
232 }
233
234 #[test]
235 fn patch_empty_delta_is_noop() {
236 let mut main = HnswIndex::with_seed(3, small_params(), 3);
237 for i in 0u32..3 {
238 main.insert(vec![i as f32, 0.0, 0.0])
239 .expect("insert failed");
240 }
241 let initial_len = main.len();
242
243 let mut delta = DeltaIndex::new(3, 16);
244 let mut patcher = LirePatcher::new(&mut main, &mut delta);
245 let stats = patcher.patch(4, 20).expect("patch failed");
246
247 assert_eq!(stats.patched, 0);
248 assert_eq!(stats.tombstoned_marked, 0);
249 assert_eq!(main.len(), initial_len);
250 }
251}