Skip to main content

velesdb_core/collection/streaming/
deferred.rs

1//! Deferred indexer for high-throughput sequential vector inserts.
2//!
3//! The [`DeferredIndexer`] buffers incoming vectors in a single write buffer
4//! and exposes them to search via brute-force scan while they await insertion
5//! into the HNSW graph. This decouples the write path (fast, O(1) per point)
6//! from the index path (slower, O(log n) per point) and enables
7//! threshold-triggered merge.
8//!
9//! # Single-buffer with threshold-triggered merge
10//!
11//! The indexer holds one buffer that accepts writes. When the buffer reaches
12//! `merge_threshold`, [`swap_and_drain`](DeferredIndexer::swap_and_drain)
13//! drains it and returns the vectors for the caller to batch-insert into HNSW.
14//!
15//! # Deleted IDs
16//!
17//! When a point is deleted while buffered, its ID is recorded in a
18//! `deleted_ids` set. Search results are filtered against this set so that
19//! deleted vectors never surface. The set is cleared on drain (the HNSW
20//! tombstone system takes over after merge).
21//!
22//! # Lock ordering
23//!
24//! `DeferredIndexer` is above `DeltaBuffer` (position 10) in the lock order.
25//! The `swap_lock` (position 10.1) must never be held while acquiring any
26//! lower-numbered lock.
27
28use super::delta::DeltaBuffer;
29use crate::distance::DistanceMetric;
30use parking_lot::{Mutex, RwLock};
31use rustc_hash::FxHashSet;
32use serde::{Deserialize, Serialize};
33use std::sync::Arc;
34
35// ── Constants ────────────────────────────────────────────────────────────────
36
37/// Default number of buffered vectors before a merge is triggered.
38const DEFAULT_MERGE_THRESHOLD: usize = 1024;
39
40/// Default maximum age of buffered data before a time-based merge (ms).
41const DEFAULT_MAX_BUFFER_AGE_MS: u64 = 5000;
42
43// ── Configuration ────────────────────────────────────────────────────────────
44
45/// Configuration for the [`DeferredIndexer`].
46///
47/// Controls whether deferred indexing is enabled, how many vectors to
48/// buffer before triggering a merge, and the maximum age of buffered data.
49///
50/// # Examples
51///
52/// ```
53/// use velesdb_core::collection::streaming::DeferredIndexerConfig;
54///
55/// let config = DeferredIndexerConfig::default();
56/// assert!(!config.enabled);
57/// assert_eq!(config.merge_threshold, 1024);
58/// ```
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct DeferredIndexerConfig {
61    /// Whether deferred indexing is enabled (default: `false`).
62    #[serde(default)]
63    pub enabled: bool,
64
65    /// Number of buffered vectors that triggers a merge into HNSW.
66    #[serde(default = "default_merge_threshold")]
67    pub merge_threshold: usize,
68
69    /// Maximum age (milliseconds) of the oldest buffered vector before a
70    /// time-based merge is triggered.
71    #[serde(default = "default_max_buffer_age_ms")]
72    pub max_buffer_age_ms: u64,
73}
74
75fn default_merge_threshold() -> usize {
76    DEFAULT_MERGE_THRESHOLD
77}
78
79fn default_max_buffer_age_ms() -> u64 {
80    DEFAULT_MAX_BUFFER_AGE_MS
81}
82
83impl Default for DeferredIndexerConfig {
84    fn default() -> Self {
85        Self {
86            enabled: false,
87            merge_threshold: DEFAULT_MERGE_THRESHOLD,
88            max_buffer_age_ms: DEFAULT_MAX_BUFFER_AGE_MS,
89        }
90    }
91}
92
93// ── DeferredIndexer ──────────────────────────────────────────────────────────
94
95/// Buffers vectors for deferred HNSW insertion with brute-force searchability.
96///
97/// See the [module-level docs](self) for design details.
98pub struct DeferredIndexer {
99    /// Write buffer — accepts pushes and is drained on merge.
100    buffer: Arc<DeltaBuffer>,
101
102    /// Serializes swap-and-drain operations so only one drain runs at a time.
103    swap_lock: Mutex<()>,
104
105    /// IDs deleted while in the buffer. Filtered out of search results.
106    /// Uses `FxHashSet` for faster integer hashing on the hot search path.
107    deleted_ids: RwLock<FxHashSet<u64>>,
108
109    /// Configuration (immutable after construction).
110    config: DeferredIndexerConfig,
111}
112
113impl DeferredIndexer {
114    /// Creates a new `DeferredIndexer` with the given configuration.
115    ///
116    /// The buffer starts inactive. If `config.enabled` is `false`, all
117    /// write operations are no-ops.
118    #[must_use]
119    pub fn new(config: DeferredIndexerConfig) -> Self {
120        Self {
121            buffer: Arc::new(DeltaBuffer::new()),
122            swap_lock: Mutex::new(()),
123            deleted_ids: RwLock::new(FxHashSet::default()),
124            config,
125        }
126    }
127
128    /// Whether deferred indexing is enabled.
129    #[must_use]
130    pub fn is_enabled(&self) -> bool {
131        self.config.enabled
132    }
133
134    /// Pushes a vector into the write buffer.
135    ///
136    /// Activates the buffer lazily on first write. Returns `true` if
137    /// the buffer has reached `merge_threshold`, signaling the caller
138    /// to trigger a merge.
139    ///
140    /// No-op if deferred indexing is disabled.
141    ///
142    /// # TOCTOU note
143    ///
144    /// The `enabled` check and `len() >= threshold` read are not atomic with
145    /// the push. This is benign: a concurrent drain may reset the count
146    /// between push and the threshold check, causing a missed merge signal.
147    /// The next push will re-trigger.
148    ///
149    /// A previous TOCTOU window existed where `swap_and_drain` could
150    /// deactivate the buffer between `ensure_buffer_active` and the
151    /// underlying `buffer.push`, causing the vector to be silently dropped.
152    /// This is fixed: `swap_and_drain` now re-activates the buffer after
153    /// draining so pushes between drain and the next merge succeed.
154    pub fn push(&self, id: u64, vector: Vec<f32>) -> bool {
155        if !self.config.enabled {
156            return false;
157        }
158        self.ensure_buffer_active();
159        self.buffer.push(id, vector);
160        self.buffer.len() >= self.config.merge_threshold
161    }
162
163    /// Batch-pushes vectors into the write buffer.
164    ///
165    /// Returns `true` if the buffer has reached `merge_threshold`.
166    /// No-op if deferred indexing is disabled.
167    pub fn extend(&self, entries: impl IntoIterator<Item = (u64, Vec<f32>)>) -> bool {
168        if !self.config.enabled {
169            return false;
170        }
171        self.ensure_buffer_active();
172        self.buffer.extend(entries);
173        self.buffer.len() >= self.config.merge_threshold
174    }
175
176    /// Marks `id` as deleted, removing it from the buffer.
177    ///
178    /// The ID is added to `deleted_ids` so that search results are filtered
179    /// even if the vector was already snapshot for a concurrent search.
180    pub fn remove(&self, id: u64) {
181        self.buffer.remove(id);
182        self.deleted_ids.write().insert(id);
183    }
184
185    /// Brute-force searches the buffer, filtering deleted IDs.
186    ///
187    /// Results are sorted by the metric ordering and truncated to `k`.
188    ///
189    /// To compensate for post-filter attrition, the buffer is queried with
190    /// `k + deleted_ids.len()` candidates. This is bounded: `deleted_ids`
191    /// never exceeds `merge_threshold` entries (cleared on every drain).
192    ///
193    /// # TOCTOU note
194    ///
195    /// The `deleted_ids` snapshot is read under a separate lock from the
196    /// buffer search. A concurrent delete between the buffer snapshot and the
197    /// `deleted_ids` read is benign: the ID will be filtered on the next
198    /// search after the delete completes.
199    #[must_use]
200    pub fn search(&self, query: &[f32], k: usize, metric: DistanceMetric) -> Vec<(u64, f32)> {
201        let deleted = self.deleted_ids.read();
202        let overfetch = k.saturating_add(deleted.len());
203        let buffer_results = self.buffer.search(query, overfetch, metric);
204        let mut filtered = filter_deleted(buffer_results, &deleted);
205        drop(deleted);
206        metric.sort_results(&mut filtered);
207        filtered.truncate(k);
208        filtered
209    }
210
211    /// Merges HNSW results with deferred buffer results.
212    ///
213    /// Buffer is authoritative on duplicate IDs (more recent data): when a
214    /// point is upserted while deferred indexing is active, the new vector
215    /// goes to the buffer while HNSW still holds the stale vector. On ID
216    /// conflict the buffer score is kept, mirroring `merge_with_delta` in
217    /// `delta.rs`.
218    ///
219    /// Deleted IDs are filtered from buffer results but not from HNSW
220    /// results (HNSW has its own tombstone system).
221    #[must_use]
222    pub fn merge_with_hnsw(
223        &self,
224        hnsw_results: Vec<(u64, f32)>,
225        query: &[f32],
226        k: usize,
227        metric: DistanceMetric,
228    ) -> Vec<(u64, f32)> {
229        let buffer_results = self.search(query, k, metric);
230        if buffer_results.is_empty() {
231            return hnsw_results;
232        }
233        // Buffer holds more-recent data (upserts route through buffer, not HNSW).
234        // On ID conflict, keep the buffer score.
235        let buffer_ids: FxHashSet<u64> = buffer_results.iter().map(|(id, _)| *id).collect();
236        let mut combined: Vec<(u64, f32)> = hnsw_results
237            .into_iter()
238            .filter(|(id, _)| !buffer_ids.contains(id))
239            .collect();
240        combined.extend(buffer_results);
241        metric.sort_results(&mut combined);
242        combined.truncate(k);
243        combined
244    }
245
246    /// Drains the buffer and returns vectors for HNSW insertion.
247    ///
248    /// After this call the buffer is empty but **re-activated** so that
249    /// pushes arriving between drain and the next merge are not silently
250    /// dropped. The `deleted_ids` set is cleared because the caller is
251    /// expected to apply deletions to HNSW after merge.
252    ///
253    /// Serialized by an internal mutex so concurrent calls are safe (the
254    /// second caller gets an empty drain).
255    pub fn swap_and_drain(&self) -> Vec<(u64, Vec<f32>)> {
256        let _guard = self.swap_lock.lock();
257        let drained = self.buffer.deactivate_and_drain();
258        self.deleted_ids.write().clear();
259        // Re-activate the buffer so pushes between drain and next merge
260        // are not silently dropped (fixes TOCTOU race with concurrent push).
261        self.buffer.activate();
262        drained
263    }
264
265    /// Total number of pending (not yet indexed) vectors in the buffer.
266    #[must_use]
267    pub fn pending_count(&self) -> usize {
268        self.buffer.len()
269    }
270
271    /// Returns `true` if the buffer has reached `merge_threshold`.
272    #[must_use]
273    pub fn should_merge(&self) -> bool {
274        self.buffer.len() >= self.config.merge_threshold
275    }
276
277    /// Returns `true` if deferred indexing is enabled and the buffer has
278    /// searchable data.
279    #[must_use]
280    pub fn is_searchable(&self) -> bool {
281        self.config.enabled && self.buffer.is_searchable()
282    }
283
284    /// Drains all vectors from the buffer (for shutdown / flush).
285    ///
286    /// Clears `deleted_ids`. After this call the buffer is empty and
287    /// inactive.
288    pub fn drain_all(&self) -> Vec<(u64, Vec<f32>)> {
289        let _guard = self.swap_lock.lock();
290        let all = self.buffer.deactivate_and_drain();
291        self.deleted_ids.write().clear();
292        all
293    }
294
295    /// Lazily activates the buffer if it is not already active.
296    ///
297    /// Uses the compare-and-swap [`try_activate`](super::delta::DeltaBuffer::try_activate)
298    /// instead of a `is_active()` + `activate()` pair: the atomic transition
299    /// closes the TOCTOU window between the check and the store. An
300    /// `AlreadyActive` result is the expected idempotent case on the hot push
301    /// path (the buffer stays active across many pushes) and is ignored here.
302    fn ensure_buffer_active(&self) {
303        let _ = self.buffer.try_activate();
304    }
305}
306
307// ── Helpers ──────────────────────────────────────────────────────────────────
308
309/// Filters out deleted IDs from a result set.
310fn filter_deleted(results: Vec<(u64, f32)>, deleted: &FxHashSet<u64>) -> Vec<(u64, f32)> {
311    if deleted.is_empty() {
312        return results;
313    }
314    results
315        .into_iter()
316        .filter(|(id, _)| !deleted.contains(id))
317        .collect()
318}
319
320// ── Tests ────────────────────────────────────────────────────────────────────
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use std::collections::HashSet;
326
327    /// Helper: builds an enabled config with a custom threshold.
328    fn enabled_config(threshold: usize) -> DeferredIndexerConfig {
329        DeferredIndexerConfig {
330            enabled: true,
331            merge_threshold: threshold,
332            ..DeferredIndexerConfig::default()
333        }
334    }
335
336    // ── Push tests ───────────────────────────────────────────────────────
337
338    #[test]
339    fn test_deferred_push_when_enabled() {
340        let idx = DeferredIndexer::new(enabled_config(1024));
341        idx.push(1, vec![1.0, 0.0, 0.0]);
342        idx.push(2, vec![0.0, 1.0, 0.0]);
343        assert_eq!(idx.pending_count(), 2);
344    }
345
346    #[test]
347    fn test_deferred_push_returns_true_at_threshold() {
348        let idx = DeferredIndexer::new(enabled_config(3));
349        assert!(!idx.push(1, vec![1.0]));
350        assert!(!idx.push(2, vec![2.0]));
351        assert!(idx.push(3, vec![3.0]), "third push should hit threshold");
352    }
353
354    #[test]
355    fn test_deferred_push_noop_when_disabled() {
356        let config = DeferredIndexerConfig::default(); // enabled=false
357        let idx = DeferredIndexer::new(config);
358        let triggered = idx.push(1, vec![1.0, 2.0]);
359        assert!(!triggered);
360        assert_eq!(idx.pending_count(), 0);
361    }
362
363    #[test]
364    fn test_deferred_extend_returns_true_at_threshold() {
365        let idx = DeferredIndexer::new(enabled_config(3));
366        let entries = vec![(1, vec![1.0]), (2, vec![2.0]), (3, vec![3.0])];
367        assert!(idx.extend(entries), "batch should hit threshold");
368    }
369
370    // ── Search tests ─────────────────────────────────────────────────────
371
372    #[test]
373    fn test_deferred_search_finds_buffered_vectors() {
374        let idx = DeferredIndexer::new(enabled_config(1024));
375        idx.push(1, vec![1.0, 0.0]);
376        idx.push(2, vec![0.0, 1.0]);
377
378        let results = idx.search(&[1.0, 0.0], 2, DistanceMetric::Cosine);
379        assert_eq!(results.len(), 2);
380        // Cosine: id=1 (identical to query) should be first
381        assert_eq!(results[0].0, 1);
382    }
383
384    #[test]
385    fn test_deferred_search_filters_deleted_ids() {
386        let idx = DeferredIndexer::new(enabled_config(1024));
387        idx.push(1, vec![1.0, 0.0, 0.0]);
388        idx.push(2, vec![0.0, 1.0, 0.0]);
389        idx.push(3, vec![0.0, 0.0, 1.0]);
390        idx.remove(2);
391
392        let results = idx.search(&[1.0, 0.0, 0.0], 10, DistanceMetric::Euclidean);
393        let ids: Vec<u64> = results.iter().map(|(id, _)| *id).collect();
394        assert!(!ids.contains(&2), "deleted ID 2 must not appear in results");
395        assert_eq!(ids.len(), 2);
396    }
397
398    // ── Swap and drain tests ─────────────────────────────────────────────
399
400    #[test]
401    fn test_deferred_swap_and_drain() {
402        let idx = DeferredIndexer::new(enabled_config(1024));
403        idx.push(1, vec![1.0]);
404        idx.push(2, vec![2.0]);
405
406        let drained = idx.swap_and_drain();
407        assert_eq!(drained.len(), 2);
408        assert_eq!(idx.pending_count(), 0, "buffer should be empty after drain");
409    }
410
411    #[test]
412    fn test_deferred_swap_and_drain_clears_deleted_ids() {
413        let idx = DeferredIndexer::new(enabled_config(1024));
414        idx.push(1, vec![1.0]);
415        idx.remove(1);
416        let _drained = idx.swap_and_drain();
417        // After drain, deleted_ids should be cleared
418        assert!(idx.deleted_ids.read().is_empty());
419    }
420
421    #[test]
422    fn test_deferred_swap_and_drain_reactivates_buffer() {
423        // Regression: swap_and_drain must re-activate the buffer so that
424        // pushes between drain and the next merge are not silently dropped.
425        let idx = DeferredIndexer::new(enabled_config(1024));
426        idx.push(1, vec![1.0]);
427        let _ = idx.swap_and_drain();
428
429        // After drain, the buffer should be re-activated and accept pushes.
430        idx.push(2, vec![2.0]);
431        assert_eq!(idx.pending_count(), 1, "push after drain must succeed");
432        assert!(
433            idx.is_searchable(),
434            "buffer should be searchable after push"
435        );
436    }
437
438    #[test]
439    fn test_deferred_drain_all_leaves_buffer_inactive() {
440        // drain_all is for shutdown — buffer is left inactive (not
441        // re-activated like swap_and_drain). A subsequent push *will*
442        // re-activate via ensure_buffer_active, but there is a window
443        // where the buffer is inactive immediately after drain_all.
444        let idx = DeferredIndexer::new(enabled_config(1024));
445        idx.push(1, vec![1.0]);
446        let _ = idx.drain_all();
447
448        // Immediately after drain_all the buffer is inactive.
449        assert!(
450            !idx.is_searchable(),
451            "buffer must not be searchable immediately after drain_all"
452        );
453        assert_eq!(
454            idx.pending_count(),
455            0,
456            "buffer should be empty after drain_all"
457        );
458    }
459
460    // ── Merge with HNSW tests ────────────────────────────────────────────
461
462    #[test]
463    fn test_deferred_merge_with_hnsw() {
464        let idx = DeferredIndexer::new(enabled_config(1024));
465        idx.push(10, vec![0.9, 0.1]);
466        idx.push(30, vec![0.5, 0.5]);
467
468        // HNSW results: id=10 (also in buffer) and id=20 (only in HNSW)
469        let hnsw = vec![(10, 0.95_f32), (20, 0.80_f32)];
470        let merged = idx.merge_with_hnsw(hnsw, &[1.0, 0.0], 3, DistanceMetric::Cosine);
471
472        // No duplicate IDs
473        let ids: Vec<u64> = merged.iter().map(|(id, _)| *id).collect();
474        let unique: HashSet<u64> = ids.iter().copied().collect();
475        assert_eq!(ids.len(), unique.len(), "no duplicate IDs");
476
477        // All three IDs should be present (10 from buffer, 20 from HNSW, 30 from buffer)
478        assert_eq!(merged.len(), 3);
479        assert!(ids.contains(&10));
480        assert!(ids.contains(&20));
481        assert!(ids.contains(&30));
482
483        // Buffer score for id=10 should be kept (not the HNSW score of 0.95),
484        // because the buffer holds more-recent data (upserts route there).
485        let id10_score = merged.iter().find(|(id, _)| *id == 10).map(|(_, s)| *s);
486        assert!(
487            (id10_score.unwrap_or(0.0) - 0.95).abs() > f32::EPSILON,
488            "buffer score should be authoritative for id=10, not HNSW"
489        );
490    }
491
492    #[test]
493    fn test_deferred_merge_with_hnsw_empty_buffer() {
494        let idx = DeferredIndexer::new(enabled_config(1024));
495        // Buffer is empty — merge should return HNSW results unchanged
496        let hnsw = vec![(1, 0.9_f32), (2, 0.8_f32)];
497        let merged = idx.merge_with_hnsw(hnsw.clone(), &[1.0, 0.0], 5, DistanceMetric::Cosine);
498        assert_eq!(merged, hnsw);
499    }
500
501    // ── Drain-all test ───────────────────────────────────────────────────
502
503    #[test]
504    fn test_deferred_drain_all() {
505        let idx = DeferredIndexer::new(enabled_config(1024));
506        idx.push(1, vec![1.0]);
507        idx.push(2, vec![2.0]);
508
509        let all = idx.drain_all();
510        assert_eq!(all.len(), 2);
511        assert_eq!(idx.pending_count(), 0);
512        assert!(!idx.is_searchable(), "not searchable after drain_all");
513    }
514
515    // ── Config serde test ────────────────────────────────────────────────
516
517    #[test]
518    fn test_deferred_config_serde() {
519        let config = DeferredIndexerConfig {
520            enabled: true,
521            merge_threshold: 512,
522            max_buffer_age_ms: 3000,
523        };
524        let json = serde_json::to_string(&config).expect("serialize");
525        let restored: DeferredIndexerConfig = serde_json::from_str(&json).expect("deserialize");
526        assert!(restored.enabled);
527        assert_eq!(restored.merge_threshold, 512);
528        assert_eq!(restored.max_buffer_age_ms, 3000);
529    }
530
531    #[test]
532    fn test_deferred_config_serde_defaults() {
533        let json = "{}";
534        let config: DeferredIndexerConfig = serde_json::from_str(json).expect("deserialize empty");
535        assert!(!config.enabled);
536        assert_eq!(config.merge_threshold, DEFAULT_MERGE_THRESHOLD);
537        assert_eq!(config.max_buffer_age_ms, DEFAULT_MAX_BUFFER_AGE_MS);
538    }
539
540    // ── Edge cases ───────────────────────────────────────────────────────
541
542    #[test]
543    fn test_deferred_should_merge_reflects_threshold() {
544        let idx = DeferredIndexer::new(enabled_config(2));
545        assert!(!idx.should_merge());
546        idx.push(1, vec![1.0]);
547        assert!(!idx.should_merge());
548        idx.push(2, vec![2.0]);
549        assert!(idx.should_merge());
550    }
551
552    #[test]
553    fn test_deferred_is_enabled_reflects_config() {
554        let enabled = DeferredIndexer::new(enabled_config(1024));
555        assert!(enabled.is_enabled());
556        let disabled = DeferredIndexer::new(DeferredIndexerConfig::default());
557        assert!(!disabled.is_enabled());
558    }
559}