velesdb-core 1.13.7

High-performance vector database engine written in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
//! CSR (Compressed Sparse Row) graph representation for GPU traversal.
//!
//! Converts the HNSW Layer's per-node `RwLock<Vec<NodeId>>` adjacency lists
//! into a flat, GPU-friendly format suitable for coalesced memory access
//! in compute shaders.
//!
//! ## Layout
//!
//! ```text
//! offsets:   [0, 3, 5, 9, 11]           ← cumulative neighbor count (N+1 entries)
//! neighbors: [2,5,7, 0,3, 0,4,6,8, 1,5] ← all neighbors, concatenated
//! ```
//!
//! ## Cache Invalidation
//!
//! The [`CsrCache`] wraps a [`CsrGraph`] with a dirty flag that is set
//! whenever the underlying Layer is mutated (insert/delete). The CSR is
//! rebuilt lazily on the next GPU search request.

use std::sync::atomic::{AtomicU64, Ordering};

use parking_lot::RwLock;

use crate::index::hnsw::native::layer::Layer;

/// GPU-friendly CSR representation of a single HNSW layer's adjacency graph.
///
/// All data is stored in flat `Vec<u32>` arrays that can be uploaded to GPU
/// storage buffers via `wgpu::util::DeviceExt::create_buffer_init`.
#[derive(Debug, Clone)]
pub struct CsrGraph {
    /// Cumulative neighbor offsets: `offsets[node]..offsets[node+1]` gives
    /// the range of neighbors for `node` in the `neighbors` array.
    /// Length: `num_nodes + 1`.
    pub offsets: Vec<u32>,
    /// Concatenated neighbor IDs for all nodes. Length: total number of edges.
    pub neighbors: Vec<u32>,
    /// Total number of nodes in the graph.
    pub num_nodes: u32,
    /// Maximum degree (number of neighbors) across all nodes.
    pub max_degree: u32,
    /// Total number of edges (sum of all neighbor counts).
    pub total_edges: u32,
}

impl CsrGraph {
    /// Builds a CSR graph from a single HNSW [`Layer`].
    ///
    /// Acquires a read lock on each node's neighbor list sequentially.
    /// For 1M nodes with average degree 16, this takes ~50ms.
    ///
    /// # Arguments
    ///
    /// * `layer` — The HNSW layer to convert.
    /// * `num_nodes` — Number of active nodes (may be less than `layer.neighbors.len()`
    ///   if the layer was pre-allocated with extra capacity).
    #[must_use]
    pub fn from_layer(layer: &Layer, num_nodes: usize) -> Self {
        let n = num_nodes.min(layer.neighbors.len());
        let mut offsets = Vec::with_capacity(n + 1);
        // Pre-estimate: assume average degree of 16 for initial allocation
        let mut neighbors = Vec::with_capacity(n * 16);
        let mut max_degree: u32 = 0;

        offsets.push(0u32);
        for node_id in 0..n {
            let nbrs = layer.get_neighbors(node_id);
            #[allow(clippy::cast_possible_truncation)]
            let degree = nbrs.len() as u32;
            max_degree = max_degree.max(degree);
            for &nbr in &nbrs {
                // Reason: NodeId (usize) values are bounded by collection size,
                // which is validated to fit in u32 by the GPU dispatch threshold.
                #[allow(clippy::cast_possible_truncation)]
                let nbr_u32 = nbr as u32;
                neighbors.push(nbr_u32);
            }
            // Reason: neighbors.len() is bounded by n * max_degree, where both
            // n and max_degree are << u32::MAX for any practical HNSW index.
            #[allow(clippy::cast_possible_truncation)]
            let offset = neighbors.len() as u32;
            offsets.push(offset);
        }

        #[allow(clippy::cast_possible_truncation)]
        let total_edges = neighbors.len() as u32;
        #[allow(clippy::cast_possible_truncation)]
        let num_nodes_u32 = n as u32;

        CsrGraph {
            offsets,
            neighbors,
            num_nodes: num_nodes_u32,
            max_degree,
            total_edges,
        }
    }

