nodedb_vector/collection/
lifecycle.rs1use std::collections::HashMap;
18
19use nodedb_types::{Surrogate, VectorQuantization};
20
21use crate::flat::FlatIndex;
22use crate::hnsw::{HnswIndex, HnswParams};
23use crate::index_config::{IndexConfig, IndexType};
24
25use super::codec_dispatch::CollectionCodec;
26use super::payload_index::PayloadIndexSet;
27use super::segment::{BuildRequest, BuildingSegment, DEFAULT_SEAL_THRESHOLD, SealedSegment};
28
29pub struct VectorCollection {
33 pub(crate) growing: FlatIndex,
35 pub(crate) growing_base_id: u32,
37 pub(crate) sealed: Vec<SealedSegment>,
39 pub(crate) building: Vec<BuildingSegment>,
41 pub(crate) params: HnswParams,
43 pub(crate) next_id: u32,
45 pub(crate) next_segment_id: u32,
47 pub(crate) dim: usize,
49 pub(crate) data_dir: Option<std::path::PathBuf>,
51 pub(crate) ram_budget_bytes: usize,
53 pub(crate) mmap_fallback_count: u32,
55 pub(crate) mmap_segment_count: u32,
57 pub surrogate_map: HashMap<u32, Surrogate>,
59 pub surrogate_to_local: HashMap<Surrogate, u32>,
61 pub multi_doc_map: HashMap<Surrogate, Vec<u32>>,
64 pub(crate) seal_threshold: usize,
66 pub(crate) index_config: IndexConfig,
68 pub codec_dispatch: Option<CollectionCodec>,
73 pub(crate) quantization: VectorQuantization,
79 pub payload: PayloadIndexSet,
84 pub arena_index: Option<u32>,
90}
91
92impl VectorCollection {
93 pub fn new(dim: usize, params: HnswParams) -> Self {
95 Self::with_seal_threshold(dim, params, DEFAULT_SEAL_THRESHOLD)
96 }
97
98 pub fn with_seal_threshold(dim: usize, params: HnswParams, seal_threshold: usize) -> Self {
100 let index_config = IndexConfig {
101 hnsw: params.clone(),
102 ..IndexConfig::default()
103 };
104 Self::with_seal_threshold_and_config(dim, index_config, seal_threshold)
105 }
106
107 pub fn with_index_config(dim: usize, config: IndexConfig) -> Self {
109 Self::with_seal_threshold_and_config(dim, config, DEFAULT_SEAL_THRESHOLD)
110 }
111
112 pub fn with_seal_threshold_and_config(
114 dim: usize,
115 config: IndexConfig,
116 seal_threshold: usize,
117 ) -> Self {
118 let params = config.hnsw.clone();
119 Self {
120 growing: FlatIndex::new(dim, params.metric),
121 growing_base_id: 0,
122 sealed: Vec::new(),
123 building: Vec::new(),
124 params,
125 next_id: 0,
126 next_segment_id: 0,
127 dim,
128 data_dir: None,
129 ram_budget_bytes: 0,
130 mmap_fallback_count: 0,
131 mmap_segment_count: 0,
132 surrogate_map: HashMap::new(),
133 surrogate_to_local: HashMap::new(),
134 multi_doc_map: HashMap::new(),
135 seal_threshold,
136 index_config: config,
137 codec_dispatch: None,
138 quantization: VectorQuantization::default(),
139 payload: PayloadIndexSet::default(),
140 arena_index: None,
141 }
142 }
143
144 pub fn with_seed(dim: usize, params: HnswParams, _seed: u64) -> Self {
146 Self::with_seal_threshold(dim, params, DEFAULT_SEAL_THRESHOLD)
147 }
148
149 pub fn needs_seal(&self) -> bool {
151 self.growing.len() >= self.seal_threshold
152 }
153
154 pub fn seal(&mut self, key: &str) -> Option<BuildRequest> {
156 if self.growing.is_empty() {
157 return None;
158 }
159
160 let segment_id = self.next_segment_id;
161 self.next_segment_id += 1;
162
163 let count = self.growing.len();
164 let mut vectors = Vec::with_capacity(count);
166 for i in 0..count as u32 {
167 if let Some(v) = self.growing.get_vector(i) {
168 vectors.push(v.to_vec());
169 }
170 }
171
172 let old_growing = std::mem::replace(
173 &mut self.growing,
174 FlatIndex::new(self.dim, self.params.metric),
175 );
176 let old_base = self.growing_base_id;
177 self.growing_base_id = self.next_id;
178
179 self.building.push(BuildingSegment {
180 flat: old_growing,
181 base_id: old_base,
182 segment_id,
183 });
184
185 Some(BuildRequest {
186 key: key.to_string(),
187 segment_id,
188 vectors,
189 dim: self.dim,
190 params: self.params.clone(),
191 })
192 }
193
194 pub fn complete_build(&mut self, segment_id: u32, index: HnswIndex) {
201 if let Some(pos) = self
202 .building
203 .iter()
204 .position(|b| b.segment_id == segment_id)
205 {
206 let building = self.building.remove(pos);
207 let use_codec_dispatch = matches!(
208 self.quantization,
209 VectorQuantization::RaBitQ | VectorQuantization::Bbq
210 );
211 let use_pq = !use_codec_dispatch && self.index_config.index_type == IndexType::HnswPq;
212 let (sq8, pq) = if use_codec_dispatch {
213 (None, None)
214 } else if use_pq {
215 (
216 None,
217 Self::build_pq_for_index(&index, self.index_config.pq_m),
218 )
219 } else {
220 (Self::build_sq8_for_index(&index), None)
221 };
222 let (tier, mmap_vectors) =
223 self.resolve_tier_for_build(segment_id, building.base_id, &index);
224
225 self.sealed.push(SealedSegment {
226 index,
227 base_id: building.base_id,
228 sq8,
229 pq,
230 tier,
231 mmap_vectors,
232 });
233
234 if use_codec_dispatch {
235 let tag = match self.quantization {
236 VectorQuantization::RaBitQ => "rabitq",
237 VectorQuantization::Bbq => "bbq",
238 _ => unreachable!(
239 "invariant: use_codec_dispatch is only true for RaBitQ and Bbq quantization variants"
240 ),
241 };
242 self.build_codec_dispatch(tag);
243 }
244 }
245 }
246
247 pub fn sealed_segments(&self) -> &[SealedSegment] {
249 &self.sealed
250 }
251
252 pub fn sealed_segments_mut(&mut self) -> &mut Vec<SealedSegment> {
254 &mut self.sealed
255 }
256
257 pub fn growing_is_empty(&self) -> bool {
259 self.growing.is_empty()
260 }
261
262 pub fn len(&self) -> usize {
263 let mut total = self.growing.len();
264 for seg in &self.sealed {
265 total += seg.index.len();
266 }
267 for seg in &self.building {
268 total += seg.flat.len();
269 }
270 total
271 }
272
273 pub fn live_count(&self) -> usize {
274 let mut total = self.growing.live_count();
275 for seg in &self.sealed {
276 total += seg.index.live_count();
277 }
278 for seg in &self.building {
279 total += seg.flat.live_count();
280 }
281 total
282 }
283
284 pub fn is_empty(&self) -> bool {
285 self.live_count() == 0
286 }
287
288 pub fn dim(&self) -> usize {
289 self.dim
290 }
291
292 pub fn params(&self) -> &HnswParams {
293 &self.params
294 }
295
296 pub fn set_params(&mut self, params: HnswParams) {
298 self.params = params;
299 }
300
301 pub fn set_quantization(&mut self, q: VectorQuantization) {
303 self.quantization = q;
304 }
305
306 pub fn quantization(&self) -> VectorQuantization {
308 self.quantization
309 }
310
311 pub fn configure_payload_indexes(&mut self, fields: &[String]) {
313 use super::payload_index::PayloadIndexKind;
314 for field in fields {
315 self.payload
316 .add_index(field.as_str(), PayloadIndexKind::Equality);
317 }
318 }
319}