Skip to main content

velesdb_core/collection/core/
crud.rs

1//! CRUD operations for Collection (upsert, get, delete).
2
3use crate::collection::types::Collection;
4use crate::error::{Error, Result};
5use crate::index::VectorIndex;
6use crate::index::{JsonValue, SecondaryIndex};
7use crate::point::Point;
8use crate::quantization::{
9    BinaryQuantizedVector, PQVector, ProductQuantizer, QuantizedVector, StorageMode,
10};
11use crate::storage::{PayloadStorage, VectorStorage};
12
13impl Collection {
14    /// Inserts or updates points in the collection.
15    ///
16    /// Accepts any iterator of points (Vec, slice, array, etc.)
17    ///
18    /// # Errors
19    ///
20    /// Returns an error if any point has a mismatched dimension, or if
21    /// attempting to insert vectors into a metadata-only collection.
22    pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
23        let points: Vec<Point> = points.into_iter().collect();
24        let config = self.config.read();
25        let dimension = config.dimension;
26        let storage_mode = config.storage_mode;
27        let metadata_only = config.metadata_only;
28        let name = config.name.clone();
29        drop(config);
30
31        // Reject vectors on metadata-only collections
32        if metadata_only {
33            for point in &points {
34                if !point.vector.is_empty() {
35                    return Err(Error::VectorNotAllowed(name));
36                }
37            }
38            // Delegate to upsert_metadata for metadata-only collections
39            return self.upsert_metadata(points);
40        }
41
42        // Validate dimensions first
43        for point in &points {
44            if point.dimension() != dimension {
45                return Err(Error::DimensionMismatch {
46                    expected: dimension,
47                    actual: point.dimension(),
48                });
49            }
50        }
51
52        let mut vector_storage = self.vector_storage.write();
53        let mut payload_storage = self.payload_storage.write();
54
55        // Get quantized caches if needed
56        let mut sq8_cache = match storage_mode {
57            StorageMode::SQ8 => Some(self.sq8_cache.write()),
58            _ => None,
59        };
60        let mut binary_cache = match storage_mode {
61            StorageMode::Binary => Some(self.binary_cache.write()),
62            _ => None,
63        };
64        let mut pq_cache = match storage_mode {
65            StorageMode::ProductQuantization => Some(self.pq_cache.write()),
66            _ => None,
67        };
68
69        for point in points {
70            let old_payload = payload_storage.retrieve(point.id).ok().flatten();
71            // 1. Store Vector
72            vector_storage
73                .store(point.id, &point.vector)
74                .map_err(Error::Io)?;
75
76            // 2. Store quantized vector based on storage_mode
77            match storage_mode {
78                StorageMode::SQ8 => {
79                    if let Some(ref mut cache) = sq8_cache {
80                        let quantized = QuantizedVector::from_f32(&point.vector);
81                        cache.insert(point.id, quantized);
82                    }
83                }
84                StorageMode::Binary => {
85                    if let Some(ref mut cache) = binary_cache {
86                        let quantized = BinaryQuantizedVector::from_f32(&point.vector);
87                        cache.insert(point.id, quantized);
88                    }
89                }
90                StorageMode::ProductQuantization => {
91                    let maybe_code: Option<PQVector> = {
92                        let mut quantizer_guard = self.pq_quantizer.write();
93                        if quantizer_guard.is_none() {
94                            let mut buffer = self.pq_training_buffer.write();
95                            buffer.push_back(point.vector.clone());
96                            const PQ_TRAINING_SAMPLES: usize = 128;
97                            if buffer.len() >= PQ_TRAINING_SAMPLES {
98                                let training: Vec<Vec<f32>> = buffer.iter().cloned().collect();
99                                let dimension = point.vector.len();
100                                let mut num_subspaces = 8usize;
101                                while num_subspaces > 1 && !dimension.is_multiple_of(num_subspaces)
102                                {
103                                    num_subspaces /= 2;
104                                }
105                                let num_centroids = 256usize.min(training.len().max(2));
106                                *quantizer_guard = Some(ProductQuantizer::train(
107                                    &training,
108                                    num_subspaces.max(1),
109                                    num_centroids,
110                                ));
111                            }
112                        }
113
114                        quantizer_guard
115                            .as_ref()
116                            .map(|quantizer| quantizer.quantize(&point.vector))
117                    };
118
119                    if let (Some(ref mut cache), Some(code)) = (&mut pq_cache, maybe_code) {
120                        cache.insert(point.id, code);
121                    }
122                }
123                StorageMode::Full => {}
124            }
125
126            // 3. Store Payload (if present)
127            if let Some(payload) = &point.payload {
128                payload_storage
129                    .store(point.id, payload)
130                    .map_err(Error::Io)?;
131            } else {
132                let _ = payload_storage.delete(point.id);
133            }
134
135            self.update_secondary_indexes_on_upsert(
136                point.id,
137                old_payload.as_ref(),
138                point.payload.as_ref(),
139            );
140
141            // 4. Update Vector Index
142            self.index.insert(point.id, &point.vector);
143
144            // 5. Update BM25 Text Index
145            if let Some(payload) = &point.payload {
146                let text = Self::extract_text_from_payload(payload);
147                if !text.is_empty() {
148                    self.text_index.add_document(point.id, &text);
149                }
150            } else {
151                self.text_index.remove_document(point.id);
152            }
153        }
154
155        // Update point count
156        let mut config = self.config.write();
157        config.point_count = vector_storage.len();
158
159        // Auto-flush for durability
160        vector_storage.flush().map_err(Error::Io)?;
161        payload_storage.flush().map_err(Error::Io)?;
162        self.index.save(&self.path).map_err(Error::Io)?;
163
164        Ok(())
165    }
166
167    /// Inserts or updates metadata-only points (no vectors).
168    ///
169    /// This method is for metadata-only collections. Points should have
170    /// empty vectors and only contain payload data.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if storage operations fail.
175    pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
176        let points: Vec<Point> = points.into_iter().collect();
177
178        let mut payload_storage = self.payload_storage.write();
179
180        for point in &points {
181            let old_payload = payload_storage.retrieve(point.id).ok().flatten();
182            // Store Payload (metadata-only points must have payload)
183            if let Some(payload) = &point.payload {
184                payload_storage
185                    .store(point.id, payload)
186                    .map_err(Error::Io)?;
187
188                // Update BM25 Text Index for full-text search
189                let text = Self::extract_text_from_payload(payload);
190                if !text.is_empty() {
191                    self.text_index.add_document(point.id, &text);
192                }
193            } else {
194                let _ = payload_storage.delete(point.id);
195                self.text_index.remove_document(point.id);
196            }
197
198            self.update_secondary_indexes_on_upsert(
199                point.id,
200                old_payload.as_ref(),
201                point.payload.as_ref(),
202            );
203        }
204
205        // Update point count
206        let mut config = self.config.write();
207        config.point_count = payload_storage.ids().len();
208
209        // Auto-flush for durability
210        payload_storage.flush().map_err(Error::Io)?;
211
212        Ok(())
213    }
214
215    /// Bulk insert optimized for high-throughput import.
216    ///
217    /// # Performance
218    ///
219    /// This method is optimized for bulk loading:
220    /// - Uses parallel HNSW insertion (rayon)
221    /// - Single flush at the end (not per-point)
222    /// - No HNSW index save (deferred for performance)
223    /// - ~15x faster than previous sequential approach on large batches (5000+)
224    /// - Benchmark: 25-30 Kvec/s on 768D vectors
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if any point has a mismatched dimension.
229    pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
230        if points.is_empty() {
231            return Ok(0);
232        }
233
234        let config = self.config.read();
235        let dimension = config.dimension;
236        drop(config);
237
238        // Validate dimensions first
239        for point in points {
240            if point.dimension() != dimension {
241                return Err(Error::DimensionMismatch {
242                    expected: dimension,
243                    actual: point.dimension(),
244                });
245            }
246        }
247
248        // Perf: Collect vectors for parallel HNSW insertion (needed for clone anyway)
249        let vectors_for_hnsw: Vec<(u64, Vec<f32>)> =
250            points.iter().map(|p| (p.id, p.vector.clone())).collect();
251
252        // Perf: Single batch WAL write + contiguous mmap write
253        // Use references from vectors_for_hnsw to avoid double allocation
254        let vectors_for_storage: Vec<(u64, &[f32])> = vectors_for_hnsw
255            .iter()
256            .map(|(id, v)| (*id, v.as_slice()))
257            .collect();
258
259        let mut vector_storage = self.vector_storage.write();
260        vector_storage
261            .store_batch(&vectors_for_storage)
262            .map_err(Error::Io)?;
263        drop(vector_storage);
264
265        // Store payloads and update BM25 (still sequential for now)
266        let mut payload_storage = self.payload_storage.write();
267        for point in points {
268            if let Some(payload) = &point.payload {
269                payload_storage
270                    .store(point.id, payload)
271                    .map_err(Error::Io)?;
272
273                // Update BM25 text index
274                let text = Self::extract_text_from_payload(payload);
275                if !text.is_empty() {
276                    self.text_index.add_document(point.id, &text);
277                }
278            }
279        }
280        drop(payload_storage);
281
282        // Perf: Parallel HNSW insertion (CPU bound - benefits from parallelism)
283        let inserted = self.index.insert_batch_parallel(vectors_for_hnsw);
284        self.index.set_searching_mode();
285
286        // Update point count
287        let mut config = self.config.write();
288        config.point_count = self.vector_storage.read().len();
289        drop(config);
290
291        // Perf: Only flush vector/payload storage (fast mmap sync)
292        // Skip expensive HNSW index save - will be saved on collection close/explicit flush
293        // This is safe: HNSW is in-memory and rebuilt from vector storage on restart
294        self.vector_storage.write().flush().map_err(Error::Io)?;
295        self.payload_storage.write().flush().map_err(Error::Io)?;
296        // NOTE: index.save() removed - too slow for batch operations
297        // Call collection.flush() explicitly if durability is critical
298
299        Ok(inserted)
300    }
301
302    /// Retrieves points by their IDs.
303    #[must_use]
304    pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
305        let config = self.config.read();
306        let is_metadata_only = config.metadata_only;
307        drop(config);
308
309        let payload_storage = self.payload_storage.read();
310
311        if is_metadata_only {
312            // For metadata-only collections, only retrieve payload
313            ids.iter()
314                .map(|&id| {
315                    let payload = payload_storage.retrieve(id).ok().flatten()?;
316                    Some(Point {
317                        id,
318                        vector: Vec::new(),
319                        payload: Some(payload),
320                    })
321                })
322                .collect()
323        } else {
324            // For vector collections, retrieve both vector and payload
325            let vector_storage = self.vector_storage.read();
326            ids.iter()
327                .map(|&id| {
328                    let vector = vector_storage.retrieve(id).ok().flatten()?;
329                    let payload = payload_storage.retrieve(id).ok().flatten();
330                    Some(Point {
331                        id,
332                        vector,
333                        payload,
334                    })
335                })
336                .collect()
337        }
338    }
339
340    /// Deletes points by their IDs.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if storage operations fail.
345    pub fn delete(&self, ids: &[u64]) -> Result<()> {
346        let config = self.config.read();
347        let is_metadata_only = config.metadata_only;
348        drop(config);
349
350        let mut payload_storage = self.payload_storage.write();
351
352        if is_metadata_only {
353            // For metadata-only collections, only delete from payload storage
354            for &id in ids {
355                let old_payload = payload_storage.retrieve(id).ok().flatten();
356                payload_storage.delete(id).map_err(Error::Io)?;
357                self.text_index.remove_document(id);
358                self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
359            }
360
361            let mut config = self.config.write();
362            config.point_count = payload_storage.ids().len();
363        } else {
364            // For vector collections, delete from all stores
365            let mut vector_storage = self.vector_storage.write();
366
367            for &id in ids {
368                let old_payload = payload_storage.retrieve(id).ok().flatten();
369                vector_storage.delete(id).map_err(Error::Io)?;
370                payload_storage.delete(id).map_err(Error::Io)?;
371                self.index.remove(id);
372                self.sq8_cache.write().remove(&id);
373                self.binary_cache.write().remove(&id);
374                self.pq_cache.write().remove(&id);
375                self.text_index.remove_document(id);
376                self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
377            }
378
379            let mut config = self.config.write();
380            config.point_count = vector_storage.len();
381        }
382
383        Ok(())
384    }
385
386    fn update_secondary_indexes_on_upsert(
387        &self,
388        id: u64,
389        old_payload: Option<&serde_json::Value>,
390        new_payload: Option<&serde_json::Value>,
391    ) {
392        let indexes = self.secondary_indexes.read();
393        for (field, index) in indexes.iter() {
394            if let Some(old_value) = old_payload
395                .and_then(|p| p.get(field))
396                .and_then(JsonValue::from_json)
397            {
398                self.remove_from_secondary_index(index, &old_value, id);
399            }
400            if let Some(new_value) = new_payload
401                .and_then(|p| p.get(field))
402                .and_then(JsonValue::from_json)
403            {
404                self.insert_into_secondary_index(index, new_value, id);
405            }
406        }
407    }
408
409    fn update_secondary_indexes_on_delete(&self, id: u64, old_payload: Option<&serde_json::Value>) {
410        let Some(payload) = old_payload else {
411            return;
412        };
413        let indexes = self.secondary_indexes.read();
414        for (field, index) in indexes.iter() {
415            if let Some(old_value) = payload.get(field).and_then(JsonValue::from_json) {
416                self.remove_from_secondary_index(index, &old_value, id);
417            }
418        }
419    }
420
421    fn insert_into_secondary_index(&self, index: &SecondaryIndex, key: JsonValue, id: u64) {
422        match index {
423            SecondaryIndex::BTree(tree) => {
424                let mut tree = tree.write();
425                let ids = tree.entry(key).or_default();
426                if !ids.contains(&id) {
427                    ids.push(id);
428                }
429            }
430        }
431    }
432
433    fn remove_from_secondary_index(&self, index: &SecondaryIndex, key: &JsonValue, id: u64) {
434        match index {
435            SecondaryIndex::BTree(tree) => {
436                let mut tree = tree.write();
437                if let Some(ids) = tree.get_mut(key) {
438                    ids.retain(|existing| *existing != id);
439                    if ids.is_empty() {
440                        tree.remove(key);
441                    }
442                }
443            }
444        }
445    }
446
447    /// Returns the number of points in the collection.
448    /// Perf: Uses cached `point_count` from config instead of acquiring storage lock
449    #[must_use]
450    pub fn len(&self) -> usize {
451        self.config.read().point_count
452    }
453
454    /// Returns true if the collection is empty.
455    /// Perf: Uses cached `point_count` from config instead of acquiring storage lock
456    #[must_use]
457    pub fn is_empty(&self) -> bool {
458        self.config.read().point_count == 0
459    }
460
461    /// Returns all point IDs in the collection.
462    #[must_use]
463    pub fn all_ids(&self) -> Vec<u64> {
464        self.payload_storage.read().ids()
465    }
466}