velesdb_core/collection/core/
crud.rs1use crate::collection::types::Collection;
4use crate::error::{Error, Result};
5use crate::index::VectorIndex;
6use crate::index::{JsonValue, SecondaryIndex};
7use crate::point::Point;
8use crate::quantization::{
9 BinaryQuantizedVector, PQVector, ProductQuantizer, QuantizedVector, StorageMode,
10};
11use crate::storage::{PayloadStorage, VectorStorage};
12
13impl Collection {
14 pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
23 let points: Vec<Point> = points.into_iter().collect();
24 let config = self.config.read();
25 let dimension = config.dimension;
26 let storage_mode = config.storage_mode;
27 let metadata_only = config.metadata_only;
28 let name = config.name.clone();
29 drop(config);
30
31 if metadata_only {
33 for point in &points {
34 if !point.vector.is_empty() {
35 return Err(Error::VectorNotAllowed(name));
36 }
37 }
38 return self.upsert_metadata(points);
40 }
41
42 for point in &points {
44 if point.dimension() != dimension {
45 return Err(Error::DimensionMismatch {
46 expected: dimension,
47 actual: point.dimension(),
48 });
49 }
50 }
51
52 let mut vector_storage = self.vector_storage.write();
53 let mut payload_storage = self.payload_storage.write();
54
55 let mut sq8_cache = match storage_mode {
57 StorageMode::SQ8 => Some(self.sq8_cache.write()),
58 _ => None,
59 };
60 let mut binary_cache = match storage_mode {
61 StorageMode::Binary => Some(self.binary_cache.write()),
62 _ => None,
63 };
64 let mut pq_cache = match storage_mode {
65 StorageMode::ProductQuantization => Some(self.pq_cache.write()),
66 _ => None,
67 };
68
69 for point in points {
70 let old_payload = payload_storage.retrieve(point.id).ok().flatten();
71 vector_storage
73 .store(point.id, &point.vector)
74 .map_err(Error::Io)?;
75
76 match storage_mode {
78 StorageMode::SQ8 => {
79 if let Some(ref mut cache) = sq8_cache {
80 let quantized = QuantizedVector::from_f32(&point.vector);
81 cache.insert(point.id, quantized);
82 }
83 }
84 StorageMode::Binary => {
85 if let Some(ref mut cache) = binary_cache {
86 let quantized = BinaryQuantizedVector::from_f32(&point.vector);
87 cache.insert(point.id, quantized);
88 }
89 }
90 StorageMode::ProductQuantization => {
91 let maybe_code: Option<PQVector> = {
92 let mut quantizer_guard = self.pq_quantizer.write();
93 if quantizer_guard.is_none() {
94 let mut buffer = self.pq_training_buffer.write();
95 buffer.push_back(point.vector.clone());
96 const PQ_TRAINING_SAMPLES: usize = 128;
97 if buffer.len() >= PQ_TRAINING_SAMPLES {
98 let training: Vec<Vec<f32>> = buffer.iter().cloned().collect();
99 let dimension = point.vector.len();
100 let mut num_subspaces = 8usize;
101 while num_subspaces > 1 && !dimension.is_multiple_of(num_subspaces)
102 {
103 num_subspaces /= 2;
104 }
105 let num_centroids = 256usize.min(training.len().max(2));
106 *quantizer_guard = Some(ProductQuantizer::train(
107 &training,
108 num_subspaces.max(1),
109 num_centroids,
110 ));
111 }
112 }
113
114 quantizer_guard
115 .as_ref()
116 .map(|quantizer| quantizer.quantize(&point.vector))
117 };
118
119 if let (Some(ref mut cache), Some(code)) = (&mut pq_cache, maybe_code) {
120 cache.insert(point.id, code);
121 }
122 }
123 StorageMode::Full => {}
124 }
125
126 if let Some(payload) = &point.payload {
128 payload_storage
129 .store(point.id, payload)
130 .map_err(Error::Io)?;
131 } else {
132 let _ = payload_storage.delete(point.id);
133 }
134
135 self.update_secondary_indexes_on_upsert(
136 point.id,
137 old_payload.as_ref(),
138 point.payload.as_ref(),
139 );
140
141 self.index.insert(point.id, &point.vector);
143
144 if let Some(payload) = &point.payload {
146 let text = Self::extract_text_from_payload(payload);
147 if !text.is_empty() {
148 self.text_index.add_document(point.id, &text);
149 }
150 } else {
151 self.text_index.remove_document(point.id);
152 }
153 }
154
155 let mut config = self.config.write();
157 config.point_count = vector_storage.len();
158
159 vector_storage.flush().map_err(Error::Io)?;
161 payload_storage.flush().map_err(Error::Io)?;
162 self.index.save(&self.path).map_err(Error::Io)?;
163
164 Ok(())
165 }
166
167 pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
176 let points: Vec<Point> = points.into_iter().collect();
177
178 let mut payload_storage = self.payload_storage.write();
179
180 for point in &points {
181 let old_payload = payload_storage.retrieve(point.id).ok().flatten();
182 if let Some(payload) = &point.payload {
184 payload_storage
185 .store(point.id, payload)
186 .map_err(Error::Io)?;
187
188 let text = Self::extract_text_from_payload(payload);
190 if !text.is_empty() {
191 self.text_index.add_document(point.id, &text);
192 }
193 } else {
194 let _ = payload_storage.delete(point.id);
195 self.text_index.remove_document(point.id);
196 }
197
198 self.update_secondary_indexes_on_upsert(
199 point.id,
200 old_payload.as_ref(),
201 point.payload.as_ref(),
202 );
203 }
204
205 let mut config = self.config.write();
207 config.point_count = payload_storage.ids().len();
208
209 payload_storage.flush().map_err(Error::Io)?;
211
212 Ok(())
213 }
214
215 pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
230 if points.is_empty() {
231 return Ok(0);
232 }
233
234 let config = self.config.read();
235 let dimension = config.dimension;
236 drop(config);
237
238 for point in points {
240 if point.dimension() != dimension {
241 return Err(Error::DimensionMismatch {
242 expected: dimension,
243 actual: point.dimension(),
244 });
245 }
246 }
247
248 let vectors_for_hnsw: Vec<(u64, Vec<f32>)> =
250 points.iter().map(|p| (p.id, p.vector.clone())).collect();
251
252 let vectors_for_storage: Vec<(u64, &[f32])> = vectors_for_hnsw
255 .iter()
256 .map(|(id, v)| (*id, v.as_slice()))
257 .collect();
258
259 let mut vector_storage = self.vector_storage.write();
260 vector_storage
261 .store_batch(&vectors_for_storage)
262 .map_err(Error::Io)?;
263 drop(vector_storage);
264
265 let mut payload_storage = self.payload_storage.write();
267 for point in points {
268 if let Some(payload) = &point.payload {
269 payload_storage
270 .store(point.id, payload)
271 .map_err(Error::Io)?;
272
273 let text = Self::extract_text_from_payload(payload);
275 if !text.is_empty() {
276 self.text_index.add_document(point.id, &text);
277 }
278 }
279 }
280 drop(payload_storage);
281
282 let inserted = self.index.insert_batch_parallel(vectors_for_hnsw);
284 self.index.set_searching_mode();
285
286 let mut config = self.config.write();
288 config.point_count = self.vector_storage.read().len();
289 drop(config);
290
291 self.vector_storage.write().flush().map_err(Error::Io)?;
295 self.payload_storage.write().flush().map_err(Error::Io)?;
296 Ok(inserted)
300 }
301
302 #[must_use]
304 pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
305 let config = self.config.read();
306 let is_metadata_only = config.metadata_only;
307 drop(config);
308
309 let payload_storage = self.payload_storage.read();
310
311 if is_metadata_only {
312 ids.iter()
314 .map(|&id| {
315 let payload = payload_storage.retrieve(id).ok().flatten()?;
316 Some(Point {
317 id,
318 vector: Vec::new(),
319 payload: Some(payload),
320 })
321 })
322 .collect()
323 } else {
324 let vector_storage = self.vector_storage.read();
326 ids.iter()
327 .map(|&id| {
328 let vector = vector_storage.retrieve(id).ok().flatten()?;
329 let payload = payload_storage.retrieve(id).ok().flatten();
330 Some(Point {
331 id,
332 vector,
333 payload,
334 })
335 })
336 .collect()
337 }
338 }
339
340 pub fn delete(&self, ids: &[u64]) -> Result<()> {
346 let config = self.config.read();
347 let is_metadata_only = config.metadata_only;
348 drop(config);
349
350 let mut payload_storage = self.payload_storage.write();
351
352 if is_metadata_only {
353 for &id in ids {
355 let old_payload = payload_storage.retrieve(id).ok().flatten();
356 payload_storage.delete(id).map_err(Error::Io)?;
357 self.text_index.remove_document(id);
358 self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
359 }
360
361 let mut config = self.config.write();
362 config.point_count = payload_storage.ids().len();
363 } else {
364 let mut vector_storage = self.vector_storage.write();
366
367 for &id in ids {
368 let old_payload = payload_storage.retrieve(id).ok().flatten();
369 vector_storage.delete(id).map_err(Error::Io)?;
370 payload_storage.delete(id).map_err(Error::Io)?;
371 self.index.remove(id);
372 self.sq8_cache.write().remove(&id);
373 self.binary_cache.write().remove(&id);
374 self.pq_cache.write().remove(&id);
375 self.text_index.remove_document(id);
376 self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
377 }
378
379 let mut config = self.config.write();
380 config.point_count = vector_storage.len();
381 }
382
383 Ok(())
384 }
385
386 fn update_secondary_indexes_on_upsert(
387 &self,
388 id: u64,
389 old_payload: Option<&serde_json::Value>,
390 new_payload: Option<&serde_json::Value>,
391 ) {
392 let indexes = self.secondary_indexes.read();
393 for (field, index) in indexes.iter() {
394 if let Some(old_value) = old_payload
395 .and_then(|p| p.get(field))
396 .and_then(JsonValue::from_json)
397 {
398 self.remove_from_secondary_index(index, &old_value, id);
399 }
400 if let Some(new_value) = new_payload
401 .and_then(|p| p.get(field))
402 .and_then(JsonValue::from_json)
403 {
404 self.insert_into_secondary_index(index, new_value, id);
405 }
406 }
407 }
408
409 fn update_secondary_indexes_on_delete(&self, id: u64, old_payload: Option<&serde_json::Value>) {
410 let Some(payload) = old_payload else {
411 return;
412 };
413 let indexes = self.secondary_indexes.read();
414 for (field, index) in indexes.iter() {
415 if let Some(old_value) = payload.get(field).and_then(JsonValue::from_json) {
416 self.remove_from_secondary_index(index, &old_value, id);
417 }
418 }
419 }
420
421 fn insert_into_secondary_index(&self, index: &SecondaryIndex, key: JsonValue, id: u64) {
422 match index {
423 SecondaryIndex::BTree(tree) => {
424 let mut tree = tree.write();
425 let ids = tree.entry(key).or_default();
426 if !ids.contains(&id) {
427 ids.push(id);
428 }
429 }
430 }
431 }
432
433 fn remove_from_secondary_index(&self, index: &SecondaryIndex, key: &JsonValue, id: u64) {
434 match index {
435 SecondaryIndex::BTree(tree) => {
436 let mut tree = tree.write();
437 if let Some(ids) = tree.get_mut(key) {
438 ids.retain(|existing| *existing != id);
439 if ids.is_empty() {
440 tree.remove(key);
441 }
442 }
443 }
444 }
445 }
446
447 #[must_use]
450 pub fn len(&self) -> usize {
451 self.config.read().point_count
452 }
453
454 #[must_use]
457 pub fn is_empty(&self) -> bool {
458 self.config.read().point_count == 0
459 }
460
461 #[must_use]
463 pub fn all_ids(&self) -> Vec<u64> {
464 self.payload_storage.read().ids()
465 }
466}