velesdb_core/collection/core/crud.rs
1//! Upsert operations for Collection.
2//!
3//! Read and delete operations are in `crud_read_delete.rs`.
4//! Bulk-specific methods (`upsert_bulk`, `upsert_bulk_from_raw`) are in `crud_bulk.rs`.
5//! Quantization caching helpers and secondary-index update helpers are in `crud_helpers.rs`.
6
7use crate::collection::types::Collection;
8use crate::error::{Error, Result};
9use crate::point::Point;
10use crate::quantization::{BinaryQuantizedVector, PQVector, QuantizedVector, StorageMode};
11use crate::storage::{LogPayloadStorage, PayloadStorage, VectorStorage};
12use crate::validation::validate_dimension_match;
13
14use parking_lot::RwLockWriteGuard;
15use std::collections::{BTreeMap, HashMap};
16
17/// Pre-computed last-writer-wins dedup map: `point_id -> index_of_last_occurrence`.
18///
19/// Built once in `batch_store_all` and shared by both `write_deduped_payloads`
20/// and `write_deduped_vectors` to avoid redundant map construction (Issue #425).
21type DedupMap = HashMap<u64, usize>;
22
23struct QuantizationGuards<'a> {
24 sq8: Option<RwLockWriteGuard<'a, HashMap<u64, QuantizedVector>>>,
25 binary: Option<RwLockWriteGuard<'a, HashMap<u64, BinaryQuantizedVector>>>,
26 pq: Option<RwLockWriteGuard<'a, HashMap<u64, PQVector>>>,
27}
28
29impl<'a> QuantizationGuards<'a> {
30 fn acquire(collection: &'a Collection, mode: StorageMode) -> Self {
31 Self {
32 sq8: matches!(mode, StorageMode::SQ8).then(|| collection.sq8_cache.write()),
33 binary: matches!(mode, StorageMode::Binary).then(|| collection.binary_cache.write()),
34 pq: matches!(mode, StorageMode::ProductQuantization)
35 .then(|| collection.pq_cache.write()),
36 }
37 }
38}
39
40impl Collection {
41 /// Inserts or updates points in the collection.
42 ///
43 /// Accepts any iterator of points (Vec, slice, array, etc.)
44 ///
45 /// # Errors
46 ///
47 /// Returns an error if any point has a mismatched dimension, or if
48 /// attempting to insert vectors into a metadata-only collection.
49 pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
50 let points: Vec<Point> = points.into_iter().collect();
51 let config = self.config.read();
52 let dimension = config.dimension;
53 let storage_mode = config.storage_mode;
54
55 if config.metadata_only {
56 for point in &points {
57 if !point.vector.is_empty() {
58 return Err(Error::VectorNotAllowed(config.name.clone()));
59 }
60 }
61 drop(config);
62 return self.upsert_metadata(points);
63 }
64 drop(config);
65
66 for point in &points {
67 validate_dimension_match(dimension, point.dimension())?;
68 }
69
70 let sparse_batch = self.upsert_storage_and_index(&points, storage_mode)?;
71
72 self.apply_sparse_batch_upsert(&sparse_batch)?;
73 self.invalidate_caches_and_bump_generation();
74 Ok(())
75 }
76
77 /// Stores vectors, payloads, and indexes for a batch of points.
78 ///
79 /// Three-phase pipeline to minimize lock contention and I/O:
80 /// 1. Batch storage: `store_batch()` for vectors + payloads (1 fsync each)
81 /// 2. Per-point updates: secondary indexes, quantization, text, sparse
82 /// 3. Batch HNSW insert via `bulk_index_or_defer()`
83 ///
84 /// # Crash Recovery
85 ///
86 /// A crash between Phase 1 and Phase 3 leaves vectors durably stored but
87 /// absent from the HNSW index. On the next `Collection::open()`, gap
88 /// detection compares storage IDs against HNSW mappings and re-indexes
89 /// any missing vectors. The recovery window is bounded by one batch.
90 ///
91 /// Returns buffered sparse vectors for deferred insertion.
92 fn upsert_storage_and_index(
93 &self,
94 points: &[Point],
95 storage_mode: StorageMode,
96 ) -> Result<Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>> {
97 // Phase 1: Batch storage under write locks (1 fsync per storage)
98 let old_payloads = self.batch_store_all(points)?;
99
100 // Phase 2: Per-point updates (no storage locks held)
101 let sparse_batch = self.per_point_updates(points, &old_payloads, storage_mode);
102
103 // Phase 3: Batch HNSW insert
104 let vector_refs: Vec<(u64, &[f32])> =
105 points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
106 self.bulk_index_or_defer(vector_refs);
107
108 Ok(sparse_batch)
109 }
110
111 /// Phase 1: Batch-stores vectors and payloads with minimal lock scope.
112 ///
113 /// Pre-collects old payloads (needed for secondary index updates),
114 /// then writes all vectors and payloads in single batch calls (1 fsync each).
115 ///
116 /// Deduplicates intra-batch duplicate IDs using last-writer-wins semantics:
117 /// only the final occurrence per ID is written to the WAL, avoiding wasteful
118 /// intermediate entries that would bloat the log and slow replay.
119 ///
120 /// After this method returns, vectors and payloads are durable on disk.
121 /// A crash before Phase 3 (HNSW insertion) is recovered by gap detection
122 /// on the next `Collection::open()`.
123 ///
124 /// # Parallel I/O (Issue #424)
125 ///
126 /// With the `persistence` feature (which enables `rayon`), payload and
127 /// vector writes run concurrently via `rayon::join` after old-payload
128 /// collection completes. This is safe because:
129 ///
130 /// - Payload and vector storage use independent `RwLock`s (positions 3
131 /// and 2 in the lock order). Neither closure acquires both locks.
132 /// - Crash recovery only requires that both are durable before Phase 3
133 /// (HNSW insertion). There is no ordering dependency between payload
134 /// and vector WAL writes — gap detection on `Collection::open()` handles
135 /// any partial write scenario.
136 /// - `old_payloads` collection is completed and the payload lock is
137 /// released before the fork, so both closures start from clean state.
138 /// - The TOCTOU gap between old-payload collection and the parallel
139 /// write is acceptable: `old_payloads` feeds Phase 2 secondary-index
140 /// updates, and each concurrent batch tracks its own `seen_payloads`.
141 ///
142 /// Returns the old payloads for Phase 2.
143 fn batch_store_all(&self, points: &[Point]) -> Result<Vec<Option<serde_json::Value>>> {
144 // Collect old payloads under the payload write lock, then release.
145 // The write lock prevents concurrent payload mutations during the read.
146 let old_payloads = {
147 let payload_storage = self.payload_storage.write();
148 let result = Self::collect_old_payloads(points, &payload_storage);
149 drop(payload_storage);
150 result
151 };
152
153 // Issue #425: Build the dedup map once and share it across both
154 // write paths, avoiding redundant HashMap construction.
155 let dedup_map = Self::build_dedup_map(points);
156
157 // Issue #424: Parallel I/O — payload and vector writes are independent
158 // after old_payloads collection. Run them concurrently via rayon::join.
159 // rayon is gated on the persistence feature.
160 #[cfg(feature = "persistence")]
161 {
162 let (payload_result, vector_result) = rayon::join(
163 || self.write_and_flush_payloads(points, &dedup_map),
164 || self.write_deduped_vectors(points, &dedup_map),
165 );
166 payload_result?;
167 vector_result?;
168 }
169
170 #[cfg(not(feature = "persistence"))]
171 {
172 self.write_and_flush_payloads(points, &dedup_map)?;
173 self.write_deduped_vectors(points, &dedup_map)?;
174 }
175
176 Ok(old_payloads)
177 }
178
179 /// Writes deduped payloads and flushes the storage.
180 ///
181 /// Issue #424: Extracted so it can be called from `rayon::join` in the
182 /// parallel I/O path. Acquires the `payload_storage` write lock internally.
183 ///
184 /// Issue #425: Accepts a pre-computed `dedup_map` to avoid rebuilding
185 /// the last-writer-wins map redundantly.
186 fn write_and_flush_payloads(&self, points: &[Point], dedup_map: &DedupMap) -> Result<()> {
187 let mut payload_storage = self.payload_storage.write();
188 Self::write_deduped_payloads(points, &mut payload_storage, dedup_map)?;
189 payload_storage.flush()?;
190 Ok(())
191 }
192
193 /// Retrieves pre-batch payloads, querying storage only once per unique ID.
194 ///
195 /// For intra-batch duplicates, only the first occurrence needs the pre-batch
196 /// value; subsequent occurrences are handled by `seen_payloads` in Phase 2.
197 fn collect_old_payloads(
198 points: &[Point],
199 storage: &LogPayloadStorage,
200 ) -> Vec<Option<serde_json::Value>> {
201 let mut seen = std::collections::HashSet::new();
202 points
203 .iter()
204 .map(|p| {
205 if seen.insert(p.id) {
206 // First occurrence — retrieve pre-batch payload from storage
207 storage.retrieve(p.id).ok().flatten()
208 } else {
209 None // Duplicate — Phase 2 uses seen_payloads instead
210 }
211 })
212 .collect()
213 }
214
215 /// Builds a last-writer-wins dedup map: `point_id -> index_of_last_occurrence`.
216 ///
217 /// Issue #425: Computed once in `batch_store_all` and shared by both
218 /// `write_deduped_payloads` and `write_deduped_vectors` to avoid
219 /// redundant `HashMap` construction.
220 fn build_dedup_map(points: &[Point]) -> DedupMap {
221 let mut map = HashMap::with_capacity(points.len());
222 for (i, p) in points.iter().enumerate() {
223 map.insert(p.id, i);
224 }
225 map
226 }
227
228 /// Writes only the last payload per ID to the WAL, then deletes IDs whose
229 /// final occurrence has `payload=None`.
230 ///
231 /// Issue #425: Accepts a pre-computed `dedup_map` instead of building
232 /// its own, consolidating the two redundant maps into one.
233 fn write_deduped_payloads(
234 points: &[Point],
235 storage: &mut LogPayloadStorage,
236 dedup_map: &DedupMap,
237 ) -> Result<()> {
238 // Only write the final payload per ID (skip intermediate duplicates)
239 let deduped: Vec<(u64, &serde_json::Value)> = points
240 .iter()
241 .enumerate()
242 .filter(|&(i, p)| dedup_map.get(&p.id) == Some(&i) && p.payload.is_some())
243 .filter_map(|(_, p)| p.payload.as_ref().map(|pl| (p.id, pl)))
244 .collect();
245 storage.store_batch(&deduped)?;
246
247 // Delete IDs whose final occurrence has payload=None
248 for (i, p) in points.iter().enumerate() {
249 if dedup_map.get(&p.id) == Some(&i) && p.payload.is_none() {
250 let _ = storage.delete(p.id);
251 }
252 }
253 Ok(())
254 }
255
256 /// Writes only the last vector per ID to vector storage.
257 ///
258 /// Issue #425: Accepts a pre-computed `dedup_map` instead of building
259 /// its own, consolidating the two redundant maps into one.
260 fn write_deduped_vectors(&self, points: &[Point], dedup_map: &DedupMap) -> Result<()> {
261 let deduped: Vec<(u64, &[f32])> = points
262 .iter()
263 .enumerate()
264 .filter(|&(i, p)| dedup_map.get(&p.id) == Some(&i))
265 .map(|(_, p)| (p.id, p.vector.as_slice()))
266 .collect();
267
268 let mut vector_storage = self.vector_storage.write();
269 vector_storage.store_batch(&deduped)?;
270 let point_count = vector_storage.len();
271 vector_storage.flush()?;
272 drop(vector_storage);
273
274 self.config.write().point_count = point_count;
275 Ok(())
276 }
277
278 /// Returns `true` when Phase 2 processing can be skipped entirely.
279 ///
280 /// Issue #425: For the common case (`StorageMode::Full`, no secondary
281 /// indexes, empty BM25 index, no sparse vectors in the batch), Phase 2
282 /// does zero useful work. Skipping avoids `QuantizationGuards` acquisition,
283 /// `seen_payloads` HashMap allocation, and the per-point loop.
284 fn can_skip_phase2(&self, points: &[Point], storage_mode: StorageMode) -> bool {
285 // Quantization caching is a no-op only for Full and RaBitQ modes
286 let no_quantization = matches!(storage_mode, StorageMode::Full | StorageMode::RaBitQ);
287 if !no_quantization {
288 return false;
289 }
290
291 // Secondary indexes require per-point old/new payload diffing
292 let no_secondary = self.secondary_indexes.read().is_empty();
293 if !no_secondary {
294 return false;
295 }
296
297 // BM25 text index: skip only when the index is empty AND no point
298 // carries a payload (nothing to add, nothing to remove)
299 let bm25_empty = self.text_index.is_empty();
300 let any_payload = points.iter().any(|p| p.payload.is_some());
301 if !bm25_empty || any_payload {
302 return false;
303 }
304
305 // Sparse vectors require collection into the sparse batch buffer
306 let any_sparse = points.iter().any(Point::has_sparse_vectors);
307 !any_sparse
308 }
309
310 /// Phase 2: Per-point updates that don't need storage write locks.
311 ///
312 /// Tracks the effective "old payload" per ID to handle within-batch
313 /// duplicates correctly: when id=5 appears twice, the second occurrence
314 /// sees the first occurrence's payload as its "old" (not the pre-batch
315 /// original), ensuring secondary indexes stay consistent.
316 ///
317 /// Issue #425: Fast-path skips the entire loop when no secondary
318 /// processing is needed (see `can_skip_phase2`).
319 fn per_point_updates(
320 &self,
321 points: &[Point],
322 old_payloads: &[Option<serde_json::Value>],
323 storage_mode: StorageMode,
324 ) -> Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)> {
325 // Issue #425: Fast-path — skip Phase 2 entirely when no secondary
326 // processing is needed. Avoids lock acquisition, HashMap allocation,
327 // and the per-point loop for the common StorageMode::Full case.
328 if self.can_skip_phase2(points, storage_mode) {
329 return Vec::new();
330 }
331
332 let mut quant_guards = QuantizationGuards::acquire(self, storage_mode);
333 let mut sparse_batch = Vec::new();
334 // Track effective old payload per ID for within-batch duplicate handling.
335 // When id=5 appears twice, the second occurrence uses the first's payload
336 // as "old" — not the pre-batch original — so secondary indexes stay correct.
337 //
338 // Uses `Option<Option<&Value>>`: outer Option = "seen this ID?",
339 // inner Option = "had a payload?". This distinguishes "seen with None"
340 // from "not seen" — `.flatten()` would collapse both to None.
341 let mut seen_payloads: HashMap<u64, Option<&serde_json::Value>> = HashMap::new();
342
343 // Issue #425: BM25 skip — pre-check whether any point carries a payload
344 // or the BM25 index has existing documents. When both are false, the
345 // text index loop body is a no-op (add_document never called, remove_document
346 // on non-existent docs is free). Skip to avoid per-point function call overhead.
347 let skip_bm25 = self.text_index.is_empty() && !points.iter().any(|p| p.payload.is_some());
348
349 for (point, pre_batch_old) in points.iter().zip(old_payloads) {
350 let effective_old: Option<&serde_json::Value> =
351 if let Some(&inner) = seen_payloads.get(&point.id) {
352 // ID was seen earlier in this batch — use that point's payload as "old"
353 inner
354 } else {
355 // First occurrence — use the pre-batch original
356 pre_batch_old.as_ref()
357 };
358
359 let (sq8, binary, pq) = (
360 quant_guards.sq8.as_deref_mut(),
361 quant_guards.binary.as_deref_mut(),
362 quant_guards.pq.as_deref_mut(),
363 );
364 self.cache_quantized_vector(point, storage_mode, sq8, binary, pq);
365
366 self.update_secondary_indexes_on_upsert(
367 point.id,
368 effective_old,
369 point.payload.as_ref(),
370 );
371 if !skip_bm25 {
372 Self::update_text_index(&self.text_index, point);
373 }
374 Self::collect_sparse_vectors(point, &mut sparse_batch);
375
376 // Record this point's payload ref — zero-cost for the common case (no clone)
377 seen_payloads.insert(point.id, point.payload.as_ref());
378 }
379
380 sparse_batch
381 }
382
383 fn collect_sparse_vectors(
384 point: &Point,
385 sparse_batch: &mut Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>,
386 ) {
387 if let Some(sv_map) = &point.sparse_vectors {
388 if !sv_map.is_empty() {
389 sparse_batch.push((point.id, sv_map.clone()));
390 }
391 }
392 }
393
394 /// Updates the BM25 text index for a single point.
395 pub(super) fn update_text_index(text_index: &crate::index::Bm25Index, point: &Point) {
396 if let Some(payload) = &point.payload {
397 let text = Self::extract_text_from_payload(payload);
398 if !text.is_empty() {
399 text_index.add_document(point.id, &text);
400 }
401 } else {
402 text_index.remove_document(point.id);
403 }
404 }
405
406 /// Applies buffered sparse vector upserts with WAL-before-apply semantics.
407 fn apply_sparse_batch_upsert(
408 &self,
409 sparse_batch: &[(u64, BTreeMap<String, crate::index::sparse::SparseVector>)],
410 ) -> Result<()> {
411 if sparse_batch.is_empty() {
412 return Ok(());
413 }
414 #[cfg(feature = "persistence")]
415 {
416 for (point_id, sv_map) in sparse_batch {
417 for (name, sv) in sv_map {
418 let wal_path =
419 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
420 crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
421 }
422 }
423 }
424 let mut indexes = self.sparse_indexes.write();
425 for (point_id, sv_map) in sparse_batch {
426 for (name, sv) in sv_map {
427 let idx = indexes.entry(name.clone()).or_default();
428 idx.insert(*point_id, sv);
429 }
430 }
431 Ok(())
432 }
433
434 /// Invalidates stats cache and bumps write generation.
435 pub(super) fn invalidate_caches_and_bump_generation(&self) {
436 *self.cached_stats.lock() = None;
437 self.write_generation
438 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
439 }
440
441 /// Drains the deferred indexer and batch-inserts into HNSW.
442 ///
443 /// Filters out IDs that have been deleted from vector storage since they
444 /// were buffered, preventing ghost vectors from being re-inserted into
445 /// HNSW after a concurrent delete.
446 ///
447 /// Logs a warning if fewer vectors were inserted than expected, which
448 /// indicates a partial failure (e.g., duplicate IDs filtered out,
449 /// ghost-vector filtering, or graph insertion error). The drained
450 /// vectors are not retried.
451 #[cfg(feature = "persistence")]
452 fn merge_deferred_batch(&self, di: &crate::collection::streaming::DeferredIndexer) {
453 let drained = di.swap_and_drain();
454 if drained.is_empty() {
455 return;
456 }
457 // Filter out vectors deleted from storage during the buffer's
458 // lifetime to prevent ghost re-insertion into HNSW.
459 let storage = self.vector_storage.read();
460 let valid: Vec<(u64, &[f32])> = drained
461 .iter()
462 .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
463 .map(|(id, v)| (*id, v.as_slice()))
464 .collect();
465 drop(storage); // Release read lock before batch insert
466 let expected = valid.len();
467 if valid.is_empty() {
468 return;
469 }
470 let inserted = self.index.insert_batch_parallel(valid);
471 if inserted < expected {
472 tracing::warn!("merge_deferred_batch: inserted {inserted}/{expected} vectors");
473 }
474 }
475
476 /// Batch-inserts into HNSW or defers into the deferred indexer.
477 ///
478 /// Returns the number of vectors processed (whether indexed directly
479 /// or deferred for later merge).
480 ///
481 /// Since v1.7.2, both `upsert()` and `upsert_bulk()` route through this
482 /// method. The direct path calls `insert_batch_parallel` (rayon), which
483 /// yields non-deterministic HNSW graph topology across runs. Search
484 /// correctness and recall are unaffected.
485 ///
486 /// Invariant: `self.deferred_indexer` is `Some` only when enabled
487 /// (`build_deferred_indexer` filters on `cfg.enabled`), so no
488 /// redundant `is_enabled()` check is needed here.
489 pub(super) fn bulk_index_or_defer(&self, vector_refs: Vec<(u64, &[f32])>) -> usize {
490 let count = vector_refs.len();
491 #[cfg(feature = "persistence")]
492 if let Some(ref di) = self.deferred_indexer {
493 di.extend(vector_refs.iter().map(|(id, v)| (*id, v.to_vec())));
494 if di.should_merge() {
495 self.merge_deferred_batch(di);
496 }
497 // Issue #423 Component 3: Track inserts for periodic HNSW save.
498 // Reason: count fits in u64 (vector batch size bounded by memory).
499 #[allow(clippy::cast_possible_truncation)]
500 self.inserts_since_last_hnsw_save
501 .fetch_add(count as u64, std::sync::atomic::Ordering::Relaxed);
502 return count;
503 }
504 let inserted = self.index.insert_batch_parallel(vector_refs);
505 self.index.set_searching_mode();
506 // Issue #423 Component 3: Track inserts for periodic HNSW save.
507 // Reason: count fits in u64 (vector batch size bounded by memory).
508 #[allow(clippy::cast_possible_truncation)]
509 self.inserts_since_last_hnsw_save
510 .fetch_add(count as u64, std::sync::atomic::Ordering::Relaxed);
511 inserted
512 }
513
514 /// Inserts or updates metadata-only points (no vectors).
515 ///
516 /// This method is for metadata-only collections. Points should have
517 /// empty vectors and only contain payload data.
518 ///
519 /// # Errors
520 ///
521 /// Returns an error if storage operations fail.
522 pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
523 let points: Vec<Point> = points.into_iter().collect();
524
525 let mut payload_storage = self.payload_storage.write();
526
527 for point in &points {
528 let old_payload = payload_storage.retrieve(point.id).ok().flatten();
529 if let Some(payload) = &point.payload {
530 payload_storage.store(point.id, payload)?;
531 } else {
532 let _ = payload_storage.delete(point.id);
533 }
534 Self::update_text_index(&self.text_index, point);
535 self.update_secondary_indexes_on_upsert(
536 point.id,
537 old_payload.as_ref(),
538 point.payload.as_ref(),
539 );
540 }
541
542 // LOCK ORDER: flush while payload_storage(3) still held, then drop before acquiring config(1).
543 let point_count = payload_storage.ids().len();
544 payload_storage.flush()?;
545 drop(payload_storage);
546
547 // config(1) only — all higher-numbered locks released above.
548 self.config.write().point_count = point_count;
549 self.invalidate_caches_and_bump_generation();
550 Ok(())
551 }
552}