velesdb-core 1.15.0

High-performance vector database engine written in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
//! Delta buffer for accumulating vectors during HNSW rebuilds.
//!
//! The [`DeltaBuffer`] holds recently inserted vectors that have not yet been
//! indexed into the HNSW graph (e.g., because a rebuild is in progress).
//! The search pipeline brute-force scans this buffer and merges results with
//! HNSW results for immediate searchability via
//! [`super::delta_merge::merge_with_delta`].
//!
//! # State machine
//!
//! The buffer transitions through three states encoded in the internal `state` field:
//!
//! ```text
//! INACTIVE (0) --activate()--> ACTIVE (1) --deactivate_and_drain()--> DRAINING (2) --> INACTIVE (0)
//! ```
//!
//! - `push` / `extend`: only write when `ACTIVE`.
//! - `search`: scan when `ACTIVE` or `DRAINING` (so concurrent searches during
//!   drain still see the buffered vectors).
//!
//! Future: promote activate() to a CAS so double-activate is detectable (STREAM-02)
//!
//! Currently `activate()` is an unconditional store. A future hardening pass
//! should use `compare_exchange(INACTIVE, ACTIVE)` and return `Err(())` on
//! re-entrance to surface bugs during testing.
//!
//! # Lock ordering
//!
//! `DeltaBuffer` is at position **10** in the collection lock order
//! (after `sparse_indexes` at 9). Code must never hold a delta buffer lock
//! while acquiring a lower-numbered lock.

use crate::distance::DistanceMetric;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::atomic::{AtomicU8, Ordering};

/// Buffer is inactive — not accumulating writes.
const INACTIVE: u8 = 0;
/// Buffer is actively accumulating writes (HNSW rebuild in progress).
const ACTIVE: u8 = 1;
/// Buffer is draining — no new writes accepted, but still readable for search.
const DRAINING: u8 = 2;

/// Delta buffer for streaming inserts during HNSW rebuilds.
///
/// Accumulates `(point_id, vector)` pairs that are in storage but not yet in
/// the HNSW index. When active, search methods brute-force scan the buffer
/// and merge results with HNSW results via
/// [`super::delta_merge::merge_with_delta`].
pub struct DeltaBuffer {
    /// Buffered `(point_id, vector)` pairs awaiting index insertion.
    points: RwLock<Vec<(u64, Vec<f32>)>>,

    /// State machine: `INACTIVE` | `ACTIVE` | `DRAINING`.
    state: AtomicU8,
}

impl DeltaBuffer {
    /// Creates an empty, inactive delta buffer.
    #[must_use]
    pub fn new() -> Self {
        Self {
            points: RwLock::new(Vec::new()),
            state: AtomicU8::new(INACTIVE),
        }
    }

    /// Returns `true` if the delta buffer is actively accumulating vectors
    /// (i.e., an HNSW rebuild is in progress).
    #[must_use]
    pub fn is_active(&self) -> bool {
        self.state.load(Ordering::Acquire) == ACTIVE
    }

    /// Returns true if the buffer contains data that should be merged into search results.
    ///
    /// This is true in both `ACTIVE` and `DRAINING` states: the buffer holds
    /// vectors not yet present in HNSW, so searches must include them regardless
    /// of whether new writes are still being accepted.
    #[must_use]
    pub fn is_searchable(&self) -> bool {
        let s = self.state.load(Ordering::Acquire);
        s == ACTIVE || s == DRAINING
    }

    /// Activates the delta buffer (marks a rebuild as in progress).
    ///
    /// While active, the drain loop will push vectors into this buffer so
    /// that search can find them before they are indexed into HNSW.
    ///
    /// Idempotent: calling `activate()` when already active is a no-op.
    pub fn activate(&self) {
        self.state.store(ACTIVE, Ordering::Release);
    }

    /// Deactivates the buffer and drains all buffered points.
    ///
    /// Transitions `ACTIVE → DRAINING`, takes the points, then sets
    /// `INACTIVE`. Any concurrent `search` call that observes `DRAINING`
    /// may race with this method and observe an empty buffer — that is
    /// architecturally acceptable. The real searchable-immediately guarantee
    /// is provided by the HNSW index rebuild completing after drain
    /// incorporates all drained vectors. Searches racing with
    /// `deactivate_and_drain` during the DRAINING window may miss these
    /// vectors transiently; they will be found via HNSW once the rebuild
    /// completes.
    ///
    /// Returns the accumulated `(point_id, vector)` pairs for progressive
    /// merge into the newly rebuilt HNSW index. After this call, the buffer
    /// is empty and inactive.
    pub fn deactivate_and_drain(&self) -> Vec<(u64, Vec<f32>)> {
        // Mark as DRAINING so concurrent searches can still observe the buffer
        // while we hold the write lock.
        self.state.store(DRAINING, Ordering::Release);
        let mut points = self.points.write();
        let drained = std::mem::take(&mut *points);
        // Set INACTIVE before dropping write lock: this ensures no observable window
        // where state == DRAINING but buffer is empty. A concurrent activate() call
        // seeing INACTIVE will store ACTIVE, and any subsequent push() will contend
        // for the write lock (still held here) then see the empty-but-active buffer.
        // This is correct: the activate→push sequence works on a clean buffer.
        self.state.store(INACTIVE, Ordering::Release);
        drop(points);
        drained
    }