    /// Returns true if this CSR graph has no nodes.
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.num_nodes == 0
    }

    /// Returns the byte size of the offsets buffer for GPU upload.
    #[must_use]
    pub fn offsets_byte_size(&self) -> usize {
        self.offsets.len() * std::mem::size_of::<u32>()
    }

    /// Returns the byte size of the neighbors buffer for GPU upload.
    #[must_use]
    pub fn neighbors_byte_size(&self) -> usize {
        self.neighbors.len() * std::mem::size_of::<u32>()
    }

    /// Returns the total GPU memory (VRAM) needed for CSR buffers alone.
    #[must_use]
    pub fn total_gpu_bytes(&self) -> usize {
        self.offsets_byte_size() + self.neighbors_byte_size()
    }

    /// Returns the graph density as a ratio of actual edges to maximum possible.
    ///
    /// For an undirected graph: `density = edges / (nodes * (nodes - 1))`.
    /// Values near 0 indicate sparse graphs (typical for HNSW), values near
    /// 1 indicate dense graphs.
    ///
    /// Used for GPU dispatch tuning: very sparse graphs have poor GPU
    /// occupancy due to uneven thread loads.
    #[must_use]
    pub fn density(&self) -> f64 {
        if self.num_nodes <= 1 {
            return 0.0;
        }
        let n = f64::from(self.num_nodes);
        let max_edges = n * (n - 1.0);
        if max_edges == 0.0 {
            return 0.0;
        }
        f64::from(self.total_edges) / max_edges
    }

    /// Returns the average degree of nodes in the graph.
    #[must_use]
    pub fn avg_degree(&self) -> f64 {
        if self.num_nodes == 0 {
            return 0.0;
        }
        f64::from(self.total_edges) / f64::from(self.num_nodes)
    }

    /// Validates CSR invariants in debug mode.
    ///
    /// Checks:
    /// 1. `offsets.len() == num_nodes + 1`
    /// 2. Offsets are monotonically non-decreasing
    /// 3. Last offset equals `total_edges`
    /// 4. All neighbor IDs are `< num_nodes`
    ///
    /// Returns `Ok(())` if all invariants hold, or `Err` with a description
    /// of the first violated invariant.
    ///
    /// # Errors
    ///
    /// Returns `Err(String)` describing the first violated invariant
    /// when `offsets.len()` is wrong, offsets are non-monotonic, the last
    /// offset disagrees with `total_edges`, or any neighbor ID is out of
    /// range. The error string is intended for diagnostics, not matching.
    pub fn validate(&self) -> Result<(), String> {
        // Check 1: offsets length
        let expected_len = self.num_nodes as usize + 1;
        if self.offsets.len() != expected_len {
            return Err(format!(
                "offsets.len()={} != num_nodes+1={}",
                self.offsets.len(),
                expected_len,
            ));
        }

        // Check 2: monotonicity
        for i in 1..self.offsets.len() {
            if self.offsets[i] < self.offsets[i - 1] {
                return Err(format!(
                    "offsets not monotonic at {}: {} < {}",
                    i,
                    self.offsets[i],
                    self.offsets[i - 1],
                ));
            }
        }

        // Check 3: last offset == total_edges
        if let Some(&last) = self.offsets.last() {
            if last != self.total_edges {
                return Err(format!(
                    "last offset {} != total_edges {}",
                    last, self.total_edges,
                ));
            }
        }

        // Check 4: neighbor bounds
        for (idx, &nbr) in self.neighbors.iter().enumerate() {
            if nbr >= self.num_nodes {
                return Err(format!(
                    "neighbor[{}]={} >= num_nodes={}",
                    idx, nbr, self.num_nodes,
                ));
            }
        }

        Ok(())
    }
}

impl std::fmt::Display for CsrGraph {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        #[allow(clippy::cast_precision_loss)]
        let vram_kb = self.total_gpu_bytes() as f64 / 1024.0;
        write!(
            f,
            "CsrGraph(nodes={}, edges={}, max_deg={}, avg_deg={:.1}, density={:.6}, vram={:.1}KB)",
            self.num_nodes,
            self.total_edges,
            self.max_degree,
            self.avg_degree(),
            self.density(),
            vram_kb,
        )
    }
}

