nodedb_vector/collection/
lifecycle.rs1use std::collections::HashMap;
18
19use nodedb_types::{Surrogate, VectorQuantization};
20
21use crate::flat::FlatIndex;
22use crate::hnsw::{HnswIndex, HnswParams};
23use crate::index_config::{IndexConfig, IndexType};
24
25use super::codec_dispatch::CollectionCodec;
26use super::payload_index::PayloadIndexSet;
27use super::segment::{BuildRequest, BuildingSegment, DEFAULT_SEAL_THRESHOLD, SealedSegment};
28
29pub struct VectorCollection {
33 pub(crate) growing: FlatIndex,
35 pub(crate) growing_base_id: u32,
37 pub(crate) sealed: Vec<SealedSegment>,
39 pub(crate) building: Vec<BuildingSegment>,
41 pub(crate) params: HnswParams,
43 pub(crate) next_id: u32,
45 pub(crate) next_segment_id: u32,
47 pub(crate) dim: usize,
49 pub(crate) data_dir: Option<std::path::PathBuf>,
51 pub(crate) ram_budget_bytes: usize,
53 pub(crate) mmap_fallback_count: u32,
55 pub(crate) mmap_segment_count: u32,
57 pub surrogate_map: HashMap<u32, Surrogate>,
59 pub surrogate_to_local: HashMap<Surrogate, u32>,
61 pub multi_doc_map: HashMap<Surrogate, Vec<u32>>,
64 pub(crate) seal_threshold: usize,
66 pub(crate) index_config: IndexConfig,
68 pub codec_dispatch: Option<CollectionCodec>,
73 pub(crate) quantization: VectorQuantization,
79 pub payload: PayloadIndexSet,
84 pub arena_index: Option<u32>,
90}
91
92impl VectorCollection {
93 pub fn new(dim: usize, params: HnswParams) -> Self {
95 Self::with_seal_threshold(dim, params, DEFAULT_SEAL_THRESHOLD)
96 }
97
98 pub fn with_seal_threshold(dim: usize, params: HnswParams, seal_threshold: usize) -> Self {
100 let index_config = IndexConfig {
101 hnsw: params.clone(),
102 ..IndexConfig::default()
103 };
104 Self::with_seal_threshold_and_config(dim, index_config, seal_threshold)
105 }
106
107 pub fn with_index_config(dim: usize, config: IndexConfig) -> Self {
109 Self::with_seal_threshold_and_config(dim, config, DEFAULT_SEAL_THRESHOLD)
110 }
111
112 pub fn with_seal_threshold_and_config(
114 dim: usize,
115 config: IndexConfig,
116 seal_threshold: usize,
117 ) -> Self {
118 let params = config.hnsw.clone();
119 Self {
120 growing: FlatIndex::new(dim, params.metric),
121 growing_base_id: 0,
122 sealed: Vec::new(),
123 building: Vec::new(),
124 params,
125 next_id: 0,
126 next_segment_id: 0,
127 dim,
128 data_dir: None,
129 ram_budget_bytes: 0,
130 mmap_fallback_count: 0,
131 mmap_segment_count: 0,
132 surrogate_map: HashMap::new(),
133 surrogate_to_local: HashMap::new(),
134 multi_doc_map: HashMap::new(),
135 seal_threshold,
136 index_config: config,
137 codec_dispatch: None,
138 quantization: VectorQuantization::default(),
139 payload: PayloadIndexSet::default(),
140 arena_index: None,
141 }
142 }
143
144 pub fn with_seed(dim: usize, params: HnswParams, _seed: u64) -> Self {
146 Self::with_seal_threshold(dim, params, DEFAULT_SEAL_THRESHOLD)
147 }
148
149 pub fn needs_seal(&self) -> bool {
151 self.growing.len() >= self.seal_threshold
152 }
153
154 pub fn seal(&mut self, key: &str) -> Option<BuildRequest> {
156 if self.growing.is_empty() {
157 return None;
158 }
159
160 let segment_id = self.next_segment_id;
161 self.next_segment_id += 1;
162
163 let count = self.growing.len();
164 let mut vectors = Vec::with_capacity(count);
165 for i in 0..count as u32 {
166 if let Some(v) = self.growing.get_vector(i) {
167 vectors.push(v.to_vec());
168 }
169 }
170
171 let old_growing = std::mem::replace(
172 &mut self.growing,
173 FlatIndex::new(self.dim, self.params.metric),
174 );
175 let old_base = self.growing_base_id;
176 self.growing_base_id = self.next_id;
177
178 self.building.push(BuildingSegment {
179 flat: old_growing,
180 base_id: old_base,
181 segment_id,
182 });
183
184 Some(BuildRequest {
185 key: key.to_string(),
186 segment_id,
187 vectors,
188 dim: self.dim,
189 params: self.params.clone(),
190 })
191 }
192
193 pub fn complete_build(&mut self, segment_id: u32, index: HnswIndex) {
200 if let Some(pos) = self
201 .building
202 .iter()
203 .position(|b| b.segment_id == segment_id)
204 {
205 let building = self.building.remove(pos);
206 let use_codec_dispatch = matches!(
207 self.quantization,
208 VectorQuantization::RaBitQ | VectorQuantization::Bbq
209 );
210 let use_pq = !use_codec_dispatch && self.index_config.index_type == IndexType::HnswPq;
211 let (sq8, pq) = if use_codec_dispatch {
212 (None, None)
213 } else if use_pq {
214 (
215 None,
216 Self::build_pq_for_index(&index, self.index_config.pq_m),
217 )
218 } else {
219 (Self::build_sq8_for_index(&index), None)
220 };
221 let (tier, mmap_vectors) =
222 self.resolve_tier_for_build(segment_id, building.base_id, &index);
223
224 self.sealed.push(SealedSegment {
225 index,
226 base_id: building.base_id,
227 sq8,
228 pq,
229 tier,
230 mmap_vectors,
231 });
232
233 if use_codec_dispatch {
234 let tag = match self.quantization {
235 VectorQuantization::RaBitQ => "rabitq",
236 VectorQuantization::Bbq => "bbq",
237 _ => unreachable!(
238 "invariant: use_codec_dispatch is only true for RaBitQ and Bbq quantization variants"
239 ),
240 };
241 self.build_codec_dispatch(tag);
242 }
243 }
244 }
245
246 pub fn sealed_segments(&self) -> &[SealedSegment] {
248 &self.sealed
249 }
250
251 pub fn sealed_segments_mut(&mut self) -> &mut Vec<SealedSegment> {
253 &mut self.sealed
254 }
255
256 pub fn growing_is_empty(&self) -> bool {
258 self.growing.is_empty()
259 }
260
261 pub fn len(&self) -> usize {
262 let mut total = self.growing.len();
263 for seg in &self.sealed {
264 total += seg.index.len();
265 }
266 for seg in &self.building {
267 total += seg.flat.len();
268 }
269 total
270 }
271
272 pub fn live_count(&self) -> usize {
273 let mut total = self.growing.live_count();
274 for seg in &self.sealed {
275 total += seg.index.live_count();
276 }
277 for seg in &self.building {
278 total += seg.flat.live_count();
279 }
280 total
281 }
282
283 pub fn is_empty(&self) -> bool {
284 self.live_count() == 0
285 }
286
287 pub fn dim(&self) -> usize {
288 self.dim
289 }
290
291 pub fn params(&self) -> &HnswParams {
292 &self.params
293 }
294
295 pub fn set_params(&mut self, params: HnswParams) {
297 self.params = params;
298 }
299
300 pub fn set_quantization(&mut self, q: VectorQuantization) {
302 self.quantization = q;
303 }
304
305 pub fn quantization(&self) -> VectorQuantization {
307 self.quantization
308 }
309
310 pub fn configure_payload_indexes(&mut self, fields: &[String]) {
312 use super::payload_index::PayloadIndexKind;
313 for field in fields {
314 self.payload
315 .add_index(field.as_str(), PayloadIndexKind::Equality);
316 }
317 }
318}