Skip to main content

velesdb_core/collection/core/
lifecycle.rs

1//! Collection lifecycle methods (create, open, flush).
2
3use crate::collection::graph::{EdgeStore, PropertyIndex, RangeIndex};
4use crate::collection::types::{Collection, CollectionConfig, CollectionType};
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::guardrails::GuardRails;
8use crate::index::{Bm25Index, HnswIndex};
9use crate::quantization::StorageMode;
10use crate::sparse_index::DEFAULT_SPARSE_INDEX_NAME;
11use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
12use crate::validation::validate_dimension;
13use crate::velesql::{QueryCache, QueryPlanner};
14
15use crate::index::sparse::SparseInvertedIndex;
16
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use parking_lot::{Mutex, RwLock};
20use std::path::PathBuf;
21use std::sync::Arc;
22
23/// Pre-built components needed to assemble a [`Collection`].
24///
25/// Used by [`Collection::assemble`] as the single point of truth for the
26/// struct literal, eliminating duplication across the five public constructors.
27struct CollectionParts {
28    path: PathBuf,
29    config: CollectionConfig,
30    vector_storage: Arc<RwLock<MmapStorage>>,
31    payload_storage: Arc<RwLock<LogPayloadStorage>>,
32    index: Arc<HnswIndex>,
33    text_index: Arc<Bm25Index>,
34    property_index: PropertyIndex,
35    range_index: RangeIndex,
36    edge_store: EdgeStore,
37    sparse_indexes: BTreeMap<String, SparseInvertedIndex>,
38}
39
40impl CollectionParts {
41    /// Returns a new `CollectionParts` with empty graph and sparse indexes.
42    ///
43    /// The six storage/index fields must be supplied by the caller; only the
44    /// four optional index fields (`property_index`, `range_index`,
45    /// `edge_store`, `sparse_indexes`) default to empty.
46    fn new_with_empty_indexes(
47        path: PathBuf,
48        config: CollectionConfig,
49        vector_storage: Arc<RwLock<MmapStorage>>,
50        payload_storage: Arc<RwLock<LogPayloadStorage>>,
51        index: Arc<HnswIndex>,
52        text_index: Arc<Bm25Index>,
53    ) -> Self {
54        Self {
55            path,
56            config,
57            vector_storage,
58            payload_storage,
59            index,
60            text_index,
61            property_index: PropertyIndex::new(),
62            range_index: RangeIndex::new(),
63            edge_store: EdgeStore::new(),
64            sparse_indexes: BTreeMap::new(),
65        }
66    }
67}
68
69impl Collection {
70    /// Assembles a `Collection` from pre-built components and default caches.
71    ///
72    /// This is the single point of truth for the `Self { .. }` struct literal,
73    /// eliminating duplication across the five public constructors.
74    fn assemble(parts: CollectionParts) -> Self {
75        #[cfg(feature = "persistence")]
76        let deferred_indexer = Self::build_deferred_indexer(&parts.config);
77
78        Self {
79            path: parts.path,
80            config: Arc::new(RwLock::new(parts.config)),
81            vector_storage: parts.vector_storage,
82            payload_storage: parts.payload_storage,
83            index: parts.index,
84            text_index: parts.text_index,
85            sq8_cache: Arc::new(RwLock::new(HashMap::new())),
86            binary_cache: Arc::new(RwLock::new(HashMap::new())),
87            pq_cache: Arc::new(RwLock::new(HashMap::new())),
88            pq_quantizer: Arc::new(RwLock::new(None)),
89            pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
90            property_index: Arc::new(RwLock::new(parts.property_index)),
91            range_index: Arc::new(RwLock::new(parts.range_index)),
92            edge_store: Arc::new(RwLock::new(parts.edge_store)),
93            sparse_indexes: Arc::new(RwLock::new(parts.sparse_indexes)),
94            secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
95            guard_rails: Arc::new(GuardRails::default()),
96            query_planner: Arc::new(QueryPlanner::new()),
97            query_cache: Arc::new(QueryCache::new(256)),
98            cached_stats: Arc::new(Mutex::new(None)),
99            write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100            #[cfg(feature = "persistence")]
101            stream_ingester: Arc::new(RwLock::new(None)),
102            #[cfg(feature = "persistence")]
103            delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
104            #[cfg(feature = "persistence")]
105            deferred_indexer,
106        }
107    }
108
109    /// Builds the optional `DeferredIndexer` from config.
110    ///
111    /// Returns `Some(Arc<DeferredIndexer>)` when `deferred_indexing` is
112    /// configured and enabled; `None` otherwise.
113    #[cfg(feature = "persistence")]
114    fn build_deferred_indexer(
115        config: &CollectionConfig,
116    ) -> Option<Arc<crate::collection::streaming::DeferredIndexer>> {
117        config
118            .deferred_indexing
119            .as_ref()
120            .filter(|cfg| cfg.enabled)
121            .map(|cfg| {
122                Arc::new(crate::collection::streaming::DeferredIndexer::new(
123                    cfg.clone(),
124                ))
125            })
126    }
127
128    /// Initialises persistent storages and indexes for a new collection.
129    ///
130    /// Returns a complete `CollectionParts` with empty graph/sparse indexes,
131    /// ready to be passed to [`Self::assemble`].
132    fn init_collection_parts(
133        path: PathBuf,
134        config: CollectionConfig,
135        hnsw_params: Option<crate::index::hnsw::HnswParams>,
136    ) -> Result<CollectionParts> {
137        let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
138        let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
139        let index = if let Some(params) = hnsw_params {
140            Arc::new(HnswIndex::with_params(
141                config.dimension,
142                config.metric,
143                params,
144            )?)
145        } else {
146            Arc::new(HnswIndex::new(config.dimension, config.metric)?)
147        };
148        let text_index = Arc::new(Bm25Index::new());
149        Ok(CollectionParts::new_with_empty_indexes(
150            path,
151            config,
152            vector_storage,
153            payload_storage,
154            index,
155            text_index,
156        ))
157    }
158
159    /// Rebuilds the BM25 full-text index from persisted payloads.
160    fn rebuild_bm25_index(
161        payload_storage: &Arc<RwLock<LogPayloadStorage>>,
162        text_index: &Arc<Bm25Index>,
163    ) {
164        let storage = payload_storage.read();
165        let ids = storage.ids();
166        for id in ids {
167            if let Ok(Some(payload)) = storage.retrieve(id) {
168                let text = Self::extract_text_from_payload(&payload);
169                if !text.is_empty() {
170                    text_index.add_document(id, &text);
171                }
172            }
173        }
174    }
175
176    /// Creates a new collection at the specified path.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the directory cannot be created or the config cannot be saved.
181    pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
182        Self::create_with_options(path, dimension, metric, StorageMode::default())
183    }
184
185    /// Derives the collection name from the directory path.
186    fn name_from_path(path: &std::path::Path) -> String {
187        path.file_name()
188            .and_then(|n| n.to_str())
189            .unwrap_or("unknown")
190            .to_string()
191    }
192
193    /// Shared init-and-persist pipeline for all `create_*` constructors.
194    ///
195    /// Validates dimensions (when non-zero), creates the directory, assembles
196    /// the collection from the supplied config, and persists `config.json`.
197    fn create_from_config(
198        path: PathBuf,
199        config: CollectionConfig,
200        hnsw_params: Option<crate::index::hnsw::HnswParams>,
201    ) -> Result<Self> {
202        let skip_dimension_check = config.metadata_only
203            || (config.graph_schema.is_some() && config.embedding_dimension.is_none());
204        if skip_dimension_check {
205            // dimension=0 is valid for metadata-only and graph-without-embedding
206        } else {
207            validate_dimension(config.dimension)?;
208        }
209        std::fs::create_dir_all(&path)?;
210
211        let collection = Self::assemble(Self::init_collection_parts(path, config, hnsw_params)?);
212        collection.save_config()?;
213        Ok(collection)
214    }
215
216    /// Creates a new collection with custom storage options.
217    ///
218    /// # Arguments
219    ///
220    /// * `path` - Path to the collection directory
221    /// * `dimension` - Vector dimension
222    /// * `metric` - Distance metric
223    /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the directory cannot be created or the config cannot be saved.
228    pub fn create_with_options(
229        path: PathBuf,
230        dimension: usize,
231        metric: DistanceMetric,
232        storage_mode: StorageMode,
233    ) -> Result<Self> {
234        let config = CollectionConfig {
235            name: Self::name_from_path(&path),
236            dimension,
237            metric,
238            point_count: 0,
239            storage_mode,
240            metadata_only: false,
241            graph_schema: None,
242            embedding_dimension: None,
243            pq_rescore_oversampling: Some(4),
244            hnsw_params: None,
245            #[cfg(feature = "persistence")]
246            deferred_indexing: None,
247        };
248        Self::create_from_config(path, config, None)
249    }
250
251    /// Creates a new collection with custom HNSW parameters.
252    ///
253    /// This is the lowest-level vector collection constructor, giving full
254    /// control over the HNSW graph topology (M, `ef_construction`) while
255    /// retaining the standard storage pipeline.
256    ///
257    /// # Arguments
258    ///
259    /// * `path` - Path to the collection directory
260    /// * `dimension` - Vector dimension
261    /// * `metric` - Distance metric
262    /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
263    /// * `hnsw_params` - Custom HNSW index parameters
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the directory cannot be created or the config cannot be saved.
268    pub fn create_with_hnsw_params(
269        path: PathBuf,
270        dimension: usize,
271        metric: DistanceMetric,
272        storage_mode: StorageMode,
273        hnsw_params: crate::index::hnsw::HnswParams,
274    ) -> Result<Self> {
275        let config = CollectionConfig {
276            name: Self::name_from_path(&path),
277            dimension,
278            metric,
279            point_count: 0,
280            storage_mode,
281            metadata_only: false,
282            graph_schema: None,
283            embedding_dimension: None,
284            pq_rescore_oversampling: Some(4),
285            hnsw_params: Some(hnsw_params),
286            #[cfg(feature = "persistence")]
287            deferred_indexing: None,
288        };
289        Self::create_from_config(path, config, Some(hnsw_params))
290    }
291
292    /// Creates a new collection with a specific type (Vector or `MetadataOnly`).
293    ///
294    /// # Arguments
295    ///
296    /// * `path` - Path to the collection directory
297    /// * `name` - Name of the collection
298    /// * `collection_type` - Type of collection to create
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the directory cannot be created or the config cannot be saved.
303    pub fn create_typed(
304        path: PathBuf,
305        name: &str,
306        collection_type: &CollectionType,
307    ) -> Result<Self> {
308        match collection_type {
309            CollectionType::Vector {
310                dimension,
311                metric,
312                storage_mode,
313            } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
314            CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
315            CollectionType::Graph { .. } => {
316                // Graph collections will be implemented in EPIC-004
317                // For now, return an error indicating this is not yet supported
318                Err(crate::Error::GraphNotSupported(
319                    "Graph collection creation not yet implemented".to_string(),
320                ))
321            }
322        }
323    }
324
325    /// Creates a new metadata-only collection (no vectors, no HNSW index).
326    ///
327    /// Metadata-only collections are optimized for storing reference data,
328    /// catalogs, and other non-vector data. They support CRUD operations
329    /// and `VelesQL` queries on payload, but NOT vector search.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if the directory cannot be created or the config cannot be saved.
334    pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
335        let config = CollectionConfig {
336            name: name.to_string(),
337            dimension: 0,                   // No vector dimension
338            metric: DistanceMetric::Cosine, // Default, not used
339            point_count: 0,
340            storage_mode: StorageMode::Full, // Default, not used
341            metadata_only: true,
342            graph_schema: None,
343            embedding_dimension: None,
344            pq_rescore_oversampling: Some(4),
345            hnsw_params: None,
346            #[cfg(feature = "persistence")]
347            deferred_indexing: None,
348        };
349        Self::create_from_config(path, config, None)
350    }
351
352    /// Returns true if this is a metadata-only collection.
353    #[must_use]
354    pub fn is_metadata_only(&self) -> bool {
355        self.config.read().metadata_only
356    }
357
358    /// Opens an existing collection from the specified path.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if the config file cannot be read or parsed.
363    ///
364    /// # INVARIANT(CACHE-01): write_generation starts at 0 on open
365    ///
366    /// Every call to `Collection::open` initialises `write_generation` to 0.
367    /// This is **safe** for cache correctness because:
368    ///
369    /// 1. The plan cache is **not persisted** across process restarts — it is
370    ///    always empty when the database opens. There are therefore no stale
371    ///    cached plans that could be incorrectly served.
372    ///
373    /// 2. `Database::load_collections` bumps `schema_version` after loading
374    ///    at least one collection (C-3). Any plan key built before the load
375    ///    would carry the pre-load `schema_version` and would miss the cache
376    ///    even if the `write_generation` happened to match.
377    ///
378    /// 3. Within a single process lifetime the `write_generation` is only ever
379    ///    incremented (never reset), so a cache key built with generation N
380    ///    will never be reused once the generation advances past N.
381    pub fn open(path: PathBuf) -> Result<Self> {
382        let config_path = path.join("config.json");
383        let config_data = std::fs::read_to_string(&config_path)?;
384        let config: CollectionConfig =
385            serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
386
387        // Open persistent storages
388        let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
389        let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
390
391        // Load HNSW index if it exists, otherwise create new (empty).
392        // When hnsw.bin is absent and config.hnsw_params is set, honour the
393        // persisted custom params so they survive collection reopen.
394        let index = Self::load_or_create_hnsw(&path, &config)?;
395        let text_index = Arc::new(Bm25Index::new());
396
397        // Rebuild BM25 index from persisted payloads
398        Self::rebuild_bm25_index(&payload_storage, &text_index);
399
400        // Load persisted graph/sparse indexes (EPIC-009, EPIC-062)
401        let property_index = Self::load_property_index(&path);
402        let range_index = Self::load_range_index(&path);
403        let edge_store = Self::load_edge_store(&path);
404        let sparse_indexes = Self::load_named_sparse_indexes(&path);
405
406        // Reconcile point_count from storage (config.json may be stale if
407        // the previous process exited without calling save_config).
408        let actual_count = if config.metadata_only {
409            payload_storage.read().ids().len()
410        } else {
411            vector_storage.read().len()
412        };
413        let mut config = config;
414        config.point_count = actual_count;
415
416        // TODO #370: implement crash recovery gap detection for deferred indexer.
417        // After loading HNSW and vector storage, detect vectors in storage but
418        // not in HNSW (gap vectors from a crash during deferred merge) and
419        // re-index them. This requires comparing storage IDs against HNSW IDs
420        // which may be expensive for large collections.
421
422        Ok(Self::assemble(CollectionParts {
423            path,
424            config,
425            vector_storage,
426            payload_storage,
427            index,
428            text_index,
429            property_index,
430            range_index,
431            edge_store,
432            sparse_indexes,
433        }))
434    }
435
436    /// Creates a new graph collection (with optional node embeddings).
437    ///
438    /// Persists `graph_schema` and `embedding_dimension` in `config.json`.
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if the directory cannot be created or the config cannot be saved.
443    pub fn create_graph_collection(
444        path: PathBuf,
445        name: &str,
446        schema: crate::collection::graph::GraphSchema,
447        embedding_dim: Option<usize>,
448        metric: DistanceMetric,
449    ) -> Result<Self> {
450        let config = CollectionConfig {
451            name: name.to_string(),
452            dimension: embedding_dim.unwrap_or(0),
453            metric,
454            point_count: 0,
455            storage_mode: StorageMode::Full,
456            metadata_only: false,
457            graph_schema: Some(schema),
458            embedding_dimension: embedding_dim,
459            pq_rescore_oversampling: Some(4),
460            hnsw_params: None,
461            #[cfg(feature = "persistence")]
462            deferred_indexing: None,
463        };
464        // NOTE: create_from_config validates dimension only when > 0,
465        // so embedding_dim=None (dimension=0) skips validation correctly.
466        Self::create_from_config(path, config, None)
467    }
468
469    /// Loads the HNSW index from `hnsw.bin` or creates an empty one.
470    ///
471    /// When `hnsw.bin` is absent and `config.hnsw_params` is set, the
472    /// persisted custom params are honoured so they survive collection reopen.
473    fn load_or_create_hnsw(
474        path: &std::path::Path,
475        config: &CollectionConfig,
476    ) -> Result<Arc<HnswIndex>> {
477        if path.join("hnsw.bin").exists() {
478            let idx = HnswIndex::load(path, config.dimension, config.metric)?;
479            Ok(Arc::new(idx))
480        } else if let Some(params) = config.hnsw_params {
481            Ok(Arc::new(HnswIndex::with_params(
482                config.dimension,
483                config.metric,
484                params,
485            )?))
486        } else {
487            Ok(Arc::new(HnswIndex::new(config.dimension, config.metric)?))
488        }
489    }
490
491    /// Loads all named sparse indexes from disk.
492    ///
493    /// Scans for `sparse.meta` (default name `""`) and `sparse-{name}.meta` files.
494    /// Returns a `BTreeMap` keyed by sparse vector name.
495    ///
496    /// # Concurrency safety of `read_dir`
497    ///
498    /// The `read_dir` scan below is safe from race conditions for two reasons:
499    ///
500    /// 1. **Single-threaded open**: `Collection::open` (and therefore this
501    ///    function) is always called from `Database::open`, which runs
502    ///    single-threaded during startup. No concurrent writers exist at this
503    ///    point.
504    ///
505    /// 2. **Atomic rename in compaction**: `compact_with_prefix` writes new
506    ///    data to `{prefix}.*.tmp` staging files and only promotes them to
507    ///    their final names via an atomic `rename(2)`. A `read_dir` scan
508    ///    therefore never observes a partially-written `sparse-*.meta` file;
509    ///    it either sees the complete previous version or the complete new
510    ///    version — never a torn write.
511    fn load_named_sparse_indexes(
512        path: &std::path::Path,
513    ) -> BTreeMap<String, crate::index::sparse::SparseInvertedIndex> {
514        let mut indexes = BTreeMap::new();
515
516        // Load default (unprefixed) sparse index: sparse.meta / sparse.wal
517        match crate::index::sparse::persistence::load_from_disk(path) {
518            Ok(Some(idx)) => {
519                indexes.insert(DEFAULT_SPARSE_INDEX_NAME.to_string(), idx);
520            }
521            Ok(None) => {}
522            Err(e) => {
523                tracing::warn!(
524                    "Failed to load default sparse index from {:?}: {}. Skipping.",
525                    path,
526                    e
527                );
528            }
529        }
530
531        // Scan for named sparse indexes: sparse-{name}.meta files.
532        // The `.meta` suffix is the sentinel for a fully compacted (committed)
533        // index file; stale `.tmp` artefacts from interrupted compactions are
534        // ignored because they do not match the `strip_suffix(".meta")` filter.
535        if let Ok(entries) = std::fs::read_dir(path) {
536            for entry in entries.flatten() {
537                let file_name = entry.file_name();
538                let name_str = file_name.to_string_lossy();
539                if let Some(sparse_name) = name_str
540                    .strip_prefix("sparse-")
541                    .and_then(|s| s.strip_suffix(".meta"))
542                {
543                    let sparse_name = sparse_name.to_string();
544                    match crate::index::sparse::persistence::load_named_from_disk(
545                        path,
546                        &sparse_name,
547                    ) {
548                        Ok(Some(idx)) => {
549                            indexes.insert(sparse_name, idx);
550                        }
551                        Ok(None) => {}
552                        Err(e) => {
553                            tracing::warn!(
554                                "Failed to load sparse index '{}' from {:?}: {}. Skipping.",
555                                sparse_name,
556                                path,
557                                e
558                            );
559                        }
560                    }
561                }
562            }
563        }
564
565        indexes
566    }
567
568    /// Loads a persisted index from disk, falling back to a default on missing
569    /// file or deserialization error.
570    ///
571    /// This is the single implementation for the load-or-default pattern shared
572    /// by `PropertyIndex`, `RangeIndex`, and `EdgeStore`.
573    fn load_or_default<T>(
574        path: &std::path::Path,
575        file_name: &str,
576        load_fn: impl FnOnce(&std::path::Path) -> std::io::Result<T>,
577        default: impl FnOnce() -> T,
578    ) -> T {
579        let full_path = path.join(file_name);
580        if full_path.exists() {
581            match load_fn(&full_path) {
582                Ok(val) => return val,
583                Err(e) => tracing::warn!(
584                    "Failed to load {} from {:?}: {}. Starting with empty default.",
585                    file_name,
586                    full_path,
587                    e
588                ),
589            }
590        }
591        default()
592    }
593
594    fn load_edge_store(path: &std::path::Path) -> EdgeStore {
595        Self::load_or_default(
596            path,
597            "edge_store.bin",
598            EdgeStore::load_from_file,
599            EdgeStore::new,
600        )
601    }
602
603    fn load_property_index(path: &std::path::Path) -> PropertyIndex {
604        Self::load_or_default(
605            path,
606            "property_index.bin",
607            PropertyIndex::load_from_file,
608            PropertyIndex::new,
609        )
610    }
611
612    fn load_range_index(path: &std::path::Path) -> RangeIndex {
613        Self::load_or_default(
614            path,
615            "range_index.bin",
616            RangeIndex::load_from_file,
617            RangeIndex::new,
618        )
619    }
620
621    /// Returns a reference to the collection's guard rails.
622    #[must_use]
623    pub fn guard_rails(&self) -> &std::sync::Arc<crate::guardrails::GuardRails> {
624        &self.guard_rails
625    }
626
627    /// Returns the collection configuration.
628    #[must_use]
629    pub fn config(&self) -> CollectionConfig {
630        self.config.read().clone()
631    }
632
633    /// Saves the collection configuration and index to disk.
634    ///
635    /// If the delta buffer is active (HNSW rebuild in progress), drains
636    /// buffered vectors into the HNSW index before persisting it. This
637    /// ensures graceful shutdown does not lose vectors that were accepted
638    /// during the rebuild window.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if storage operations fail.
643    pub fn flush(&self) -> Result<()> {
644        self.save_config()?;
645        self.vector_storage.write().flush()?;
646        self.payload_storage.write().flush()?;
647        // Drain delta buffer into HNSW before persisting the index.
648        // Lock order: delta_buffer(10) is acquired after vector_storage(2)
649        // and payload_storage(3) — both already released above.
650        self.drain_delta_into_index();
651        // Drain deferred indexer into HNSW (position 11, after delta at 10).
652        self.drain_deferred_into_index();
653        self.index.save(&self.path)?;
654        self.flush_secondary_indexes()?;
655        self.flush_sparse_indexes()
656    }
657
658    /// Drains the delta buffer into the HNSW index (if active).
659    ///
660    /// No-op when the delta buffer is inactive (no rebuild in progress).
661    /// After draining, the buffer is empty and inactive.
662    ///
663    /// Filters out IDs that have been deleted from vector storage since they
664    /// were buffered, preventing ghost vectors from being re-inserted into
665    /// HNSW after a concurrent delete.
666    ///
667    /// Uses `insert_batch_parallel` for consistent batch insert performance
668    /// (same strategy as `merge_deferred_batch` in crud.rs).
669    ///
670    /// # Lock ordering
671    ///
672    /// Acquires `vector_storage` (position 2) briefly for the validity
673    /// check, releases it, then inserts into the index (no lock).
674    /// `delta_buffer` (position 10) is acquired first via `deactivate_and_drain`.
675    /// The caller must NOT hold any lower-numbered lock when calling this method.
676    #[cfg(feature = "persistence")]
677    fn drain_delta_into_index(&self) {
678        let drained = self.delta_buffer.deactivate_and_drain();
679        if drained.is_empty() {
680            return;
681        }
682        // Filter out vectors deleted from storage during the buffer's
683        // lifetime to prevent ghost re-insertion into HNSW.
684        let storage = self.vector_storage.read();
685        let valid: Vec<(u64, &[f32])> = drained
686            .iter()
687            .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
688            .map(|(id, v)| (*id, v.as_slice()))
689            .collect();
690        drop(storage); // Release read lock before batch insert
691        if !valid.is_empty() {
692            self.index.insert_batch_parallel(valid);
693        }
694    }
695
696    /// No-op stub when persistence is disabled.
697    #[cfg(not(feature = "persistence"))]
698    fn drain_delta_into_index(&self) {}
699
700    /// Drains the deferred indexer into the HNSW index (if configured).
701    ///
702    /// No-op when deferred indexing is not configured or disabled.
703    /// After draining, both buffers are empty and inactive.
704    ///
705    /// Filters out IDs that have been deleted from vector storage since they
706    /// were buffered, preventing ghost vectors from being re-inserted into
707    /// HNSW after a concurrent delete.
708    ///
709    /// Uses `insert_batch_parallel` for consistent batch insert performance
710    /// (same strategy as `merge_deferred_batch` in crud.rs).
711    ///
712    /// # Lock ordering
713    ///
714    /// Acquires `vector_storage` (position 2) briefly for the validity
715    /// check, releases it, then inserts into the index (no lock).
716    /// `deferred_indexer` (position 11) is acquired first via `drain_all`.
717    /// The caller must NOT hold any lower-numbered lock.
718    #[cfg(feature = "persistence")]
719    fn drain_deferred_into_index(&self) {
720        if let Some(ref di) = self.deferred_indexer {
721            let drained = di.drain_all();
722            if drained.is_empty() {
723                return;
724            }
725            // Filter out vectors deleted from storage during the buffer's
726            // lifetime to prevent ghost re-insertion into HNSW.
727            let storage = self.vector_storage.read();
728            let valid: Vec<(u64, &[f32])> = drained
729                .iter()
730                .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
731                .map(|(id, v)| (*id, v.as_slice()))
732                .collect();
733            drop(storage); // Release read lock before batch insert
734            if !valid.is_empty() {
735                self.index.insert_batch_parallel(valid);
736            }
737        }
738    }
739
740    /// No-op stub when persistence is disabled.
741    #[cfg(not(feature = "persistence"))]
742    fn drain_deferred_into_index(&self) {}
743
744    /// Persists property index, range index, and edge store (EPIC-009 US-005).
745    fn flush_secondary_indexes(&self) -> Result<()> {
746        let property_index_path = self.path.join("property_index.bin");
747        self.property_index
748            .read()
749            .save_to_file(&property_index_path)?;
750
751        let range_index_path = self.path.join("range_index.bin");
752        self.range_index.read().save_to_file(&range_index_path)?;
753
754        // Save EdgeStore for graph collections (BUG-1: was never persisted)
755        if self.config.read().graph_schema.is_some() {
756            let edge_store_path = self.path.join("edge_store.bin");
757            self.edge_store.read().save_to_file(&edge_store_path)?;
758        }
759
760        Ok(())
761    }
762
763    /// Compacts all named sparse indexes to disk (EPIC-062 / SPARSE-04).
764    fn flush_sparse_indexes(&self) -> Result<()> {
765        let indexes = self.sparse_indexes.read();
766        for (name, idx) in indexes.iter() {
767            crate::index::sparse::persistence::compact_named(&self.path, name, idx)?;
768        }
769        Ok(())
770    }
771
772    /// Returns a reference to the collection's data path.
773    #[must_use]
774    pub(crate) fn data_path(&self) -> &std::path::Path {
775        &self.path
776    }
777
778    /// Returns a write guard on the collection config for mutation.
779    pub(crate) fn config_write(
780        &self,
781    ) -> parking_lot::RwLockWriteGuard<'_, crate::collection::types::CollectionConfig> {
782        self.config.write()
783    }
784
785    /// Returns a write guard on the PQ quantizer slot.
786    pub(crate) fn pq_quantizer_write(
787        &self,
788    ) -> parking_lot::RwLockWriteGuard<'_, Option<crate::quantization::ProductQuantizer>> {
789        self.pq_quantizer.write()
790    }
791
792    /// Returns a read guard on the PQ quantizer slot.
793    pub(crate) fn pq_quantizer_read(
794        &self,
795    ) -> parking_lot::RwLockReadGuard<'_, Option<crate::quantization::ProductQuantizer>> {
796        self.pq_quantizer.read()
797    }
798
799    /// Saves the collection configuration to disk.
800    ///
801    /// Uses atomic write-tmp-fsync-rename to prevent torn writes on crash.
802    pub(crate) fn save_config(&self) -> Result<()> {
803        use std::io::Write;
804
805        let config = self.config.read();
806        let config_path = self.path.join("config.json");
807        let tmp_path = self.path.join("config.json.tmp");
808        let config_data = serde_json::to_string_pretty(&*config)
809            .map_err(|e| Error::Serialization(e.to_string()))?;
810
811        let file = std::fs::File::create(&tmp_path)?;
812        let mut writer = std::io::BufWriter::new(file);
813        writer.write_all(config_data.as_bytes())?;
814        writer.flush()?;
815        writer.get_ref().sync_all()?;
816        std::fs::rename(&tmp_path, &config_path)?;
817        Ok(())
818    }
819}