/// Cached CSR graph with generation-based invalidation.
///
/// Thread-safe: the CSR data is behind a [`RwLock`] and invalidation uses
/// a monotonic generation counter to avoid ABA problems that a simple
/// dirty boolean would have.
///
/// ## Why not `AtomicBool`?
///
/// A boolean dirty flag has an ABA problem: if thread A reads `dirty=true`,
/// starts rebuilding, and thread B calls `invalidate()` during the rebuild,
/// thread B's `store(true)` is invisible (already true). Thread A's
/// `compare_exchange(true, false)` succeeds, clearing the flag — but the
/// cached CSR was built from a pre-mutation snapshot. The generation counter
/// detects this: A snapshots `gen=5`, B increments to `gen=6`, A's CAS on
/// `gen` fails because `5 != 6`, so the flag stays dirty.
pub struct CsrCache {
    /// The cached CSR graph. `None` if not yet built.
    csr: RwLock<Option<CsrGraph>>,
    /// Monotonically increasing generation counter.
    /// Incremented by `invalidate()` on every Layer mutation.
    generation: AtomicU64,
    /// Generation at which the cached CSR was built.
    /// If `built_generation != generation`, the cache is stale.
    built_generation: AtomicU64,
    /// Public version counter, incremented on each successful rebuild.
    version: AtomicU64,
}

impl CsrCache {
    /// Creates a new, empty CSR cache.
    #[must_use]
    pub fn new() -> Self {
        Self {
            csr: RwLock::new(None),
            // Start at gen=1, built_gen=0 → cache starts stale
            generation: AtomicU64::new(1),
            built_generation: AtomicU64::new(0),
            version: AtomicU64::new(0),
        }
    }

    /// Returns true if the cache is stale (needs rebuild).
    #[inline]
    fn is_stale(&self) -> bool {
        self.generation.load(Ordering::Acquire) != self.built_generation.load(Ordering::Acquire)
    }

    /// Marks the cache as dirty. Called after any Layer mutation (insert/delete).
    ///
    /// Each call increments the generation counter, ensuring that concurrent
    /// rebuilds can detect the mutation even if it occurs during the rebuild.
    pub fn invalidate(&self) {
        self.generation.fetch_add(1, Ordering::Release);
    }

    /// Returns the current version counter.
    #[must_use]
    pub fn version(&self) -> u64 {
        self.version.load(Ordering::Acquire)
    }

    /// Returns a clone of the cached CSR graph, rebuilding if stale.
    ///
    /// This method is designed for the GPU dispatch hot path:
    /// - If the cache is fresh, returns the existing CSR (fast path).
    /// - If stale, rebuilds from the `layer` argument (slow path, ~50 ms
    ///   for 1 M nodes).
    ///
    /// # Caller contract — must hold `layers` read lock
    ///
    /// **The rebuild from `layer` is only race-free while the caller holds
    /// the `layers` read lock** (rank 20 in `locking::LockRank`) for the
    /// entire duration of this call. Layer mutations require the write
    /// lock, so as long as the read lock is held, every concurrent
    /// rebuilder sees the same layer topology and produces an identical
    /// CSR. The `gen_before`/`gen_after` check under `self.csr.write()`
    /// then discards any rebuild that raced with an `invalidate()`.
    ///
    /// If a future caller ever invokes `get_or_rebuild` **without** the
    /// layers read lock held, two rebuilders could observe different
    /// layer states, and one might commit a stale CSR while the
    /// generation counter indicates fresh — a silent correctness bug.
    /// Debug builds assert the precondition via `holds_lock(Layers)`.
    ///
    /// # Generation protocol (NOT a CAS)
    ///
    /// The rebuild is **not** a compare-and-swap. It is a
    /// load-before-rebuild (`gen_before`) / re-check-under-write-lock
    /// (`gen_after`) sequence: we store the new CSR and bump
    /// `built_generation` only when the generation has not moved during
    /// the rebuild. Correctness depends on the caller contract above —
    /// without the shared layer snapshot, the write would still be
    /// atomic but the data committed could be stale.
    pub fn get_or_rebuild(&self, layer: &Layer, num_nodes: usize) -> CsrGraph {
        // Caller contract: the layers read lock must be held so concurrent
        // rebuilders observe the same layer topology. In release builds
        // `hnsw_holds_lock` returns `true` unconditionally (thread-local
        // stack is not maintained), so the assert is a debug-only tripwire.
        debug_assert!(
            crate::index::hnsw::native::hnsw_holds_lock(
                crate::index::hnsw::native::HnswLockRank::Layers
            ),
            "CsrCache::get_or_rebuild must be called while holding the layers read lock"
        );

        // Fast path: check if cache is fresh
        if !self.is_stale() {
            let guard = self.csr.read();
            if let Some(ref csr) = *guard {
                return csr.clone();
            }
        }

        // Snapshot generation before rebuild
        let gen_before = self.generation.load(Ordering::Acquire);

        // Slow path: rebuild outside of any lock on `self.csr`. Safe
        // against concurrent rebuilders because they all observe the
        // same layer state (see caller contract above).
        let new_csr = CsrGraph::from_layer(layer, num_nodes);

        // Commit is atomic under the write lock: we only store our CSR
        // and bump built_generation if no concurrent invalidation has
        // happened since `gen_before`.
        //
        // Without this single critical section, a slow rebuilder (gen=N)
        // could overwrite a fast rebuilder's fresh CSR (gen=N+1) AFTER
        // it has written: the slow rebuilder's post-write gen check
        // would fail, so `built_generation` stays N+1 while the cached
        // CSR is now stale → `is_stale()` returns false, and the stale
        // CSR is served as if it were fresh.
        {
            let mut guard = self.csr.write();
            let gen_after = self.generation.load(Ordering::Acquire);
            if gen_after == gen_before {
                *guard = Some(new_csr.clone());
                self.built_generation.store(gen_before, Ordering::Release);
                self.version.fetch_add(1, Ordering::AcqRel);
            }
            // else: someone invalidated during our rebuild. Discard
            // our (now-stale) CSR — the next query will trigger another
            // rebuild from the fresh layer state.
        }

        new_csr
    }

