velesdb_core/collection/core/
crud.rs1use crate::collection::types::Collection;
6use crate::error::{Error, Result};
7use crate::index::VectorIndex;
8use crate::point::Point;
9use crate::quantization::{BinaryQuantizedVector, PQVector, QuantizedVector, StorageMode};
10use crate::storage::{LogPayloadStorage, PayloadStorage, VectorStorage};
11use crate::validation::validate_dimension_match;
12
13use parking_lot::RwLockWriteGuard;
14use std::collections::{BTreeMap, HashMap};
15
16struct QuantizationGuards<'a> {
17 sq8: Option<RwLockWriteGuard<'a, HashMap<u64, QuantizedVector>>>,
18 binary: Option<RwLockWriteGuard<'a, HashMap<u64, BinaryQuantizedVector>>>,
19 pq: Option<RwLockWriteGuard<'a, HashMap<u64, PQVector>>>,
20}
21
22impl<'a> QuantizationGuards<'a> {
23 fn acquire(collection: &'a Collection, mode: StorageMode) -> Self {
24 Self {
25 sq8: matches!(mode, StorageMode::SQ8).then(|| collection.sq8_cache.write()),
26 binary: matches!(mode, StorageMode::Binary).then(|| collection.binary_cache.write()),
27 pq: matches!(mode, StorageMode::ProductQuantization)
28 .then(|| collection.pq_cache.write()),
29 }
30 }
31}
32
33impl Collection {
34 pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
43 let points: Vec<Point> = points.into_iter().collect();
44 let config = self.config.read();
45 let dimension = config.dimension;
46 let storage_mode = config.storage_mode;
47
48 if config.metadata_only {
49 for point in &points {
50 if !point.vector.is_empty() {
51 return Err(Error::VectorNotAllowed(config.name.clone()));
52 }
53 }
54 drop(config);
55 return self.upsert_metadata(points);
56 }
57 drop(config);
58
59 for point in &points {
60 validate_dimension_match(dimension, point.dimension())?;
61 }
62
63 let sparse_batch = self.upsert_storage_and_index(&points, storage_mode)?;
64
65 self.apply_sparse_batch_upsert(&sparse_batch)?;
66 self.invalidate_caches_and_bump_generation();
67 Ok(())
68 }
69
70 fn upsert_storage_and_index(
74 &self,
75 points: &[Point],
76 storage_mode: StorageMode,
77 ) -> Result<Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>> {
78 let mut vector_storage = self.vector_storage.write();
79 let mut payload_storage = self.payload_storage.write();
80 let mut quant_guards = QuantizationGuards::acquire(self, storage_mode);
81
82 let mut sparse_batch = Vec::new();
83 for point in points {
84 let old_payload = payload_storage.retrieve(point.id).ok().flatten();
85 vector_storage.store(point.id, &point.vector)?;
86
87 let (sq8, binary, pq) = (
88 quant_guards.sq8.as_deref_mut(),
89 quant_guards.binary.as_deref_mut(),
90 quant_guards.pq.as_deref_mut(),
91 );
92 self.cache_quantized_vector(point, storage_mode, sq8, binary, pq);
93
94 Self::store_or_delete_payload(&mut payload_storage, point)?;
95 self.update_secondary_indexes_on_upsert(
96 point.id,
97 old_payload.as_ref(),
98 point.payload.as_ref(),
99 );
100 self.insert_or_defer(point.id, &point.vector);
101 Self::update_text_index(&self.text_index, point);
102 Self::collect_sparse_vectors(point, &mut sparse_batch);
103 }
104
105 let point_count = vector_storage.len();
106 vector_storage.flush()?;
107 payload_storage.flush()?;
108 drop(vector_storage);
109 drop(payload_storage);
110
111 self.config.write().point_count = point_count;
112 self.maybe_merge_deferred();
113
114 Ok(sparse_batch)
115 }
116
117 fn store_or_delete_payload(
118 payload_storage: &mut LogPayloadStorage,
119 point: &Point,
120 ) -> Result<()> {
121 if let Some(payload) = &point.payload {
122 payload_storage.store(point.id, payload)?;
123 } else {
124 let _ = payload_storage.delete(point.id);
125 }
126 Ok(())
127 }
128
129 fn collect_sparse_vectors(
130 point: &Point,
131 sparse_batch: &mut Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>,
132 ) {
133 if let Some(sv_map) = &point.sparse_vectors {
134 if !sv_map.is_empty() {
135 sparse_batch.push((point.id, sv_map.clone()));
136 }
137 }
138 }
139
140 fn update_text_index(text_index: &crate::index::Bm25Index, point: &Point) {
142 if let Some(payload) = &point.payload {
143 let text = Self::extract_text_from_payload(payload);
144 if !text.is_empty() {
145 text_index.add_document(point.id, &text);
146 }
147 } else {
148 text_index.remove_document(point.id);
149 }
150 }
151
152 fn apply_sparse_batch_upsert(
154 &self,
155 sparse_batch: &[(u64, BTreeMap<String, crate::index::sparse::SparseVector>)],
156 ) -> Result<()> {
157 if sparse_batch.is_empty() {
158 return Ok(());
159 }
160 #[cfg(feature = "persistence")]
161 {
162 for (point_id, sv_map) in sparse_batch {
163 for (name, sv) in sv_map {
164 let wal_path =
165 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
166 crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
167 }
168 }
169 }
170 let mut indexes = self.sparse_indexes.write();
171 for (point_id, sv_map) in sparse_batch {
172 for (name, sv) in sv_map {
173 let idx = indexes.entry(name.clone()).or_default();
174 idx.insert(*point_id, sv);
175 }
176 }
177 Ok(())
178 }
179
180 fn invalidate_caches_and_bump_generation(&self) {
182 *self.cached_stats.lock() = None;
183 self.write_generation
184 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
185 }
186
187 fn insert_or_defer(&self, id: u64, vector: &[f32]) {
197 #[cfg(feature = "persistence")]
198 if let Some(ref di) = self.deferred_indexer {
199 di.push(id, vector.to_vec());
200 return;
201 }
202 self.index.insert(id, vector);
203 }
204
205 fn maybe_merge_deferred(&self) {
210 #[cfg(feature = "persistence")]
211 if let Some(ref di) = self.deferred_indexer {
212 if di.should_merge() {
213 self.merge_deferred_batch(di);
214 }
215 }
216 }
217
218 #[cfg(feature = "persistence")]
229 fn merge_deferred_batch(&self, di: &crate::collection::streaming::DeferredIndexer) {
230 let drained = di.swap_and_drain();
231 if drained.is_empty() {
232 return;
233 }
234 let storage = self.vector_storage.read();
237 let valid: Vec<(u64, &[f32])> = drained
238 .iter()
239 .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
240 .map(|(id, v)| (*id, v.as_slice()))
241 .collect();
242 drop(storage); let expected = valid.len();
244 if valid.is_empty() {
245 return;
246 }
247 let inserted = self.index.insert_batch_parallel(valid);
248 if inserted < expected {
249 tracing::warn!("merge_deferred_batch: inserted {inserted}/{expected} vectors");
250 }
251 }
252
253 pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
262 let points: Vec<Point> = points.into_iter().collect();
263
264 let mut payload_storage = self.payload_storage.write();
265
266 for point in &points {
267 let old_payload = payload_storage.retrieve(point.id).ok().flatten();
268 Self::store_or_delete_payload(&mut payload_storage, point)?;
269 Self::update_text_index(&self.text_index, point);
270 self.update_secondary_indexes_on_upsert(
271 point.id,
272 old_payload.as_ref(),
273 point.payload.as_ref(),
274 );
275 }
276
277 let point_count = payload_storage.ids().len();
279 payload_storage.flush()?;
280 drop(payload_storage);
281
282 self.config.write().point_count = point_count;
284 self.invalidate_caches_and_bump_generation();
285 Ok(())
286 }
287
288 pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
303 if points.is_empty() {
304 return Ok(0);
305 }
306
307 let dimension = self.config.read().dimension;
308 for point in points {
309 validate_dimension_match(dimension, point.dimension())?;
310 }
311
312 let vector_refs: Vec<(u64, &[f32])> =
313 points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
314 let sparse_batch = Self::collect_sparse_batch(points);
315
316 self.bulk_store_vectors(&vector_refs)?;
317 self.bulk_store_payloads(points)?;
318
319 let inserted = self.bulk_index_or_defer(vector_refs);
320 self.config.write().point_count = self.vector_storage.read().len();
321
322 self.apply_sparse_batch_bulk(&sparse_batch)?;
323 self.invalidate_caches_and_bump_generation();
324
325 Ok(inserted)
326 }
327
328 fn bulk_index_or_defer(&self, vector_refs: Vec<(u64, &[f32])>) -> usize {
337 #[cfg(feature = "persistence")]
338 if let Some(ref di) = self.deferred_indexer {
339 di.extend(vector_refs.iter().map(|(id, v)| (*id, v.to_vec())));
340 if di.should_merge() {
341 self.merge_deferred_batch(di);
342 }
343 return vector_refs.len();
344 }
345 let inserted = self.index.insert_batch_parallel(vector_refs);
346 self.index.set_searching_mode();
347 inserted
348 }
349
350 fn collect_sparse_batch(
352 points: &[Point],
353 ) -> BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> {
354 let mut batch: BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> =
355 BTreeMap::new();
356 for point in points {
357 if let Some(sv_map) = &point.sparse_vectors {
358 for (name, sv) in sv_map {
359 batch
360 .entry(name.clone())
361 .or_default()
362 .push((point.id, sv.clone()));
363 }
364 }
365 }
366 batch
367 }
368
369 fn bulk_store_vectors(&self, vectors: &[(u64, &[f32])]) -> Result<()> {
371 let mut storage = self.vector_storage.write();
372 storage.store_batch(vectors)?;
373 storage.flush()?;
374 Ok(())
375 }
376
377 fn bulk_store_payloads(&self, points: &[Point]) -> Result<()> {
382 let entries: Vec<(u64, &serde_json::Value)> = points
383 .iter()
384 .filter_map(|p| p.payload.as_ref().map(|pl| (p.id, pl)))
385 .collect();
386
387 self.payload_storage.write().store_batch(&entries)?;
388
389 for point in points {
390 Self::update_text_index(&self.text_index, point);
391 }
392
393 Ok(())
394 }
395
396 fn apply_sparse_batch_bulk(
398 &self,
399 sparse_batch: &BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>>,
400 ) -> Result<()> {
401 if sparse_batch.is_empty() {
402 return Ok(());
403 }
404 #[cfg(feature = "persistence")]
405 {
406 for (name, docs) in sparse_batch {
407 let wal_path =
408 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
409 for (point_id, sv) in docs {
410 crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
411 }
412 }
413 }
414 let mut indexes = self.sparse_indexes.write();
415 for (name, docs) in sparse_batch {
416 let idx = indexes.entry(name.clone()).or_default();
417 idx.insert_batch_chunk(docs);
418 }
419 Ok(())
420 }
421
422 #[must_use]
424 pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
425 let config = self.config.read();
426 let is_metadata_only = config.metadata_only;
427 drop(config);
428
429 let payload_storage = self.payload_storage.read();
430
431 if is_metadata_only {
432 ids.iter()
434 .map(|&id| {
435 let payload = payload_storage.retrieve(id).ok().flatten()?;
436 Some(Point {
437 id,
438 vector: Vec::new(),
439 payload: Some(payload),
440 sparse_vectors: None,
441 })
442 })
443 .collect()
444 } else {
445 let vector_storage = self.vector_storage.read();
447 ids.iter()
448 .map(|&id| {
449 let vector = vector_storage.retrieve(id).ok().flatten()?;
450 let payload = payload_storage.retrieve(id).ok().flatten();
451 Some(Point {
452 id,
453 vector,
454 payload,
455 sparse_vectors: None,
456 })
457 })
458 .collect()
459 }
460 }
461
462 pub fn delete(&self, ids: &[u64]) -> Result<()> {
468 if self.config.read().metadata_only {
469 self.delete_metadata_only(ids)?;
470 } else {
471 self.delete_vector_points(ids)?;
472 }
473 self.invalidate_caches_and_bump_generation();
474 Ok(())
475 }
476
477 fn delete_metadata_only(&self, ids: &[u64]) -> Result<()> {
479 let mut payload_storage = self.payload_storage.write();
480 for &id in ids {
481 let old_payload = payload_storage.retrieve(id).ok().flatten();
482 payload_storage.delete(id)?;
483 self.text_index.remove_document(id);
484 self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
485 }
486 let point_count = payload_storage.ids().len();
487 drop(payload_storage);
488 self.config.write().point_count = point_count;
489 Ok(())
490 }
491
492 fn delete_vector_points(&self, ids: &[u64]) -> Result<()> {
494 let mut payload_storage = self.payload_storage.write();
495 let mut vector_storage = self.vector_storage.write();
496 let mut sq8_cache = self.sq8_cache.write();
497 let mut binary_cache = self.binary_cache.write();
498 let mut pq_cache = self.pq_cache.write();
499
500 for &id in ids {
501 let old_payload = payload_storage.retrieve(id).ok().flatten();
502 vector_storage.delete(id)?;
503 payload_storage.delete(id)?;
504 self.index.remove(id);
505 sq8_cache.remove(&id);
506 binary_cache.remove(&id);
507 pq_cache.remove(&id);
508 self.text_index.remove_document(id);
509 self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
510 }
511
512 let point_count = vector_storage.len();
513 drop(vector_storage);
514 drop(payload_storage);
515 drop(sq8_cache);
516 drop(binary_cache);
517 drop(pq_cache);
518 self.config.write().point_count = point_count;
519
520 self.delete_from_sparse_indexes(ids)?;
521
522 #[cfg(feature = "persistence")]
524 for &id in ids {
525 self.delta_buffer.remove(id);
526 }
527
528 #[cfg(feature = "persistence")]
530 if let Some(ref di) = self.deferred_indexer {
531 for &id in ids {
532 di.remove(id);
533 }
534 }
535
536 Ok(())
537 }
538
539 fn delete_from_sparse_indexes(&self, ids: &[u64]) -> Result<()> {
541 #[cfg(feature = "persistence")]
542 {
543 let indexes = self.sparse_indexes.read();
544 for (name, _) in indexes.iter() {
545 let wal_path =
546 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
547 for &id in ids {
548 crate::index::sparse::persistence::wal_append_delete(&wal_path, id)?;
549 }
550 }
551 }
552 let indexes = self.sparse_indexes.read();
553 for idx in indexes.values() {
554 for &id in ids {
555 idx.delete(id);
556 }
557 }
558 Ok(())
559 }
560
561 #[must_use]
564 pub fn len(&self) -> usize {
565 self.config.read().point_count
566 }
567
568 #[must_use]
571 pub fn is_empty(&self) -> bool {
572 self.config.read().point_count == 0
573 }
574
575 #[must_use]
577 pub fn all_ids(&self) -> Vec<u64> {
578 self.payload_storage.read().ids()
579 }
580}