1use std::sync::mpsc;
9use std::thread::JoinHandle;
10
11use tracing::{debug, info, warn};
12
13use crate::collection::{BuildComplete, BuildRequest};
14use crate::hnsw::HnswIndex;
15
16pub type BuildSender = mpsc::Sender<BuildRequest>;
18
19pub type CompleteReceiver = mpsc::Receiver<BuildComplete>;
21
22pub fn spawn_builder(core_id: usize) -> (BuildSender, CompleteReceiver, JoinHandle<()>) {
24 let (request_tx, request_rx) = mpsc::channel::<BuildRequest>();
25 let (complete_tx, complete_rx) = mpsc::channel::<BuildComplete>();
26
27 let handle = std::thread::Builder::new()
28 .name(format!("hnsw-builder-{core_id}"))
29 .spawn(move || {
30 info!(core_id, "HNSW builder thread started");
31 builder_loop(core_id, request_rx, complete_tx);
32 info!(core_id, "HNSW builder thread stopped");
33 })
34 .expect("failed to spawn HNSW builder thread");
35
36 (request_tx, complete_rx, handle)
37}
38
39fn builder_loop(core_id: usize, rx: mpsc::Receiver<BuildRequest>, tx: mpsc::Sender<BuildComplete>) {
40 while let Ok(req) = rx.recv() {
41 debug!(
42 core_id,
43 key = %req.key,
44 segment_id = req.segment_id,
45 vectors = req.vectors.len(),
46 dim = req.dim,
47 "building HNSW index"
48 );
49
50 let start = std::time::Instant::now();
51 let mut index = HnswIndex::with_seed(
52 req.dim,
53 req.params,
54 (core_id as u64 + 1) * 1000 + req.segment_id as u64,
55 );
56
57 for vector in req.vectors {
58 index
59 .insert(vector)
60 .unwrap_or_else(|e| tracing::error!(error = %e, "HNSW insert failed"));
61 }
62
63 let elapsed = start.elapsed();
64 info!(
65 core_id,
66 key = %req.key,
67 segment_id = req.segment_id,
68 vectors = index.len(),
69 elapsed_ms = elapsed.as_millis() as u64,
70 "HNSW index built"
71 );
72
73 if tx
74 .send(BuildComplete {
75 key: req.key,
76 segment_id: req.segment_id,
77 index,
78 })
79 .is_err()
80 {
81 warn!(core_id, "builder: core channel closed, stopping");
82 break;
83 }
84 }
85}