    /// Returns a clone of the cached CSR if it is available and fresh.
    ///
    /// Returns `None` if the cache is stale or has not been built yet. Used
    /// by non-critical paths that do not want to pay the rebuild cost.
    ///
    /// Aligns with the existing `Snapshot` terminology used throughout
    /// `velesdb-core` (`Bm25Snapshot`, `HnswSnapshot`, `to_snapshot`,
    /// `from_snapshot`).
    #[must_use]
    pub fn clean_snapshot(&self) -> Option<CsrGraph> {
        if self.is_stale() {
            return None;
        }
        self.csr.read().clone()
    }
}

impl Default for CsrCache {
    fn default() -> Self {
        Self::new()
    }
}

// =============================================================================
// Tests
// =============================================================================

#[cfg(test)]
mod tests {
    use super::*;
    use crate::index::hnsw::native::{
        hnsw_holds_lock, hnsw_record_lock_acquire, hnsw_record_lock_release, HnswLockRank, NodeId,
    };

    /// Runs `f` with the layers rank recorded as held — the caller contract
    /// of [`CsrCache::get_or_rebuild`]. Abstracts away the record/release
    /// dance so tests read linearly and do not accidentally break the
    /// contract by forgetting the release.
    fn with_layers_rank<R>(f: impl FnOnce() -> R) -> R {
        hnsw_record_lock_acquire(HnswLockRank::Layers);
        debug_assert!(hnsw_holds_lock(HnswLockRank::Layers));
        let result = f();
        hnsw_record_lock_release(HnswLockRank::Layers);
        result
    }

    #[test]
    fn test_csr_from_empty_layer() {
        let layer = Layer::new(0);
        let csr = CsrGraph::from_layer(&layer, 0);
        assert!(csr.is_empty());
        assert_eq!(csr.offsets, vec![0]);
        assert!(csr.neighbors.is_empty());
        assert_eq!(csr.max_degree, 0);
        assert_eq!(csr.total_edges, 0);
    }

    #[test]
    fn test_csr_from_simple_layer() {
        let layer = Layer::new(4);
        layer.set_neighbors(0, vec![1, 2]);
        layer.set_neighbors(1, vec![0, 3]);
        layer.set_neighbors(2, vec![0, 1, 3]);
        layer.set_neighbors(3, vec![1, 2]);

        let csr = CsrGraph::from_layer(&layer, 4);
        assert_eq!(csr.num_nodes, 4);
        assert_eq!(csr.offsets, vec![0, 2, 4, 7, 9]);
        assert_eq!(csr.neighbors, vec![1, 2, 0, 3, 0, 1, 3, 1, 2]);
        assert_eq!(csr.max_degree, 3);
        assert_eq!(csr.total_edges, 9);
    }