    /// Pushes a single entry into the delta buffer (upsert semantics).
    ///
    /// If an entry with the same `id` already exists, it is replaced.
    /// This prevents duplicate IDs from accumulating when the same point
    /// is inserted multiple times during an HNSW rebuild.
    ///
    /// The retain-then-push is O(n) but acceptable: the buffer is bounded
    /// by `merge_threshold` (typically 1024-4096 entries).
    ///
    /// No-op if the buffer is not in `ACTIVE` state. The check is performed
    /// **inside** the write lock to close the TOCTOU window between `is_active()`
    /// and the actual write.
    pub fn push(&self, id: u64, vector: Vec<f32>) {
        let mut points = self.points.write();
        if self.state.load(Ordering::Acquire) == ACTIVE {
            points.retain(|(existing_id, _)| *existing_id != id);
            points.push((id, vector));
        }
    }

    /// Extends the delta buffer with multiple entries (upsert semantics).
    ///
    /// For each entry, any existing entry with the same ID is replaced.
    /// This prevents duplicate IDs from accumulating in the buffer.
    ///
    /// No-op if the buffer is not in `ACTIVE` state. The check is performed
    /// **inside** the write lock to close the TOCTOU window between `is_active()`
    /// and the actual write.
    pub fn extend(&self, entries: impl IntoIterator<Item = (u64, Vec<f32>)>) {
        let mut points = self.points.write();
        if self.state.load(Ordering::Acquire) == ACTIVE {
            let new_entries: Vec<(u64, Vec<f32>)> = entries.into_iter().collect();
            let new_ids: HashSet<u64> = new_entries.iter().map(|(id, _)| *id).collect();
            points.retain(|(existing_id, _)| !new_ids.contains(existing_id));
            points.extend(new_entries);
        }
    }

    /// Removes all entries matching the given point ID from the buffer.
    ///
    /// Works in any state (`ACTIVE`, `DRAINING`, or `INACTIVE`): a delete
    /// must always purge stale data regardless of the buffer lifecycle.
    /// This prevents ghost results where a deleted vector is still returned
    /// by the delta brute-force scan.
    pub fn remove(&self, id: u64) {
        self.points.write().retain(|(eid, _)| *eid != id);
    }

    /// Returns the number of buffered entries.
    ///
    /// Takes a single read lock. Use [`stats`](Self::stats) when both `len`
    /// and `is_empty` are needed to avoid two separate lock acquisitions.
    #[must_use]
    pub fn len(&self) -> usize {
        self.points.read().len()
    }

    /// Returns `true` if the buffer contains no entries.
    ///
    /// Delegates to `len() == 0` (single lock acquisition).
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Returns `(len, is_empty)` under a single read lock.
    ///
    /// Prefer this over calling `len()` and `is_empty()` separately when both
    /// values are needed, to avoid acquiring the read lock twice.
    #[must_use]
    pub fn stats(&self) -> (usize, bool) {
        let len = self.points.read().len();
        (len, len == 0)
    }

    /// Brute-force searches the delta buffer for the k nearest neighbors.
    ///
    /// Returns an empty `Vec` if the buffer is neither `ACTIVE` nor `DRAINING`.
    /// Takes a brief read lock to snapshot the points, releases it, then
    /// computes distances on the snapshot to avoid holding the lock during
    /// potentially expensive distance calculations.
    #[must_use]
    pub fn search(&self, query: &[f32], k: usize, metric: DistanceMetric) -> Vec<(u64, f32)> {
        let current_state = self.state.load(Ordering::Acquire);
        if current_state != ACTIVE && current_state != DRAINING {
            return Vec::new();
        }

        // Snapshot under a brief read lock, then release before computing distances.
        let snapshot: Vec<(u64, Vec<f32>)> = self.points.read().clone();
        if snapshot.is_empty() {
            return Vec::new();
        }

        let mut results: Vec<(u64, f32)> = snapshot
            .iter()
            .map(|(id, vec)| (*id, metric.calculate(query, vec)))
            .collect();

        metric.sort_results(&mut results);
        results.truncate(k);
        results
    }
}

impl Default for DeltaBuffer {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::super::delta_merge::merge_with_delta;
    use super::*;

