Skip to main content

velesdb_core/collection/core/
crud.rs

1//! CRUD operations for Collection (upsert, get, delete).
2//!
3//! Quantization caching helpers and secondary-index update helpers are in `crud_helpers.rs`.
4
5use crate::collection::types::Collection;
6use crate::error::{Error, Result};
7use crate::index::VectorIndex;
8use crate::point::Point;
9use crate::quantization::{BinaryQuantizedVector, PQVector, QuantizedVector, StorageMode};
10use crate::storage::{LogPayloadStorage, PayloadStorage, VectorStorage};
11use crate::validation::validate_dimension_match;
12
13use parking_lot::RwLockWriteGuard;
14use std::collections::{BTreeMap, HashMap};
15
16struct QuantizationGuards<'a> {
17    sq8: Option<RwLockWriteGuard<'a, HashMap<u64, QuantizedVector>>>,
18    binary: Option<RwLockWriteGuard<'a, HashMap<u64, BinaryQuantizedVector>>>,
19    pq: Option<RwLockWriteGuard<'a, HashMap<u64, PQVector>>>,
20}
21
22impl<'a> QuantizationGuards<'a> {
23    fn acquire(collection: &'a Collection, mode: StorageMode) -> Self {
24        Self {
25            sq8: matches!(mode, StorageMode::SQ8).then(|| collection.sq8_cache.write()),
26            binary: matches!(mode, StorageMode::Binary).then(|| collection.binary_cache.write()),
27            pq: matches!(mode, StorageMode::ProductQuantization)
28                .then(|| collection.pq_cache.write()),
29        }
30    }
31}
32
33impl Collection {
34    /// Inserts or updates points in the collection.
35    ///
36    /// Accepts any iterator of points (Vec, slice, array, etc.)
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if any point has a mismatched dimension, or if
41    /// attempting to insert vectors into a metadata-only collection.
42    pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
43        let points: Vec<Point> = points.into_iter().collect();
44        let config = self.config.read();
45        let dimension = config.dimension;
46        let storage_mode = config.storage_mode;
47
48        if config.metadata_only {
49            for point in &points {
50                if !point.vector.is_empty() {
51                    return Err(Error::VectorNotAllowed(config.name.clone()));
52                }
53            }
54            drop(config);
55            return self.upsert_metadata(points);
56        }
57        drop(config);
58
59        for point in &points {
60            validate_dimension_match(dimension, point.dimension())?;
61        }
62
63        let sparse_batch = self.upsert_storage_and_index(&points, storage_mode)?;
64
65        self.apply_sparse_batch_upsert(&sparse_batch)?;
66        self.invalidate_caches_and_bump_generation();
67        Ok(())
68    }
69
70    /// Stores vectors, payloads, and indexes for a batch of points.
71    ///
72    /// Three-phase pipeline to minimize lock contention and I/O:
73    /// 1. Batch storage: `store_batch()` for vectors + payloads (1 fsync each)
74    /// 2. Per-point updates: secondary indexes, quantization, text, sparse
75    /// 3. Batch HNSW insert via `bulk_index_or_defer()`
76    ///
77    /// # Crash Recovery
78    ///
79    /// A crash between Phase 1 and Phase 3 leaves vectors durably stored but
80    /// absent from the HNSW index. On the next `Collection::open()`, gap
81    /// detection compares storage IDs against HNSW mappings and re-indexes
82    /// any missing vectors. The recovery window is bounded by one batch.
83    ///
84    /// Returns buffered sparse vectors for deferred insertion.
85    fn upsert_storage_and_index(
86        &self,
87        points: &[Point],
88        storage_mode: StorageMode,
89    ) -> Result<Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>> {
90        // Phase 1: Batch storage under write locks (1 fsync per storage)
91        let old_payloads = self.batch_store_all(points)?;
92
93        // Phase 2: Per-point updates (no storage locks held)
94        let sparse_batch = self.per_point_updates(points, &old_payloads, storage_mode);
95
96        // Phase 3: Batch HNSW insert
97        let vector_refs: Vec<(u64, &[f32])> =
98            points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
99        self.bulk_index_or_defer(vector_refs);
100
101        Ok(sparse_batch)
102    }
103
104    /// Phase 1: Batch-stores vectors and payloads with minimal lock scope.
105    ///
106    /// Pre-collects old payloads (needed for secondary index updates),
107    /// then writes all vectors and payloads in single batch calls (1 fsync each).
108    ///
109    /// Deduplicates intra-batch duplicate IDs using last-writer-wins semantics:
110    /// only the final occurrence per ID is written to the WAL, avoiding wasteful
111    /// intermediate entries that would bloat the log and slow replay.
112    ///
113    /// After this method returns, vectors and payloads are durable on disk.
114    /// A crash before Phase 3 (HNSW insertion) is recovered by gap detection
115    /// on the next `Collection::open()`.
116    ///
117    /// Returns the old payloads for Phase 2.
118    fn batch_store_all(&self, points: &[Point]) -> Result<Vec<Option<serde_json::Value>>> {
119        let mut payload_storage = self.payload_storage.write();
120
121        // Pre-collect old payloads, retrieving from storage only on the first
122        // occurrence of each ID. Subsequent occurrences get None because Phase 2
123        // (`per_point_updates`) uses `seen_payloads` to track intra-batch state.
124        let old_payloads = Self::collect_old_payloads(points, &payload_storage);
125
126        Self::write_deduped_payloads(points, &mut payload_storage)?;
127        payload_storage.flush()?;
128        drop(payload_storage);
129
130        self.write_deduped_vectors(points)?;
131
132        Ok(old_payloads)
133    }
134
135    /// Retrieves pre-batch payloads, querying storage only once per unique ID.
136    ///
137    /// For intra-batch duplicates, only the first occurrence needs the pre-batch
138    /// value; subsequent occurrences are handled by `seen_payloads` in Phase 2.
139    fn collect_old_payloads(
140        points: &[Point],
141        storage: &LogPayloadStorage,
142    ) -> Vec<Option<serde_json::Value>> {
143        let mut seen = std::collections::HashSet::new();
144        points
145            .iter()
146            .map(|p| {
147                if seen.insert(p.id) {
148                    // First occurrence — retrieve pre-batch payload from storage
149                    storage.retrieve(p.id).ok().flatten()
150                } else {
151                    None // Duplicate — Phase 2 uses seen_payloads instead
152                }
153            })
154            .collect()
155    }
156
157    /// Writes only the last payload per ID to the WAL, then deletes IDs whose
158    /// final occurrence has `payload=None`.
159    fn write_deduped_payloads(points: &[Point], storage: &mut LogPayloadStorage) -> Result<()> {
160        // Build last-writer-wins map: id -> (has_payload, index_of_last_occurrence)
161        let mut last_idx: HashMap<u64, usize> = HashMap::new();
162        for (i, p) in points.iter().enumerate() {
163            last_idx.insert(p.id, i);
164        }
165
166        // Only write the final payload per ID (skip intermediate duplicates)
167        let deduped: Vec<(u64, &serde_json::Value)> = points
168            .iter()
169            .enumerate()
170            .filter(|&(i, p)| last_idx.get(&p.id) == Some(&i) && p.payload.is_some())
171            .filter_map(|(_, p)| p.payload.as_ref().map(|pl| (p.id, pl)))
172            .collect();
173        storage.store_batch(&deduped)?;
174
175        // Delete IDs whose final occurrence has payload=None
176        for (i, p) in points.iter().enumerate() {
177            if last_idx.get(&p.id) == Some(&i) && p.payload.is_none() {
178                let _ = storage.delete(p.id);
179            }
180        }
181        Ok(())
182    }
183
184    /// Writes only the last vector per ID to vector storage.
185    fn write_deduped_vectors(&self, points: &[Point]) -> Result<()> {
186        let mut last_idx: HashMap<u64, usize> = HashMap::new();
187        for (i, p) in points.iter().enumerate() {
188            last_idx.insert(p.id, i);
189        }
190
191        let deduped: Vec<(u64, &[f32])> = points
192            .iter()
193            .enumerate()
194            .filter(|&(i, p)| last_idx.get(&p.id) == Some(&i))
195            .map(|(_, p)| (p.id, p.vector.as_slice()))
196            .collect();
197
198        let mut vector_storage = self.vector_storage.write();
199        vector_storage.store_batch(&deduped)?;
200        let point_count = vector_storage.len();
201        vector_storage.flush()?;
202        drop(vector_storage);
203
204        self.config.write().point_count = point_count;
205        Ok(())
206    }
207
208    /// Phase 2: Per-point updates that don't need storage write locks.
209    ///
210    /// Tracks the effective "old payload" per ID to handle within-batch
211    /// duplicates correctly: when id=5 appears twice, the second occurrence
212    /// sees the first occurrence's payload as its "old" (not the pre-batch
213    /// original), ensuring secondary indexes stay consistent.
214    fn per_point_updates(
215        &self,
216        points: &[Point],
217        old_payloads: &[Option<serde_json::Value>],
218        storage_mode: StorageMode,
219    ) -> Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)> {
220        let mut quant_guards = QuantizationGuards::acquire(self, storage_mode);
221        let mut sparse_batch = Vec::new();
222        // Track effective old payload per ID for within-batch duplicate handling.
223        // When id=5 appears twice, the second occurrence uses the first's payload
224        // as "old" — not the pre-batch original — so secondary indexes stay correct.
225        //
226        // Uses `Option<Option<&Value>>`: outer Option = "seen this ID?",
227        // inner Option = "had a payload?". This distinguishes "seen with None"
228        // from "not seen" — `.flatten()` would collapse both to None.
229        let mut seen_payloads: HashMap<u64, Option<&serde_json::Value>> = HashMap::new();
230
231        for (point, pre_batch_old) in points.iter().zip(old_payloads) {
232            let effective_old: Option<&serde_json::Value> =
233                if let Some(&inner) = seen_payloads.get(&point.id) {
234                    // ID was seen earlier in this batch — use that point's payload as "old"
235                    inner
236                } else {
237                    // First occurrence — use the pre-batch original
238                    pre_batch_old.as_ref()
239                };
240
241            let (sq8, binary, pq) = (
242                quant_guards.sq8.as_deref_mut(),
243                quant_guards.binary.as_deref_mut(),
244                quant_guards.pq.as_deref_mut(),
245            );
246            self.cache_quantized_vector(point, storage_mode, sq8, binary, pq);
247
248            self.update_secondary_indexes_on_upsert(
249                point.id,
250                effective_old,
251                point.payload.as_ref(),
252            );
253            Self::update_text_index(&self.text_index, point);
254            Self::collect_sparse_vectors(point, &mut sparse_batch);
255
256            // Record this point's payload ref — zero-cost for the common case (no clone)
257            seen_payloads.insert(point.id, point.payload.as_ref());
258        }
259
260        sparse_batch
261    }
262
263    fn collect_sparse_vectors(
264        point: &Point,
265        sparse_batch: &mut Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>,
266    ) {
267        if let Some(sv_map) = &point.sparse_vectors {
268            if !sv_map.is_empty() {
269                sparse_batch.push((point.id, sv_map.clone()));
270            }
271        }
272    }
273
274    /// Updates the BM25 text index for a single point.
275    fn update_text_index(text_index: &crate::index::Bm25Index, point: &Point) {
276        if let Some(payload) = &point.payload {
277            let text = Self::extract_text_from_payload(payload);
278            if !text.is_empty() {
279                text_index.add_document(point.id, &text);
280            }
281        } else {
282            text_index.remove_document(point.id);
283        }
284    }
285
286    /// Applies buffered sparse vector upserts with WAL-before-apply semantics.
287    fn apply_sparse_batch_upsert(
288        &self,
289        sparse_batch: &[(u64, BTreeMap<String, crate::index::sparse::SparseVector>)],
290    ) -> Result<()> {
291        if sparse_batch.is_empty() {
292            return Ok(());
293        }
294        #[cfg(feature = "persistence")]
295        {
296            for (point_id, sv_map) in sparse_batch {
297                for (name, sv) in sv_map {
298                    let wal_path =
299                        crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
300                    crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
301                }
302            }
303        }
304        let mut indexes = self.sparse_indexes.write();
305        for (point_id, sv_map) in sparse_batch {
306            for (name, sv) in sv_map {
307                let idx = indexes.entry(name.clone()).or_default();
308                idx.insert(*point_id, sv);
309            }
310        }
311        Ok(())
312    }
313
314    /// Invalidates stats cache and bumps write generation.
315    fn invalidate_caches_and_bump_generation(&self) {
316        *self.cached_stats.lock() = None;
317        self.write_generation
318            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
319    }
320
321    /// Drains the deferred indexer and batch-inserts into HNSW.
322    ///
323    /// Filters out IDs that have been deleted from vector storage since they
324    /// were buffered, preventing ghost vectors from being re-inserted into
325    /// HNSW after a concurrent delete.
326    ///
327    /// Logs a warning if fewer vectors were inserted than expected, which
328    /// indicates a partial failure (e.g., duplicate IDs filtered out,
329    /// ghost-vector filtering, or graph insertion error). The drained
330    /// vectors are not retried.
331    #[cfg(feature = "persistence")]
332    fn merge_deferred_batch(&self, di: &crate::collection::streaming::DeferredIndexer) {
333        let drained = di.swap_and_drain();
334        if drained.is_empty() {
335            return;
336        }
337        // Filter out vectors deleted from storage during the buffer's
338        // lifetime to prevent ghost re-insertion into HNSW.
339        let storage = self.vector_storage.read();
340        let valid: Vec<(u64, &[f32])> = drained
341            .iter()
342            .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
343            .map(|(id, v)| (*id, v.as_slice()))
344            .collect();
345        drop(storage); // Release read lock before batch insert
346        let expected = valid.len();
347        if valid.is_empty() {
348            return;
349        }
350        let inserted = self.index.insert_batch_parallel(valid);
351        if inserted < expected {
352            tracing::warn!("merge_deferred_batch: inserted {inserted}/{expected} vectors");
353        }
354    }
355
356    /// Inserts or updates metadata-only points (no vectors).
357    ///
358    /// This method is for metadata-only collections. Points should have
359    /// empty vectors and only contain payload data.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if storage operations fail.
364    pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
365        let points: Vec<Point> = points.into_iter().collect();
366
367        let mut payload_storage = self.payload_storage.write();
368
369        for point in &points {
370            let old_payload = payload_storage.retrieve(point.id).ok().flatten();
371            if let Some(payload) = &point.payload {
372                payload_storage.store(point.id, payload)?;
373            } else {
374                let _ = payload_storage.delete(point.id);
375            }
376            Self::update_text_index(&self.text_index, point);
377            self.update_secondary_indexes_on_upsert(
378                point.id,
379                old_payload.as_ref(),
380                point.payload.as_ref(),
381            );
382        }
383
384        // LOCK ORDER: flush while payload_storage(3) still held, then drop before acquiring config(1).
385        let point_count = payload_storage.ids().len();
386        payload_storage.flush()?;
387        drop(payload_storage);
388
389        // config(1) only — all higher-numbered locks released above.
390        self.config.write().point_count = point_count;
391        self.invalidate_caches_and_bump_generation();
392        Ok(())
393    }
394
395    /// Bulk insert optimized for high-throughput import.
396    ///
397    /// # Performance
398    ///
399    /// This method is optimized for bulk loading:
400    /// - Uses parallel HNSW insertion (rayon)
401    /// - Single flush at the end (not per-point)
402    /// - No HNSW index save (deferred for performance)
403    /// - ~15x faster than previous sequential approach on large batches (5000+)
404    /// - Benchmark: 25-30 Kvec/s on 768D vectors
405    ///
406    /// # Errors
407    ///
408    /// Returns an error if any point has a mismatched dimension.
409    pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
410        if points.is_empty() {
411            return Ok(0);
412        }
413
414        let dimension = self.config.read().dimension;
415        for point in points {
416            validate_dimension_match(dimension, point.dimension())?;
417        }
418
419        let vector_refs: Vec<(u64, &[f32])> =
420            points.iter().map(|p| (p.id, p.vector.as_slice())).collect();
421        let sparse_batch = Self::collect_sparse_batch(points);
422
423        self.bulk_store_vectors(&vector_refs)?;
424        self.bulk_store_payloads(points)?;
425
426        let inserted = self.bulk_index_or_defer(vector_refs);
427        self.config.write().point_count = self.vector_storage.read().len();
428
429        self.apply_sparse_batch_bulk(&sparse_batch)?;
430        self.invalidate_caches_and_bump_generation();
431
432        Ok(inserted)
433    }
434
435    /// Batch-inserts into HNSW or defers into the deferred indexer.
436    ///
437    /// Returns the number of vectors processed (whether indexed directly
438    /// or deferred for later merge).
439    ///
440    /// Since v1.7.2, both `upsert()` and `upsert_bulk()` route through this
441    /// method. The direct path calls `insert_batch_parallel` (rayon), which
442    /// yields non-deterministic HNSW graph topology across runs. Search
443    /// correctness and recall are unaffected.
444    ///
445    /// Invariant: `self.deferred_indexer` is `Some` only when enabled
446    /// (`build_deferred_indexer` filters on `cfg.enabled`), so no
447    /// redundant `is_enabled()` check is needed here.
448    fn bulk_index_or_defer(&self, vector_refs: Vec<(u64, &[f32])>) -> usize {
449        #[cfg(feature = "persistence")]
450        if let Some(ref di) = self.deferred_indexer {
451            di.extend(vector_refs.iter().map(|(id, v)| (*id, v.to_vec())));
452            if di.should_merge() {
453                self.merge_deferred_batch(di);
454            }
455            return vector_refs.len();
456        }
457        let inserted = self.index.insert_batch_parallel(vector_refs);
458        self.index.set_searching_mode();
459        inserted
460    }
461
462    /// Collects sparse vectors grouped by index name for batch insert.
463    fn collect_sparse_batch(
464        points: &[Point],
465    ) -> BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> {
466        let mut batch: BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> =
467            BTreeMap::new();
468        for point in points {
469            if let Some(sv_map) = &point.sparse_vectors {
470                for (name, sv) in sv_map {
471                    batch
472                        .entry(name.clone())
473                        .or_default()
474                        .push((point.id, sv.clone()));
475                }
476            }
477        }
478        batch
479    }
480
481    /// Stores vectors in bulk via batch WAL + mmap write.
482    fn bulk_store_vectors(&self, vectors: &[(u64, &[f32])]) -> Result<()> {
483        let mut storage = self.vector_storage.write();
484        storage.store_batch(vectors)?;
485        storage.flush()?;
486        Ok(())
487    }
488
489    /// Stores payloads and updates BM25 text index in bulk.
490    ///
491    /// Uses `LogPayloadStorage::store_batch()` for a single WAL sync instead
492    /// of per-point fsync, improving bulk insert throughput by 10-50x.
493    fn bulk_store_payloads(&self, points: &[Point]) -> Result<()> {
494        let entries: Vec<(u64, &serde_json::Value)> = points
495            .iter()
496            .filter_map(|p| p.payload.as_ref().map(|pl| (p.id, pl)))
497            .collect();
498
499        self.payload_storage.write().store_batch(&entries)?;
500
501        for point in points {
502            Self::update_text_index(&self.text_index, point);
503        }
504
505        Ok(())
506    }
507
508    /// Applies sparse batch with WAL-before-apply for bulk insert.
509    fn apply_sparse_batch_bulk(
510        &self,
511        sparse_batch: &BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>>,
512    ) -> Result<()> {
513        if sparse_batch.is_empty() {
514            return Ok(());
515        }
516        #[cfg(feature = "persistence")]
517        {
518            for (name, docs) in sparse_batch {
519                let wal_path =
520                    crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
521                for (point_id, sv) in docs {
522                    crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
523                }
524            }
525        }
526        let mut indexes = self.sparse_indexes.write();
527        for (name, docs) in sparse_batch {
528            let idx = indexes.entry(name.clone()).or_default();
529            idx.insert_batch_chunk(docs);
530        }
531        Ok(())
532    }
533
534    /// Retrieves points by their IDs.
535    #[must_use]
536    pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
537        let config = self.config.read();
538        let is_metadata_only = config.metadata_only;
539        drop(config);
540
541        let payload_storage = self.payload_storage.read();
542
543        if is_metadata_only {
544            // For metadata-only collections, only retrieve payload
545            ids.iter()
546                .map(|&id| {
547                    let payload = payload_storage.retrieve(id).ok().flatten()?;
548                    Some(Point {
549                        id,
550                        vector: Vec::new(),
551                        payload: Some(payload),
552                        sparse_vectors: None,
553                    })
554                })
555                .collect()
556        } else {
557            // For vector collections, retrieve both vector and payload
558            let vector_storage = self.vector_storage.read();
559            ids.iter()
560                .map(|&id| {
561                    let vector = vector_storage.retrieve(id).ok().flatten()?;
562                    let payload = payload_storage.retrieve(id).ok().flatten();
563                    Some(Point {
564                        id,
565                        vector,
566                        payload,
567                        sparse_vectors: None,
568                    })
569                })
570                .collect()
571        }
572    }
573
574    /// Deletes points by their IDs.
575    ///
576    /// # Errors
577    ///
578    /// Returns an error if storage operations fail.
579    pub fn delete(&self, ids: &[u64]) -> Result<()> {
580        if self.config.read().metadata_only {
581            self.delete_metadata_only(ids)?;
582        } else {
583            self.delete_vector_points(ids)?;
584        }
585        self.invalidate_caches_and_bump_generation();
586        Ok(())
587    }
588
589    /// Deletes metadata-only points.
590    fn delete_metadata_only(&self, ids: &[u64]) -> Result<()> {
591        let mut payload_storage = self.payload_storage.write();
592        for &id in ids {
593            let old_payload = payload_storage.retrieve(id).ok().flatten();
594            payload_storage.delete(id)?;
595            self.text_index.remove_document(id);
596            self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
597        }
598        let point_count = payload_storage.ids().len();
599        drop(payload_storage);
600        self.config.write().point_count = point_count;
601        Ok(())
602    }
603
604    /// Deletes vector points from all stores (vector, payload, index, caches, sparse, delta).
605    fn delete_vector_points(&self, ids: &[u64]) -> Result<()> {
606        let mut payload_storage = self.payload_storage.write();
607        let mut vector_storage = self.vector_storage.write();
608        let mut sq8_cache = self.sq8_cache.write();
609        let mut binary_cache = self.binary_cache.write();
610        let mut pq_cache = self.pq_cache.write();
611
612        for &id in ids {
613            let old_payload = payload_storage.retrieve(id).ok().flatten();
614            vector_storage.delete(id)?;
615            payload_storage.delete(id)?;
616            self.index.remove(id);
617            sq8_cache.remove(&id);
618            binary_cache.remove(&id);
619            pq_cache.remove(&id);
620            self.text_index.remove_document(id);
621            self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
622        }
623
624        let point_count = vector_storage.len();
625        drop(vector_storage);
626        drop(payload_storage);
627        drop(sq8_cache);
628        drop(binary_cache);
629        drop(pq_cache);
630        self.config.write().point_count = point_count;
631
632        self.delete_from_sparse_indexes(ids)?;
633
634        // Lock order: delta_buffer(10) acquired after sparse_indexes(9) released.
635        #[cfg(feature = "persistence")]
636        for &id in ids {
637            self.delta_buffer.remove(id);
638        }
639
640        // Lock order: deferred_indexer(11) acquired after delta_buffer(10).
641        #[cfg(feature = "persistence")]
642        if let Some(ref di) = self.deferred_indexer {
643            for &id in ids {
644                di.remove(id);
645            }
646        }
647
648        Ok(())
649    }
650
651    /// Deletes IDs from sparse indexes with WAL-before-apply.
652    fn delete_from_sparse_indexes(&self, ids: &[u64]) -> Result<()> {
653        #[cfg(feature = "persistence")]
654        {
655            let indexes = self.sparse_indexes.read();
656            for (name, _) in indexes.iter() {
657                let wal_path =
658                    crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
659                for &id in ids {
660                    crate::index::sparse::persistence::wal_append_delete(&wal_path, id)?;
661                }
662            }
663        }
664        let indexes = self.sparse_indexes.read();
665        for idx in indexes.values() {
666            for &id in ids {
667                idx.delete(id);
668            }
669        }
670        Ok(())
671    }
672
673    /// Returns the number of points stored in the collection.
674    ///
675    /// This reflects the **storage count** (vectors written to disk), not the
676    /// number of points currently indexed in the HNSW graph. During a batch
677    /// upsert or when deferred indexing is active, `len()` may temporarily
678    /// exceed the HNSW-indexed count until the deferred merge completes.
679    ///
680    /// Perf: Uses cached `point_count` from config instead of acquiring storage lock.
681    #[must_use]
682    pub fn len(&self) -> usize {
683        self.config.read().point_count
684    }
685
686    /// Returns true if the collection is empty.
687    ///
688    /// Uses the same cached `point_count` as [`len()`](Self::len), reflecting
689    /// the storage count rather than the HNSW-indexed count.
690    #[must_use]
691    pub fn is_empty(&self) -> bool {
692        self.config.read().point_count == 0
693    }
694
695    /// Returns all point IDs in the collection.
696    #[must_use]
697    pub fn all_ids(&self) -> Vec<u64> {
698        self.payload_storage.read().ids()
699    }
700}