Skip to main content

velesdb_core/collection/streaming/
deferred.rs

1//! Deferred indexer for high-throughput sequential vector inserts.
2//!
3//! The [`DeferredIndexer`] buffers incoming vectors in a single write buffer
4//! and exposes them to search via brute-force scan while they await insertion
5//! into the HNSW graph. This decouples the write path (fast, O(1) per point)
6//! from the index path (slower, O(log n) per point) and enables
7//! threshold-triggered merge.
8//!
9//! # Single-buffer with threshold-triggered merge
10//!
11//! The indexer holds one buffer that accepts writes. When the buffer reaches
12//! `merge_threshold`, [`swap_and_drain`](DeferredIndexer::swap_and_drain)
13//! drains it and returns the vectors for the caller to batch-insert into HNSW.
14//!
15//! # Deleted IDs
16//!
17//! When a point is deleted while buffered, its ID is recorded in a
18//! `deleted_ids` set. Search results are filtered against this set so that
19//! deleted vectors never surface. The set is cleared on drain (the HNSW
20//! tombstone system takes over after merge).
21//!
22//! # Lock ordering
23//!
24//! `DeferredIndexer` is above `DeltaBuffer` (position 10) in the lock order.
25//! The `swap_lock` (position 10.1) must never be held while acquiring any
26//! lower-numbered lock.
27
28use super::delta::DeltaBuffer;
29use crate::distance::DistanceMetric;
30use parking_lot::{Mutex, RwLock};
31use rustc_hash::FxHashSet;
32use serde::{Deserialize, Serialize};
33use std::sync::Arc;
34
35// ── Constants ────────────────────────────────────────────────────────────────
36
37/// Default number of buffered vectors before a merge is triggered.
38const DEFAULT_MERGE_THRESHOLD: usize = 1024;
39
40/// Default maximum age of buffered data before a time-based merge (ms).
41const DEFAULT_MAX_BUFFER_AGE_MS: u64 = 5000;
42
43// ── Configuration ────────────────────────────────────────────────────────────
44
45/// Configuration for the [`DeferredIndexer`].
46///
47/// Controls whether deferred indexing is enabled, how many vectors to
48/// buffer before triggering a merge, and the maximum age of buffered data.
49///
50/// # Examples
51///
52/// ```
53/// use velesdb_core::collection::streaming::DeferredIndexerConfig;
54///
55/// let config = DeferredIndexerConfig::default();
56/// assert!(!config.enabled);
57/// assert_eq!(config.merge_threshold, 1024);
58/// ```
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct DeferredIndexerConfig {
61    /// Whether deferred indexing is enabled (default: `false`).
62    #[serde(default)]
63    pub enabled: bool,
64
65    /// Number of buffered vectors that triggers a merge into HNSW.
66    #[serde(default = "default_merge_threshold")]
67    pub merge_threshold: usize,
68
69    /// Maximum age (milliseconds) of the oldest buffered vector before a
70    /// time-based merge is triggered.
71    #[serde(default = "default_max_buffer_age_ms")]
72    pub max_buffer_age_ms: u64,
73}
74
75fn default_merge_threshold() -> usize {
76    DEFAULT_MERGE_THRESHOLD
77}
78
79fn default_max_buffer_age_ms() -> u64 {
80    DEFAULT_MAX_BUFFER_AGE_MS
81}
82
83impl Default for DeferredIndexerConfig {
84    fn default() -> Self {
85        Self {
86            enabled: false,
87            merge_threshold: DEFAULT_MERGE_THRESHOLD,
88            max_buffer_age_ms: DEFAULT_MAX_BUFFER_AGE_MS,
89        }
90    }
91}
92
93// ── DeferredIndexer ──────────────────────────────────────────────────────────
94
95/// Buffers vectors for deferred HNSW insertion with brute-force searchability.
96///
97/// See the [module-level docs](self) for design details.
98pub struct DeferredIndexer {
99    /// Write buffer — accepts pushes and is drained on merge.
100    buffer: Arc<DeltaBuffer>,
101
102    /// Serializes swap-and-drain operations so only one drain runs at a time.
103    swap_lock: Mutex<()>,
104
105    /// IDs deleted while in the buffer. Filtered out of search results.
106    /// Uses `FxHashSet` for faster integer hashing on the hot search path.
107    deleted_ids: RwLock<FxHashSet<u64>>,
108
109    /// Configuration (immutable after construction).
110    config: DeferredIndexerConfig,
111}
112
113impl DeferredIndexer {
114    /// Creates a new `DeferredIndexer` with the given configuration.
115    ///
116    /// The buffer starts inactive. If `config.enabled` is `false`, all
117    /// write operations are no-ops.
118    #[must_use]
119    pub fn new(config: DeferredIndexerConfig) -> Self {
120        Self {
121            buffer: Arc::new(DeltaBuffer::new()),
122            swap_lock: Mutex::new(()),
123            deleted_ids: RwLock::new(FxHashSet::default()),
124            config,
125        }
126    }
127
128    /// Whether deferred indexing is enabled.
129    #[must_use]
130    pub fn is_enabled(&self) -> bool {
131        self.config.enabled
132    }
133
134    /// Pushes a vector into the write buffer.
135    ///
136    /// Activates the buffer lazily on first write. Returns `true` if
137    /// the buffer has reached `merge_threshold`, signaling the caller
138    /// to trigger a merge.
139    ///
140    /// No-op if deferred indexing is disabled.
141    ///
142    /// # TOCTOU note
143    ///
144    /// The `enabled` check and `len() >= threshold` read are not atomic with
145    /// the push. This is benign: a concurrent drain may reset the count
146    /// between push and the threshold check, causing a missed merge signal.
147    /// The next push will re-trigger.
148    ///
149    /// A previous TOCTOU window existed where `swap_and_drain` could
150    /// deactivate the buffer between `ensure_buffer_active` and the
151    /// underlying `buffer.push`, causing the vector to be silently dropped.
152    /// This is fixed: `swap_and_drain` now re-activates the buffer after
153    /// draining so pushes between drain and the next merge succeed.
154    pub fn push(&self, id: u64, vector: Vec<f32>) -> bool {
155        if !self.config.enabled {
156            return false;
157        }
158        self.ensure_buffer_active();
159        self.buffer.push(id, vector);
160        self.buffer.len() >= self.config.merge_threshold
161    }
162
163    /// Batch-pushes vectors into the write buffer.
164    ///
165    /// Returns `true` if the buffer has reached `merge_threshold`.
166    /// No-op if deferred indexing is disabled.
167    pub fn extend(&self, entries: impl IntoIterator<Item = (u64, Vec<f32>)>) -> bool {
168        if !self.config.enabled {
169            return false;
170        }
171        self.ensure_buffer_active();
172        self.buffer.extend(entries);
173        self.buffer.len() >= self.config.merge_threshold
174    }
175
176    /// Marks `id` as deleted, removing it from the buffer.
177    ///
178    /// The ID is added to `deleted_ids` so that search results are filtered
179    /// even if the vector was already snapshot for a concurrent search.
180    pub fn remove(&self, id: u64) {
181        self.buffer.remove(id);
182        self.deleted_ids.write().insert(id);
183    }
184
185    /// Brute-force searches the buffer, filtering deleted IDs.
186    ///
187    /// Results are sorted by the metric ordering and truncated to `k`.
188    ///
189    /// To compensate for post-filter attrition, the buffer is queried with
190    /// `k + deleted_ids.len()` candidates. This is bounded: `deleted_ids`
191    /// never exceeds `merge_threshold` entries (cleared on every drain).
192    ///
193    /// # TOCTOU note
194    ///
195    /// The `deleted_ids` snapshot is read under a separate lock from the
196    /// buffer search. A concurrent delete between the buffer snapshot and the
197    /// `deleted_ids` read is benign: the ID will be filtered on the next
198    /// search after the delete completes.
199    #[must_use]
200    pub fn search(&self, query: &[f32], k: usize, metric: DistanceMetric) -> Vec<(u64, f32)> {
201        let deleted = self.deleted_ids.read();
202        let overfetch = k.saturating_add(deleted.len());
203        let buffer_results = self.buffer.search(query, overfetch, metric);
204        let mut filtered = filter_deleted(buffer_results, &deleted);
205        drop(deleted);
206        metric.sort_results(&mut filtered);
207        filtered.truncate(k);
208        filtered
209    }
210
211    /// Merges HNSW results with deferred buffer results.
212    ///
213    /// Buffer is authoritative on duplicate IDs (more recent data): when a
214    /// point is upserted while deferred indexing is active, the new vector
215    /// goes to the buffer while HNSW still holds the stale vector. On ID
216    /// conflict the buffer score is kept, mirroring `merge_with_delta` in
217    /// `delta.rs`.
218    ///
219    /// Deleted IDs are filtered from buffer results but not from HNSW
220    /// results (HNSW has its own tombstone system).
221    #[must_use]
222    pub fn merge_with_hnsw(
223        &self,
224        hnsw_results: Vec<(u64, f32)>,
225        query: &[f32],
226        k: usize,
227        metric: DistanceMetric,
228    ) -> Vec<(u64, f32)> {
229        let buffer_results = self.search(query, k, metric);
230        if buffer_results.is_empty() {
231            return hnsw_results;
232        }
233        // Buffer holds more-recent data (upserts route through buffer, not HNSW).
234        // On ID conflict, keep the buffer score.
235        let buffer_ids: FxHashSet<u64> = buffer_results.iter().map(|(id, _)| *id).collect();
236        let mut combined: Vec<(u64, f32)> = hnsw_results
237            .into_iter()
238            .filter(|(id, _)| !buffer_ids.contains(id))
239            .collect();
240        combined.extend(buffer_results);
241        metric.sort_results(&mut combined);
242        combined.truncate(k);
243        combined
244    }
245
246    /// Drains the buffer and returns vectors for HNSW insertion.
247    ///
248    /// After this call the buffer is empty but **re-activated** so that
249    /// pushes arriving between drain and the next merge are not silently
250    /// dropped. The `deleted_ids` set is cleared because the caller is
251    /// expected to apply deletions to HNSW after merge.
252    ///
253    /// Serialized by an internal mutex so concurrent calls are safe (the
254    /// second caller gets an empty drain).
255    pub fn swap_and_drain(&self) -> Vec<(u64, Vec<f32>)> {
256        let _guard = self.swap_lock.lock();
257        let drained = self.buffer.deactivate_and_drain();
258        self.deleted_ids.write().clear();
259        // Re-activate the buffer so pushes between drain and next merge
260        // are not silently dropped (fixes TOCTOU race with concurrent push).
261        self.buffer.activate();
262        drained
263    }
264
265    /// Total number of pending (not yet indexed) vectors in the buffer.
266    #[must_use]
267    pub fn pending_count(&self) -> usize {
268        self.buffer.len()
269    }
270
271    /// Returns `true` if the buffer has reached `merge_threshold`.
272    #[must_use]
273    pub fn should_merge(&self) -> bool {
274        self.buffer.len() >= self.config.merge_threshold
275    }
276
277    /// Returns `true` if deferred indexing is enabled and the buffer has
278    /// searchable data.
279    #[must_use]
280    pub fn is_searchable(&self) -> bool {
281        self.config.enabled && self.buffer.is_searchable()
282    }
283
284    /// Drains all vectors from the buffer (for shutdown / flush).
285    ///
286    /// Clears `deleted_ids`. After this call the buffer is empty and
287    /// inactive.
288    pub fn drain_all(&self) -> Vec<(u64, Vec<f32>)> {
289        let _guard = self.swap_lock.lock();
290        let all = self.buffer.deactivate_and_drain();
291        self.deleted_ids.write().clear();
292        all
293    }
294
295    /// Lazily activates the buffer if it is not already active.
296    fn ensure_buffer_active(&self) {
297        if !self.buffer.is_active() {
298            self.buffer.activate();
299        }
300    }
301}
302
303// ── Helpers ──────────────────────────────────────────────────────────────────
304
305/// Filters out deleted IDs from a result set.
306fn filter_deleted(results: Vec<(u64, f32)>, deleted: &FxHashSet<u64>) -> Vec<(u64, f32)> {
307    if deleted.is_empty() {
308        return results;
309    }
310    results
311        .into_iter()
312        .filter(|(id, _)| !deleted.contains(id))
313        .collect()
314}
315
316// ── Tests ────────────────────────────────────────────────────────────────────
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use std::collections::HashSet;
322
323    /// Helper: builds an enabled config with a custom threshold.
324    fn enabled_config(threshold: usize) -> DeferredIndexerConfig {
325        DeferredIndexerConfig {
326            enabled: true,
327            merge_threshold: threshold,
328            ..DeferredIndexerConfig::default()
329        }
330    }
331
332    // ── Push tests ───────────────────────────────────────────────────────
333
334    #[test]
335    fn test_deferred_push_when_enabled() {
336        let idx = DeferredIndexer::new(enabled_config(1024));
337        idx.push(1, vec![1.0, 0.0, 0.0]);
338        idx.push(2, vec![0.0, 1.0, 0.0]);
339        assert_eq!(idx.pending_count(), 2);
340    }
341
342    #[test]
343    fn test_deferred_push_returns_true_at_threshold() {
344        let idx = DeferredIndexer::new(enabled_config(3));
345        assert!(!idx.push(1, vec![1.0]));
346        assert!(!idx.push(2, vec![2.0]));
347        assert!(idx.push(3, vec![3.0]), "third push should hit threshold");
348    }
349
350    #[test]
351    fn test_deferred_push_noop_when_disabled() {
352        let config = DeferredIndexerConfig::default(); // enabled=false
353        let idx = DeferredIndexer::new(config);
354        let triggered = idx.push(1, vec![1.0, 2.0]);
355        assert!(!triggered);
356        assert_eq!(idx.pending_count(), 0);
357    }
358
359    #[test]
360    fn test_deferred_extend_returns_true_at_threshold() {
361        let idx = DeferredIndexer::new(enabled_config(3));
362        let entries = vec![(1, vec![1.0]), (2, vec![2.0]), (3, vec![3.0])];
363        assert!(idx.extend(entries), "batch should hit threshold");
364    }
365
366    // ── Search tests ─────────────────────────────────────────────────────
367
368    #[test]
369    fn test_deferred_search_finds_buffered_vectors() {
370        let idx = DeferredIndexer::new(enabled_config(1024));
371        idx.push(1, vec![1.0, 0.0]);
372        idx.push(2, vec![0.0, 1.0]);
373
374        let results = idx.search(&[1.0, 0.0], 2, DistanceMetric::Cosine);
375        assert_eq!(results.len(), 2);
376        // Cosine: id=1 (identical to query) should be first
377        assert_eq!(results[0].0, 1);
378    }
379
380    #[test]
381    fn test_deferred_search_filters_deleted_ids() {
382        let idx = DeferredIndexer::new(enabled_config(1024));
383        idx.push(1, vec![1.0, 0.0, 0.0]);
384        idx.push(2, vec![0.0, 1.0, 0.0]);
385        idx.push(3, vec![0.0, 0.0, 1.0]);
386        idx.remove(2);
387
388        let results = idx.search(&[1.0, 0.0, 0.0], 10, DistanceMetric::Euclidean);
389        let ids: Vec<u64> = results.iter().map(|(id, _)| *id).collect();
390        assert!(!ids.contains(&2), "deleted ID 2 must not appear in results");
391        assert_eq!(ids.len(), 2);
392    }
393
394    // ── Swap and drain tests ─────────────────────────────────────────────
395
396    #[test]
397    fn test_deferred_swap_and_drain() {
398        let idx = DeferredIndexer::new(enabled_config(1024));
399        idx.push(1, vec![1.0]);
400        idx.push(2, vec![2.0]);
401
402        let drained = idx.swap_and_drain();
403        assert_eq!(drained.len(), 2);
404        assert_eq!(idx.pending_count(), 0, "buffer should be empty after drain");
405    }
406
407    #[test]
408    fn test_deferred_swap_and_drain_clears_deleted_ids() {
409        let idx = DeferredIndexer::new(enabled_config(1024));
410        idx.push(1, vec![1.0]);
411        idx.remove(1);
412        let _drained = idx.swap_and_drain();
413        // After drain, deleted_ids should be cleared
414        assert!(idx.deleted_ids.read().is_empty());
415    }
416
417    #[test]
418    fn test_deferred_swap_and_drain_reactivates_buffer() {
419        // Regression: swap_and_drain must re-activate the buffer so that
420        // pushes between drain and the next merge are not silently dropped.
421        let idx = DeferredIndexer::new(enabled_config(1024));
422        idx.push(1, vec![1.0]);
423        let _ = idx.swap_and_drain();
424
425        // After drain, the buffer should be re-activated and accept pushes.
426        idx.push(2, vec![2.0]);
427        assert_eq!(idx.pending_count(), 1, "push after drain must succeed");
428        assert!(
429            idx.is_searchable(),
430            "buffer should be searchable after push"
431        );
432    }
433
434    #[test]
435    fn test_deferred_drain_all_leaves_buffer_inactive() {
436        // drain_all is for shutdown — buffer is left inactive (not
437        // re-activated like swap_and_drain). A subsequent push *will*
438        // re-activate via ensure_buffer_active, but there is a window
439        // where the buffer is inactive immediately after drain_all.
440        let idx = DeferredIndexer::new(enabled_config(1024));
441        idx.push(1, vec![1.0]);
442        let _ = idx.drain_all();
443
444        // Immediately after drain_all the buffer is inactive.
445        assert!(
446            !idx.is_searchable(),
447            "buffer must not be searchable immediately after drain_all"
448        );
449        assert_eq!(
450            idx.pending_count(),
451            0,
452            "buffer should be empty after drain_all"
453        );
454    }
455
456    // ── Merge with HNSW tests ────────────────────────────────────────────
457
458    #[test]
459    fn test_deferred_merge_with_hnsw() {
460        let idx = DeferredIndexer::new(enabled_config(1024));
461        idx.push(10, vec![0.9, 0.1]);
462        idx.push(30, vec![0.5, 0.5]);
463
464        // HNSW results: id=10 (also in buffer) and id=20 (only in HNSW)
465        let hnsw = vec![(10, 0.95_f32), (20, 0.80_f32)];
466        let merged = idx.merge_with_hnsw(hnsw, &[1.0, 0.0], 3, DistanceMetric::Cosine);
467
468        // No duplicate IDs
469        let ids: Vec<u64> = merged.iter().map(|(id, _)| *id).collect();
470        let unique: HashSet<u64> = ids.iter().copied().collect();
471        assert_eq!(ids.len(), unique.len(), "no duplicate IDs");
472
473        // All three IDs should be present (10 from buffer, 20 from HNSW, 30 from buffer)
474        assert_eq!(merged.len(), 3);
475        assert!(ids.contains(&10));
476        assert!(ids.contains(&20));
477        assert!(ids.contains(&30));
478
479        // Buffer score for id=10 should be kept (not the HNSW score of 0.95),
480        // because the buffer holds more-recent data (upserts route there).
481        let id10_score = merged.iter().find(|(id, _)| *id == 10).map(|(_, s)| *s);
482        assert!(
483            (id10_score.unwrap_or(0.0) - 0.95).abs() > f32::EPSILON,
484            "buffer score should be authoritative for id=10, not HNSW"
485        );
486    }
487
488    #[test]
489    fn test_deferred_merge_with_hnsw_empty_buffer() {
490        let idx = DeferredIndexer::new(enabled_config(1024));
491        // Buffer is empty — merge should return HNSW results unchanged
492        let hnsw = vec![(1, 0.9_f32), (2, 0.8_f32)];
493        let merged = idx.merge_with_hnsw(hnsw.clone(), &[1.0, 0.0], 5, DistanceMetric::Cosine);
494        assert_eq!(merged, hnsw);
495    }
496
497    // ── Drain-all test ───────────────────────────────────────────────────
498
499    #[test]
500    fn test_deferred_drain_all() {
501        let idx = DeferredIndexer::new(enabled_config(1024));
502        idx.push(1, vec![1.0]);
503        idx.push(2, vec![2.0]);
504
505        let all = idx.drain_all();
506        assert_eq!(all.len(), 2);
507        assert_eq!(idx.pending_count(), 0);
508        assert!(!idx.is_searchable(), "not searchable after drain_all");
509    }
510
511    // ── Config serde test ────────────────────────────────────────────────
512
513    #[test]
514    fn test_deferred_config_serde() {
515        let config = DeferredIndexerConfig {
516            enabled: true,
517            merge_threshold: 512,
518            max_buffer_age_ms: 3000,
519        };
520        let json = serde_json::to_string(&config).expect("serialize");
521        let restored: DeferredIndexerConfig = serde_json::from_str(&json).expect("deserialize");
522        assert!(restored.enabled);
523        assert_eq!(restored.merge_threshold, 512);
524        assert_eq!(restored.max_buffer_age_ms, 3000);
525    }
526
527    #[test]
528    fn test_deferred_config_serde_defaults() {
529        let json = "{}";
530        let config: DeferredIndexerConfig = serde_json::from_str(json).expect("deserialize empty");
531        assert!(!config.enabled);
532        assert_eq!(config.merge_threshold, DEFAULT_MERGE_THRESHOLD);
533        assert_eq!(config.max_buffer_age_ms, DEFAULT_MAX_BUFFER_AGE_MS);
534    }
535
536    // ── Edge cases ───────────────────────────────────────────────────────
537
538    #[test]
539    fn test_deferred_should_merge_reflects_threshold() {
540        let idx = DeferredIndexer::new(enabled_config(2));
541        assert!(!idx.should_merge());
542        idx.push(1, vec![1.0]);
543        assert!(!idx.should_merge());
544        idx.push(2, vec![2.0]);
545        assert!(idx.should_merge());
546    }
547
548    #[test]
549    fn test_deferred_is_enabled_reflects_config() {
550        let enabled = DeferredIndexer::new(enabled_config(1024));
551        assert!(enabled.is_enabled());
552        let disabled = DeferredIndexer::new(DeferredIndexerConfig::default());
553        assert!(!disabled.is_enabled());
554    }
555}