velesdb_core/collection/streaming/async_index_builder.rs
1//! Async HNSW index builder for deferred bulk indexing.
2//!
3//! Buffers vectors and builds the HNSW index either synchronously (via
4//! `flush_sync`) or asynchronously (future Task 4 integration). The buffer
5//! is searchable via brute-force scan for consistency during construction.
6//!
7//! # Lock ordering
8//!
9//! Position 11 (after `delta_buffer` at 10). The internal `RwLock` on
10//! `buffer` must never be held while acquiring any lock at position ≤ 10.
11
12use crate::distance::DistanceMetric;
13use crate::index::hnsw::HnswIndex;
14use parking_lot::RwLock;
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18
19/// Configuration for the async index builder.
20///
21/// Legacy configurations persisted with a `sync_mode` field are
22/// accepted transparently: serde ignores unknown fields by default,
23/// so the value is dropped silently on load.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct AsyncIndexBuilderConfig {
26 /// Number of buffered vectors that triggers a build.
27 #[serde(default = "default_merge_threshold")]
28 pub merge_threshold: usize,
29
30 /// Number of segments for parallel construction (default: `num_cpus`).
31 #[serde(default)]
32 pub segment_count: Option<usize>,
33}
34
35fn default_merge_threshold() -> usize {
36 10_000
37}
38
39impl Default for AsyncIndexBuilderConfig {
40 fn default() -> Self {
41 Self {
42 merge_threshold: default_merge_threshold(),
43 segment_count: None,
44 }
45 }
46}
47
48/// Async HNSW index builder that buffers vectors and flushes them to the
49/// HNSW index via [`HnswIndex::insert_batch_parallel`].
50///
51/// Only synchronous flush is currently supported; background-thread
52/// integration into the Collection pipeline is tracked under Issue #488
53/// Task 4.
54///
55/// Lock order position: 11 (after `delta_buffer` at 10).
56#[allow(dead_code)] // Pipeline integration tracked under Issue #488 Task 4.
57pub struct AsyncIndexBuilder {
58 /// Buffer of vectors pending indexation.
59 buffer: RwLock<Vec<(u64, Vec<f32>)>>,
60 /// Configuration.
61 config: AsyncIndexBuilderConfig,
62 /// Whether a build is currently in progress (shared with background thread).
63 building: Arc<AtomicBool>,
64}
65
66#[allow(dead_code)] // Wired into Collection pipeline in Task 4
67impl AsyncIndexBuilder {
68 /// Creates a new async index builder with the given configuration.
69 #[must_use]
70 pub fn new(config: AsyncIndexBuilderConfig) -> Self {
71 Self {
72 buffer: RwLock::new(Vec::new()),
73 config,
74 building: Arc::new(AtomicBool::new(false)),
75 }
76 }
77
78 /// Enqueues vectors for deferred indexation.
79 ///
80 /// Returns `true` if the buffer has reached `merge_threshold`,
81 /// signaling the caller to trigger a build.
82 pub fn enqueue(&self, vectors: Vec<(u64, Vec<f32>)>) -> bool {
83 let mut buf = self.buffer.write();
84 buf.extend(vectors);
85 buf.len() >= self.config.merge_threshold
86 }
87
88 /// Returns the number of vectors currently buffered.
89 #[must_use]
90 pub fn buffer_len(&self) -> usize {
91 self.buffer.read().len()
92 }
93
94 /// Drains and returns all buffered vectors.
95 pub fn drain_buffer(&self) -> Vec<(u64, Vec<f32>)> {
96 let mut buf = self.buffer.write();
97 std::mem::take(&mut *buf)
98 }
99
100 /// Brute-force searches the buffer for consistency during construction.
101 ///
102 /// Returns `(external_id, distance)` pairs sorted by the metric ordering,
103 /// truncated to `k`.
104 #[must_use]
105 pub fn search_buffer(
106 &self,
107 query: &[f32],
108 k: usize,
109 metric: DistanceMetric,
110 ) -> Vec<(u64, f32)> {
111 let buf = self.buffer.read();
112 if buf.is_empty() {
113 return Vec::new();
114 }
115
116 let mut results: Vec<(u64, f32)> = buf
117 .iter()
118 .filter(|(_, v)| v.len() == query.len())
119 .map(|(id, v)| {
120 let dist = metric.calculate(query, v);
121 (*id, dist)
122 })
123 .collect();
124
125 metric.sort_results(&mut results);
126 results.truncate(k);
127 results
128 }
129
130 /// Drains the buffer and inserts all buffered vectors into the HNSW
131 /// index via [`HnswIndex::insert_batch_parallel`].
132 ///
133 /// Concurrent calls are serialized: the second caller returns
134 /// `Ok(0)` while the first is in progress.
135 ///
136 /// # Errors
137 ///
138 /// Returns an error if HNSW insertion fails.
139 pub fn flush_sync(&self, hnsw_index: &HnswIndex) -> crate::error::Result<usize> {
140 if self.building.swap(true, Ordering::AcqRel) {
141 // Another build is in progress — skip
142 return Ok(0);
143 }
144
145 let vectors = self.drain_buffer();
146 let count = vectors.len();
147
148 if count == 0 {
149 self.building.store(false, Ordering::Release);
150 return Ok(0);
151 }
152
153 let pairs: Vec<(u64, &[f32])> = vectors.iter().map(|(id, v)| (*id, v.as_slice())).collect();
154
155 let inserted = hnsw_index.insert_batch_parallel(pairs);
156
157 self.building.store(false, Ordering::Release);
158
159 tracing::debug!("AsyncIndexBuilder::flush_sync: indexed {inserted}/{count} vectors");
160
161 Ok(inserted)
162 }
163
164 /// Returns `true` if a build is currently in progress.
165 #[must_use]
166 pub fn is_building(&self) -> bool {
167 self.building.load(Ordering::Acquire)
168 }
169
170 /// Triggers a background build if the buffer is non-empty.
171 ///
172 /// Returns immediately — the build runs in a separate thread.
173 /// If a build is already in progress, this is a no-op.
174 /// The background thread calls `insert_batch_parallel` on the
175 /// provided `HnswIndex` and clears the `building` flag on completion.
176 pub fn trigger_build_async(&self, hnsw_index: &Arc<HnswIndex>) {
177 if self.building.swap(true, Ordering::AcqRel) {
178 return; // Already building
179 }
180
181 let vectors = self.drain_buffer();
182 if vectors.is_empty() {
183 self.building.store(false, Ordering::Release);
184 return;
185 }
186
187 let index = Arc::clone(hnsw_index);
188 let flag = Arc::clone(&self.building);
189 let count = vectors.len();
190
191 std::thread::spawn(move || {
192 let pairs: Vec<(u64, &[f32])> =
193 vectors.iter().map(|(id, v)| (*id, v.as_slice())).collect();
194 let _ = index.insert_batch_parallel(pairs);
195 flag.store(false, Ordering::Release);
196 tracing::debug!("AsyncIndexBuilder: background build complete ({count} vectors)");
197 });
198 }
199
200 /// Returns the merge threshold from the configuration.
201 #[must_use]
202 pub fn merge_threshold(&self) -> usize {
203 self.config.merge_threshold
204 }
205}