Skip to main content

velesdb_core/collection/core/
crud.rs

1//! Upsert operations for Collection.
2//!
3//! Read and delete operations are in `crud_read_delete.rs`.
4//! Bulk-specific methods (`upsert_bulk`, `upsert_bulk_from_raw`) are in `crud_bulk.rs`.
5//! Quantization caching helpers and secondary-index update helpers are in `crud_helpers.rs`.
6
7use crate::collection::types::Collection;
8use crate::error::{Error, Result};
9use crate::point::Point;
10use crate::quantization::{BinaryQuantizedVector, PQVector, QuantizedVector, StorageMode};
11use crate::storage::{LogPayloadStorage, PayloadStorage, VectorStorage};
12use crate::validation::validate_dimension_match;
13
14use parking_lot::RwLockWriteGuard;
15use std::collections::{BTreeMap, HashMap};
16
17/// Pre-computed last-writer-wins dedup map: `point_id -> index_of_last_occurrence`.
18///
19/// Built once in `batch_store_all` and shared by both `write_deduped_payloads`
20/// and `write_deduped_vectors` to avoid redundant map construction (Issue #425).
21type DedupMap = HashMap<u64, usize>;
22
23struct QuantizationGuards<'a> {
24    sq8: Option<RwLockWriteGuard<'a, HashMap<u64, QuantizedVector>>>,
25    binary: Option<RwLockWriteGuard<'a, HashMap<u64, BinaryQuantizedVector>>>,
26    pq: Option<RwLockWriteGuard<'a, HashMap<u64, PQVector>>>,
27}
28
29impl<'a> QuantizationGuards<'a> {
30    fn acquire(collection: &'a Collection, mode: StorageMode) -> Self {
31        Self {
32            sq8: matches!(mode, StorageMode::SQ8).then(|| collection.sq8_cache.write()),
33            binary: matches!(mode, StorageMode::Binary).then(|| collection.binary_cache.write()),
34            pq: matches!(mode, StorageMode::ProductQuantization)
35                .then(|| collection.pq_cache.write()),
36        }
37    }
38}
39
40impl Collection {
41    /// Inserts or updates points in the collection.
42    ///
43    /// Accepts any iterator of points (Vec, slice, array, etc.)
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if any point has a mismatched dimension, or if
48    /// attempting to insert vectors into a metadata-only collection.
49    pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
50        let points: Vec<Point> = points.into_iter().collect();
51        let config = self.config.read();
52        let dimension = config.dimension;
53        let storage_mode = config.storage_mode;
54
55        if config.metadata_only {
56            for point in &points {
57                if !point.vector.is_empty() {
58                    return Err(Error::VectorNotAllowed(config.name.clone()));
59                }
60            }
61            drop(config);
62            return self.upsert_metadata(points);
63        }
64        drop(config);
65
66        for point in &points {
67            validate_dimension_match(dimension, point.dimension())?;
68        }
69
70        let sparse_batch = self.upsert_storage_and_index(&points, storage_mode)?;
71
72        self.apply_sparse_batch_upsert(&sparse_batch)?;
73        self.invalidate_caches_and_bump_generation();
74        Ok(())
75    }
76
77    /// Stores vectors, payloads, and indexes for a batch of points.
78    ///
79    /// Three-phase pipeline to minimize lock contention and I/O:
80    /// 1. Batch storage: `store_batch()` for vectors + payloads (1 fsync each)
81    /// 2. Per-point updates: secondary indexes, quantization, text, sparse
82    /// 3. Batch HNSW insert via `bulk_index_or_defer()`
83    ///
84    /// # Crash Recovery
85    ///
86    /// A crash between Phase 1 and Phase 3 leaves vectors durably stored but
87    /// absent from the HNSW index. On the next `Collection::open()`, gap
88    /// detection compares storage IDs against HNSW mappings and re-indexes
89    /// any missing vectors. The recovery window is bounded by one batch.
90    ///
91    /// Returns buffered sparse vectors for deferred insertion.
92    fn upsert_storage_and_index(
93        &self,
94        points: &[Point],
95        storage_mode: StorageMode,
96    ) -> Result<Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>> {
97        // Phase 1: Batch storage under write locks (1 fsync per storage)
98        let old_payloads = self.batch_store_all(points)?;
99
100        // Phase 2: Per-point updates (no storage locks held)
101        let sparse_batch = self.per_point_updates(points, &old_payloads, storage_mode);
102
103        // Phase 3: Batch HNSW insert
104        let vector_refs: Vec<(u64, &[f32])> =
105            points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
106        self.bulk_index_or_defer(vector_refs);
107
108        Ok(sparse_batch)
109    }
110
111    /// Phase 1: Batch-stores vectors and payloads with minimal lock scope.
112    ///
113    /// Pre-collects old payloads (needed for secondary index updates),
114    /// then writes all vectors and payloads in single batch calls (1 fsync each).
115    ///
116    /// Deduplicates intra-batch duplicate IDs using last-writer-wins semantics:
117    /// only the final occurrence per ID is written to the WAL, avoiding wasteful
118    /// intermediate entries that would bloat the log and slow replay.
119    ///
120    /// After this method returns, vectors and payloads are durable on disk.
121    /// A crash before Phase 3 (HNSW insertion) is recovered by gap detection
122    /// on the next `Collection::open()`.
123    ///
124    /// # Parallel I/O (Issue #424)
125    ///
126    /// With the `persistence` feature (which enables `rayon`), payload and
127    /// vector writes run concurrently via `rayon::join` after old-payload
128    /// collection completes. This is safe because:
129    ///
130    /// - Payload and vector storage use independent `RwLock`s (positions 3
131    ///   and 2 in the lock order). Neither closure acquires both locks.
132    /// - Crash recovery only requires that both are durable before Phase 3
133    ///   (HNSW insertion). There is no ordering dependency between payload
134    ///   and vector WAL writes — gap detection on `Collection::open()` handles
135    ///   any partial write scenario.
136    /// - `old_payloads` collection is completed and the payload lock is
137    ///   released before the fork, so both closures start from clean state.
138    /// - The TOCTOU gap between old-payload collection and the parallel
139    ///   write is acceptable: `old_payloads` feeds Phase 2 secondary-index
140    ///   updates, and each concurrent batch tracks its own `seen_payloads`.
141    ///
142    /// Returns the old payloads for Phase 2.
143    fn batch_store_all(&self, points: &[Point]) -> Result<Vec<Option<serde_json::Value>>> {
144        // Collect old payloads under the payload write lock, then release.
145        // The write lock prevents concurrent payload mutations during the read.
146        let old_payloads = {
147            let payload_storage = self.payload_storage.write();
148            let result = Self::collect_old_payloads(points, &payload_storage);
149            drop(payload_storage);
150            result
151        };
152
153        // Issue #425: Build the dedup map once and share it across both
154        // write paths, avoiding redundant HashMap construction.
155        let dedup_map = Self::build_dedup_map(points);
156
157        // Issue #424: Parallel I/O — payload and vector writes are independent
158        // after old_payloads collection. Run them concurrently via rayon::join.
159        // rayon is gated on the persistence feature.
160        #[cfg(feature = "persistence")]
161        {
162            let (payload_result, vector_result) = rayon::join(
163                || self.write_and_flush_payloads(points, &dedup_map),
164                || self.write_deduped_vectors(points, &dedup_map),
165            );
166            payload_result?;
167            vector_result?;
168        }
169
170        #[cfg(not(feature = "persistence"))]
171        {
172            self.write_and_flush_payloads(points, &dedup_map)?;
173            self.write_deduped_vectors(points, &dedup_map)?;
174        }
175
176        Ok(old_payloads)
177    }
178
179    /// Writes deduped payloads and flushes the storage.
180    ///
181    /// Issue #424: Extracted so it can be called from `rayon::join` in the
182    /// parallel I/O path. Acquires the `payload_storage` write lock internally.
183    ///
184    /// Issue #425: Accepts a pre-computed `dedup_map` to avoid rebuilding
185    /// the last-writer-wins map redundantly.
186    fn write_and_flush_payloads(&self, points: &[Point], dedup_map: &DedupMap) -> Result<()> {
187        let mut payload_storage = self.payload_storage.write();
188        Self::write_deduped_payloads(points, &mut payload_storage, dedup_map)?;
189        payload_storage.flush()?;
190        Ok(())
191    }
192
193    /// Retrieves pre-batch payloads, querying storage only once per unique ID.
194    ///
195    /// For intra-batch duplicates, only the first occurrence needs the pre-batch
196    /// value; subsequent occurrences are handled by `seen_payloads` in Phase 2.
197    fn collect_old_payloads(
198        points: &[Point],
199        storage: &LogPayloadStorage,
200    ) -> Vec<Option<serde_json::Value>> {
201        let mut seen = std::collections::HashSet::new();
202        points
203            .iter()
204            .map(|p| {
205                if seen.insert(p.id) {
206                    // First occurrence — retrieve pre-batch payload from storage
207                    storage.retrieve(p.id).ok().flatten()
208                } else {
209                    None // Duplicate — Phase 2 uses seen_payloads instead
210                }
211            })
212            .collect()
213    }
214
215    /// Builds a last-writer-wins dedup map: `point_id -> index_of_last_occurrence`.
216    ///
217    /// Issue #425: Computed once in `batch_store_all` and shared by both
218    /// `write_deduped_payloads` and `write_deduped_vectors` to avoid
219    /// redundant `HashMap` construction.
220    fn build_dedup_map(points: &[Point]) -> DedupMap {
221        let mut map = HashMap::with_capacity(points.len());
222        for (i, p) in points.iter().enumerate() {
223            map.insert(p.id, i);
224        }
225        map
226    }
227
228    /// Writes only the last payload per ID to the WAL, then deletes IDs whose
229    /// final occurrence has `payload=None`.
230    ///
231    /// Issue #425: Accepts a pre-computed `dedup_map` instead of building
232    /// its own, consolidating the two redundant maps into one.
233    fn write_deduped_payloads(
234        points: &[Point],
235        storage: &mut LogPayloadStorage,
236        dedup_map: &DedupMap,
237    ) -> Result<()> {
238        // Only write the final payload per ID (skip intermediate duplicates)
239        let deduped: Vec<(u64, &serde_json::Value)> = points
240            .iter()
241            .enumerate()
242            .filter(|&(i, p)| dedup_map.get(&p.id) == Some(&i) && p.payload.is_some())
243            .filter_map(|(_, p)| p.payload.as_ref().map(|pl| (p.id, pl)))
244            .collect();
245        storage.store_batch(&deduped)?;
246
247        // Delete IDs whose final occurrence has payload=None
248        for (i, p) in points.iter().enumerate() {
249            if dedup_map.get(&p.id) == Some(&i) && p.payload.is_none() {
250                let _ = storage.delete(p.id);
251            }
252        }
253        Ok(())
254    }
255
256    /// Writes only the last vector per ID to vector storage.
257    ///
258    /// Issue #425: Accepts a pre-computed `dedup_map` instead of building
259    /// its own, consolidating the two redundant maps into one.
260    fn write_deduped_vectors(&self, points: &[Point], dedup_map: &DedupMap) -> Result<()> {
261        let deduped: Vec<(u64, &[f32])> = points
262            .iter()
263            .enumerate()
264            .filter(|&(i, p)| dedup_map.get(&p.id) == Some(&i))
265            .map(|(_, p)| (p.id, p.vector.as_slice()))
266            .collect();
267
268        let mut vector_storage = self.vector_storage.write();
269        vector_storage.store_batch(&deduped)?;
270        let point_count = vector_storage.len();
271        vector_storage.flush()?;
272        drop(vector_storage);
273
274        self.config.write().point_count = point_count;
275        Ok(())
276    }
277
278    /// Returns `true` when Phase 2 processing can be skipped entirely.
279    ///
280    /// Issue #425: For the common case (`StorageMode::Full`, no secondary
281    /// indexes, empty BM25 index, no sparse vectors in the batch), Phase 2
282    /// does zero useful work. Skipping avoids `QuantizationGuards` acquisition,
283    /// `seen_payloads` HashMap allocation, and the per-point loop.
284    fn can_skip_phase2(&self, points: &[Point], storage_mode: StorageMode) -> bool {
285        // Quantization caching is a no-op only for Full and RaBitQ modes
286        let no_quantization = matches!(storage_mode, StorageMode::Full | StorageMode::RaBitQ);
287        if !no_quantization {
288            return false;
289        }
290
291        // Secondary indexes require per-point old/new payload diffing
292        let no_secondary = self.secondary_indexes.read().is_empty();
293        if !no_secondary {
294            return false;
295        }
296
297        // BM25 text index: skip only when the index is empty AND no point
298        // carries a payload (nothing to add, nothing to remove)
299        let bm25_empty = self.text_index.is_empty();
300        let any_payload = points.iter().any(|p| p.payload.is_some());
301        if !bm25_empty || any_payload {
302            return false;
303        }
304
305        // Sparse vectors require collection into the sparse batch buffer
306        let any_sparse = points.iter().any(Point::has_sparse_vectors);
307        !any_sparse
308    }
309
310    /// Phase 2: Per-point updates that don't need storage write locks.
311    ///
312    /// Tracks the effective "old payload" per ID to handle within-batch
313    /// duplicates correctly: when id=5 appears twice, the second occurrence
314    /// sees the first occurrence's payload as its "old" (not the pre-batch
315    /// original), ensuring secondary indexes stay consistent.
316    ///
317    /// Issue #425: Fast-path skips the entire loop when no secondary
318    /// processing is needed (see `can_skip_phase2`).
319    fn per_point_updates(
320        &self,
321        points: &[Point],
322        old_payloads: &[Option<serde_json::Value>],
323        storage_mode: StorageMode,
324    ) -> Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)> {
325        // Issue #425: Fast-path — skip Phase 2 entirely when no secondary
326        // processing is needed. Avoids lock acquisition, HashMap allocation,
327        // and the per-point loop for the common StorageMode::Full case.
328        if self.can_skip_phase2(points, storage_mode) {
329            return Vec::new();
330        }
331
332        let mut quant_guards = QuantizationGuards::acquire(self, storage_mode);
333        let mut sparse_batch = Vec::new();
334        // Track effective old payload per ID for within-batch duplicate handling.
335        // When id=5 appears twice, the second occurrence uses the first's payload
336        // as "old" — not the pre-batch original — so secondary indexes stay correct.
337        //
338        // Uses `Option<Option<&Value>>`: outer Option = "seen this ID?",
339        // inner Option = "had a payload?". This distinguishes "seen with None"
340        // from "not seen" — `.flatten()` would collapse both to None.
341        let mut seen_payloads: HashMap<u64, Option<&serde_json::Value>> = HashMap::new();
342
343        // Issue #425: BM25 skip — pre-check whether any point carries a payload
344        // or the BM25 index has existing documents. When both are false, the
345        // text index loop body is a no-op (add_document never called, remove_document
346        // on non-existent docs is free). Skip to avoid per-point function call overhead.
347        let skip_bm25 = self.text_index.is_empty() && !points.iter().any(|p| p.payload.is_some());
348
349        for (point, pre_batch_old) in points.iter().zip(old_payloads) {
350            let effective_old: Option<&serde_json::Value> =
351                if let Some(&inner) = seen_payloads.get(&point.id) {
352                    // ID was seen earlier in this batch — use that point's payload as "old"
353                    inner
354                } else {
355                    // First occurrence — use the pre-batch original
356                    pre_batch_old.as_ref()
357                };
358
359            let (sq8, binary, pq) = (
360                quant_guards.sq8.as_deref_mut(),
361                quant_guards.binary.as_deref_mut(),
362                quant_guards.pq.as_deref_mut(),
363            );
364            self.cache_quantized_vector(point, storage_mode, sq8, binary, pq);
365
366            self.update_secondary_indexes_on_upsert(
367                point.id,
368                effective_old,
369                point.payload.as_ref(),
370            );
371            if !skip_bm25 {
372                Self::update_text_index(&self.text_index, point);
373            }
374            Self::collect_sparse_vectors(point, &mut sparse_batch);
375
376            // Record this point's payload ref — zero-cost for the common case (no clone)
377            seen_payloads.insert(point.id, point.payload.as_ref());
378        }
379
380        sparse_batch
381    }
382
383    fn collect_sparse_vectors(
384        point: &Point,
385        sparse_batch: &mut Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>,
386    ) {
387        if let Some(sv_map) = &point.sparse_vectors {
388            if !sv_map.is_empty() {
389                sparse_batch.push((point.id, sv_map.clone()));
390            }
391        }
392    }
393
394    /// Updates the BM25 text index for a single point.
395    pub(super) fn update_text_index(text_index: &crate::index::Bm25Index, point: &Point) {
396        if let Some(payload) = &point.payload {
397            let text = Self::extract_text_from_payload(payload);
398            if !text.is_empty() {
399                text_index.add_document(point.id, &text);
400            }
401        } else {
402            text_index.remove_document(point.id);
403        }
404    }
405
406    /// Applies buffered sparse vector upserts with WAL-before-apply semantics.
407    fn apply_sparse_batch_upsert(
408        &self,
409        sparse_batch: &[(u64, BTreeMap<String, crate::index::sparse::SparseVector>)],
410    ) -> Result<()> {
411        if sparse_batch.is_empty() {
412            return Ok(());
413        }
414        #[cfg(feature = "persistence")]
415        {
416            for (point_id, sv_map) in sparse_batch {
417                for (name, sv) in sv_map {
418                    let wal_path =
419                        crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
420                    crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
421                }
422            }
423        }
424        let mut indexes = self.sparse_indexes.write();
425        for (point_id, sv_map) in sparse_batch {
426            for (name, sv) in sv_map {
427                let idx = indexes.entry(name.clone()).or_default();
428                idx.insert(*point_id, sv);
429            }
430        }
431        Ok(())
432    }
433
434    /// Invalidates stats cache and bumps write generation.
435    pub(super) fn invalidate_caches_and_bump_generation(&self) {
436        *self.cached_stats.lock() = None;
437        self.write_generation
438            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
439    }
440
441    /// Drains the deferred indexer and batch-inserts into HNSW.
442    ///
443    /// Filters out IDs that have been deleted from vector storage since they
444    /// were buffered, preventing ghost vectors from being re-inserted into
445    /// HNSW after a concurrent delete.
446    ///
447    /// Logs a warning if fewer vectors were inserted than expected, which
448    /// indicates a partial failure (e.g., duplicate IDs filtered out,
449    /// ghost-vector filtering, or graph insertion error). The drained
450    /// vectors are not retried.
451    #[cfg(feature = "persistence")]
452    fn merge_deferred_batch(&self, di: &crate::collection::streaming::DeferredIndexer) {
453        let drained = di.swap_and_drain();
454        if drained.is_empty() {
455            return;
456        }
457        // Filter out vectors deleted from storage during the buffer's
458        // lifetime to prevent ghost re-insertion into HNSW.
459        let storage = self.vector_storage.read();
460        let valid: Vec<(u64, &[f32])> = drained
461            .iter()
462            .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
463            .map(|(id, v)| (*id, v.as_slice()))
464            .collect();
465        drop(storage); // Release read lock before batch insert
466        let expected = valid.len();
467        if valid.is_empty() {
468            return;
469        }
470        let inserted = self.index.insert_batch_parallel(valid);
471        if inserted < expected {
472            tracing::warn!("merge_deferred_batch: inserted {inserted}/{expected} vectors");
473        }
474    }
475
476    /// Batch-inserts into HNSW or defers into the deferred indexer.
477    ///
478    /// Returns the number of vectors processed (whether indexed directly
479    /// or deferred for later merge).
480    ///
481    /// Since v1.7.2, both `upsert()` and `upsert_bulk()` route through this
482    /// method. The direct path calls `insert_batch_parallel` (rayon), which
483    /// yields non-deterministic HNSW graph topology across runs. Search
484    /// correctness and recall are unaffected.
485    ///
486    /// Invariant: `self.deferred_indexer` is `Some` only when enabled
487    /// (`build_deferred_indexer` filters on `cfg.enabled`), so no
488    /// redundant `is_enabled()` check is needed here.
489    pub(super) fn bulk_index_or_defer(&self, vector_refs: Vec<(u64, &[f32])>) -> usize {
490        let count = vector_refs.len();
491        #[cfg(feature = "persistence")]
492        if let Some(ref di) = self.deferred_indexer {
493            di.extend(vector_refs.iter().map(|(id, v)| (*id, v.to_vec())));
494            if di.should_merge() {
495                self.merge_deferred_batch(di);
496            }
497            // Issue #423 Component 3: Track inserts for periodic HNSW save.
498            // Reason: count fits in u64 (vector batch size bounded by memory).
499            #[allow(clippy::cast_possible_truncation)]
500            self.inserts_since_last_hnsw_save
501                .fetch_add(count as u64, std::sync::atomic::Ordering::Relaxed);
502            return count;
503        }
504        let inserted = self.index.insert_batch_parallel(vector_refs);
505        self.index.set_searching_mode();
506        // Issue #423 Component 3: Track inserts for periodic HNSW save.
507        // Reason: count fits in u64 (vector batch size bounded by memory).
508        #[allow(clippy::cast_possible_truncation)]
509        self.inserts_since_last_hnsw_save
510            .fetch_add(count as u64, std::sync::atomic::Ordering::Relaxed);
511        inserted
512    }
513
514    /// Inserts or updates metadata-only points (no vectors).
515    ///
516    /// This method is for metadata-only collections. Points should have
517    /// empty vectors and only contain payload data.
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if storage operations fail.
522    pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
523        let points: Vec<Point> = points.into_iter().collect();
524
525        let mut payload_storage = self.payload_storage.write();
526
527        for point in &points {
528            let old_payload = payload_storage.retrieve(point.id).ok().flatten();
529            if let Some(payload) = &point.payload {
530                payload_storage.store(point.id, payload)?;
531            } else {
532                let _ = payload_storage.delete(point.id);
533            }
534            Self::update_text_index(&self.text_index, point);
535            self.update_secondary_indexes_on_upsert(
536                point.id,
537                old_payload.as_ref(),
538                point.payload.as_ref(),
539            );
540        }
541
542        // LOCK ORDER: flush while payload_storage(3) still held, then drop before acquiring config(1).
543        let point_count = payload_storage.ids().len();
544        payload_storage.flush()?;
545        drop(payload_storage);
546
547        // config(1) only — all higher-numbered locks released above.
548        self.config.write().point_count = point_count;
549        self.invalidate_caches_and_bump_generation();
550        Ok(())
551    }
552}