    #[test]
    fn test_stream_delta_buffer_compiles_and_defaults_inactive() {
        let buf = DeltaBuffer::new();
        assert!(
            !buf.is_active(),
            "new DeltaBuffer should be inactive by default"
        );
    }

    #[test]
    fn test_stream_delta_buffer_default_trait() {
        let buf = DeltaBuffer::default();
        assert!(!buf.is_active());
    }

    #[test]
    fn test_stream_delta_push_and_search() {
        let buf = DeltaBuffer::new();
        buf.activate();
        buf.push(1, vec![1.0, 0.0, 0.0]);
        buf.push(2, vec![0.0, 1.0, 0.0]);
        buf.push(3, vec![0.5, 0.5, 0.0]);

        let query = &[1.0, 0.0, 0.0];
        let results = buf.search(query, 2, DistanceMetric::Cosine);
        assert_eq!(results.len(), 2, "should return at most k=2 results");
        // Cosine: higher is better; [1,0,0] is identical to query -> highest score
        assert_eq!(
            results[0].0, 1,
            "closest match should be id=1 (identical vector)"
        );
    }

    #[test]
    fn test_stream_delta_search_returns_empty_when_inactive() {
        let buf = DeltaBuffer::new();
        buf.push(1, vec![1.0, 0.0, 0.0]);
        // buffer is NOT active — push() is a no-op when inactive
        let results = buf.search(&[1.0, 0.0, 0.0], 10, DistanceMetric::Cosine);
        assert!(
            results.is_empty(),
            "inactive delta should return no results"
        );
    }

    #[test]
    fn test_stream_delta_push_noop_when_inactive() {
        let buf = DeltaBuffer::new();
        // push and extend are no-ops when inactive (C-1 guard)
        buf.push(1, vec![1.0, 0.0]);
        buf.extend(vec![(2, vec![0.0, 1.0])]);
        assert_eq!(buf.len(), 0, "push/extend should be no-ops when inactive");
    }

    #[test]
    fn test_stream_delta_search_cosine_ordering() {
        let buf = DeltaBuffer::new();
        buf.activate();
        // Vec pointing along x-axis
        buf.push(10, vec![1.0, 0.0]);
        // Vec pointing along y-axis (orthogonal)
        buf.push(20, vec![0.0, 1.0]);
        // Vec at 45 degrees
        buf.push(30, vec![1.0, 1.0]);

        let query = &[1.0, 0.0];
        let results = buf.search(query, 3, DistanceMetric::Cosine);
        // Cosine: higher is better. id=10 should be first (similarity ~1.0)
        assert_eq!(results[0].0, 10);
        // id=30 at 45 deg should be next (similarity ~0.707)
        assert_eq!(results[1].0, 30);
        // id=20 orthogonal should be last (similarity ~0.0)
        assert_eq!(results[2].0, 20);
    }

    #[test]
    fn test_stream_delta_search_euclidean_ordering() {
        let buf = DeltaBuffer::new();
        buf.activate();
        buf.push(1, vec![0.0, 0.0]);
        buf.push(2, vec![1.0, 0.0]);
        buf.push(3, vec![3.0, 4.0]);

        let query = &[0.0, 0.0];
        let results = buf.search(query, 3, DistanceMetric::Euclidean);
        // Euclidean: lower is better. id=1 (dist=0) should be first
        assert_eq!(results[0].0, 1);
        assert_eq!(results[1].0, 2);
        assert_eq!(results[2].0, 3);
    }

    #[test]
    fn test_stream_delta_merge_with_delta_inactive() {
        let buf = DeltaBuffer::new();
        // NOT active
        let hnsw = vec![(1, 0.9), (2, 0.8)];
        let merged = merge_with_delta(hnsw.clone(), &buf, &[1.0, 0.0], 5, DistanceMetric::Cosine);
        assert_eq!(merged, hnsw, "inactive delta should return HNSW unchanged");
    }

    #[test]
    fn test_stream_delta_merge_dedup_and_truncate() {
        let buf = DeltaBuffer::new();
        buf.activate();
        // Delta has id=1 with a different score and id=3 (new)
        buf.push(1, vec![0.9, 0.1]);
        buf.push(3, vec![0.8, 0.2]);

        // HNSW results (cosine scores, higher is better)
        let hnsw = vec![(1, 0.95), (2, 0.80)];

        let query = &[1.0, 0.0];
        let merged = merge_with_delta(hnsw, &buf, query, 2, DistanceMetric::Cosine);

        // Should have at most k=2 results
        assert_eq!(merged.len(), 2);

        // Delta wins for id=1 — its score should come from delta's brute-force
        // Check no duplicate ids
        let ids: Vec<u64> = merged.iter().map(|(id, _)| *id).collect();
        let unique: HashSet<u64> = ids.iter().copied().collect();
        assert_eq!(
            ids.len(),
            unique.len(),
            "no duplicate IDs in merged results"
        );
    }

