Skip to main content

velesdb_core/collection/streaming/
async_index_builder.rs

1//! Async HNSW index builder for deferred bulk indexing.
2//!
3//! Buffers vectors and builds the HNSW index either synchronously (via
4//! `flush_sync`) or asynchronously (future Task 4 integration). The buffer
5//! is searchable via brute-force scan for consistency during construction.
6//!
7//! # Lock ordering
8//!
9//! Position 11 (after `delta_buffer` at 10). The internal `RwLock` on
10//! `buffer` must never be held while acquiring any lock at position ≤ 10.
11
12use crate::distance::DistanceMetric;
13use crate::index::hnsw::HnswIndex;
14use parking_lot::RwLock;
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18
19/// Configuration for the async index builder.
20///
21/// Legacy configurations persisted with a `sync_mode` field are
22/// accepted transparently: serde ignores unknown fields by default,
23/// so the value is dropped silently on load.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct AsyncIndexBuilderConfig {
26    /// Number of buffered vectors that triggers a build.
27    #[serde(default = "default_merge_threshold")]
28    pub merge_threshold: usize,
29
30    /// Number of segments for parallel construction (default: `num_cpus`).
31    #[serde(default)]
32    pub segment_count: Option<usize>,
33}
34
35fn default_merge_threshold() -> usize {
36    10_000
37}
38
39impl Default for AsyncIndexBuilderConfig {
40    fn default() -> Self {
41        Self {
42            merge_threshold: default_merge_threshold(),
43            segment_count: None,
44        }
45    }
46}
47
48/// Async HNSW index builder that buffers vectors and flushes them to the
49/// HNSW index via [`HnswIndex::insert_batch_parallel`].
50///
51/// Only synchronous flush is currently supported; background-thread
52/// integration into the Collection pipeline is tracked under Issue #488
53/// Task 4.
54///
55/// Lock order position: 11 (after `delta_buffer` at 10).
56#[allow(dead_code)] // Pipeline integration tracked under Issue #488 Task 4.
57pub struct AsyncIndexBuilder {
58    /// Buffer of vectors pending indexation.
59    buffer: RwLock<Vec<(u64, Vec<f32>)>>,
60    /// Configuration.
61    config: AsyncIndexBuilderConfig,
62    /// Whether a build is currently in progress (shared with background thread).
63    building: Arc<AtomicBool>,
64}
65
66#[allow(dead_code)] // Wired into Collection pipeline in Task 4
67impl AsyncIndexBuilder {
68    /// Creates a new async index builder with the given configuration.
69    #[must_use]
70    pub fn new(config: AsyncIndexBuilderConfig) -> Self {
71        Self {
72            buffer: RwLock::new(Vec::new()),
73            config,
74            building: Arc::new(AtomicBool::new(false)),
75        }
76    }
77
78    /// Enqueues vectors for deferred indexation.
79    ///
80    /// Returns `true` if the buffer has reached `merge_threshold`,
81    /// signaling the caller to trigger a build.
82    pub fn enqueue(&self, vectors: Vec<(u64, Vec<f32>)>) -> bool {
83        let mut buf = self.buffer.write();
84        buf.extend(vectors);
85        buf.len() >= self.config.merge_threshold
86    }
87
88    /// Returns the number of vectors currently buffered.
89    #[must_use]
90    pub fn buffer_len(&self) -> usize {
91        self.buffer.read().len()
92    }
93
94    /// Drains and returns all buffered vectors.
95    pub fn drain_buffer(&self) -> Vec<(u64, Vec<f32>)> {
96        let mut buf = self.buffer.write();
97        std::mem::take(&mut *buf)
98    }
99
100    /// Brute-force searches the buffer for consistency during construction.
101    ///
102    /// Returns `(external_id, distance)` pairs sorted by the metric ordering,
103    /// truncated to `k`.
104    #[must_use]
105    pub fn search_buffer(
106        &self,
107        query: &[f32],
108        k: usize,
109        metric: DistanceMetric,
110    ) -> Vec<(u64, f32)> {
111        let buf = self.buffer.read();
112        if buf.is_empty() {
113            return Vec::new();
114        }
115
116        let mut results: Vec<(u64, f32)> = buf
117            .iter()
118            .filter(|(_, v)| v.len() == query.len())
119            .map(|(id, v)| {
120                let dist = metric.calculate(query, v);
121                (*id, dist)
122            })
123            .collect();
124
125        metric.sort_results(&mut results);
126        results.truncate(k);
127        results
128    }
129
130    /// Drains the buffer and inserts all buffered vectors into the HNSW
131    /// index via [`HnswIndex::insert_batch_parallel`].
132    ///
133    /// Concurrent calls are serialized: the second caller returns
134    /// `Ok(0)` while the first is in progress.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if HNSW insertion fails.
139    pub fn flush_sync(&self, hnsw_index: &HnswIndex) -> crate::error::Result<usize> {
140        if self.building.swap(true, Ordering::AcqRel) {
141            // Another build is in progress — skip
142            return Ok(0);
143        }
144
145        let vectors = self.drain_buffer();
146        let count = vectors.len();
147
148        if count == 0 {
149            self.building.store(false, Ordering::Release);
150            return Ok(0);
151        }
152
153        let pairs: Vec<(u64, &[f32])> = vectors.iter().map(|(id, v)| (*id, v.as_slice())).collect();
154
155        let inserted = hnsw_index.insert_batch_parallel(pairs);
156
157        self.building.store(false, Ordering::Release);
158
159        tracing::debug!("AsyncIndexBuilder::flush_sync: indexed {inserted}/{count} vectors");
160
161        Ok(inserted)
162    }
163
164    /// Returns `true` if a build is currently in progress.
165    #[must_use]
166    pub fn is_building(&self) -> bool {
167        self.building.load(Ordering::Acquire)
168    }
169
170    /// Triggers a background build if the buffer is non-empty.
171    ///
172    /// Returns immediately — the build runs in a separate thread.
173    /// If a build is already in progress, this is a no-op.
174    /// The background thread calls `insert_batch_parallel` on the
175    /// provided `HnswIndex` and clears the `building` flag on completion.
176    pub fn trigger_build_async(&self, hnsw_index: &Arc<HnswIndex>) {
177        if self.building.swap(true, Ordering::AcqRel) {
178            return; // Already building
179        }
180
181        let vectors = self.drain_buffer();
182        if vectors.is_empty() {
183            self.building.store(false, Ordering::Release);
184            return;
185        }
186
187        let index = Arc::clone(hnsw_index);
188        let flag = Arc::clone(&self.building);
189        let count = vectors.len();
190
191        std::thread::spawn(move || {
192            let pairs: Vec<(u64, &[f32])> =
193                vectors.iter().map(|(id, v)| (*id, v.as_slice())).collect();
194            let _ = index.insert_batch_parallel(pairs);
195            flag.store(false, Ordering::Release);
196            tracing::debug!("AsyncIndexBuilder: background build complete ({count} vectors)");
197        });
198    }
199
200    /// Returns the merge threshold from the configuration.
201    #[must_use]
202    pub fn merge_threshold(&self) -> usize {
203        self.config.merge_threshold
204    }
205}