Skip to main content

velesdb_core/collection/core/
lifecycle.rs

1//! Collection lifecycle methods (create, open, flush).
2
3use crate::collection::graph::{EdgeStore, PropertyIndex, RangeIndex};
4use crate::collection::types::{Collection, CollectionConfig, CollectionType};
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::guardrails::GuardRails;
8use crate::index::{Bm25Index, HnswIndex};
9use crate::quantization::StorageMode;
10use crate::sparse_index::DEFAULT_SPARSE_INDEX_NAME;
11use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
12use crate::validation::validate_dimension;
13use crate::velesql::{QueryCache, QueryPlanner};
14
15use crate::index::sparse::SparseInvertedIndex;
16
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use parking_lot::{Mutex, RwLock};
20use std::path::PathBuf;
21use std::sync::Arc;
22
23/// Pre-built components needed to assemble a [`Collection`].
24///
25/// Used by [`Collection::assemble`] as the single point of truth for the
26/// struct literal, eliminating duplication across the five public constructors.
27struct CollectionParts {
28    path: PathBuf,
29    config: CollectionConfig,
30    vector_storage: Arc<RwLock<MmapStorage>>,
31    payload_storage: Arc<RwLock<LogPayloadStorage>>,
32    index: Arc<HnswIndex>,
33    text_index: Arc<Bm25Index>,
34    property_index: PropertyIndex,
35    range_index: RangeIndex,
36    edge_store: EdgeStore,
37    sparse_indexes: BTreeMap<String, SparseInvertedIndex>,
38}
39
40impl CollectionParts {
41    /// Returns a new `CollectionParts` with empty graph and sparse indexes.
42    ///
43    /// The six storage/index fields must be supplied by the caller; only the
44    /// four optional index fields (`property_index`, `range_index`,
45    /// `edge_store`, `sparse_indexes`) default to empty.
46    fn new_with_empty_indexes(
47        path: PathBuf,
48        config: CollectionConfig,
49        vector_storage: Arc<RwLock<MmapStorage>>,
50        payload_storage: Arc<RwLock<LogPayloadStorage>>,
51        index: Arc<HnswIndex>,
52        text_index: Arc<Bm25Index>,
53    ) -> Self {
54        Self {
55            path,
56            config,
57            vector_storage,
58            payload_storage,
59            index,
60            text_index,
61            property_index: PropertyIndex::new(),
62            range_index: RangeIndex::new(),
63            edge_store: EdgeStore::new(),
64            sparse_indexes: BTreeMap::new(),
65        }
66    }
67}
68
69impl Collection {
70    /// Assembles a `Collection` from pre-built components and default caches.
71    ///
72    /// This is the single point of truth for the `Self { .. }` struct literal,
73    /// eliminating duplication across the five public constructors.
74    fn assemble(parts: CollectionParts) -> Self {
75        #[cfg(feature = "persistence")]
76        let deferred_indexer = Self::build_deferred_indexer(&parts.config);
77
78        Self {
79            path: parts.path,
80            config: Arc::new(RwLock::new(parts.config)),
81            vector_storage: parts.vector_storage,
82            payload_storage: parts.payload_storage,
83            index: parts.index,
84            text_index: parts.text_index,
85            sq8_cache: Arc::new(RwLock::new(HashMap::new())),
86            binary_cache: Arc::new(RwLock::new(HashMap::new())),
87            pq_cache: Arc::new(RwLock::new(HashMap::new())),
88            pq_quantizer: Arc::new(RwLock::new(None)),
89            pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
90            property_index: Arc::new(RwLock::new(parts.property_index)),
91            range_index: Arc::new(RwLock::new(parts.range_index)),
92            edge_store: Arc::new(RwLock::new(parts.edge_store)),
93            sparse_indexes: Arc::new(RwLock::new(parts.sparse_indexes)),
94            secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
95            guard_rails: Arc::new(GuardRails::default()),
96            query_planner: Arc::new(QueryPlanner::new()),
97            query_cache: Arc::new(QueryCache::new(256)),
98            cached_stats: Arc::new(Mutex::new(None)),
99            write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100            #[cfg(feature = "persistence")]
101            stream_ingester: Arc::new(RwLock::new(None)),
102            #[cfg(feature = "persistence")]
103            delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
104            #[cfg(feature = "persistence")]
105            deferred_indexer,
106        }
107    }
108
109    /// Builds the optional `DeferredIndexer` from config.
110    ///
111    /// Returns `Some(Arc<DeferredIndexer>)` when `deferred_indexing` is
112    /// configured and enabled; `None` otherwise.
113    #[cfg(feature = "persistence")]
114    fn build_deferred_indexer(
115        config: &CollectionConfig,
116    ) -> Option<Arc<crate::collection::streaming::DeferredIndexer>> {
117        config
118            .deferred_indexing
119            .as_ref()
120            .filter(|cfg| cfg.enabled)
121            .map(|cfg| {
122                Arc::new(crate::collection::streaming::DeferredIndexer::new(
123                    cfg.clone(),
124                ))
125            })
126    }
127
128    /// Initialises persistent storages and indexes for a new collection.
129    ///
130    /// Returns a complete `CollectionParts` with empty graph/sparse indexes,
131    /// ready to be passed to [`Self::assemble`].
132    fn init_collection_parts(
133        path: PathBuf,
134        config: CollectionConfig,
135        hnsw_params: Option<crate::index::hnsw::HnswParams>,
136    ) -> Result<CollectionParts> {
137        let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
138        let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
139        let index = if let Some(params) = hnsw_params {
140            Arc::new(HnswIndex::with_params(
141                config.dimension,
142                config.metric,
143                params,
144            )?)
145        } else {
146            Arc::new(HnswIndex::new(config.dimension, config.metric)?)
147        };
148        let text_index = Arc::new(Bm25Index::new());
149        Ok(CollectionParts::new_with_empty_indexes(
150            path,
151            config,
152            vector_storage,
153            payload_storage,
154            index,
155            text_index,
156        ))
157    }
158
159    /// Rebuilds the BM25 full-text index from persisted payloads.
160    fn rebuild_bm25_index(
161        payload_storage: &Arc<RwLock<LogPayloadStorage>>,
162        text_index: &Arc<Bm25Index>,
163    ) {
164        let storage = payload_storage.read();
165        let ids = storage.ids();
166        for id in ids {
167            if let Ok(Some(payload)) = storage.retrieve(id) {
168                let text = Self::extract_text_from_payload(&payload);
169                if !text.is_empty() {
170                    text_index.add_document(id, &text);
171                }
172            }
173        }
174    }
175
176    /// Creates a new collection at the specified path.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the directory cannot be created or the config cannot be saved.
181    pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
182        Self::create_with_options(path, dimension, metric, StorageMode::default())
183    }
184
185    /// Derives the collection name from the directory path.
186    fn name_from_path(path: &std::path::Path) -> String {
187        path.file_name()
188            .and_then(|n| n.to_str())
189            .unwrap_or("unknown")
190            .to_string()
191    }
192
193    /// Shared init-and-persist pipeline for all `create_*` constructors.
194    ///
195    /// Validates dimensions (when non-zero), creates the directory, assembles
196    /// the collection from the supplied config, and persists `config.json`.
197    fn create_from_config(
198        path: PathBuf,
199        config: CollectionConfig,
200        hnsw_params: Option<crate::index::hnsw::HnswParams>,
201    ) -> Result<Self> {
202        let skip_dimension_check = config.metadata_only
203            || (config.graph_schema.is_some() && config.embedding_dimension.is_none());
204        if skip_dimension_check {
205            // dimension=0 is valid for metadata-only and graph-without-embedding
206        } else {
207            validate_dimension(config.dimension)?;
208        }
209        std::fs::create_dir_all(&path)?;
210
211        let collection = Self::assemble(Self::init_collection_parts(path, config, hnsw_params)?);
212        collection.save_config()?;
213        Ok(collection)
214    }
215
216    /// Creates a new collection with custom storage options.
217    ///
218    /// # Arguments
219    ///
220    /// * `path` - Path to the collection directory
221    /// * `dimension` - Vector dimension
222    /// * `metric` - Distance metric
223    /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the directory cannot be created or the config cannot be saved.
228    pub fn create_with_options(
229        path: PathBuf,
230        dimension: usize,
231        metric: DistanceMetric,
232        storage_mode: StorageMode,
233    ) -> Result<Self> {
234        let config = CollectionConfig {
235            name: Self::name_from_path(&path),
236            dimension,
237            metric,
238            point_count: 0,
239            storage_mode,
240            metadata_only: false,
241            graph_schema: None,
242            embedding_dimension: None,
243            pq_rescore_oversampling: Some(4),
244            hnsw_params: None,
245            #[cfg(feature = "persistence")]
246            deferred_indexing: None,
247        };
248        Self::create_from_config(path, config, None)
249    }
250
251    /// Creates a new collection with custom HNSW parameters.
252    ///
253    /// This is the lowest-level vector collection constructor, giving full
254    /// control over the HNSW graph topology (M, `ef_construction`) while
255    /// retaining the standard storage pipeline.
256    ///
257    /// # Arguments
258    ///
259    /// * `path` - Path to the collection directory
260    /// * `dimension` - Vector dimension
261    /// * `metric` - Distance metric
262    /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
263    /// * `hnsw_params` - Custom HNSW index parameters
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the directory cannot be created or the config cannot be saved.
268    pub fn create_with_hnsw_params(
269        path: PathBuf,
270        dimension: usize,
271        metric: DistanceMetric,
272        storage_mode: StorageMode,
273        hnsw_params: crate::index::hnsw::HnswParams,
274    ) -> Result<Self> {
275        let config = CollectionConfig {
276            name: Self::name_from_path(&path),
277            dimension,
278            metric,
279            point_count: 0,
280            storage_mode,
281            metadata_only: false,
282            graph_schema: None,
283            embedding_dimension: None,
284            pq_rescore_oversampling: Some(4),
285            hnsw_params: Some(hnsw_params),
286            #[cfg(feature = "persistence")]
287            deferred_indexing: None,
288        };
289        Self::create_from_config(path, config, Some(hnsw_params))
290    }
291
292    /// Creates a new collection with a specific type (Vector or `MetadataOnly`).
293    ///
294    /// # Arguments
295    ///
296    /// * `path` - Path to the collection directory
297    /// * `name` - Name of the collection
298    /// * `collection_type` - Type of collection to create
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the directory cannot be created or the config cannot be saved.
303    pub fn create_typed(
304        path: PathBuf,
305        name: &str,
306        collection_type: &CollectionType,
307    ) -> Result<Self> {
308        match collection_type {
309            CollectionType::Vector {
310                dimension,
311                metric,
312                storage_mode,
313            } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
314            CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
315            CollectionType::Graph { .. } => {
316                // Graph collections will be implemented in EPIC-004
317                // For now, return an error indicating this is not yet supported
318                Err(crate::Error::GraphNotSupported(
319                    "Graph collection creation not yet implemented".to_string(),
320                ))
321            }
322        }
323    }
324
325    /// Creates a new metadata-only collection (no vectors, no HNSW index).
326    ///
327    /// Metadata-only collections are optimized for storing reference data,
328    /// catalogs, and other non-vector data. They support CRUD operations
329    /// and `VelesQL` queries on payload, but NOT vector search.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if the directory cannot be created or the config cannot be saved.
334    pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
335        let config = CollectionConfig {
336            name: name.to_string(),
337            dimension: 0,                   // No vector dimension
338            metric: DistanceMetric::Cosine, // Default, not used
339            point_count: 0,
340            storage_mode: StorageMode::Full, // Default, not used
341            metadata_only: true,
342            graph_schema: None,
343            embedding_dimension: None,
344            pq_rescore_oversampling: Some(4),
345            hnsw_params: None,
346            #[cfg(feature = "persistence")]
347            deferred_indexing: None,
348        };
349        Self::create_from_config(path, config, None)
350    }
351
352    /// Returns true if this is a metadata-only collection.
353    #[must_use]
354    pub fn is_metadata_only(&self) -> bool {
355        self.config.read().metadata_only
356    }
357
358    /// Opens an existing collection from the specified path.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if the config file cannot be read or parsed.
363    ///
364    /// # INVARIANT(CACHE-01): write_generation starts at 0 on open
365    ///
366    /// Every call to `Collection::open` initialises `write_generation` to 0.
367    /// This is **safe** for cache correctness because:
368    ///
369    /// 1. The plan cache is **not persisted** across process restarts — it is
370    ///    always empty when the database opens. There are therefore no stale
371    ///    cached plans that could be incorrectly served.
372    ///
373    /// 2. `Database::load_collections` bumps `schema_version` after loading
374    ///    at least one collection (C-3). Any plan key built before the load
375    ///    would carry the pre-load `schema_version` and would miss the cache
376    ///    even if the `write_generation` happened to match.
377    ///
378    /// 3. Within a single process lifetime the `write_generation` is only ever
379    ///    incremented (never reset), so a cache key built with generation N
380    ///    will never be reused once the generation advances past N.
381    pub fn open(path: PathBuf) -> Result<Self> {
382        let config_path = path.join("config.json");
383        let config_data = std::fs::read_to_string(&config_path)?;
384        let config: CollectionConfig =
385            serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
386
387        // Open persistent storages
388        let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
389        let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
390
391        // Load HNSW index if it exists, otherwise create new (empty).
392        // When hnsw.bin is absent and config.hnsw_params is set, honour the
393        // persisted custom params so they survive collection reopen.
394        let index = Self::load_or_create_hnsw(&path, &config)?;
395        let text_index = Arc::new(Bm25Index::new());
396
397        // Rebuild BM25 index from persisted payloads
398        Self::rebuild_bm25_index(&payload_storage, &text_index);
399
400        // Load persisted graph/sparse indexes (EPIC-009, EPIC-062)
401        let property_index = Self::load_property_index(&path);
402        let range_index = Self::load_range_index(&path);
403        let edge_store = Self::load_edge_store(&path);
404        let sparse_indexes = Self::load_named_sparse_indexes(&path);
405
406        // Reconcile point_count from storage (config.json may be stale if
407        // the previous process exited without calling save_config).
408        let actual_count = if config.metadata_only {
409            payload_storage.read().ids().len()
410        } else {
411            vector_storage.read().len()
412        };
413        let mut config = config;
414        config.point_count = actual_count;
415
416        // Crash recovery: detect vectors in storage but not in HNSW (gap from
417        // crash during deferred merge, delta drain, or normal insert).
418        #[cfg(feature = "persistence")]
419        if !config.metadata_only && config.dimension > 0 {
420            let recovered =
421                super::recovery::recover_hnsw_gap(&vector_storage, &index, config.dimension)?;
422            if recovered > 0 {
423                tracing::info!(
424                    collection = %config.name,
425                    recovered,
426                    "Collection gap recovery completed on open"
427                );
428            }
429        }
430
431        Ok(Self::assemble(CollectionParts {
432            path,
433            config,
434            vector_storage,
435            payload_storage,
436            index,
437            text_index,
438            property_index,
439            range_index,
440            edge_store,
441            sparse_indexes,
442        }))
443    }
444
445    /// Creates a new graph collection (with optional node embeddings).
446    ///
447    /// Persists `graph_schema` and `embedding_dimension` in `config.json`.
448    ///
449    /// # Errors
450    ///
451    /// Returns an error if the directory cannot be created or the config cannot be saved.
452    pub fn create_graph_collection(
453        path: PathBuf,
454        name: &str,
455        schema: crate::collection::graph::GraphSchema,
456        embedding_dim: Option<usize>,
457        metric: DistanceMetric,
458    ) -> Result<Self> {
459        let config = CollectionConfig {
460            name: name.to_string(),
461            dimension: embedding_dim.unwrap_or(0),
462            metric,
463            point_count: 0,
464            storage_mode: StorageMode::Full,
465            metadata_only: false,
466            graph_schema: Some(schema),
467            embedding_dimension: embedding_dim,
468            pq_rescore_oversampling: Some(4),
469            hnsw_params: None,
470            #[cfg(feature = "persistence")]
471            deferred_indexing: None,
472        };
473        // NOTE: create_from_config validates dimension only when > 0,
474        // so embedding_dim=None (dimension=0) skips validation correctly.
475        Self::create_from_config(path, config, None)
476    }
477
478    /// Loads the HNSW index from `hnsw.bin` or creates an empty one.
479    ///
480    /// When `hnsw.bin` is absent and `config.hnsw_params` is set, the
481    /// persisted custom params are honoured so they survive collection reopen.
482    fn load_or_create_hnsw(
483        path: &std::path::Path,
484        config: &CollectionConfig,
485    ) -> Result<Arc<HnswIndex>> {
486        if path.join("hnsw.bin").exists() {
487            let idx = HnswIndex::load(path, config.dimension, config.metric)?;
488            Ok(Arc::new(idx))
489        } else if let Some(params) = config.hnsw_params {
490            Ok(Arc::new(HnswIndex::with_params(
491                config.dimension,
492                config.metric,
493                params,
494            )?))
495        } else {
496            Ok(Arc::new(HnswIndex::new(config.dimension, config.metric)?))
497        }
498    }
499
500    /// Loads all named sparse indexes from disk.
501    ///
502    /// Scans for `sparse.meta` (default name `""`) and `sparse-{name}.meta` files.
503    /// Returns a `BTreeMap` keyed by sparse vector name.
504    ///
505    /// # Concurrency safety of `read_dir`
506    ///
507    /// The `read_dir` scan below is safe from race conditions for two reasons:
508    ///
509    /// 1. **Single-threaded open**: `Collection::open` (and therefore this
510    ///    function) is always called from `Database::open`, which runs
511    ///    single-threaded during startup. No concurrent writers exist at this
512    ///    point.
513    ///
514    /// 2. **Atomic rename in compaction**: `compact_with_prefix` writes new
515    ///    data to `{prefix}.*.tmp` staging files and only promotes them to
516    ///    their final names via an atomic `rename(2)`. A `read_dir` scan
517    ///    therefore never observes a partially-written `sparse-*.meta` file;
518    ///    it either sees the complete previous version or the complete new
519    ///    version — never a torn write.
520    fn load_named_sparse_indexes(
521        path: &std::path::Path,
522    ) -> BTreeMap<String, crate::index::sparse::SparseInvertedIndex> {
523        let mut indexes = BTreeMap::new();
524
525        // Load default (unprefixed) sparse index: sparse.meta / sparse.wal
526        match crate::index::sparse::persistence::load_from_disk(path) {
527            Ok(Some(idx)) => {
528                indexes.insert(DEFAULT_SPARSE_INDEX_NAME.to_string(), idx);
529            }
530            Ok(None) => {}
531            Err(e) => {
532                tracing::warn!(
533                    "Failed to load default sparse index from {:?}: {}. Skipping.",
534                    path,
535                    e
536                );
537            }
538        }
539
540        // Scan for named sparse indexes: sparse-{name}.meta files.
541        // The `.meta` suffix is the sentinel for a fully compacted (committed)
542        // index file; stale `.tmp` artefacts from interrupted compactions are
543        // ignored because they do not match the `strip_suffix(".meta")` filter.
544        if let Ok(entries) = std::fs::read_dir(path) {
545            for entry in entries.flatten() {
546                let file_name = entry.file_name();
547                let name_str = file_name.to_string_lossy();
548                if let Some(sparse_name) = name_str
549                    .strip_prefix("sparse-")
550                    .and_then(|s| s.strip_suffix(".meta"))
551                {
552                    let sparse_name = sparse_name.to_string();
553                    match crate::index::sparse::persistence::load_named_from_disk(
554                        path,
555                        &sparse_name,
556                    ) {
557                        Ok(Some(idx)) => {
558                            indexes.insert(sparse_name, idx);
559                        }
560                        Ok(None) => {}
561                        Err(e) => {
562                            tracing::warn!(
563                                "Failed to load sparse index '{}' from {:?}: {}. Skipping.",
564                                sparse_name,
565                                path,
566                                e
567                            );
568                        }
569                    }
570                }
571            }
572        }
573
574        indexes
575    }
576
577    /// Loads a persisted index from disk, falling back to a default on missing
578    /// file or deserialization error.
579    ///
580    /// This is the single implementation for the load-or-default pattern shared
581    /// by `PropertyIndex`, `RangeIndex`, and `EdgeStore`.
582    fn load_or_default<T>(
583        path: &std::path::Path,
584        file_name: &str,
585        load_fn: impl FnOnce(&std::path::Path) -> std::io::Result<T>,
586        default: impl FnOnce() -> T,
587    ) -> T {
588        let full_path = path.join(file_name);
589        if full_path.exists() {
590            match load_fn(&full_path) {
591                Ok(val) => return val,
592                Err(e) => tracing::warn!(
593                    "Failed to load {} from {:?}: {}. Starting with empty default.",
594                    file_name,
595                    full_path,
596                    e
597                ),
598            }
599        }
600        default()
601    }
602
603    fn load_edge_store(path: &std::path::Path) -> EdgeStore {
604        Self::load_or_default(
605            path,
606            "edge_store.bin",
607            EdgeStore::load_from_file,
608            EdgeStore::new,
609        )
610    }
611
612    fn load_property_index(path: &std::path::Path) -> PropertyIndex {
613        Self::load_or_default(
614            path,
615            "property_index.bin",
616            PropertyIndex::load_from_file,
617            PropertyIndex::new,
618        )
619    }
620
621    fn load_range_index(path: &std::path::Path) -> RangeIndex {
622        Self::load_or_default(
623            path,
624            "range_index.bin",
625            RangeIndex::load_from_file,
626            RangeIndex::new,
627        )
628    }
629
630    /// Returns a reference to the collection's guard rails.
631    #[must_use]
632    pub fn guard_rails(&self) -> &std::sync::Arc<crate::guardrails::GuardRails> {
633        &self.guard_rails
634    }
635
636    /// Returns the collection configuration.
637    #[must_use]
638    pub fn config(&self) -> CollectionConfig {
639        self.config.read().clone()
640    }
641
642    /// Saves the collection configuration and index to disk.
643    ///
644    /// If the delta buffer is active (HNSW rebuild in progress), drains
645    /// buffered vectors into the HNSW index before persisting it. This
646    /// ensures graceful shutdown does not lose vectors that were accepted
647    /// during the rebuild window.
648    ///
649    /// # Errors
650    ///
651    /// Returns an error if storage operations fail.
652    pub fn flush(&self) -> Result<()> {
653        self.save_config()?;
654        self.vector_storage.write().flush()?;
655        self.payload_storage.write().flush()?;
656        // Drain delta buffer into HNSW before persisting the index.
657        // Lock order: delta_buffer(10) is acquired after vector_storage(2)
658        // and payload_storage(3) — both already released above.
659        self.drain_delta_into_index();
660        // Drain deferred indexer into HNSW (position 11, after delta at 10).
661        self.drain_deferred_into_index();
662        self.index.save(&self.path)?;
663        self.flush_secondary_indexes()?;
664        self.flush_sparse_indexes()
665    }
666
667    /// Drains the delta buffer into the HNSW index (if active).
668    ///
669    /// No-op when the delta buffer is inactive (no rebuild in progress).
670    /// After draining, the buffer is empty and inactive.
671    ///
672    /// Filters out IDs that have been deleted from vector storage since they
673    /// were buffered, preventing ghost vectors from being re-inserted into
674    /// HNSW after a concurrent delete.
675    ///
676    /// Uses `insert_batch_parallel` for consistent batch insert performance
677    /// (same strategy as `merge_deferred_batch` in crud.rs).
678    ///
679    /// # Lock ordering
680    ///
681    /// Acquires `vector_storage` (position 2) briefly for the validity
682    /// check, releases it, then inserts into the index (no lock).
683    /// `delta_buffer` (position 10) is acquired first via `deactivate_and_drain`.
684    /// The caller must NOT hold any lower-numbered lock when calling this method.
685    #[cfg(feature = "persistence")]
686    fn drain_delta_into_index(&self) {
687        let drained = self.delta_buffer.deactivate_and_drain();
688        if drained.is_empty() {
689            return;
690        }
691        // Filter out vectors deleted from storage during the buffer's
692        // lifetime to prevent ghost re-insertion into HNSW.
693        let storage = self.vector_storage.read();
694        let valid: Vec<(u64, &[f32])> = drained
695            .iter()
696            .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
697            .map(|(id, v)| (*id, v.as_slice()))
698            .collect();
699        drop(storage); // Release read lock before batch insert
700        if !valid.is_empty() {
701            self.index.insert_batch_parallel(valid);
702        }
703    }
704
705    /// No-op stub when persistence is disabled.
706    #[cfg(not(feature = "persistence"))]
707    fn drain_delta_into_index(&self) {}
708
709    /// Drains the deferred indexer into the HNSW index (if configured).
710    ///
711    /// No-op when deferred indexing is not configured or disabled.
712    /// After draining, both buffers are empty and inactive.
713    ///
714    /// Filters out IDs that have been deleted from vector storage since they
715    /// were buffered, preventing ghost vectors from being re-inserted into
716    /// HNSW after a concurrent delete.
717    ///
718    /// Uses `insert_batch_parallel` for consistent batch insert performance
719    /// (same strategy as `merge_deferred_batch` in crud.rs).
720    ///
721    /// # Lock ordering
722    ///
723    /// Acquires `vector_storage` (position 2) briefly for the validity
724    /// check, releases it, then inserts into the index (no lock).
725    /// `deferred_indexer` (position 11) is acquired first via `drain_all`.
726    /// The caller must NOT hold any lower-numbered lock.
727    #[cfg(feature = "persistence")]
728    fn drain_deferred_into_index(&self) {
729        if let Some(ref di) = self.deferred_indexer {
730            let drained = di.drain_all();
731            if drained.is_empty() {
732                return;
733            }
734            // Filter out vectors deleted from storage during the buffer's
735            // lifetime to prevent ghost re-insertion into HNSW.
736            let storage = self.vector_storage.read();
737            let valid: Vec<(u64, &[f32])> = drained
738                .iter()
739                .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
740                .map(|(id, v)| (*id, v.as_slice()))
741                .collect();
742            drop(storage); // Release read lock before batch insert
743            if !valid.is_empty() {
744                self.index.insert_batch_parallel(valid);
745            }
746        }
747    }
748
749    /// No-op stub when persistence is disabled.
750    #[cfg(not(feature = "persistence"))]
751    fn drain_deferred_into_index(&self) {}
752
753    /// Persists property index, range index, and edge store (EPIC-009 US-005).
754    fn flush_secondary_indexes(&self) -> Result<()> {
755        let property_index_path = self.path.join("property_index.bin");
756        self.property_index
757            .read()
758            .save_to_file(&property_index_path)?;
759
760        let range_index_path = self.path.join("range_index.bin");
761        self.range_index.read().save_to_file(&range_index_path)?;
762
763        // Save EdgeStore for graph collections (BUG-1: was never persisted)
764        if self.config.read().graph_schema.is_some() {
765            let edge_store_path = self.path.join("edge_store.bin");
766            self.edge_store.read().save_to_file(&edge_store_path)?;
767        }
768
769        Ok(())
770    }
771
772    /// Compacts all named sparse indexes to disk (EPIC-062 / SPARSE-04).
773    fn flush_sparse_indexes(&self) -> Result<()> {
774        let indexes = self.sparse_indexes.read();
775        for (name, idx) in indexes.iter() {
776            crate::index::sparse::persistence::compact_named(&self.path, name, idx)?;
777        }
778        Ok(())
779    }
780
781    /// Returns a reference to the collection's data path.
782    #[must_use]
783    pub(crate) fn data_path(&self) -> &std::path::Path {
784        &self.path
785    }
786
787    /// Returns a write guard on the collection config for mutation.
788    pub(crate) fn config_write(
789        &self,
790    ) -> parking_lot::RwLockWriteGuard<'_, crate::collection::types::CollectionConfig> {
791        self.config.write()
792    }
793
794    /// Returns a write guard on the PQ quantizer slot.
795    pub(crate) fn pq_quantizer_write(
796        &self,
797    ) -> parking_lot::RwLockWriteGuard<'_, Option<crate::quantization::ProductQuantizer>> {
798        self.pq_quantizer.write()
799    }
800
801    /// Returns a read guard on the PQ quantizer slot.
802    pub(crate) fn pq_quantizer_read(
803        &self,
804    ) -> parking_lot::RwLockReadGuard<'_, Option<crate::quantization::ProductQuantizer>> {
805        self.pq_quantizer.read()
806    }
807
808    /// Saves the collection configuration to disk.
809    ///
810    /// Uses atomic write-tmp-fsync-rename to prevent torn writes on crash.
811    pub(crate) fn save_config(&self) -> Result<()> {
812        use std::io::Write;
813
814        let config = self.config.read();
815        let config_path = self.path.join("config.json");
816        let tmp_path = self.path.join("config.json.tmp");
817        let config_data = serde_json::to_string_pretty(&*config)
818            .map_err(|e| Error::Serialization(e.to_string()))?;
819
820        let file = std::fs::File::create(&tmp_path)?;
821        let mut writer = std::io::BufWriter::new(file);
822        writer.write_all(config_data.as_bytes())?;
823        writer.flush()?;
824        writer.get_ref().sync_all()?;
825        std::fs::rename(&tmp_path, &config_path)?;
826        Ok(())
827    }
828}