Skip to main content

nodedb_vector/collection/
lifecycle.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! VectorCollection lifecycle: insert, delete, seal, complete_build, compact.
4//!
5//! Identity model: every vector inserted into the collection is bound to
6//! a global `Surrogate` allocated by the Control Plane before the engine
7//! sees the call. The HNSW segments keep their dense local node-ids
8//! internally for cache-locality and SIMD traversal; this file owns the
9//! `surrogate_map: HashMap<u32, Surrogate>` (global node-id → surrogate)
10//! and reverse `surrogate_to_local: HashMap<Surrogate, u32>` (for
11//! point-delete by surrogate). User-PK strings live in the catalog and
12//! are translated at the Control Plane response boundary.
13//!
14//! Insert/delete ops live in `lifecycle_insert_ops`.
15//! Compact/snapshot ops live in `lifecycle_compact`.
16
17use std::collections::HashMap;
18
19use nodedb_types::{Surrogate, VectorQuantization};
20
21use crate::flat::FlatIndex;
22use crate::hnsw::{HnswIndex, HnswParams};
23use crate::index_config::{IndexConfig, IndexType};
24
25use super::codec_dispatch::CollectionCodec;
26use super::payload_index::PayloadIndexSet;
27use super::segment::{BuildRequest, BuildingSegment, DEFAULT_SEAL_THRESHOLD, SealedSegment};
28
29/// Manages all vector segments for a single collection (one index key).
30///
31/// This type is `!Send` — owned by a single Data Plane core.
32pub struct VectorCollection {
33    /// Active growing segment (append-only, brute-force search).
34    pub(crate) growing: FlatIndex,
35    /// Base ID for the growing segment's vectors.
36    pub(crate) growing_base_id: u32,
37    /// Sealed segments with completed HNSW indexes.
38    pub(crate) sealed: Vec<SealedSegment>,
39    /// Segments being built in background (brute-force searchable).
40    pub(crate) building: Vec<BuildingSegment>,
41    /// HNSW params for this collection.
42    pub(crate) params: HnswParams,
43    /// Global vector ID counter (monotonic across all segments).
44    pub(crate) next_id: u32,
45    /// Next segment ID (monotonic).
46    pub(crate) next_segment_id: u32,
47    /// Dimensionality.
48    pub(crate) dim: usize,
49    /// Data directory for mmap segment files (L1 NVMe tier).
50    pub(crate) data_dir: Option<std::path::PathBuf>,
51    /// Memory budget for this collection's RAM vectors (bytes).
52    pub(crate) ram_budget_bytes: usize,
53    /// Count of segments that fell back to mmap due to budget exhaustion.
54    pub(crate) mmap_fallback_count: u32,
55    /// Count of segments currently backed by mmap files.
56    pub(crate) mmap_segment_count: u32,
57    /// Mapping from internal global vector ID → surrogate.
58    pub surrogate_map: HashMap<u32, Surrogate>,
59    /// Reverse map: surrogate → global vector ID. Used by point delete.
60    pub surrogate_to_local: HashMap<Surrogate, u32>,
61    /// Reverse mapping for multi-vector documents:
62    /// document_surrogate → list of global vector IDs.
63    pub multi_doc_map: HashMap<Surrogate, Vec<u32>>,
64    /// Number of vectors in the growing segment before sealing.
65    pub(crate) seal_threshold: usize,
66    /// Full index configuration (index type, PQ params, IVF params).
67    pub(crate) index_config: IndexConfig,
68    /// Optional collection-level codec-dispatch index (RaBitQ or BBQ).
69    /// Present only when the collection was built with a non-Sq8 quantization.
70    /// Coexists with sealed segments — for codec-dispatched collections the
71    /// per-segment Sq8 builder is skipped and this index is used instead.
72    pub codec_dispatch: Option<CollectionCodec>,
73    /// Quantization mode requested at collection-creation time.
74    ///
75    /// When `!= None && != Sq8`, each call to `complete_build` additionally
76    /// rebuilds `codec_dispatch` over all vectors so the codec-dispatch path
77    /// is always up-to-date after a segment seals.
78    pub(crate) quantization: VectorQuantization,
79    /// In-memory payload bitmap indexes for vector-primary collections.
80    ///
81    /// Empty (no indexes) by default; populated at construction time from
82    /// `VectorPrimaryConfig::payload_indexes`.
83    pub payload: PayloadIndexSet,
84    /// Optional dedicated memory arena index for this collection.
85    ///
86    /// Set by the Data Plane after requesting a per-collection arena from
87    /// `nodedb_mem::CollectionArenaRegistry`. Used only for stats reporting;
88    /// the actual arena pinning is handled externally.
89    pub arena_index: Option<u32>,
90}
91
92impl VectorCollection {
93    /// Create an empty collection with the default seal threshold.
94    pub fn new(dim: usize, params: HnswParams) -> Self {
95        Self::with_seal_threshold(dim, params, DEFAULT_SEAL_THRESHOLD)
96    }
97
98    /// Create an empty collection with an explicit seal threshold.
99    pub fn with_seal_threshold(dim: usize, params: HnswParams, seal_threshold: usize) -> Self {
100        let index_config = IndexConfig {
101            hnsw: params.clone(),
102            ..IndexConfig::default()
103        };
104        Self::with_seal_threshold_and_config(dim, index_config, seal_threshold)
105    }
106
107    /// Create an empty collection with a full index configuration.
108    pub fn with_index_config(dim: usize, config: IndexConfig) -> Self {
109        Self::with_seal_threshold_and_config(dim, config, DEFAULT_SEAL_THRESHOLD)
110    }
111
112    /// Create an empty collection with a full index config and custom seal threshold.
113    pub fn with_seal_threshold_and_config(
114        dim: usize,
115        config: IndexConfig,
116        seal_threshold: usize,
117    ) -> Self {
118        let params = config.hnsw.clone();
119        Self {
120            growing: FlatIndex::new(dim, params.metric),
121            growing_base_id: 0,
122            sealed: Vec::new(),
123            building: Vec::new(),
124            params,
125            next_id: 0,
126            next_segment_id: 0,
127            dim,
128            data_dir: None,
129            ram_budget_bytes: 0,
130            mmap_fallback_count: 0,
131            mmap_segment_count: 0,
132            surrogate_map: HashMap::new(),
133            surrogate_to_local: HashMap::new(),
134            multi_doc_map: HashMap::new(),
135            seal_threshold,
136            index_config: config,
137            codec_dispatch: None,
138            quantization: VectorQuantization::default(),
139            payload: PayloadIndexSet::default(),
140            arena_index: None,
141        }
142    }
143
144    /// Create with a specific seed (for deterministic testing).
145    pub fn with_seed(dim: usize, params: HnswParams, _seed: u64) -> Self {
146        Self::with_seal_threshold(dim, params, DEFAULT_SEAL_THRESHOLD)
147    }
148
149    /// Check if the growing segment should be sealed.
150    pub fn needs_seal(&self) -> bool {
151        self.growing.len() >= self.seal_threshold
152    }
153
154    /// Seal the growing segment and return a build request.
155    pub fn seal(&mut self, key: &str) -> Option<BuildRequest> {
156        if self.growing.is_empty() {
157            return None;
158        }
159
160        let segment_id = self.next_segment_id;
161        self.next_segment_id += 1;
162
163        let count = self.growing.len();
164        // no-governor: VectorCollection is !Send and has no governor field; budget is enforced by the Data Plane core's arena (arena_index) before dispatching to this struct
165        let mut vectors = Vec::with_capacity(count);
166        for i in 0..count as u32 {
167            if let Some(v) = self.growing.get_vector(i) {
168                vectors.push(v.to_vec());
169            }
170        }
171
172        let old_growing = std::mem::replace(
173            &mut self.growing,
174            FlatIndex::new(self.dim, self.params.metric),
175        );
176        let old_base = self.growing_base_id;
177        self.growing_base_id = self.next_id;
178
179        self.building.push(BuildingSegment {
180            flat: old_growing,
181            base_id: old_base,
182            segment_id,
183        });
184
185        Some(BuildRequest {
186            key: key.to_string(),
187            segment_id,
188            vectors,
189            dim: self.dim,
190            params: self.params.clone(),
191        })
192    }
193
194    /// Accept a completed HNSW build from the background thread.
195    ///
196    /// After promoting the segment to sealed, rebuilds the collection-level
197    /// codec-dispatch index when `self.quantization` is `RaBitQ` or `Bbq`.
198    /// The rebuild trains over all vectors so the codec index always covers
199    /// every sealed segment.
200    pub fn complete_build(&mut self, segment_id: u32, index: HnswIndex) {
201        if let Some(pos) = self
202            .building
203            .iter()
204            .position(|b| b.segment_id == segment_id)
205        {
206            let building = self.building.remove(pos);
207            let use_codec_dispatch = matches!(
208                self.quantization,
209                VectorQuantization::RaBitQ | VectorQuantization::Bbq
210            );
211            let use_pq = !use_codec_dispatch && self.index_config.index_type == IndexType::HnswPq;
212            let (sq8, pq) = if use_codec_dispatch {
213                (None, None)
214            } else if use_pq {
215                (
216                    None,
217                    Self::build_pq_for_index(&index, self.index_config.pq_m),
218                )
219            } else {
220                (Self::build_sq8_for_index(&index), None)
221            };
222            let (tier, mmap_vectors) =
223                self.resolve_tier_for_build(segment_id, building.base_id, &index);
224
225            self.sealed.push(SealedSegment {
226                index,
227                base_id: building.base_id,
228                sq8,
229                pq,
230                tier,
231                mmap_vectors,
232            });
233
234            if use_codec_dispatch {
235                let tag = match self.quantization {
236                    VectorQuantization::RaBitQ => "rabitq",
237                    VectorQuantization::Bbq => "bbq",
238                    _ => unreachable!(
239                        "invariant: use_codec_dispatch is only true for RaBitQ and Bbq quantization variants"
240                    ),
241                };
242                self.build_codec_dispatch(tag);
243            }
244        }
245    }
246
247    /// Access sealed segments (read-only).
248    pub fn sealed_segments(&self) -> &[SealedSegment] {
249        &self.sealed
250    }
251
252    /// Access sealed segments mutably.
253    pub fn sealed_segments_mut(&mut self) -> &mut Vec<SealedSegment> {
254        &mut self.sealed
255    }
256
257    /// Whether the growing segment has no vectors.
258    pub fn growing_is_empty(&self) -> bool {
259        self.growing.is_empty()
260    }
261
262    pub fn len(&self) -> usize {
263        let mut total = self.growing.len();
264        for seg in &self.sealed {
265            total += seg.index.len();
266        }
267        for seg in &self.building {
268            total += seg.flat.len();
269        }
270        total
271    }
272
273    pub fn live_count(&self) -> usize {
274        let mut total = self.growing.live_count();
275        for seg in &self.sealed {
276            total += seg.index.live_count();
277        }
278        for seg in &self.building {
279            total += seg.flat.live_count();
280        }
281        total
282    }
283
284    pub fn is_empty(&self) -> bool {
285        self.live_count() == 0
286    }
287
288    pub fn dim(&self) -> usize {
289        self.dim
290    }
291
292    pub fn params(&self) -> &HnswParams {
293        &self.params
294    }
295
296    /// Update HNSW parameters for future builds.
297    pub fn set_params(&mut self, params: HnswParams) {
298        self.params = params;
299    }
300
301    /// Set the collection-level quantization.
302    pub fn set_quantization(&mut self, q: VectorQuantization) {
303        self.quantization = q;
304    }
305
306    /// Return the configured quantization mode.
307    pub fn quantization(&self) -> VectorQuantization {
308        self.quantization
309    }
310
311    /// Configure payload bitmap indexes from a list of field names.
312    pub fn configure_payload_indexes(&mut self, fields: &[String]) {
313        use super::payload_index::PayloadIndexKind;
314        for field in fields {
315            self.payload
316                .add_index(field.as_str(), PayloadIndexKind::Equality);
317        }
318    }
319}