Skip to main content

nodedb_vector/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Background HNSW index builder thread.
4//!
5//! Each Data Plane core has one builder thread that processes HNSW
6//! construction requests sequentially (FIFO).
7
8use std::sync::mpsc;
9use std::thread::JoinHandle;
10
11use tracing::{debug, info, warn};
12
13use crate::collection::{BuildComplete, BuildRequest};
14use crate::hnsw::HnswIndex;
15
16/// Sender half: TPC core sends build requests to the builder thread.
17pub type BuildSender = mpsc::Sender<BuildRequest>;
18
19/// Receiver half: TPC core receives completed builds.
20pub type CompleteReceiver = mpsc::Receiver<BuildComplete>;
21
22/// Spawn a background HNSW builder thread for a Data Plane core.
23pub fn spawn_builder(core_id: usize) -> (BuildSender, CompleteReceiver, JoinHandle<()>) {
24    let (request_tx, request_rx) = mpsc::channel::<BuildRequest>();
25    let (complete_tx, complete_rx) = mpsc::channel::<BuildComplete>();
26
27    let handle = std::thread::Builder::new()
28        .name(format!("hnsw-builder-{core_id}"))
29        .spawn(move || {
30            info!(core_id, "HNSW builder thread started");
31            builder_loop(core_id, request_rx, complete_tx);
32            info!(core_id, "HNSW builder thread stopped");
33        })
34        .expect("failed to spawn HNSW builder thread");
35
36    (request_tx, complete_rx, handle)
37}
38
39fn builder_loop(core_id: usize, rx: mpsc::Receiver<BuildRequest>, tx: mpsc::Sender<BuildComplete>) {
40    while let Ok(req) = rx.recv() {
41        debug!(
42            core_id,
43            key = %req.key,
44            segment_id = req.segment_id,
45            vectors = req.vectors.len(),
46            dim = req.dim,
47            "building HNSW index"
48        );
49
50        let start = std::time::Instant::now();
51        let mut index = HnswIndex::with_seed(
52            req.dim,
53            req.params,
54            (core_id as u64 + 1) * 1000 + req.segment_id as u64,
55        );
56
57        for vector in req.vectors {
58            index
59                .insert(vector)
60                .unwrap_or_else(|e| tracing::error!(error = %e, "HNSW insert failed"));
61        }
62
63        let elapsed = start.elapsed();
64        info!(
65            core_id,
66            key = %req.key,
67            segment_id = req.segment_id,
68            vectors = index.len(),
69            elapsed_ms = elapsed.as_millis() as u64,
70            "HNSW index built"
71        );
72
73        if tx
74            .send(BuildComplete {
75                key: req.key,
76                segment_id: req.segment_id,
77                index,
78            })
79            .is_err()
80        {
81            warn!(core_id, "builder: core channel closed, stopping");
82            break;
83        }
84    }
85}