Skip to main content

velesdb_core/collection/core/
crud_bulk.rs

1//! Bulk CRUD operations for Collection (`upsert_bulk`, `upsert_bulk_from_raw`).
2//!
3//! Extracted from `crud.rs` (Issue #425) to keep each file under 500 NLOC.
4//! These methods are optimized for high-throughput import with parallel I/O.
5
6use crate::collection::types::Collection;
7use crate::error::{Error, Result};
8use crate::point::Point;
9use crate::storage::VectorStorage;
10use crate::validation::validate_dimension_match;
11
12use std::collections::BTreeMap;
13
14impl Collection {
15    /// Bulk insert optimized for high-throughput import.
16    ///
17    /// # Performance
18    ///
19    /// This method is optimized for bulk loading:
20    /// - Uses parallel HNSW insertion (rayon)
21    /// - Parallel payload + vector I/O via `rayon::join` (Issue #424)
22    /// - Single flush at the end (not per-point)
23    /// - No HNSW index save (deferred for performance)
24    /// - ~15x faster than previous sequential approach on large batches (5000+)
25    /// - Benchmark: 25-30 Kvec/s on 768D vectors
26    ///
27    /// # Errors
28    ///
29    /// Returns an error if any point has a mismatched dimension.
30    pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
31        if points.is_empty() {
32            return Ok(0);
33        }
34
35        let dimension = self.config.read().dimension;
36        for point in points {
37            validate_dimension_match(dimension, point.dimension())?;
38        }
39
40        let vector_refs: Vec<(u64, &[f32])> =
41            points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
42        let sparse_batch = Self::collect_sparse_batch(points);
43
44        self.store_vectors_and_payloads(&vector_refs, points)?;
45
46        let inserted = self.bulk_index_or_defer(vector_refs);
47        self.config.write().point_count = self.vector_storage.read().len();
48
49        self.apply_sparse_batch_bulk(&sparse_batch)?;
50        self.invalidate_caches_and_bump_generation();
51
52        Ok(inserted)
53    }
54
55    /// Writes vectors and payloads to storage (parallel with rayon when available).
56    fn store_vectors_and_payloads(
57        &self,
58        vector_refs: &[(u64, &[f32])],
59        points: &[Point],
60    ) -> Result<()> {
61        #[cfg(feature = "persistence")]
62        {
63            let (vec_result, pay_result) = rayon::join(
64                || self.bulk_store_vectors(vector_refs),
65                || self.bulk_store_payloads(points),
66            );
67            vec_result?;
68            pay_result?;
69        }
70
71        #[cfg(not(feature = "persistence"))]
72        {
73            self.bulk_store_vectors(vector_refs)?;
74            self.bulk_store_payloads(points)?;
75        }
76
77        Ok(())
78    }
79
80    /// Bulk insert from contiguous flat slices (zero-copy from numpy / FFI).
81    ///
82    /// Accepts a flat `f32` slice of shape `(n, dimension)` in row-major order
83    /// plus a matching `u64` ID slice of length `n`. This avoids per-row
84    /// `Vec<f32>` allocation that `upsert_bulk` requires through `Point`.
85    ///
86    /// # Performance
87    ///
88    /// Eliminates `n * dimension * 4` bytes of intermediate copies compared
89    /// to the `Point`-based `upsert_bulk` path. For 100K vectors at 768D
90    /// this saves ~293 MB of heap allocations.
91    ///
92    /// # Errors
93    ///
94    /// - Returns [`Error::InvalidVector`] if `vectors.len() != ids.len() * dimension`.
95    /// - Returns [`Error::DimensionMismatch`] if `dimension` does not match the collection.
96    pub fn upsert_bulk_from_raw(
97        &self,
98        vectors: &[f32],
99        ids: &[u64],
100        dimension: usize,
101        payloads: Option<&[Option<serde_json::Value>]>,
102    ) -> Result<usize> {
103        let n = ids.len();
104        if n == 0 {
105            return Ok(0);
106        }
107
108        // Validate inputs BEFORE any state mutation.
109        self.validate_raw_inputs(vectors, ids, dimension, payloads)?;
110
111        // Build (id, &[f32]) pairs by slicing the flat buffer — zero copy.
112        let vector_refs: Vec<(u64, &[f32])> = ids
113            .iter()
114            .enumerate()
115            .map(|(i, &id)| (id, &vectors[i * dimension..(i + 1) * dimension]))
116            .collect();
117
118        // Payload entries for batch WAL write (only ids that have payloads).
119        let payload_entries: Vec<(u64, &serde_json::Value)> = payloads
120            .into_iter()
121            .flat_map(|ps| {
122                ps.iter()
123                    .enumerate()
124                    .filter_map(|(i, opt)| opt.as_ref().map(|val| (ids[i], val)))
125            })
126            .collect();
127
128        self.store_vectors_and_payload_entries(&vector_refs, &payload_entries)?;
129
130        self.update_text_index_from_raw(ids, payloads);
131
132        let inserted = self.bulk_index_or_defer(vector_refs);
133        self.config.write().point_count = self.vector_storage.read().len();
134        self.invalidate_caches_and_bump_generation();
135
136        Ok(inserted)
137    }
138
139    /// Validates raw bulk-insert inputs before any state mutation.
140    fn validate_raw_inputs(
141        &self,
142        vectors: &[f32],
143        ids: &[u64],
144        dimension: usize,
145        payloads: Option<&[Option<serde_json::Value>]>,
146    ) -> Result<()> {
147        let n = ids.len();
148        let expected_len = n.checked_mul(dimension).ok_or_else(|| {
149            Error::InvalidVector(format!(
150                "overflow computing {n} * {dimension} for flat vector length"
151            ))
152        })?;
153        if vectors.len() != expected_len {
154            return Err(Error::InvalidVector(format!(
155                "flat vectors length {} != ids.len() ({n}) * dimension ({dimension}) = {expected_len}",
156                vectors.len()
157            )));
158        }
159        if let Some(ps) = payloads {
160            if ps.len() != n {
161                return Err(Error::InvalidVector(format!(
162                    "payloads length ({}) must match ids length ({n})",
163                    ps.len()
164                )));
165            }
166        }
167        let collection_dim = self.config.read().dimension;
168        validate_dimension_match(collection_dim, dimension)?;
169        Ok(())
170    }
171
172    /// Stores pre-built payload entries via batch WAL write + flush.
173    ///
174    /// Extracted from `bulk_store_payloads` to accept `(u64, &Value)` pairs
175    /// directly, avoiding the need to reconstruct `Point` structs.
176    fn bulk_store_payload_entries(&self, entries: &[(u64, &serde_json::Value)]) -> Result<()> {
177        if entries.is_empty() {
178            return Ok(());
179        }
180        self.payload_storage.write().store_batch(entries)?;
181        Ok(())
182    }
183
184    /// Writes vectors and raw payload entries to storage (parallel when available).
185    fn store_vectors_and_payload_entries(
186        &self,
187        vector_refs: &[(u64, &[f32])],
188        payload_entries: &[(u64, &serde_json::Value)],
189    ) -> Result<()> {
190        #[cfg(feature = "persistence")]
191        {
192            let (vec_result, pay_result) = rayon::join(
193                || self.bulk_store_vectors(vector_refs),
194                || self.bulk_store_payload_entries(payload_entries),
195            );
196            vec_result?;
197            pay_result?;
198        }
199
200        #[cfg(not(feature = "persistence"))]
201        {
202            self.bulk_store_vectors(vector_refs)?;
203            self.bulk_store_payload_entries(payload_entries)?;
204        }
205
206        Ok(())
207    }
208
209    /// Updates BM25 text index from raw payload slices.
210    ///
211    /// Points with `Some(payload)` get their text indexed.
212    /// Points with `None` payload get their stale BM25 entry removed
213    /// (consistent with `update_text_index` in `crud.rs`).
214    fn update_text_index_from_raw(
215        &self,
216        ids: &[u64],
217        payloads: Option<&[Option<serde_json::Value>]>,
218    ) {
219        let Some(ps) = payloads else { return };
220        for (i, opt) in ps.iter().enumerate() {
221            if let Some(payload) = opt {
222                let text = Self::extract_text_from_payload(payload);
223                if !text.is_empty() {
224                    self.text_index.add_document(ids[i], &text);
225                }
226            } else {
227                self.text_index.remove_document(ids[i]);
228            }
229        }
230    }
231
232    /// Collects sparse vectors grouped by index name for batch insert.
233    fn collect_sparse_batch(
234        points: &[Point],
235    ) -> BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> {
236        let mut batch: BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> =
237            BTreeMap::new();
238        for point in points {
239            if let Some(sv_map) = &point.sparse_vectors {
240                for (name, sv) in sv_map {
241                    batch
242                        .entry(name.clone())
243                        .or_default()
244                        .push((point.id, sv.clone()));
245                }
246            }
247        }
248        batch
249    }
250
251    /// Stores vectors in bulk via batch WAL + mmap write.
252    fn bulk_store_vectors(&self, vectors: &[(u64, &[f32])]) -> Result<()> {
253        let mut storage = self.vector_storage.write();
254        storage.store_batch(vectors)?;
255        storage.flush()?;
256        Ok(())
257    }
258
259    /// Stores payloads and updates BM25 text index in bulk.
260    ///
261    /// Uses `LogPayloadStorage::store_batch()` for a single WAL sync instead
262    /// of per-point fsync, improving bulk insert throughput by 10-50x.
263    fn bulk_store_payloads(&self, points: &[Point]) -> Result<()> {
264        let entries: Vec<(u64, &serde_json::Value)> = points
265            .iter()
266            .filter_map(|p| p.payload.as_ref().map(|pl| (p.id, pl)))
267            .collect();
268
269        self.payload_storage.write().store_batch(&entries)?;
270
271        // Issue #425: BM25 skip — when no point has a payload AND the BM25
272        // index is empty, skip the text index loop entirely. The bulk path
273        // inserts fresh points (no old documents to remove), so the loop
274        // body would be a no-op for every point.
275        if !entries.is_empty() || !self.text_index.is_empty() {
276            for point in points {
277                Self::update_text_index(&self.text_index, point);
278            }
279        }
280
281        Ok(())
282    }
283
284    /// Applies sparse batch with WAL-before-apply for bulk insert.
285    fn apply_sparse_batch_bulk(
286        &self,
287        sparse_batch: &BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>>,
288    ) -> Result<()> {
289        if sparse_batch.is_empty() {
290            return Ok(());
291        }
292        #[cfg(feature = "persistence")]
293        {
294            for (name, docs) in sparse_batch {
295                let wal_path =
296                    crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
297                for (point_id, sv) in docs {
298                    crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
299                }
300            }
301        }
302        let mut indexes = self.sparse_indexes.write();
303        for (name, docs) in sparse_batch {
304            let idx = indexes.entry(name.clone()).or_default();
305            idx.insert_batch_chunk(docs);
306        }
307        Ok(())
308    }
309}