    #[test]
    fn test_stream_delta_merge_empty_delta() {
        let buf = DeltaBuffer::new();
        buf.activate();
        // Delta is active but empty
        let hnsw = vec![(1, 0.9), (2, 0.8)];
        let merged = merge_with_delta(hnsw.clone(), &buf, &[1.0, 0.0], 5, DistanceMetric::Cosine);
        assert_eq!(
            merged, hnsw,
            "empty active delta should return HNSW unchanged"
        );
    }

    #[test]
    fn test_stream_delta_activate_deactivate_drain() {
        let buf = DeltaBuffer::new();
        assert!(!buf.is_active());

        buf.activate();
        assert!(buf.is_active());

        buf.push(1, vec![1.0]);
        buf.push(2, vec![2.0]);
        assert_eq!(buf.len(), 2);

        let drained = buf.deactivate_and_drain();
        assert!(!buf.is_active());
        assert!(buf.is_empty());
        assert_eq!(drained.len(), 2);
        assert_eq!(drained[0].0, 1);
        assert_eq!(drained[1].0, 2);
    }

    #[test]
    fn test_stream_delta_extend() {
        let buf = DeltaBuffer::new();
        buf.activate();
        buf.extend(vec![(1, vec![1.0]), (2, vec![2.0]), (3, vec![3.0])]);
        assert_eq!(buf.len(), 3);
    }

    #[test]
    fn test_stream_delta_stats() {
        let buf = DeltaBuffer::new();
        buf.activate();
        buf.push(1, vec![1.0]);
        let (len, is_empty) = buf.stats();
        assert_eq!(len, 1);
        assert!(!is_empty);
    }

    // ── Bug B0.1: remove() filters deleted points from search ──────────

    #[test]
    fn test_delta_remove_filters_deleted_point() {
        let buf = DeltaBuffer::new();
        buf.activate();
        buf.push(1, vec![1.0, 2.0, 3.0]);
        buf.push(2, vec![4.0, 5.0, 6.0]);
        buf.remove(1);
        let results = buf.search(&[1.0, 2.0, 3.0], 10, DistanceMetric::Euclidean);
        assert!(
            results.iter().all(|(id, _)| *id != 1),
            "Deleted point should not appear in search results"
        );
        assert_eq!(results.len(), 1, "Only point 2 should remain");
    }

    #[test]
    fn test_delta_remove_nonexistent_id_is_noop() {
        let buf = DeltaBuffer::new();
        buf.activate();
        buf.push(1, vec![1.0, 2.0]);
        buf.remove(999);
        assert_eq!(buf.len(), 1, "Removing absent ID should not change length");
    }

    #[test]
    fn test_delta_remove_works_in_draining_state() {
        let buf = DeltaBuffer::new();
        buf.activate();
        buf.push(1, vec![1.0]);
        buf.push(2, vec![2.0]);
        // remove() works unconditionally (any state) — a delete must always
        // purge stale data regardless of buffer lifecycle.
        buf.remove(1);
        assert_eq!(buf.len(), 1);
    }

    // ── Bug B0.4: push() deduplicates on same ID (upsert semantics) ───

    #[test]
    fn test_delta_push_deduplicates_on_same_id() {
        let buf = DeltaBuffer::new();
        buf.activate();
        buf.push(1, vec![1.0, 2.0, 3.0]);
        buf.push(1, vec![4.0, 5.0, 6.0]); // Same ID, different vector
        assert_eq!(buf.len(), 1, "Should have deduplicated");
        let results = buf.search(&[4.0, 5.0, 6.0], 1, DistanceMetric::Euclidean);
        assert_eq!(results[0].0, 1);
        // Distance should be ~0 since query matches the updated vector
        assert!(
            results[0].1 < 0.01,
            "Updated vector should match query closely"
        );
    }

    #[test]
    fn test_delta_extend_deduplicates_on_same_id() {
        let buf = DeltaBuffer::new();
        buf.activate();
        buf.push(1, vec![1.0, 0.0]);
        buf.push(2, vec![0.0, 1.0]);
        // Extend with updates for id=1 and a new id=3
        buf.extend(vec![(1, vec![0.5, 0.5]), (3, vec![0.0, 0.0])]);
        assert_eq!(buf.len(), 3, "Should have ids 1, 2, 3");
        let results = buf.search(&[0.5, 0.5], 1, DistanceMetric::Euclidean);
        assert_eq!(
            results[0].0, 1,
            "ID 1 should have updated vector [0.5, 0.5]"
        );
        assert!(results[0].1 < 0.01, "Updated vector should match query");
    }
}