velesdb_core/collection/core/
crud.rs1use crate::collection::types::Collection;
6use crate::error::{Error, Result};
7use crate::index::VectorIndex;
8use crate::point::Point;
9use crate::quantization::{BinaryQuantizedVector, PQVector, QuantizedVector, StorageMode};
10use crate::storage::{LogPayloadStorage, PayloadStorage, VectorStorage};
11use crate::validation::validate_dimension_match;
12
13use parking_lot::RwLockWriteGuard;
14use std::collections::{BTreeMap, HashMap};
15
16struct QuantizationGuards<'a> {
17 sq8: Option<RwLockWriteGuard<'a, HashMap<u64, QuantizedVector>>>,
18 binary: Option<RwLockWriteGuard<'a, HashMap<u64, BinaryQuantizedVector>>>,
19 pq: Option<RwLockWriteGuard<'a, HashMap<u64, PQVector>>>,
20}
21
22impl<'a> QuantizationGuards<'a> {
23 fn acquire(collection: &'a Collection, mode: StorageMode) -> Self {
24 Self {
25 sq8: matches!(mode, StorageMode::SQ8).then(|| collection.sq8_cache.write()),
26 binary: matches!(mode, StorageMode::Binary).then(|| collection.binary_cache.write()),
27 pq: matches!(mode, StorageMode::ProductQuantization)
28 .then(|| collection.pq_cache.write()),
29 }
30 }
31}
32
33impl Collection {
34 pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
43 let points: Vec<Point> = points.into_iter().collect();
44 let config = self.config.read();
45 let dimension = config.dimension;
46 let storage_mode = config.storage_mode;
47
48 if config.metadata_only {
49 for point in &points {
50 if !point.vector.is_empty() {
51 return Err(Error::VectorNotAllowed(config.name.clone()));
52 }
53 }
54 drop(config);
55 return self.upsert_metadata(points);
56 }
57 drop(config);
58
59 for point in &points {
60 validate_dimension_match(dimension, point.dimension())?;
61 }
62
63 let sparse_batch = self.upsert_storage_and_index(&points, storage_mode)?;
64
65 self.apply_sparse_batch_upsert(&sparse_batch)?;
66 self.invalidate_caches_and_bump_generation();
67 Ok(())
68 }
69
70 fn upsert_storage_and_index(
86 &self,
87 points: &[Point],
88 storage_mode: StorageMode,
89 ) -> Result<Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>> {
90 let old_payloads = self.batch_store_all(points)?;
92
93 let sparse_batch = self.per_point_updates(points, &old_payloads, storage_mode);
95
96 let vector_refs: Vec<(u64, &[f32])> =
98 points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
99 self.bulk_index_or_defer(vector_refs);
100
101 Ok(sparse_batch)
102 }
103
104 fn batch_store_all(&self, points: &[Point]) -> Result<Vec<Option<serde_json::Value>>> {
119 let mut payload_storage = self.payload_storage.write();
120
121 let old_payloads = Self::collect_old_payloads(points, &payload_storage);
125
126 Self::write_deduped_payloads(points, &mut payload_storage)?;
127 payload_storage.flush()?;
128 drop(payload_storage);
129
130 self.write_deduped_vectors(points)?;
131
132 Ok(old_payloads)
133 }
134
135 fn collect_old_payloads(
140 points: &[Point],
141 storage: &LogPayloadStorage,
142 ) -> Vec<Option<serde_json::Value>> {
143 let mut seen = std::collections::HashSet::new();
144 points
145 .iter()
146 .map(|p| {
147 if seen.insert(p.id) {
148 storage.retrieve(p.id).ok().flatten()
150 } else {
151 None }
153 })
154 .collect()
155 }
156
157 fn write_deduped_payloads(points: &[Point], storage: &mut LogPayloadStorage) -> Result<()> {
160 let mut last_idx: HashMap<u64, usize> = HashMap::new();
162 for (i, p) in points.iter().enumerate() {
163 last_idx.insert(p.id, i);
164 }
165
166 let deduped: Vec<(u64, &serde_json::Value)> = points
168 .iter()
169 .enumerate()
170 .filter(|&(i, p)| last_idx.get(&p.id) == Some(&i) && p.payload.is_some())
171 .filter_map(|(_, p)| p.payload.as_ref().map(|pl| (p.id, pl)))
172 .collect();
173 storage.store_batch(&deduped)?;
174
175 for (i, p) in points.iter().enumerate() {
177 if last_idx.get(&p.id) == Some(&i) && p.payload.is_none() {
178 let _ = storage.delete(p.id);
179 }
180 }
181 Ok(())
182 }
183
184 fn write_deduped_vectors(&self, points: &[Point]) -> Result<()> {
186 let mut last_idx: HashMap<u64, usize> = HashMap::new();
187 for (i, p) in points.iter().enumerate() {
188 last_idx.insert(p.id, i);
189 }
190
191 let deduped: Vec<(u64, &[f32])> = points
192 .iter()
193 .enumerate()
194 .filter(|&(i, p)| last_idx.get(&p.id) == Some(&i))
195 .map(|(_, p)| (p.id, p.vector.as_slice()))
196 .collect();
197
198 let mut vector_storage = self.vector_storage.write();
199 vector_storage.store_batch(&deduped)?;
200 let point_count = vector_storage.len();
201 vector_storage.flush()?;
202 drop(vector_storage);
203
204 self.config.write().point_count = point_count;
205 Ok(())
206 }
207
208 fn per_point_updates(
215 &self,
216 points: &[Point],
217 old_payloads: &[Option<serde_json::Value>],
218 storage_mode: StorageMode,
219 ) -> Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)> {
220 let mut quant_guards = QuantizationGuards::acquire(self, storage_mode);
221 let mut sparse_batch = Vec::new();
222 let mut seen_payloads: HashMap<u64, Option<&serde_json::Value>> = HashMap::new();
230
231 for (point, pre_batch_old) in points.iter().zip(old_payloads) {
232 let effective_old: Option<&serde_json::Value> =
233 if let Some(&inner) = seen_payloads.get(&point.id) {
234 inner
236 } else {
237 pre_batch_old.as_ref()
239 };
240
241 let (sq8, binary, pq) = (
242 quant_guards.sq8.as_deref_mut(),
243 quant_guards.binary.as_deref_mut(),
244 quant_guards.pq.as_deref_mut(),
245 );
246 self.cache_quantized_vector(point, storage_mode, sq8, binary, pq);
247
248 self.update_secondary_indexes_on_upsert(
249 point.id,
250 effective_old,
251 point.payload.as_ref(),
252 );
253 Self::update_text_index(&self.text_index, point);
254 Self::collect_sparse_vectors(point, &mut sparse_batch);
255
256 seen_payloads.insert(point.id, point.payload.as_ref());
258 }
259
260 sparse_batch
261 }
262
263 fn collect_sparse_vectors(
264 point: &Point,
265 sparse_batch: &mut Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>,
266 ) {
267 if let Some(sv_map) = &point.sparse_vectors {
268 if !sv_map.is_empty() {
269 sparse_batch.push((point.id, sv_map.clone()));
270 }
271 }
272 }
273
274 fn update_text_index(text_index: &crate::index::Bm25Index, point: &Point) {
276 if let Some(payload) = &point.payload {
277 let text = Self::extract_text_from_payload(payload);
278 if !text.is_empty() {
279 text_index.add_document(point.id, &text);
280 }
281 } else {
282 text_index.remove_document(point.id);
283 }
284 }
285
286 fn apply_sparse_batch_upsert(
288 &self,
289 sparse_batch: &[(u64, BTreeMap<String, crate::index::sparse::SparseVector>)],
290 ) -> Result<()> {
291 if sparse_batch.is_empty() {
292 return Ok(());
293 }
294 #[cfg(feature = "persistence")]
295 {
296 for (point_id, sv_map) in sparse_batch {
297 for (name, sv) in sv_map {
298 let wal_path =
299 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
300 crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
301 }
302 }
303 }
304 let mut indexes = self.sparse_indexes.write();
305 for (point_id, sv_map) in sparse_batch {
306 for (name, sv) in sv_map {
307 let idx = indexes.entry(name.clone()).or_default();
308 idx.insert(*point_id, sv);
309 }
310 }
311 Ok(())
312 }
313
314 fn invalidate_caches_and_bump_generation(&self) {
316 *self.cached_stats.lock() = None;
317 self.write_generation
318 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
319 }
320
321 #[cfg(feature = "persistence")]
332 fn merge_deferred_batch(&self, di: &crate::collection::streaming::DeferredIndexer) {
333 let drained = di.swap_and_drain();
334 if drained.is_empty() {
335 return;
336 }
337 let storage = self.vector_storage.read();
340 let valid: Vec<(u64, &[f32])> = drained
341 .iter()
342 .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
343 .map(|(id, v)| (*id, v.as_slice()))
344 .collect();
345 drop(storage); let expected = valid.len();
347 if valid.is_empty() {
348 return;
349 }
350 let inserted = self.index.insert_batch_parallel(valid);
351 if inserted < expected {
352 tracing::warn!("merge_deferred_batch: inserted {inserted}/{expected} vectors");
353 }
354 }
355
356 pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
365 let points: Vec<Point> = points.into_iter().collect();
366
367 let mut payload_storage = self.payload_storage.write();
368
369 for point in &points {
370 let old_payload = payload_storage.retrieve(point.id).ok().flatten();
371 if let Some(payload) = &point.payload {
372 payload_storage.store(point.id, payload)?;
373 } else {
374 let _ = payload_storage.delete(point.id);
375 }
376 Self::update_text_index(&self.text_index, point);
377 self.update_secondary_indexes_on_upsert(
378 point.id,
379 old_payload.as_ref(),
380 point.payload.as_ref(),
381 );
382 }
383
384 let point_count = payload_storage.ids().len();
386 payload_storage.flush()?;
387 drop(payload_storage);
388
389 self.config.write().point_count = point_count;
391 self.invalidate_caches_and_bump_generation();
392 Ok(())
393 }
394
395 pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
410 if points.is_empty() {
411 return Ok(0);
412 }
413
414 let dimension = self.config.read().dimension;
415 for point in points {
416 validate_dimension_match(dimension, point.dimension())?;
417 }
418
419 let vector_refs: Vec<(u64, &[f32])> =
420 points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
421 let sparse_batch = Self::collect_sparse_batch(points);
422
423 self.bulk_store_vectors(&vector_refs)?;
424 self.bulk_store_payloads(points)?;
425
426 let inserted = self.bulk_index_or_defer(vector_refs);
427 self.config.write().point_count = self.vector_storage.read().len();
428
429 self.apply_sparse_batch_bulk(&sparse_batch)?;
430 self.invalidate_caches_and_bump_generation();
431
432 Ok(inserted)
433 }
434
435 fn bulk_index_or_defer(&self, vector_refs: Vec<(u64, &[f32])>) -> usize {
449 #[cfg(feature = "persistence")]
450 if let Some(ref di) = self.deferred_indexer {
451 di.extend(vector_refs.iter().map(|(id, v)| (*id, v.to_vec())));
452 if di.should_merge() {
453 self.merge_deferred_batch(di);
454 }
455 return vector_refs.len();
456 }
457 let inserted = self.index.insert_batch_parallel(vector_refs);
458 self.index.set_searching_mode();
459 inserted
460 }
461
462 fn collect_sparse_batch(
464 points: &[Point],
465 ) -> BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> {
466 let mut batch: BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> =
467 BTreeMap::new();
468 for point in points {
469 if let Some(sv_map) = &point.sparse_vectors {
470 for (name, sv) in sv_map {
471 batch
472 .entry(name.clone())
473 .or_default()
474 .push((point.id, sv.clone()));
475 }
476 }
477 }
478 batch
479 }
480
481 fn bulk_store_vectors(&self, vectors: &[(u64, &[f32])]) -> Result<()> {
483 let mut storage = self.vector_storage.write();
484 storage.store_batch(vectors)?;
485 storage.flush()?;
486 Ok(())
487 }
488
489 fn bulk_store_payloads(&self, points: &[Point]) -> Result<()> {
494 let entries: Vec<(u64, &serde_json::Value)> = points
495 .iter()
496 .filter_map(|p| p.payload.as_ref().map(|pl| (p.id, pl)))
497 .collect();
498
499 self.payload_storage.write().store_batch(&entries)?;
500
501 for point in points {
502 Self::update_text_index(&self.text_index, point);
503 }
504
505 Ok(())
506 }
507
508 fn apply_sparse_batch_bulk(
510 &self,
511 sparse_batch: &BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>>,
512 ) -> Result<()> {
513 if sparse_batch.is_empty() {
514 return Ok(());
515 }
516 #[cfg(feature = "persistence")]
517 {
518 for (name, docs) in sparse_batch {
519 let wal_path =
520 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
521 for (point_id, sv) in docs {
522 crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
523 }
524 }
525 }
526 let mut indexes = self.sparse_indexes.write();
527 for (name, docs) in sparse_batch {
528 let idx = indexes.entry(name.clone()).or_default();
529 idx.insert_batch_chunk(docs);
530 }
531 Ok(())
532 }
533
534 #[must_use]
536 pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
537 let config = self.config.read();
538 let is_metadata_only = config.metadata_only;
539 drop(config);
540
541 let payload_storage = self.payload_storage.read();
542
543 if is_metadata_only {
544 ids.iter()
546 .map(|&id| {
547 let payload = payload_storage.retrieve(id).ok().flatten()?;
548 Some(Point {
549 id,
550 vector: Vec::new(),
551 payload: Some(payload),
552 sparse_vectors: None,
553 })
554 })
555 .collect()
556 } else {
557 let vector_storage = self.vector_storage.read();
559 ids.iter()
560 .map(|&id| {
561 let vector = vector_storage.retrieve(id).ok().flatten()?;
562 let payload = payload_storage.retrieve(id).ok().flatten();
563 Some(Point {
564 id,
565 vector,
566 payload,
567 sparse_vectors: None,
568 })
569 })
570 .collect()
571 }
572 }
573
574 pub fn delete(&self, ids: &[u64]) -> Result<()> {
580 if self.config.read().metadata_only {
581 self.delete_metadata_only(ids)?;
582 } else {
583 self.delete_vector_points(ids)?;
584 }
585 self.invalidate_caches_and_bump_generation();
586 Ok(())
587 }
588
589 fn delete_metadata_only(&self, ids: &[u64]) -> Result<()> {
591 let mut payload_storage = self.payload_storage.write();
592 for &id in ids {
593 let old_payload = payload_storage.retrieve(id).ok().flatten();
594 payload_storage.delete(id)?;
595 self.text_index.remove_document(id);
596 self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
597 }
598 let point_count = payload_storage.ids().len();
599 drop(payload_storage);
600 self.config.write().point_count = point_count;
601 Ok(())
602 }
603
604 fn delete_vector_points(&self, ids: &[u64]) -> Result<()> {
606 let mut payload_storage = self.payload_storage.write();
607 let mut vector_storage = self.vector_storage.write();
608 let mut sq8_cache = self.sq8_cache.write();
609 let mut binary_cache = self.binary_cache.write();
610 let mut pq_cache = self.pq_cache.write();
611
612 for &id in ids {
613 let old_payload = payload_storage.retrieve(id).ok().flatten();
614 vector_storage.delete(id)?;
615 payload_storage.delete(id)?;
616 self.index.remove(id);
617 sq8_cache.remove(&id);
618 binary_cache.remove(&id);
619 pq_cache.remove(&id);
620 self.text_index.remove_document(id);
621 self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
622 }
623
624 let point_count = vector_storage.len();
625 drop(vector_storage);
626 drop(payload_storage);
627 drop(sq8_cache);
628 drop(binary_cache);
629 drop(pq_cache);
630 self.config.write().point_count = point_count;
631
632 self.delete_from_sparse_indexes(ids)?;
633
634 #[cfg(feature = "persistence")]
636 for &id in ids {
637 self.delta_buffer.remove(id);
638 }
639
640 #[cfg(feature = "persistence")]
642 if let Some(ref di) = self.deferred_indexer {
643 for &id in ids {
644 di.remove(id);
645 }
646 }
647
648 Ok(())
649 }
650
651 fn delete_from_sparse_indexes(&self, ids: &[u64]) -> Result<()> {
653 #[cfg(feature = "persistence")]
654 {
655 let indexes = self.sparse_indexes.read();
656 for (name, _) in indexes.iter() {
657 let wal_path =
658 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
659 for &id in ids {
660 crate::index::sparse::persistence::wal_append_delete(&wal_path, id)?;
661 }
662 }
663 }
664 let indexes = self.sparse_indexes.read();
665 for idx in indexes.values() {
666 for &id in ids {
667 idx.delete(id);
668 }
669 }
670 Ok(())
671 }
672
673 #[must_use]
682 pub fn len(&self) -> usize {
683 self.config.read().point_count
684 }
685
686 #[must_use]
691 pub fn is_empty(&self) -> bool {
692 self.config.read().point_count == 0
693 }
694
695 #[must_use]
697 pub fn all_ids(&self) -> Vec<u64> {
698 self.payload_storage.read().ids()
699 }
700}