vector_index/
concurrent.rs1use crate::error::IndexResult;
22use crate::hnsw::{HnswConfig, HnswIndex, Neighbor};
23use crate::metric::Metric;
24use crate::PointId;
25use parking_lot::RwLock;
26use std::sync::Arc;
27
28pub struct ConcurrentHnsw<P, M>
30where
31 M: Metric<Point = P>,
32{
33 inner: Arc<RwLock<HnswIndex<P, M>>>,
34}
35
36impl<P, M> Clone for ConcurrentHnsw<P, M>
37where
38 M: Metric<Point = P>,
39{
40 fn clone(&self) -> Self {
41 Self {
42 inner: Arc::clone(&self.inner),
43 }
44 }
45}
46
47impl<P, M> ConcurrentHnsw<P, M>
48where
49 M: Metric<Point = P>,
50{
51 pub fn new(config: HnswConfig, metric: M) -> IndexResult<Self> {
53 Ok(Self {
54 inner: Arc::new(RwLock::new(HnswIndex::new(config, metric)?)),
55 })
56 }
57
58 pub fn insert(&self, id: PointId, point: P) -> IndexResult<()> {
60 self.inner.write().insert(id, point)
61 }
62
63 pub fn search(&self, query: &P, k: usize) -> Vec<Neighbor> {
65 self.inner.read().search(query, k)
66 }
67
68 pub fn len(&self) -> usize {
70 self.inner.read().len()
71 }
72
73 pub fn is_empty(&self) -> bool {
75 self.inner.read().is_empty()
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use crate::metric::L2;
83 use std::thread;
84
85 #[test]
86 fn concurrent_reads_during_inserts() {
87 let idx: ConcurrentHnsw<Vec<f32>, L2> =
88 ConcurrentHnsw::new(HnswConfig::default(), L2).unwrap();
89
90 for i in 0..50 {
92 idx.insert(i, vec![i as f32, 0.0]).unwrap();
93 }
94
95 let writer = {
98 let idx = idx.clone();
99 thread::spawn(move || {
100 for i in 50..150 {
101 idx.insert(i, vec![i as f32, 0.0]).unwrap();
102 }
103 })
104 };
105
106 let readers: Vec<_> = (0..4)
107 .map(|_| {
108 let idx = idx.clone();
109 thread::spawn(move || {
110 for _ in 0..200 {
111 let res = idx.search(&vec![25.0, 0.0], 5);
112 assert!(!res.is_empty());
113 }
114 })
115 })
116 .collect();
117
118 writer.join().unwrap();
119 for r in readers {
120 r.join().unwrap();
121 }
122
123 assert_eq!(idx.len(), 150);
124 }
125}