velesdb_core/collection/streaming/deferred.rs
1//! Deferred indexer for high-throughput sequential vector inserts.
2//!
3//! The [`DeferredIndexer`] buffers incoming vectors in a single write buffer
4//! and exposes them to search via brute-force scan while they await insertion
5//! into the HNSW graph. This decouples the write path (fast, O(1) per point)
6//! from the index path (slower, O(log n) per point) and enables
7//! threshold-triggered merge.
8//!
9//! # Single-buffer with threshold-triggered merge
10//!
11//! The indexer holds one buffer that accepts writes. When the buffer reaches
12//! `merge_threshold`, [`swap_and_drain`](DeferredIndexer::swap_and_drain)
13//! drains it and returns the vectors for the caller to batch-insert into HNSW.
14//!
15//! # Deleted IDs
16//!
17//! When a point is deleted while buffered, its ID is recorded in a
18//! `deleted_ids` set. Search results are filtered against this set so that
19//! deleted vectors never surface. The set is cleared on drain (the HNSW
20//! tombstone system takes over after merge).
21//!
22//! # Lock ordering
23//!
24//! `DeferredIndexer` is above `DeltaBuffer` (position 10) in the lock order.
25//! The `swap_lock` (position 10.1) must never be held while acquiring any
26//! lower-numbered lock.
27
28use super::delta::DeltaBuffer;
29use crate::distance::DistanceMetric;
30use parking_lot::{Mutex, RwLock};
31use rustc_hash::FxHashSet;
32use serde::{Deserialize, Serialize};
33use std::sync::Arc;
34
35// ── Constants ────────────────────────────────────────────────────────────────
36
37/// Default number of buffered vectors before a merge is triggered.
38const DEFAULT_MERGE_THRESHOLD: usize = 1024;
39
40/// Default maximum age of buffered data before a time-based merge (ms).
41const DEFAULT_MAX_BUFFER_AGE_MS: u64 = 5000;
42
43// ── Configuration ────────────────────────────────────────────────────────────
44
45/// Configuration for the [`DeferredIndexer`].
46///
47/// Controls whether deferred indexing is enabled, how many vectors to
48/// buffer before triggering a merge, and the maximum age of buffered data.
49///
50/// # Examples
51///
52/// ```
53/// use velesdb_core::collection::streaming::DeferredIndexerConfig;
54///
55/// let config = DeferredIndexerConfig::default();
56/// assert!(!config.enabled);
57/// assert_eq!(config.merge_threshold, 1024);
58/// ```
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct DeferredIndexerConfig {
61 /// Whether deferred indexing is enabled (default: `false`).
62 #[serde(default)]
63 pub enabled: bool,
64
65 /// Number of buffered vectors that triggers a merge into HNSW.
66 #[serde(default = "default_merge_threshold")]
67 pub merge_threshold: usize,
68
69 /// Maximum age (milliseconds) of the oldest buffered vector before a
70 /// time-based merge is triggered.
71 #[serde(default = "default_max_buffer_age_ms")]
72 pub max_buffer_age_ms: u64,
73}
74
75fn default_merge_threshold() -> usize {
76 DEFAULT_MERGE_THRESHOLD
77}
78
79fn default_max_buffer_age_ms() -> u64 {
80 DEFAULT_MAX_BUFFER_AGE_MS
81}
82
83impl Default for DeferredIndexerConfig {
84 fn default() -> Self {
85 Self {
86 enabled: false,
87 merge_threshold: DEFAULT_MERGE_THRESHOLD,
88 max_buffer_age_ms: DEFAULT_MAX_BUFFER_AGE_MS,
89 }
90 }
91}
92
93// ── DeferredIndexer ──────────────────────────────────────────────────────────
94
95/// Buffers vectors for deferred HNSW insertion with brute-force searchability.
96///
97/// See the [module-level docs](self) for design details.
98pub struct DeferredIndexer {
99 /// Write buffer — accepts pushes and is drained on merge.
100 buffer: Arc<DeltaBuffer>,
101
102 /// Serializes swap-and-drain operations so only one drain runs at a time.
103 swap_lock: Mutex<()>,
104
105 /// IDs deleted while in the buffer. Filtered out of search results.
106 /// Uses `FxHashSet` for faster integer hashing on the hot search path.
107 deleted_ids: RwLock<FxHashSet<u64>>,
108
109 /// Configuration (immutable after construction).
110 config: DeferredIndexerConfig,
111}
112
113impl DeferredIndexer {
114 /// Creates a new `DeferredIndexer` with the given configuration.
115 ///
116 /// The buffer starts inactive. If `config.enabled` is `false`, all
117 /// write operations are no-ops.
118 #[must_use]
119 pub fn new(config: DeferredIndexerConfig) -> Self {
120 Self {
121 buffer: Arc::new(DeltaBuffer::new()),
122 swap_lock: Mutex::new(()),
123 deleted_ids: RwLock::new(FxHashSet::default()),
124 config,
125 }
126 }
127
128 /// Whether deferred indexing is enabled.
129 #[must_use]
130 pub fn is_enabled(&self) -> bool {
131 self.config.enabled
132 }
133
134 /// Pushes a vector into the write buffer.
135 ///
136 /// Activates the buffer lazily on first write. Returns `true` if
137 /// the buffer has reached `merge_threshold`, signaling the caller
138 /// to trigger a merge.
139 ///
140 /// No-op if deferred indexing is disabled.
141 ///
142 /// # TOCTOU note
143 ///
144 /// The `enabled` check and `len() >= threshold` read are not atomic with
145 /// the push. This is benign: a concurrent drain may reset the count
146 /// between push and the threshold check, causing a missed merge signal.
147 /// The next push will re-trigger.
148 ///
149 /// A previous TOCTOU window existed where `swap_and_drain` could
150 /// deactivate the buffer between `ensure_buffer_active` and the
151 /// underlying `buffer.push`, causing the vector to be silently dropped.
152 /// This is fixed: `swap_and_drain` now re-activates the buffer after
153 /// draining so pushes between drain and the next merge succeed.
154 pub fn push(&self, id: u64, vector: Vec<f32>) -> bool {
155 if !self.config.enabled {
156 return false;
157 }
158 self.ensure_buffer_active();
159 self.buffer.push(id, vector);
160 self.buffer.len() >= self.config.merge_threshold
161 }
162
163 /// Batch-pushes vectors into the write buffer.
164 ///
165 /// Returns `true` if the buffer has reached `merge_threshold`.
166 /// No-op if deferred indexing is disabled.
167 pub fn extend(&self, entries: impl IntoIterator<Item = (u64, Vec<f32>)>) -> bool {
168 if !self.config.enabled {
169 return false;
170 }
171 self.ensure_buffer_active();
172 self.buffer.extend(entries);
173 self.buffer.len() >= self.config.merge_threshold
174 }
175
176 /// Marks `id` as deleted, removing it from the buffer.
177 ///
178 /// The ID is added to `deleted_ids` so that search results are filtered
179 /// even if the vector was already snapshot for a concurrent search.
180 pub fn remove(&self, id: u64) {
181 self.buffer.remove(id);
182 self.deleted_ids.write().insert(id);
183 }
184
185 /// Brute-force searches the buffer, filtering deleted IDs.
186 ///
187 /// Results are sorted by the metric ordering and truncated to `k`.
188 ///
189 /// To compensate for post-filter attrition, the buffer is queried with
190 /// `k + deleted_ids.len()` candidates. This is bounded: `deleted_ids`
191 /// never exceeds `merge_threshold` entries (cleared on every drain).
192 ///
193 /// # TOCTOU note
194 ///
195 /// The `deleted_ids` snapshot is read under a separate lock from the
196 /// buffer search. A concurrent delete between the buffer snapshot and the
197 /// `deleted_ids` read is benign: the ID will be filtered on the next
198 /// search after the delete completes.
199 #[must_use]
200 pub fn search(&self, query: &[f32], k: usize, metric: DistanceMetric) -> Vec<(u64, f32)> {
201 let deleted = self.deleted_ids.read();
202 let overfetch = k.saturating_add(deleted.len());
203 let buffer_results = self.buffer.search(query, overfetch, metric);
204 let mut filtered = filter_deleted(buffer_results, &deleted);
205 drop(deleted);
206 metric.sort_results(&mut filtered);
207 filtered.truncate(k);
208 filtered
209 }
210
211 /// Merges HNSW results with deferred buffer results.
212 ///
213 /// Buffer is authoritative on duplicate IDs (more recent data): when a
214 /// point is upserted while deferred indexing is active, the new vector
215 /// goes to the buffer while HNSW still holds the stale vector. On ID
216 /// conflict the buffer score is kept, mirroring `merge_with_delta` in
217 /// `delta.rs`.
218 ///
219 /// Deleted IDs are filtered from buffer results but not from HNSW
220 /// results (HNSW has its own tombstone system).
221 #[must_use]
222 pub fn merge_with_hnsw(
223 &self,
224 hnsw_results: Vec<(u64, f32)>,
225 query: &[f32],
226 k: usize,
227 metric: DistanceMetric,
228 ) -> Vec<(u64, f32)> {
229 let buffer_results = self.search(query, k, metric);
230 if buffer_results.is_empty() {
231 return hnsw_results;
232 }
233 // Buffer holds more-recent data (upserts route through buffer, not HNSW).
234 // On ID conflict, keep the buffer score.
235 let buffer_ids: FxHashSet<u64> = buffer_results.iter().map(|(id, _)| *id).collect();
236 let mut combined: Vec<(u64, f32)> = hnsw_results
237 .into_iter()
238 .filter(|(id, _)| !buffer_ids.contains(id))
239 .collect();
240 combined.extend(buffer_results);
241 metric.sort_results(&mut combined);
242 combined.truncate(k);
243 combined
244 }
245
246 /// Drains the buffer and returns vectors for HNSW insertion.
247 ///
248 /// After this call the buffer is empty but **re-activated** so that
249 /// pushes arriving between drain and the next merge are not silently
250 /// dropped. The `deleted_ids` set is cleared because the caller is
251 /// expected to apply deletions to HNSW after merge.
252 ///
253 /// Serialized by an internal mutex so concurrent calls are safe (the
254 /// second caller gets an empty drain).
255 pub fn swap_and_drain(&self) -> Vec<(u64, Vec<f32>)> {
256 let _guard = self.swap_lock.lock();
257 let drained = self.buffer.deactivate_and_drain();
258 self.deleted_ids.write().clear();
259 // Re-activate the buffer so pushes between drain and next merge
260 // are not silently dropped (fixes TOCTOU race with concurrent push).
261 self.buffer.activate();
262 drained
263 }
264
265 /// Total number of pending (not yet indexed) vectors in the buffer.
266 #[must_use]
267 pub fn pending_count(&self) -> usize {
268 self.buffer.len()
269 }
270
271 /// Returns `true` if the buffer has reached `merge_threshold`.
272 #[must_use]
273 pub fn should_merge(&self) -> bool {
274 self.buffer.len() >= self.config.merge_threshold
275 }
276
277 /// Returns `true` if deferred indexing is enabled and the buffer has
278 /// searchable data.
279 #[must_use]
280 pub fn is_searchable(&self) -> bool {
281 self.config.enabled && self.buffer.is_searchable()
282 }
283
284 /// Drains all vectors from the buffer (for shutdown / flush).
285 ///
286 /// Clears `deleted_ids`. After this call the buffer is empty and
287 /// inactive.
288 pub fn drain_all(&self) -> Vec<(u64, Vec<f32>)> {
289 let _guard = self.swap_lock.lock();
290 let all = self.buffer.deactivate_and_drain();
291 self.deleted_ids.write().clear();
292 all
293 }
294
295 /// Lazily activates the buffer if it is not already active.
296 fn ensure_buffer_active(&self) {
297 if !self.buffer.is_active() {
298 self.buffer.activate();
299 }
300 }
301}
302
303// ── Helpers ──────────────────────────────────────────────────────────────────
304
305/// Filters out deleted IDs from a result set.
306fn filter_deleted(results: Vec<(u64, f32)>, deleted: &FxHashSet<u64>) -> Vec<(u64, f32)> {
307 if deleted.is_empty() {
308 return results;
309 }
310 results
311 .into_iter()
312 .filter(|(id, _)| !deleted.contains(id))
313 .collect()
314}
315
316// ── Tests ────────────────────────────────────────────────────────────────────
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use std::collections::HashSet;
322
323 /// Helper: builds an enabled config with a custom threshold.
324 fn enabled_config(threshold: usize) -> DeferredIndexerConfig {
325 DeferredIndexerConfig {
326 enabled: true,
327 merge_threshold: threshold,
328 ..DeferredIndexerConfig::default()
329 }
330 }
331
332 // ── Push tests ───────────────────────────────────────────────────────
333
334 #[test]
335 fn test_deferred_push_when_enabled() {
336 let idx = DeferredIndexer::new(enabled_config(1024));
337 idx.push(1, vec![1.0, 0.0, 0.0]);
338 idx.push(2, vec![0.0, 1.0, 0.0]);
339 assert_eq!(idx.pending_count(), 2);
340 }
341
342 #[test]
343 fn test_deferred_push_returns_true_at_threshold() {
344 let idx = DeferredIndexer::new(enabled_config(3));
345 assert!(!idx.push(1, vec![1.0]));
346 assert!(!idx.push(2, vec![2.0]));
347 assert!(idx.push(3, vec![3.0]), "third push should hit threshold");
348 }
349
350 #[test]
351 fn test_deferred_push_noop_when_disabled() {
352 let config = DeferredIndexerConfig::default(); // enabled=false
353 let idx = DeferredIndexer::new(config);
354 let triggered = idx.push(1, vec![1.0, 2.0]);
355 assert!(!triggered);
356 assert_eq!(idx.pending_count(), 0);
357 }
358
359 #[test]
360 fn test_deferred_extend_returns_true_at_threshold() {
361 let idx = DeferredIndexer::new(enabled_config(3));
362 let entries = vec![(1, vec![1.0]), (2, vec![2.0]), (3, vec![3.0])];
363 assert!(idx.extend(entries), "batch should hit threshold");
364 }
365
366 // ── Search tests ─────────────────────────────────────────────────────
367
368 #[test]
369 fn test_deferred_search_finds_buffered_vectors() {
370 let idx = DeferredIndexer::new(enabled_config(1024));
371 idx.push(1, vec![1.0, 0.0]);
372 idx.push(2, vec![0.0, 1.0]);
373
374 let results = idx.search(&[1.0, 0.0], 2, DistanceMetric::Cosine);
375 assert_eq!(results.len(), 2);
376 // Cosine: id=1 (identical to query) should be first
377 assert_eq!(results[0].0, 1);
378 }
379
380 #[test]
381 fn test_deferred_search_filters_deleted_ids() {
382 let idx = DeferredIndexer::new(enabled_config(1024));
383 idx.push(1, vec![1.0, 0.0, 0.0]);
384 idx.push(2, vec![0.0, 1.0, 0.0]);
385 idx.push(3, vec![0.0, 0.0, 1.0]);
386 idx.remove(2);
387
388 let results = idx.search(&[1.0, 0.0, 0.0], 10, DistanceMetric::Euclidean);
389 let ids: Vec<u64> = results.iter().map(|(id, _)| *id).collect();
390 assert!(!ids.contains(&2), "deleted ID 2 must not appear in results");
391 assert_eq!(ids.len(), 2);
392 }
393
394 // ── Swap and drain tests ─────────────────────────────────────────────
395
396 #[test]
397 fn test_deferred_swap_and_drain() {
398 let idx = DeferredIndexer::new(enabled_config(1024));
399 idx.push(1, vec![1.0]);
400 idx.push(2, vec![2.0]);
401
402 let drained = idx.swap_and_drain();
403 assert_eq!(drained.len(), 2);
404 assert_eq!(idx.pending_count(), 0, "buffer should be empty after drain");
405 }
406
407 #[test]
408 fn test_deferred_swap_and_drain_clears_deleted_ids() {
409 let idx = DeferredIndexer::new(enabled_config(1024));
410 idx.push(1, vec![1.0]);
411 idx.remove(1);
412 let _drained = idx.swap_and_drain();
413 // After drain, deleted_ids should be cleared
414 assert!(idx.deleted_ids.read().is_empty());
415 }
416
417 #[test]
418 fn test_deferred_swap_and_drain_reactivates_buffer() {
419 // Regression: swap_and_drain must re-activate the buffer so that
420 // pushes between drain and the next merge are not silently dropped.
421 let idx = DeferredIndexer::new(enabled_config(1024));
422 idx.push(1, vec![1.0]);
423 let _ = idx.swap_and_drain();
424
425 // After drain, the buffer should be re-activated and accept pushes.
426 idx.push(2, vec![2.0]);
427 assert_eq!(idx.pending_count(), 1, "push after drain must succeed");
428 assert!(
429 idx.is_searchable(),
430 "buffer should be searchable after push"
431 );
432 }
433
434 #[test]
435 fn test_deferred_drain_all_leaves_buffer_inactive() {
436 // drain_all is for shutdown — buffer is left inactive (not
437 // re-activated like swap_and_drain). A subsequent push *will*
438 // re-activate via ensure_buffer_active, but there is a window
439 // where the buffer is inactive immediately after drain_all.
440 let idx = DeferredIndexer::new(enabled_config(1024));
441 idx.push(1, vec![1.0]);
442 let _ = idx.drain_all();
443
444 // Immediately after drain_all the buffer is inactive.
445 assert!(
446 !idx.is_searchable(),
447 "buffer must not be searchable immediately after drain_all"
448 );
449 assert_eq!(
450 idx.pending_count(),
451 0,
452 "buffer should be empty after drain_all"
453 );
454 }
455
456 // ── Merge with HNSW tests ────────────────────────────────────────────
457
458 #[test]
459 fn test_deferred_merge_with_hnsw() {
460 let idx = DeferredIndexer::new(enabled_config(1024));
461 idx.push(10, vec![0.9, 0.1]);
462 idx.push(30, vec![0.5, 0.5]);
463
464 // HNSW results: id=10 (also in buffer) and id=20 (only in HNSW)
465 let hnsw = vec![(10, 0.95_f32), (20, 0.80_f32)];
466 let merged = idx.merge_with_hnsw(hnsw, &[1.0, 0.0], 3, DistanceMetric::Cosine);
467
468 // No duplicate IDs
469 let ids: Vec<u64> = merged.iter().map(|(id, _)| *id).collect();
470 let unique: HashSet<u64> = ids.iter().copied().collect();
471 assert_eq!(ids.len(), unique.len(), "no duplicate IDs");
472
473 // All three IDs should be present (10 from buffer, 20 from HNSW, 30 from buffer)
474 assert_eq!(merged.len(), 3);
475 assert!(ids.contains(&10));
476 assert!(ids.contains(&20));
477 assert!(ids.contains(&30));
478
479 // Buffer score for id=10 should be kept (not the HNSW score of 0.95),
480 // because the buffer holds more-recent data (upserts route there).
481 let id10_score = merged.iter().find(|(id, _)| *id == 10).map(|(_, s)| *s);
482 assert!(
483 (id10_score.unwrap_or(0.0) - 0.95).abs() > f32::EPSILON,
484 "buffer score should be authoritative for id=10, not HNSW"
485 );
486 }
487
488 #[test]
489 fn test_deferred_merge_with_hnsw_empty_buffer() {
490 let idx = DeferredIndexer::new(enabled_config(1024));
491 // Buffer is empty — merge should return HNSW results unchanged
492 let hnsw = vec![(1, 0.9_f32), (2, 0.8_f32)];
493 let merged = idx.merge_with_hnsw(hnsw.clone(), &[1.0, 0.0], 5, DistanceMetric::Cosine);
494 assert_eq!(merged, hnsw);
495 }
496
497 // ── Drain-all test ───────────────────────────────────────────────────
498
499 #[test]
500 fn test_deferred_drain_all() {
501 let idx = DeferredIndexer::new(enabled_config(1024));
502 idx.push(1, vec![1.0]);
503 idx.push(2, vec![2.0]);
504
505 let all = idx.drain_all();
506 assert_eq!(all.len(), 2);
507 assert_eq!(idx.pending_count(), 0);
508 assert!(!idx.is_searchable(), "not searchable after drain_all");
509 }
510
511 // ── Config serde test ────────────────────────────────────────────────
512
513 #[test]
514 fn test_deferred_config_serde() {
515 let config = DeferredIndexerConfig {
516 enabled: true,
517 merge_threshold: 512,
518 max_buffer_age_ms: 3000,
519 };
520 let json = serde_json::to_string(&config).expect("serialize");
521 let restored: DeferredIndexerConfig = serde_json::from_str(&json).expect("deserialize");
522 assert!(restored.enabled);
523 assert_eq!(restored.merge_threshold, 512);
524 assert_eq!(restored.max_buffer_age_ms, 3000);
525 }
526
527 #[test]
528 fn test_deferred_config_serde_defaults() {
529 let json = "{}";
530 let config: DeferredIndexerConfig = serde_json::from_str(json).expect("deserialize empty");
531 assert!(!config.enabled);
532 assert_eq!(config.merge_threshold, DEFAULT_MERGE_THRESHOLD);
533 assert_eq!(config.max_buffer_age_ms, DEFAULT_MAX_BUFFER_AGE_MS);
534 }
535
536 // ── Edge cases ───────────────────────────────────────────────────────
537
538 #[test]
539 fn test_deferred_should_merge_reflects_threshold() {
540 let idx = DeferredIndexer::new(enabled_config(2));
541 assert!(!idx.should_merge());
542 idx.push(1, vec![1.0]);
543 assert!(!idx.should_merge());
544 idx.push(2, vec![2.0]);
545 assert!(idx.should_merge());
546 }
547
548 #[test]
549 fn test_deferred_is_enabled_reflects_config() {
550 let enabled = DeferredIndexer::new(enabled_config(1024));
551 assert!(enabled.is_enabled());
552 let disabled = DeferredIndexer::new(DeferredIndexerConfig::default());
553 assert!(!disabled.is_enabled());
554 }
555}