velesdb_core/collection/core/
crud.rs1use crate::collection::types::Collection;
6use crate::error::{Error, Result};
7use crate::index::VectorIndex;
8use crate::point::Point;
9use crate::quantization::StorageMode;
10use crate::storage::{PayloadStorage, VectorStorage};
11
12use std::collections::BTreeMap;
13
14impl Collection {
15 #[allow(clippy::too_many_lines)] pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
25 let points: Vec<Point> = points.into_iter().collect();
26 let config = self.config.read();
27 let dimension = config.dimension;
28 let storage_mode = config.storage_mode;
29 let metadata_only = config.metadata_only;
30
31 if metadata_only {
32 for point in &points {
33 if !point.vector.is_empty() {
34 return Err(Error::VectorNotAllowed(config.name.clone()));
36 }
37 }
38 drop(config);
39 return self.upsert_metadata(points);
40 }
41 drop(config);
42
43 for point in &points {
44 if point.dimension() != dimension {
45 return Err(Error::DimensionMismatch {
46 expected: dimension,
47 actual: point.dimension(),
48 });
49 }
50 }
51
52 let mut sparse_batch: Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)> =
55 Vec::new();
56
57 let mut vector_storage = self.vector_storage.write();
58 let mut payload_storage = self.payload_storage.write();
59
60 let mut sq8_cache = match storage_mode {
61 StorageMode::SQ8 => Some(self.sq8_cache.write()),
62 _ => None,
63 };
64 let mut binary_cache = match storage_mode {
65 StorageMode::Binary => Some(self.binary_cache.write()),
66 _ => None,
67 };
68 let mut pq_cache = match storage_mode {
69 StorageMode::ProductQuantization => Some(self.pq_cache.write()),
70 _ => None,
71 };
72
73 for point in points {
74 let old_payload = payload_storage.retrieve(point.id).ok().flatten();
75 vector_storage
76 .store(point.id, &point.vector)
77 .map_err(Error::Io)?;
78
79 self.cache_quantized_vector(
80 &point,
81 storage_mode,
82 sq8_cache.as_deref_mut(),
83 binary_cache.as_deref_mut(),
84 pq_cache.as_deref_mut(),
85 );
86
87 if let Some(payload) = &point.payload {
88 payload_storage
89 .store(point.id, payload)
90 .map_err(Error::Io)?;
91 } else {
92 let _ = payload_storage.delete(point.id);
93 }
94
95 self.update_secondary_indexes_on_upsert(
96 point.id,
97 old_payload.as_ref(),
98 point.payload.as_ref(),
99 );
100
101 self.index.insert(point.id, &point.vector);
102
103 if let Some(payload) = &point.payload {
104 let text = Self::extract_text_from_payload(payload);
105 if !text.is_empty() {
106 self.text_index.add_document(point.id, &text);
107 }
108 } else {
109 self.text_index.remove_document(point.id);
110 }
111
112 if let Some(sv_map) = point.sparse_vectors {
114 if !sv_map.is_empty() {
115 sparse_batch.push((point.id, sv_map));
116 }
117 }
118 }
119
120 let point_count = vector_storage.len();
123 vector_storage.flush().map_err(Error::Io)?;
124 payload_storage.flush().map_err(Error::Io)?;
125 drop(vector_storage);
126 drop(payload_storage);
127
128 let mut config = self.config.write();
130 config.point_count = point_count;
131 drop(config);
132
133 self.index.save(&self.path).map_err(Error::Io)?;
134
135 if !sparse_batch.is_empty() {
137 #[cfg(feature = "persistence")]
142 {
143 for (point_id, sv_map) in &sparse_batch {
144 for (name, sv) in sv_map {
145 let wal_path =
146 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
147 crate::index::sparse::persistence::wal_append_upsert(
148 &wal_path, *point_id, sv,
149 )?;
150 }
151 }
152 }
153
154 let mut indexes = self.sparse_indexes.write();
155 for (point_id, sv_map) in &sparse_batch {
156 for (name, sv) in sv_map {
157 let idx = indexes.entry(name.clone()).or_default();
158 idx.insert(*point_id, sv);
159 }
160 }
161 }
162
163 *self.cached_stats.lock() = None;
165
166 self.write_generation
168 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
169
170 Ok(())
171 }
172
173 pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
182 let points: Vec<Point> = points.into_iter().collect();
183
184 let mut payload_storage = self.payload_storage.write();
185
186 for point in &points {
187 let old_payload = payload_storage.retrieve(point.id).ok().flatten();
188 if let Some(payload) = &point.payload {
190 payload_storage
191 .store(point.id, payload)
192 .map_err(Error::Io)?;
193
194 let text = Self::extract_text_from_payload(payload);
196 if !text.is_empty() {
197 self.text_index.add_document(point.id, &text);
198 }
199 } else {
200 let _ = payload_storage.delete(point.id);
201 self.text_index.remove_document(point.id);
202 }
203
204 self.update_secondary_indexes_on_upsert(
205 point.id,
206 old_payload.as_ref(),
207 point.payload.as_ref(),
208 );
209 }
210
211 let point_count = payload_storage.ids().len();
213 payload_storage.flush().map_err(Error::Io)?;
214 drop(payload_storage);
215
216 let mut config = self.config.write();
218 config.point_count = point_count;
219 drop(config);
220
221 *self.cached_stats.lock() = None;
223
224 self.write_generation
226 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
227
228 Ok(())
229 }
230
231 pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
246 if points.is_empty() {
247 return Ok(0);
248 }
249
250 let config = self.config.read();
251 let dimension = config.dimension;
252 drop(config);
253
254 for point in points {
255 if point.dimension() != dimension {
256 return Err(Error::DimensionMismatch {
257 expected: dimension,
258 actual: point.dimension(),
259 });
260 }
261 }
262
263 let vectors_for_hnsw: Vec<(u64, Vec<f32>)> =
265 points.iter().map(|p| (p.id, p.vector.clone())).collect();
266
267 let vectors_for_storage: Vec<(u64, &[f32])> = vectors_for_hnsw
270 .iter()
271 .map(|(id, v)| (*id, v.as_slice()))
272 .collect();
273
274 {
275 let mut vector_storage = self.vector_storage.write();
276 vector_storage
277 .store_batch(&vectors_for_storage)
278 .map_err(Error::Io)?;
279 vector_storage.flush().map_err(Error::Io)?;
281 }
282
283 {
285 let mut payload_storage = self.payload_storage.write();
286 for point in points {
287 if let Some(payload) = &point.payload {
288 payload_storage
289 .store(point.id, payload)
290 .map_err(Error::Io)?;
291
292 let text = Self::extract_text_from_payload(payload);
294 if !text.is_empty() {
295 self.text_index.add_document(point.id, &text);
296 }
297 }
298 }
299 payload_storage.flush().map_err(Error::Io)?;
301 }
302
303 let inserted = self.index.insert_batch_parallel(vectors_for_hnsw);
305 self.index.set_searching_mode();
306
307 let mut config = self.config.write();
309 config.point_count = self.vector_storage.read().len();
310 drop(config);
311 *self.cached_stats.lock() = None;
316
317 self.write_generation
327 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
328
329 Ok(inserted)
330 }
331
332 #[must_use]
334 pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
335 let config = self.config.read();
336 let is_metadata_only = config.metadata_only;
337 drop(config);
338
339 let payload_storage = self.payload_storage.read();
340
341 if is_metadata_only {
342 ids.iter()
344 .map(|&id| {
345 let payload = payload_storage.retrieve(id).ok().flatten()?;
346 Some(Point {
347 id,
348 vector: Vec::new(),
349 payload: Some(payload),
350 sparse_vectors: None,
351 })
352 })
353 .collect()
354 } else {
355 let vector_storage = self.vector_storage.read();
357 ids.iter()
358 .map(|&id| {
359 let vector = vector_storage.retrieve(id).ok().flatten()?;
360 let payload = payload_storage.retrieve(id).ok().flatten();
361 Some(Point {
362 id,
363 vector,
364 payload,
365 sparse_vectors: None,
366 })
367 })
368 .collect()
369 }
370 }
371
372 pub fn delete(&self, ids: &[u64]) -> Result<()> {
378 let config = self.config.read();
379 let is_metadata_only = config.metadata_only;
380 drop(config);
381
382 let mut payload_storage = self.payload_storage.write();
383
384 if is_metadata_only {
385 for &id in ids {
387 let old_payload = payload_storage.retrieve(id).ok().flatten();
388 payload_storage.delete(id).map_err(Error::Io)?;
389 self.text_index.remove_document(id);
390 self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
391 }
392
393 let point_count = payload_storage.ids().len();
395 drop(payload_storage);
396 let mut config = self.config.write();
398 config.point_count = point_count;
399 drop(config);
400 } else {
401 let mut vector_storage = self.vector_storage.write();
403 let mut sq8_cache = self.sq8_cache.write();
405 let mut binary_cache = self.binary_cache.write();
406 let mut pq_cache = self.pq_cache.write();
407
408 for &id in ids {
409 let old_payload = payload_storage.retrieve(id).ok().flatten();
410 vector_storage.delete(id).map_err(Error::Io)?;
411 payload_storage.delete(id).map_err(Error::Io)?;
412 self.index.remove(id);
413 sq8_cache.remove(&id);
414 binary_cache.remove(&id);
415 pq_cache.remove(&id);
416 self.text_index.remove_document(id);
417 self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
418 }
419
420 let point_count = vector_storage.len();
422 drop(vector_storage);
423 drop(payload_storage);
424 drop(sq8_cache);
425 drop(binary_cache);
426 drop(pq_cache);
427 let mut config = self.config.write();
429 config.point_count = point_count;
430 drop(config);
431
432 #[cfg(feature = "persistence")]
435 {
436 let indexes = self.sparse_indexes.read();
437 if !indexes.is_empty() {
438 for (name, _) in indexes.iter() {
439 let wal_path =
440 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
441 for &id in ids {
442 crate::index::sparse::persistence::wal_append_delete(&wal_path, id)?;
443 }
444 }
445 }
446 }
447
448 {
449 let indexes = self.sparse_indexes.read();
450 for idx in indexes.values() {
451 for &id in ids {
452 idx.delete(id);
453 }
454 }
455 }
456 }
457
458 *self.cached_stats.lock() = None;
460
461 self.write_generation
463 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
464
465 Ok(())
466 }
467
468 #[must_use]
471 pub fn len(&self) -> usize {
472 self.config.read().point_count
473 }
474
475 #[must_use]
478 pub fn is_empty(&self) -> bool {
479 self.config.read().point_count == 0
480 }
481
482 #[must_use]
484 pub fn all_ids(&self) -> Vec<u64> {
485 self.payload_storage.read().ids()
486 }
487}