    #[test]
    fn test_csr_neighbor_lookup() {
        let layer = Layer::new(3);
        layer.set_neighbors(0, vec![1, 2]);
        layer.set_neighbors(1, vec![]);
        layer.set_neighbors(2, vec![0]);

        let csr = CsrGraph::from_layer(&layer, 3);

        // Node 0: neighbors at offsets[0]..offsets[1] = 0..2
        assert_eq!(
            &csr.neighbors[csr.offsets[0] as usize..csr.offsets[1] as usize],
            &[1, 2]
        );
        // Node 1: neighbors at offsets[1]..offsets[2] = 2..2 (empty)
        assert_eq!(
            &csr.neighbors[csr.offsets[1] as usize..csr.offsets[2] as usize],
            &[] as &[u32]
        );
        // Node 2: neighbors at offsets[2]..offsets[3] = 2..3
        assert_eq!(
            &csr.neighbors[csr.offsets[2] as usize..csr.offsets[3] as usize],
            &[0]
        );
    }

    #[test]
    fn test_csr_cache_dirty_flag() {
        let cache = CsrCache::new();
        assert_eq!(cache.version(), 0);

        let layer = Layer::new(2);
        layer.set_neighbors(0, vec![1]);
        layer.set_neighbors(1, vec![0]);

        // First build — tests model the production caller contract
        // (`with_layers_read → get_or_rebuild`) via `with_layers_rank`.
        let csr = with_layers_rank(|| cache.get_or_rebuild(&layer, 2));
        assert_eq!(csr.num_nodes, 2);
        assert_eq!(cache.version(), 1);

        // Should return cached (not rebuild)
        let csr2 = with_layers_rank(|| cache.get_or_rebuild(&layer, 2));
        assert_eq!(csr2.num_nodes, 2);
        assert_eq!(cache.version(), 1); // Same version

        // Invalidate and rebuild
        cache.invalidate();
        let csr3 = with_layers_rank(|| cache.get_or_rebuild(&layer, 2));
        assert_eq!(csr3.num_nodes, 2);
        assert_eq!(cache.version(), 2); // Incremented
    }

    #[test]
    fn test_csr_byte_sizes() {
        let layer = Layer::new(100);
        for i in 0..100 {
            let neighbors: Vec<NodeId> = (0..16).map(|j| (i + j + 1) % 100).collect();
            layer.set_neighbors(i, neighbors);
        }

        let csr = CsrGraph::from_layer(&layer, 100);
        assert_eq!(csr.offsets_byte_size(), 101 * 4); // (N+1) * sizeof(u32)
        assert_eq!(csr.neighbors_byte_size(), 1600 * 4); // 100 * 16 * sizeof(u32)
        assert_eq!(csr.total_gpu_bytes(), 101 * 4 + 1600 * 4);
    }

    #[test]
    fn test_csr_partial_capacity() {
        // Layer pre-allocated for 100 but only 5 nodes are active
        let layer = Layer::new(100);
        layer.set_neighbors(0, vec![1, 2]);
        layer.set_neighbors(1, vec![0]);

        let csr = CsrGraph::from_layer(&layer, 5);
        assert_eq!(csr.num_nodes, 5);
        // Nodes 2..4 should have zero neighbors
        assert_eq!(csr.offsets[2], csr.offsets[3]);
        assert_eq!(csr.offsets[3], csr.offsets[4]);
        assert_eq!(csr.offsets[4], csr.offsets[5]);
    }

    #[test]
    fn test_clean_snapshot_returns_none_when_dirty() {
        let cache = CsrCache::new();
        assert!(cache.clean_snapshot().is_none()); // Starts dirty

        let layer = Layer::new(1);
        with_layers_rank(|| cache.get_or_rebuild(&layer, 1)); // Build it
        assert!(cache.clean_snapshot().is_some()); // Now clean

        cache.invalidate();
        assert!(cache.clean_snapshot().is_none()); // Dirty again
    }
}