Skip to main content

velesdb_core/collection/core/
crud.rs

1//! CRUD operations for Collection (upsert, get, delete).
2//!
3//! Quantization caching helpers and secondary-index update helpers are in `crud_helpers.rs`.
4
5use crate::collection::types::Collection;
6use crate::error::{Error, Result};
7use crate::index::VectorIndex;
8use crate::point::Point;
9use crate::quantization::StorageMode;
10use crate::storage::{PayloadStorage, VectorStorage};
11
12use std::collections::BTreeMap;
13
14impl Collection {
15    /// Inserts or updates points in the collection.
16    ///
17    /// Accepts any iterator of points (Vec, slice, array, etc.)
18    ///
19    /// # Errors
20    ///
21    /// Returns an error if any point has a mismatched dimension, or if
22    /// attempting to insert vectors into a metadata-only collection.
23    #[allow(clippy::too_many_lines)] // Monolithic for coherent lock-ordering; refactor tracked separately.
24    pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
25        let points: Vec<Point> = points.into_iter().collect();
26        let config = self.config.read();
27        let dimension = config.dimension;
28        let storage_mode = config.storage_mode;
29        let metadata_only = config.metadata_only;
30
31        if metadata_only {
32            for point in &points {
33                if !point.vector.is_empty() {
34                    // Lazy clone: name only allocated on this error path.
35                    return Err(Error::VectorNotAllowed(config.name.clone()));
36                }
37            }
38            drop(config);
39            return self.upsert_metadata(points);
40        }
41        drop(config);
42
43        for point in &points {
44            if point.dimension() != dimension {
45                return Err(Error::DimensionMismatch {
46                    expected: dimension,
47                    actual: point.dimension(),
48                });
49            }
50        }
51
52        // Buffer sparse data for batch insert after storage locks are released.
53        // LOCK ORDER: sparse_indexes(9) acquired AFTER vector_storage(2) + payload_storage(3).
54        let mut sparse_batch: Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)> =
55            Vec::new();
56
57        let mut vector_storage = self.vector_storage.write();
58        let mut payload_storage = self.payload_storage.write();
59
60        let mut sq8_cache = match storage_mode {
61            StorageMode::SQ8 => Some(self.sq8_cache.write()),
62            _ => None,
63        };
64        let mut binary_cache = match storage_mode {
65            StorageMode::Binary => Some(self.binary_cache.write()),
66            _ => None,
67        };
68        let mut pq_cache = match storage_mode {
69            StorageMode::ProductQuantization => Some(self.pq_cache.write()),
70            _ => None,
71        };
72
73        for point in points {
74            let old_payload = payload_storage.retrieve(point.id).ok().flatten();
75            vector_storage
76                .store(point.id, &point.vector)
77                .map_err(Error::Io)?;
78
79            self.cache_quantized_vector(
80                &point,
81                storage_mode,
82                sq8_cache.as_deref_mut(),
83                binary_cache.as_deref_mut(),
84                pq_cache.as_deref_mut(),
85            );
86
87            if let Some(payload) = &point.payload {
88                payload_storage
89                    .store(point.id, payload)
90                    .map_err(Error::Io)?;
91            } else {
92                let _ = payload_storage.delete(point.id);
93            }
94
95            self.update_secondary_indexes_on_upsert(
96                point.id,
97                old_payload.as_ref(),
98                point.payload.as_ref(),
99            );
100
101            self.index.insert(point.id, &point.vector);
102
103            if let Some(payload) = &point.payload {
104                let text = Self::extract_text_from_payload(payload);
105                if !text.is_empty() {
106                    self.text_index.add_document(point.id, &text);
107                }
108            } else {
109                self.text_index.remove_document(point.id);
110            }
111
112            // Buffer sparse vectors for batch insert after releasing storage locks.
113            if let Some(sv_map) = point.sparse_vectors {
114                if !sv_map.is_empty() {
115                    sparse_batch.push((point.id, sv_map));
116                }
117            }
118        }
119
120        // LOCK ORDER: flush while vector_storage(2) + payload_storage(3) still held,
121        // then drop both before acquiring config(1) alone to avoid inversion.
122        let point_count = vector_storage.len();
123        vector_storage.flush().map_err(Error::Io)?;
124        payload_storage.flush().map_err(Error::Io)?;
125        drop(vector_storage);
126        drop(payload_storage);
127
128        // config(1) only — all higher-numbered locks released above.
129        let mut config = self.config.write();
130        config.point_count = point_count;
131        drop(config);
132
133        self.index.save(&self.path).map_err(Error::Io)?;
134
135        // LOCK ORDER: sparse_indexes(9) — acquired after all lower-numbered locks released.
136        if !sparse_batch.is_empty() {
137            // WAL-before-apply: persist the intent to disk BEFORE mutating the
138            // in-memory index. A crash between WAL write and index insert is safe
139            // because the WAL is replayed on recovery; a crash after index insert
140            // but before WAL write would lose the update.
141            #[cfg(feature = "persistence")]
142            {
143                for (point_id, sv_map) in &sparse_batch {
144                    for (name, sv) in sv_map {
145                        let wal_path =
146                            crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
147                        crate::index::sparse::persistence::wal_append_upsert(
148                            &wal_path, *point_id, sv,
149                        )?;
150                    }
151                }
152            }
153
154            let mut indexes = self.sparse_indexes.write();
155            for (point_id, sv_map) in &sparse_batch {
156                for (name, sv) in sv_map {
157                    let idx = indexes.entry(name.clone()).or_default();
158                    idx.insert(*point_id, sv);
159                }
160            }
161        }
162
163        // Invalidate stats cache so the next get_stats() recomputes fresh data.
164        *self.cached_stats.lock() = None;
165
166        // Bump write generation once per batch (CACHE-01 invalidation counter).
167        self.write_generation
168            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
169
170        Ok(())
171    }
172
173    /// Inserts or updates metadata-only points (no vectors).
174    ///
175    /// This method is for metadata-only collections. Points should have
176    /// empty vectors and only contain payload data.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if storage operations fail.
181    pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
182        let points: Vec<Point> = points.into_iter().collect();
183
184        let mut payload_storage = self.payload_storage.write();
185
186        for point in &points {
187            let old_payload = payload_storage.retrieve(point.id).ok().flatten();
188            // Store Payload (metadata-only points must have payload)
189            if let Some(payload) = &point.payload {
190                payload_storage
191                    .store(point.id, payload)
192                    .map_err(Error::Io)?;
193
194                // Update BM25 Text Index for full-text search
195                let text = Self::extract_text_from_payload(payload);
196                if !text.is_empty() {
197                    self.text_index.add_document(point.id, &text);
198                }
199            } else {
200                let _ = payload_storage.delete(point.id);
201                self.text_index.remove_document(point.id);
202            }
203
204            self.update_secondary_indexes_on_upsert(
205                point.id,
206                old_payload.as_ref(),
207                point.payload.as_ref(),
208            );
209        }
210
211        // LOCK ORDER: flush while payload_storage(3) still held, then drop before acquiring config(1).
212        let point_count = payload_storage.ids().len();
213        payload_storage.flush().map_err(Error::Io)?;
214        drop(payload_storage);
215
216        // config(1) only — all higher-numbered locks released above.
217        let mut config = self.config.write();
218        config.point_count = point_count;
219        drop(config);
220
221        // Invalidate stats cache so the next get_stats() recomputes fresh data.
222        *self.cached_stats.lock() = None;
223
224        // Bump write generation once per batch (CACHE-01 invalidation counter).
225        self.write_generation
226            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
227
228        Ok(())
229    }
230
231    /// Bulk insert optimized for high-throughput import.
232    ///
233    /// # Performance
234    ///
235    /// This method is optimized for bulk loading:
236    /// - Uses parallel HNSW insertion (rayon)
237    /// - Single flush at the end (not per-point)
238    /// - No HNSW index save (deferred for performance)
239    /// - ~15x faster than previous sequential approach on large batches (5000+)
240    /// - Benchmark: 25-30 Kvec/s on 768D vectors
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if any point has a mismatched dimension.
245    pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
246        if points.is_empty() {
247            return Ok(0);
248        }
249
250        let config = self.config.read();
251        let dimension = config.dimension;
252        drop(config);
253
254        for point in points {
255            if point.dimension() != dimension {
256                return Err(Error::DimensionMismatch {
257                    expected: dimension,
258                    actual: point.dimension(),
259                });
260            }
261        }
262
263        // Perf: Collect vectors for parallel HNSW insertion (needed for clone anyway)
264        let vectors_for_hnsw: Vec<(u64, Vec<f32>)> =
265            points.iter().map(|p| (p.id, p.vector.clone())).collect();
266
267        // Perf: Single batch WAL write + contiguous mmap write
268        // Use references from vectors_for_hnsw to avoid double allocation
269        let vectors_for_storage: Vec<(u64, &[f32])> = vectors_for_hnsw
270            .iter()
271            .map(|(id, v)| (*id, v.as_slice()))
272            .collect();
273
274        {
275            let mut vector_storage = self.vector_storage.write();
276            vector_storage
277                .store_batch(&vectors_for_storage)
278                .map_err(Error::Io)?;
279            // Perf: Flush while lock is already held — avoids a second write() acquisition
280            vector_storage.flush().map_err(Error::Io)?;
281        }
282
283        // Store payloads and update BM25 (still sequential for now)
284        {
285            let mut payload_storage = self.payload_storage.write();
286            for point in points {
287                if let Some(payload) = &point.payload {
288                    payload_storage
289                        .store(point.id, payload)
290                        .map_err(Error::Io)?;
291
292                    // Update BM25 text index
293                    let text = Self::extract_text_from_payload(payload);
294                    if !text.is_empty() {
295                        self.text_index.add_document(point.id, &text);
296                    }
297                }
298            }
299            // Perf: Flush while lock is already held — avoids a second write() acquisition
300            payload_storage.flush().map_err(Error::Io)?;
301        }
302
303        // Perf: Parallel HNSW insertion (CPU bound - benefits from parallelism)
304        let inserted = self.index.insert_batch_parallel(vectors_for_hnsw);
305        self.index.set_searching_mode();
306
307        // Update point count
308        let mut config = self.config.write();
309        config.point_count = self.vector_storage.read().len();
310        drop(config);
311        // NOTE: index.save() removed - too slow for batch operations
312        // Call collection.flush() explicitly if durability is critical
313
314        // Invalidate stats cache so the next get_stats() recomputes fresh data.
315        *self.cached_stats.lock() = None;
316
317        // Bump write generation once per batch (CACHE-01 invalidation counter).
318        //
319        // Intentional placement: the bump occurs AFTER all mutations (vector
320        // storage write, payload storage write, HNSW insertion, config update)
321        // have completed successfully. Bumping earlier would allow a concurrent
322        // reader to see the new generation before the data is consistent,
323        // causing it to build a fresh plan key that matches no cached entry —
324        // harmless, but wasteful. Bumping here ensures cache invalidation is
325        // visible only once all writes are durable.
326        self.write_generation
327            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
328
329        Ok(inserted)
330    }
331
332    /// Retrieves points by their IDs.
333    #[must_use]
334    pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
335        let config = self.config.read();
336        let is_metadata_only = config.metadata_only;
337        drop(config);
338
339        let payload_storage = self.payload_storage.read();
340
341        if is_metadata_only {
342            // For metadata-only collections, only retrieve payload
343            ids.iter()
344                .map(|&id| {
345                    let payload = payload_storage.retrieve(id).ok().flatten()?;
346                    Some(Point {
347                        id,
348                        vector: Vec::new(),
349                        payload: Some(payload),
350                        sparse_vectors: None,
351                    })
352                })
353                .collect()
354        } else {
355            // For vector collections, retrieve both vector and payload
356            let vector_storage = self.vector_storage.read();
357            ids.iter()
358                .map(|&id| {
359                    let vector = vector_storage.retrieve(id).ok().flatten()?;
360                    let payload = payload_storage.retrieve(id).ok().flatten();
361                    Some(Point {
362                        id,
363                        vector,
364                        payload,
365                        sparse_vectors: None,
366                    })
367                })
368                .collect()
369        }
370    }
371
372    /// Deletes points by their IDs.
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if storage operations fail.
377    pub fn delete(&self, ids: &[u64]) -> Result<()> {
378        let config = self.config.read();
379        let is_metadata_only = config.metadata_only;
380        drop(config);
381
382        let mut payload_storage = self.payload_storage.write();
383
384        if is_metadata_only {
385            // For metadata-only collections, only delete from payload storage
386            for &id in ids {
387                let old_payload = payload_storage.retrieve(id).ok().flatten();
388                payload_storage.delete(id).map_err(Error::Io)?;
389                self.text_index.remove_document(id);
390                self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
391            }
392
393            // LOCK ORDER: drop payload_storage(3) before acquiring config(1).
394            let point_count = payload_storage.ids().len();
395            drop(payload_storage);
396            // config(1) only — all higher-numbered locks released above.
397            let mut config = self.config.write();
398            config.point_count = point_count;
399            drop(config);
400        } else {
401            // For vector collections, delete from all stores
402            let mut vector_storage = self.vector_storage.write();
403            // Acquire cache locks once outside the loop (was N×3 lock acquisitions)
404            let mut sq8_cache = self.sq8_cache.write();
405            let mut binary_cache = self.binary_cache.write();
406            let mut pq_cache = self.pq_cache.write();
407
408            for &id in ids {
409                let old_payload = payload_storage.retrieve(id).ok().flatten();
410                vector_storage.delete(id).map_err(Error::Io)?;
411                payload_storage.delete(id).map_err(Error::Io)?;
412                self.index.remove(id);
413                sq8_cache.remove(&id);
414                binary_cache.remove(&id);
415                pq_cache.remove(&id);
416                self.text_index.remove_document(id);
417                self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
418            }
419
420            // LOCK ORDER: drop vector_storage(2), payload_storage(3), caches before acquiring config(1).
421            let point_count = vector_storage.len();
422            drop(vector_storage);
423            drop(payload_storage);
424            drop(sq8_cache);
425            drop(binary_cache);
426            drop(pq_cache);
427            // config(1) only — all higher-numbered locks released above.
428            let mut config = self.config.write();
429            config.point_count = point_count;
430            drop(config);
431
432            // LOCK ORDER: sparse_indexes(9) — acquired after all lower-numbered locks released.
433            // WAL-before-apply: write delete intent to WAL before mutating the index.
434            #[cfg(feature = "persistence")]
435            {
436                let indexes = self.sparse_indexes.read();
437                if !indexes.is_empty() {
438                    for (name, _) in indexes.iter() {
439                        let wal_path =
440                            crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
441                        for &id in ids {
442                            crate::index::sparse::persistence::wal_append_delete(&wal_path, id)?;
443                        }
444                    }
445                }
446            }
447
448            {
449                let indexes = self.sparse_indexes.read();
450                for idx in indexes.values() {
451                    for &id in ids {
452                        idx.delete(id);
453                    }
454                }
455            }
456        }
457
458        // Invalidate stats cache so the next get_stats() recomputes fresh data.
459        *self.cached_stats.lock() = None;
460
461        // Bump write generation once per batch (CACHE-01 invalidation counter).
462        self.write_generation
463            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
464
465        Ok(())
466    }
467
468    /// Returns the number of points in the collection.
469    /// Perf: Uses cached `point_count` from config instead of acquiring storage lock
470    #[must_use]
471    pub fn len(&self) -> usize {
472        self.config.read().point_count
473    }
474
475    /// Returns true if the collection is empty.
476    /// Perf: Uses cached `point_count` from config instead of acquiring storage lock
477    #[must_use]
478    pub fn is_empty(&self) -> bool {
479        self.config.read().point_count == 0
480    }
481
482    /// Returns all point IDs in the collection.
483    #[must_use]
484    pub fn all_ids(&self) -> Vec<u64> {
485        self.payload_storage.read().ids()
486    }
487}