Skip to main content

vector_index/
concurrent.rs

1//! Concurrent access wrapper over [`HnswIndex`].
2//!
3//! Wraps the index in `Arc<RwLock<_>>` (using `parking_lot` for performance)
4//! to support the architecture's "high-velocity concurrent reads +
5//! append-writes" pattern. Cloning a [`ConcurrentHnsw`] is cheap — both
6//! clones share the same underlying index.
7//!
8//! # Write contention
9//!
10//! `parking_lot::RwLock` is writer-preferring, so a long-running search
11//! cannot starve an insert. For workloads with very high write frequency,
12//! consider batching inserts under a single write lock.
13//!
14//! # Sharded variant
15//!
16//! For workloads that genuinely need parallel inserts, a sharded index
17//! (multiple sub-indices keyed by hash(id)) is on the roadmap. The
18//! single-lock version covers the OmniPulse usage pattern (1 inserter,
19//! many searchers) cleanly.
20
21use crate::error::IndexResult;
22use crate::hnsw::{HnswConfig, HnswIndex, Neighbor};
23use crate::metric::Metric;
24use crate::PointId;
25use parking_lot::RwLock;
26use std::sync::Arc;
27
28/// Thread-safe handle to a shared HNSW index.
29pub struct ConcurrentHnsw<P, M>
30where
31    M: Metric<Point = P>,
32{
33    inner: Arc<RwLock<HnswIndex<P, M>>>,
34}
35
36impl<P, M> Clone for ConcurrentHnsw<P, M>
37where
38    M: Metric<Point = P>,
39{
40    fn clone(&self) -> Self {
41        Self {
42            inner: Arc::clone(&self.inner),
43        }
44    }
45}
46
47impl<P, M> ConcurrentHnsw<P, M>
48where
49    M: Metric<Point = P>,
50{
51    /// Construct a new shared index.
52    pub fn new(config: HnswConfig, metric: M) -> IndexResult<Self> {
53        Ok(Self {
54            inner: Arc::new(RwLock::new(HnswIndex::new(config, metric)?)),
55        })
56    }
57
58    /// Insert a point. Acquires the write lock.
59    pub fn insert(&self, id: PointId, point: P) -> IndexResult<()> {
60        self.inner.write().insert(id, point)
61    }
62
63    /// Search for k nearest neighbors. Acquires the read lock.
64    pub fn search(&self, query: &P, k: usize) -> Vec<Neighbor> {
65        self.inner.read().search(query, k)
66    }
67
68    /// Number of points currently in the index.
69    pub fn len(&self) -> usize {
70        self.inner.read().len()
71    }
72
73    /// Whether the index is empty.
74    pub fn is_empty(&self) -> bool {
75        self.inner.read().is_empty()
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use crate::metric::L2;
83    use std::thread;
84
85    #[test]
86    fn concurrent_reads_during_inserts() {
87        let idx: ConcurrentHnsw<Vec<f32>, L2> =
88            ConcurrentHnsw::new(HnswConfig::default(), L2).unwrap();
89
90        // Seed with a few points.
91        for i in 0..50 {
92            idx.insert(i, vec![i as f32, 0.0]).unwrap();
93        }
94
95        // Spawn readers and a writer; assert nothing panics and lengths
96        // are monotonically non-decreasing from the reader's perspective.
97        let writer = {
98            let idx = idx.clone();
99            thread::spawn(move || {
100                for i in 50..150 {
101                    idx.insert(i, vec![i as f32, 0.0]).unwrap();
102                }
103            })
104        };
105
106        let readers: Vec<_> = (0..4)
107            .map(|_| {
108                let idx = idx.clone();
109                thread::spawn(move || {
110                    for _ in 0..200 {
111                        let res = idx.search(&vec![25.0, 0.0], 5);
112                        assert!(!res.is_empty());
113                    }
114                })
115            })
116            .collect();
117
118        writer.join().unwrap();
119        for r in readers {
120            r.join().unwrap();
121        }
122
123        assert_eq!(idx.len(), 150);
124    }
125}