velesdb_core/collection/core/
crud_bulk.rs1use crate::collection::types::Collection;
7use crate::error::{Error, Result};
8use crate::point::Point;
9use crate::storage::VectorStorage;
10use crate::validation::validate_dimension_match;
11
12use std::collections::BTreeMap;
13
14impl Collection {
15 pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
31 if points.is_empty() {
32 return Ok(0);
33 }
34
35 let dimension = self.config.read().dimension;
36 for point in points {
37 validate_dimension_match(dimension, point.dimension())?;
38 }
39
40 let vector_refs: Vec<(u64, &[f32])> =
41 points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
42 let sparse_batch = Self::collect_sparse_batch(points);
43
44 self.store_vectors_and_payloads(&vector_refs, points)?;
45
46 let inserted = self.bulk_index_or_defer(vector_refs);
47 self.config.write().point_count = self.vector_storage.read().len();
48
49 self.apply_sparse_batch_bulk(&sparse_batch)?;
50 self.invalidate_caches_and_bump_generation();
51
52 Ok(inserted)
53 }
54
55 fn store_vectors_and_payloads(
57 &self,
58 vector_refs: &[(u64, &[f32])],
59 points: &[Point],
60 ) -> Result<()> {
61 #[cfg(feature = "persistence")]
62 {
63 let (vec_result, pay_result) = rayon::join(
64 || self.bulk_store_vectors(vector_refs),
65 || self.bulk_store_payloads(points),
66 );
67 vec_result?;
68 pay_result?;
69 }
70
71 #[cfg(not(feature = "persistence"))]
72 {
73 self.bulk_store_vectors(vector_refs)?;
74 self.bulk_store_payloads(points)?;
75 }
76
77 Ok(())
78 }
79
80 pub fn upsert_bulk_from_raw(
97 &self,
98 vectors: &[f32],
99 ids: &[u64],
100 dimension: usize,
101 payloads: Option<&[Option<serde_json::Value>]>,
102 ) -> Result<usize> {
103 let n = ids.len();
104 if n == 0 {
105 return Ok(0);
106 }
107
108 self.validate_raw_inputs(vectors, ids, dimension, payloads)?;
110
111 let vector_refs: Vec<(u64, &[f32])> = ids
113 .iter()
114 .enumerate()
115 .map(|(i, &id)| (id, &vectors[i * dimension..(i + 1) * dimension]))
116 .collect();
117
118 let payload_entries: Vec<(u64, &serde_json::Value)> = payloads
120 .into_iter()
121 .flat_map(|ps| {
122 ps.iter()
123 .enumerate()
124 .filter_map(|(i, opt)| opt.as_ref().map(|val| (ids[i], val)))
125 })
126 .collect();
127
128 self.store_vectors_and_payload_entries(&vector_refs, &payload_entries)?;
129
130 self.update_text_index_from_raw(ids, payloads);
131
132 let inserted = self.bulk_index_or_defer(vector_refs);
133 self.config.write().point_count = self.vector_storage.read().len();
134 self.invalidate_caches_and_bump_generation();
135
136 Ok(inserted)
137 }
138
139 fn validate_raw_inputs(
141 &self,
142 vectors: &[f32],
143 ids: &[u64],
144 dimension: usize,
145 payloads: Option<&[Option<serde_json::Value>]>,
146 ) -> Result<()> {
147 let n = ids.len();
148 let expected_len = n.checked_mul(dimension).ok_or_else(|| {
149 Error::InvalidVector(format!(
150 "overflow computing {n} * {dimension} for flat vector length"
151 ))
152 })?;
153 if vectors.len() != expected_len {
154 return Err(Error::InvalidVector(format!(
155 "flat vectors length {} != ids.len() ({n}) * dimension ({dimension}) = {expected_len}",
156 vectors.len()
157 )));
158 }
159 if let Some(ps) = payloads {
160 if ps.len() != n {
161 return Err(Error::InvalidVector(format!(
162 "payloads length ({}) must match ids length ({n})",
163 ps.len()
164 )));
165 }
166 }
167 let collection_dim = self.config.read().dimension;
168 validate_dimension_match(collection_dim, dimension)?;
169 Ok(())
170 }
171
172 fn bulk_store_payload_entries(&self, entries: &[(u64, &serde_json::Value)]) -> Result<()> {
177 if entries.is_empty() {
178 return Ok(());
179 }
180 self.payload_storage.write().store_batch(entries)?;
181 Ok(())
182 }
183
184 fn store_vectors_and_payload_entries(
186 &self,
187 vector_refs: &[(u64, &[f32])],
188 payload_entries: &[(u64, &serde_json::Value)],
189 ) -> Result<()> {
190 #[cfg(feature = "persistence")]
191 {
192 let (vec_result, pay_result) = rayon::join(
193 || self.bulk_store_vectors(vector_refs),
194 || self.bulk_store_payload_entries(payload_entries),
195 );
196 vec_result?;
197 pay_result?;
198 }
199
200 #[cfg(not(feature = "persistence"))]
201 {
202 self.bulk_store_vectors(vector_refs)?;
203 self.bulk_store_payload_entries(payload_entries)?;
204 }
205
206 Ok(())
207 }
208
209 fn update_text_index_from_raw(
215 &self,
216 ids: &[u64],
217 payloads: Option<&[Option<serde_json::Value>]>,
218 ) {
219 let Some(ps) = payloads else { return };
220 for (i, opt) in ps.iter().enumerate() {
221 if let Some(payload) = opt {
222 let text = Self::extract_text_from_payload(payload);
223 if !text.is_empty() {
224 self.text_index.add_document(ids[i], &text);
225 }
226 } else {
227 self.text_index.remove_document(ids[i]);
228 }
229 }
230 }
231
232 fn collect_sparse_batch(
234 points: &[Point],
235 ) -> BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> {
236 let mut batch: BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> =
237 BTreeMap::new();
238 for point in points {
239 if let Some(sv_map) = &point.sparse_vectors {
240 for (name, sv) in sv_map {
241 batch
242 .entry(name.clone())
243 .or_default()
244 .push((point.id, sv.clone()));
245 }
246 }
247 }
248 batch
249 }
250
251 fn bulk_store_vectors(&self, vectors: &[(u64, &[f32])]) -> Result<()> {
253 let mut storage = self.vector_storage.write();
254 storage.store_batch(vectors)?;
255 storage.flush()?;
256 Ok(())
257 }
258
259 fn bulk_store_payloads(&self, points: &[Point]) -> Result<()> {
264 let entries: Vec<(u64, &serde_json::Value)> = points
265 .iter()
266 .filter_map(|p| p.payload.as_ref().map(|pl| (p.id, pl)))
267 .collect();
268
269 self.payload_storage.write().store_batch(&entries)?;
270
271 if !entries.is_empty() || !self.text_index.is_empty() {
276 for point in points {
277 Self::update_text_index(&self.text_index, point);
278 }
279 }
280
281 Ok(())
282 }
283
284 fn apply_sparse_batch_bulk(
286 &self,
287 sparse_batch: &BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>>,
288 ) -> Result<()> {
289 if sparse_batch.is_empty() {
290 return Ok(());
291 }
292 #[cfg(feature = "persistence")]
293 {
294 for (name, docs) in sparse_batch {
295 let wal_path =
296 crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
297 for (point_id, sv) in docs {
298 crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
299 }
300 }
301 }
302 let mut indexes = self.sparse_indexes.write();
303 for (name, docs) in sparse_batch {
304 let idx = indexes.entry(name.clone()).or_default();
305 idx.insert_batch_chunk(docs);
306 }
307 Ok(())
308 }
309}