Skip to main content

velesdb_core/collection/core/
crud.rs

1//! CRUD operations for Collection (upsert, get, delete).
2//!
3//! Quantization caching helpers and secondary-index update helpers are in `crud_helpers.rs`.
4
5use crate::collection::types::Collection;
6use crate::error::{Error, Result};
7use crate::index::VectorIndex;
8use crate::point::Point;
9use crate::quantization::{BinaryQuantizedVector, PQVector, QuantizedVector, StorageMode};
10use crate::storage::{LogPayloadStorage, PayloadStorage, VectorStorage};
11use crate::validation::validate_dimension_match;
12
13use parking_lot::RwLockWriteGuard;
14use std::collections::{BTreeMap, HashMap};
15
16struct QuantizationGuards<'a> {
17    sq8: Option<RwLockWriteGuard<'a, HashMap<u64, QuantizedVector>>>,
18    binary: Option<RwLockWriteGuard<'a, HashMap<u64, BinaryQuantizedVector>>>,
19    pq: Option<RwLockWriteGuard<'a, HashMap<u64, PQVector>>>,
20}
21
22impl<'a> QuantizationGuards<'a> {
23    fn acquire(collection: &'a Collection, mode: StorageMode) -> Self {
24        Self {
25            sq8: matches!(mode, StorageMode::SQ8).then(|| collection.sq8_cache.write()),
26            binary: matches!(mode, StorageMode::Binary).then(|| collection.binary_cache.write()),
27            pq: matches!(mode, StorageMode::ProductQuantization)
28                .then(|| collection.pq_cache.write()),
29        }
30    }
31}
32
33impl Collection {
34    /// Inserts or updates points in the collection.
35    ///
36    /// Accepts any iterator of points (Vec, slice, array, etc.)
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if any point has a mismatched dimension, or if
41    /// attempting to insert vectors into a metadata-only collection.
42    pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
43        let points: Vec<Point> = points.into_iter().collect();
44        let config = self.config.read();
45        let dimension = config.dimension;
46        let storage_mode = config.storage_mode;
47
48        if config.metadata_only {
49            for point in &points {
50                if !point.vector.is_empty() {
51                    return Err(Error::VectorNotAllowed(config.name.clone()));
52                }
53            }
54            drop(config);
55            return self.upsert_metadata(points);
56        }
57        drop(config);
58
59        for point in &points {
60            validate_dimension_match(dimension, point.dimension())?;
61        }
62
63        let sparse_batch = self.upsert_storage_and_index(&points, storage_mode)?;
64
65        self.apply_sparse_batch_upsert(&sparse_batch)?;
66        self.invalidate_caches_and_bump_generation();
67        Ok(())
68    }
69
70    /// Stores vectors, payloads, and indexes for a batch of points.
71    ///
72    /// Returns buffered sparse vectors for deferred insertion.
73    fn upsert_storage_and_index(
74        &self,
75        points: &[Point],
76        storage_mode: StorageMode,
77    ) -> Result<Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>> {
78        let mut vector_storage = self.vector_storage.write();
79        let mut payload_storage = self.payload_storage.write();
80        let mut quant_guards = QuantizationGuards::acquire(self, storage_mode);
81
82        let mut sparse_batch = Vec::new();
83        for point in points {
84            let old_payload = payload_storage.retrieve(point.id).ok().flatten();
85            vector_storage.store(point.id, &point.vector)?;
86
87            let (sq8, binary, pq) = (
88                quant_guards.sq8.as_deref_mut(),
89                quant_guards.binary.as_deref_mut(),
90                quant_guards.pq.as_deref_mut(),
91            );
92            self.cache_quantized_vector(point, storage_mode, sq8, binary, pq);
93
94            Self::store_or_delete_payload(&mut payload_storage, point)?;
95            self.update_secondary_indexes_on_upsert(
96                point.id,
97                old_payload.as_ref(),
98                point.payload.as_ref(),
99            );
100            self.insert_or_defer(point.id, &point.vector);
101            Self::update_text_index(&self.text_index, point);
102            Self::collect_sparse_vectors(point, &mut sparse_batch);
103        }
104
105        let point_count = vector_storage.len();
106        vector_storage.flush()?;
107        payload_storage.flush()?;
108        drop(vector_storage);
109        drop(payload_storage);
110
111        self.config.write().point_count = point_count;
112        self.maybe_merge_deferred();
113
114        Ok(sparse_batch)
115    }
116
117    fn store_or_delete_payload(
118        payload_storage: &mut LogPayloadStorage,
119        point: &Point,
120    ) -> Result<()> {
121        if let Some(payload) = &point.payload {
122            payload_storage.store(point.id, payload)?;
123        } else {
124            let _ = payload_storage.delete(point.id);
125        }
126        Ok(())
127    }
128
129    fn collect_sparse_vectors(
130        point: &Point,
131        sparse_batch: &mut Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>,
132    ) {
133        if let Some(sv_map) = &point.sparse_vectors {
134            if !sv_map.is_empty() {
135                sparse_batch.push((point.id, sv_map.clone()));
136            }
137        }
138    }
139
140    /// Updates the BM25 text index for a single point.
141    fn update_text_index(text_index: &crate::index::Bm25Index, point: &Point) {
142        if let Some(payload) = &point.payload {
143            let text = Self::extract_text_from_payload(payload);
144            if !text.is_empty() {
145                text_index.add_document(point.id, &text);
146            }
147        } else {
148            text_index.remove_document(point.id);
149        }
150    }
151
152    /// Applies buffered sparse vector upserts with WAL-before-apply semantics.
153    fn apply_sparse_batch_upsert(
154        &self,
155        sparse_batch: &[(u64, BTreeMap<String, crate::index::sparse::SparseVector>)],
156    ) -> Result<()> {
157        if sparse_batch.is_empty() {
158            return Ok(());
159        }
160        #[cfg(feature = "persistence")]
161        {
162            for (point_id, sv_map) in sparse_batch {
163                for (name, sv) in sv_map {
164                    let wal_path =
165                        crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
166                    crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
167                }
168            }
169        }
170        let mut indexes = self.sparse_indexes.write();
171        for (point_id, sv_map) in sparse_batch {
172            for (name, sv) in sv_map {
173                let idx = indexes.entry(name.clone()).or_default();
174                idx.insert(*point_id, sv);
175            }
176        }
177        Ok(())
178    }
179
180    /// Invalidates stats cache and bumps write generation.
181    fn invalidate_caches_and_bump_generation(&self) {
182        *self.cached_stats.lock() = None;
183        self.write_generation
184            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
185    }
186
187    /// Inserts into HNSW directly, or buffers in the deferred indexer.
188    ///
189    /// When a deferred indexer is present, the vector is pushed into the
190    /// deferred buffer instead of the HNSW graph. Otherwise falls through
191    /// to `VectorIndex::insert`.
192    ///
193    /// Invariant: `self.deferred_indexer` is `Some` only when enabled
194    /// (`build_deferred_indexer` filters on `cfg.enabled`), so no
195    /// redundant `is_enabled()` check is needed here.
196    fn insert_or_defer(&self, id: u64, vector: &[f32]) {
197        #[cfg(feature = "persistence")]
198        if let Some(ref di) = self.deferred_indexer {
199            di.push(id, vector.to_vec());
200            return;
201        }
202        self.index.insert(id, vector);
203    }
204
205    /// Triggers a deferred merge if the buffer has reached threshold.
206    ///
207    /// Drains buffered vectors and batch-inserts them into HNSW.
208    /// No-op when deferred indexing is not configured.
209    fn maybe_merge_deferred(&self) {
210        #[cfg(feature = "persistence")]
211        if let Some(ref di) = self.deferred_indexer {
212            if di.should_merge() {
213                self.merge_deferred_batch(di);
214            }
215        }
216    }
217
218    /// Drains the deferred indexer and batch-inserts into HNSW.
219    ///
220    /// Filters out IDs that have been deleted from vector storage since they
221    /// were buffered, preventing ghost vectors from being re-inserted into
222    /// HNSW after a concurrent delete.
223    ///
224    /// Logs a warning if fewer vectors were inserted than expected, which
225    /// indicates a partial failure (e.g., duplicate IDs filtered out,
226    /// ghost-vector filtering, or graph insertion error). The drained
227    /// vectors are not retried.
228    #[cfg(feature = "persistence")]
229    fn merge_deferred_batch(&self, di: &crate::collection::streaming::DeferredIndexer) {
230        let drained = di.swap_and_drain();
231        if drained.is_empty() {
232            return;
233        }
234        // Filter out vectors deleted from storage during the buffer's
235        // lifetime to prevent ghost re-insertion into HNSW.
236        let storage = self.vector_storage.read();
237        let valid: Vec<(u64, &[f32])> = drained
238            .iter()
239            .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
240            .map(|(id, v)| (*id, v.as_slice()))
241            .collect();
242        drop(storage); // Release read lock before batch insert
243        let expected = valid.len();
244        if valid.is_empty() {
245            return;
246        }
247        let inserted = self.index.insert_batch_parallel(valid);
248        if inserted < expected {
249            tracing::warn!("merge_deferred_batch: inserted {inserted}/{expected} vectors");
250        }
251    }
252
253    /// Inserts or updates metadata-only points (no vectors).
254    ///
255    /// This method is for metadata-only collections. Points should have
256    /// empty vectors and only contain payload data.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if storage operations fail.
261    pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
262        let points: Vec<Point> = points.into_iter().collect();
263
264        let mut payload_storage = self.payload_storage.write();
265
266        for point in &points {
267            let old_payload = payload_storage.retrieve(point.id).ok().flatten();
268            Self::store_or_delete_payload(&mut payload_storage, point)?;
269            Self::update_text_index(&self.text_index, point);
270            self.update_secondary_indexes_on_upsert(
271                point.id,
272                old_payload.as_ref(),
273                point.payload.as_ref(),
274            );
275        }
276
277        // LOCK ORDER: flush while payload_storage(3) still held, then drop before acquiring config(1).
278        let point_count = payload_storage.ids().len();
279        payload_storage.flush()?;
280        drop(payload_storage);
281
282        // config(1) only — all higher-numbered locks released above.
283        self.config.write().point_count = point_count;
284        self.invalidate_caches_and_bump_generation();
285        Ok(())
286    }
287
288    /// Bulk insert optimized for high-throughput import.
289    ///
290    /// # Performance
291    ///
292    /// This method is optimized for bulk loading:
293    /// - Uses parallel HNSW insertion (rayon)
294    /// - Single flush at the end (not per-point)
295    /// - No HNSW index save (deferred for performance)
296    /// - ~15x faster than previous sequential approach on large batches (5000+)
297    /// - Benchmark: 25-30 Kvec/s on 768D vectors
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if any point has a mismatched dimension.
302    pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
303        if points.is_empty() {
304            return Ok(0);
305        }
306
307        let dimension = self.config.read().dimension;
308        for point in points {
309            validate_dimension_match(dimension, point.dimension())?;
310        }
311
312        let vector_refs: Vec<(u64, &[f32])> =
313            points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
314        let sparse_batch = Self::collect_sparse_batch(points);
315
316        self.bulk_store_vectors(&vector_refs)?;
317        self.bulk_store_payloads(points)?;
318
319        let inserted = self.bulk_index_or_defer(vector_refs);
320        self.config.write().point_count = self.vector_storage.read().len();
321
322        self.apply_sparse_batch_bulk(&sparse_batch)?;
323        self.invalidate_caches_and_bump_generation();
324
325        Ok(inserted)
326    }
327
328    /// Batch-inserts into HNSW or defers into the deferred indexer.
329    ///
330    /// Returns the number of vectors processed (whether indexed directly
331    /// or deferred for later merge).
332    ///
333    /// Invariant: `self.deferred_indexer` is `Some` only when enabled
334    /// (`build_deferred_indexer` filters on `cfg.enabled`), so no
335    /// redundant `is_enabled()` check is needed here.
336    fn bulk_index_or_defer(&self, vector_refs: Vec<(u64, &[f32])>) -> usize {
337        #[cfg(feature = "persistence")]
338        if let Some(ref di) = self.deferred_indexer {
339            di.extend(vector_refs.iter().map(|(id, v)| (*id, v.to_vec())));
340            if di.should_merge() {
341                self.merge_deferred_batch(di);
342            }
343            return vector_refs.len();
344        }
345        let inserted = self.index.insert_batch_parallel(vector_refs);
346        self.index.set_searching_mode();
347        inserted
348    }
349
350    /// Collects sparse vectors grouped by index name for batch insert.
351    fn collect_sparse_batch(
352        points: &[Point],
353    ) -> BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> {
354        let mut batch: BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> =
355            BTreeMap::new();
356        for point in points {
357            if let Some(sv_map) = &point.sparse_vectors {
358                for (name, sv) in sv_map {
359                    batch
360                        .entry(name.clone())
361                        .or_default()
362                        .push((point.id, sv.clone()));
363                }
364            }
365        }
366        batch
367    }
368
369    /// Stores vectors in bulk via batch WAL + mmap write.
370    fn bulk_store_vectors(&self, vectors: &[(u64, &[f32])]) -> Result<()> {
371        let mut storage = self.vector_storage.write();
372        storage.store_batch(vectors)?;
373        storage.flush()?;
374        Ok(())
375    }
376
377    /// Stores payloads and updates BM25 text index in bulk.
378    ///
379    /// Uses `LogPayloadStorage::store_batch()` for a single WAL sync instead
380    /// of per-point fsync, improving bulk insert throughput by 10-50x.
381    fn bulk_store_payloads(&self, points: &[Point]) -> Result<()> {
382        let entries: Vec<(u64, &serde_json::Value)> = points
383            .iter()
384            .filter_map(|p| p.payload.as_ref().map(|pl| (p.id, pl)))
385            .collect();
386
387        self.payload_storage.write().store_batch(&entries)?;
388
389        for point in points {
390            Self::update_text_index(&self.text_index, point);
391        }
392
393        Ok(())
394    }
395
396    /// Applies sparse batch with WAL-before-apply for bulk insert.
397    fn apply_sparse_batch_bulk(
398        &self,
399        sparse_batch: &BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>>,
400    ) -> Result<()> {
401        if sparse_batch.is_empty() {
402            return Ok(());
403        }
404        #[cfg(feature = "persistence")]
405        {
406            for (name, docs) in sparse_batch {
407                let wal_path =
408                    crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
409                for (point_id, sv) in docs {
410                    crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
411                }
412            }
413        }
414        let mut indexes = self.sparse_indexes.write();
415        for (name, docs) in sparse_batch {
416            let idx = indexes.entry(name.clone()).or_default();
417            idx.insert_batch_chunk(docs);
418        }
419        Ok(())
420    }
421
422    /// Retrieves points by their IDs.
423    #[must_use]
424    pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
425        let config = self.config.read();
426        let is_metadata_only = config.metadata_only;
427        drop(config);
428
429        let payload_storage = self.payload_storage.read();
430
431        if is_metadata_only {
432            // For metadata-only collections, only retrieve payload
433            ids.iter()
434                .map(|&id| {
435                    let payload = payload_storage.retrieve(id).ok().flatten()?;
436                    Some(Point {
437                        id,
438                        vector: Vec::new(),
439                        payload: Some(payload),
440                        sparse_vectors: None,
441                    })
442                })
443                .collect()
444        } else {
445            // For vector collections, retrieve both vector and payload
446            let vector_storage = self.vector_storage.read();
447            ids.iter()
448                .map(|&id| {
449                    let vector = vector_storage.retrieve(id).ok().flatten()?;
450                    let payload = payload_storage.retrieve(id).ok().flatten();
451                    Some(Point {
452                        id,
453                        vector,
454                        payload,
455                        sparse_vectors: None,
456                    })
457                })
458                .collect()
459        }
460    }
461
462    /// Deletes points by their IDs.
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if storage operations fail.
467    pub fn delete(&self, ids: &[u64]) -> Result<()> {
468        if self.config.read().metadata_only {
469            self.delete_metadata_only(ids)?;
470        } else {
471            self.delete_vector_points(ids)?;
472        }
473        self.invalidate_caches_and_bump_generation();
474        Ok(())
475    }
476
477    /// Deletes metadata-only points.
478    fn delete_metadata_only(&self, ids: &[u64]) -> Result<()> {
479        let mut payload_storage = self.payload_storage.write();
480        for &id in ids {
481            let old_payload = payload_storage.retrieve(id).ok().flatten();
482            payload_storage.delete(id)?;
483            self.text_index.remove_document(id);
484            self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
485        }
486        let point_count = payload_storage.ids().len();
487        drop(payload_storage);
488        self.config.write().point_count = point_count;
489        Ok(())
490    }
491
492    /// Deletes vector points from all stores (vector, payload, index, caches, sparse, delta).
493    fn delete_vector_points(&self, ids: &[u64]) -> Result<()> {
494        let mut payload_storage = self.payload_storage.write();
495        let mut vector_storage = self.vector_storage.write();
496        let mut sq8_cache = self.sq8_cache.write();
497        let mut binary_cache = self.binary_cache.write();
498        let mut pq_cache = self.pq_cache.write();
499
500        for &id in ids {
501            let old_payload = payload_storage.retrieve(id).ok().flatten();
502            vector_storage.delete(id)?;
503            payload_storage.delete(id)?;
504            self.index.remove(id);
505            sq8_cache.remove(&id);
506            binary_cache.remove(&id);
507            pq_cache.remove(&id);
508            self.text_index.remove_document(id);
509            self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
510        }
511
512        let point_count = vector_storage.len();
513        drop(vector_storage);
514        drop(payload_storage);
515        drop(sq8_cache);
516        drop(binary_cache);
517        drop(pq_cache);
518        self.config.write().point_count = point_count;
519
520        self.delete_from_sparse_indexes(ids)?;
521
522        // Lock order: delta_buffer(10) acquired after sparse_indexes(9) released.
523        #[cfg(feature = "persistence")]
524        for &id in ids {
525            self.delta_buffer.remove(id);
526        }
527
528        // Lock order: deferred_indexer(11) acquired after delta_buffer(10).
529        #[cfg(feature = "persistence")]
530        if let Some(ref di) = self.deferred_indexer {
531            for &id in ids {
532                di.remove(id);
533            }
534        }
535
536        Ok(())
537    }
538
539    /// Deletes IDs from sparse indexes with WAL-before-apply.
540    fn delete_from_sparse_indexes(&self, ids: &[u64]) -> Result<()> {
541        #[cfg(feature = "persistence")]
542        {
543            let indexes = self.sparse_indexes.read();
544            for (name, _) in indexes.iter() {
545                let wal_path =
546                    crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
547                for &id in ids {
548                    crate::index::sparse::persistence::wal_append_delete(&wal_path, id)?;
549                }
550            }
551        }
552        let indexes = self.sparse_indexes.read();
553        for idx in indexes.values() {
554            for &id in ids {
555                idx.delete(id);
556            }
557        }
558        Ok(())
559    }
560
561    /// Returns the number of points in the collection.
562    /// Perf: Uses cached `point_count` from config instead of acquiring storage lock
563    #[must_use]
564    pub fn len(&self) -> usize {
565        self.config.read().point_count
566    }
567
568    /// Returns true if the collection is empty.
569    /// Perf: Uses cached `point_count` from config instead of acquiring storage lock
570    #[must_use]
571    pub fn is_empty(&self) -> bool {
572        self.config.read().point_count == 0
573    }
574
575    /// Returns all point IDs in the collection.
576    #[must_use]
577    pub fn all_ids(&self) -> Vec<u64> {
578        self.payload_storage.read().ids()
579    }
580}