Skip to main content

velesdb_core/collection/core/
lifecycle.rs

1//! Collection lifecycle methods (create, open, flush).
2
3use crate::collection::graph::{EdgeStore, PropertyIndex, RangeIndex};
4use crate::collection::types::{Collection, CollectionConfig, CollectionType};
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::guardrails::GuardRails;
8use crate::index::{Bm25Index, HnswIndex};
9use crate::quantization::StorageMode;
10use crate::sparse_index::DEFAULT_SPARSE_INDEX_NAME;
11use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
12use crate::validation::validate_dimension;
13use crate::velesql::{QueryCache, QueryPlanner};
14
15use crate::index::sparse::SparseInvertedIndex;
16
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use parking_lot::{Mutex, RwLock};
20use std::path::PathBuf;
21use std::sync::Arc;
22
23/// Pre-built components needed to assemble a [`Collection`].
24///
25/// Used by [`Collection::assemble`] as the single point of truth for the
26/// struct literal, eliminating duplication across the five public constructors.
27struct CollectionParts {
28    path: PathBuf,
29    config: CollectionConfig,
30    vector_storage: Arc<RwLock<MmapStorage>>,
31    payload_storage: Arc<RwLock<LogPayloadStorage>>,
32    index: Arc<HnswIndex>,
33    text_index: Arc<Bm25Index>,
34    property_index: PropertyIndex,
35    range_index: RangeIndex,
36    edge_store: EdgeStore,
37    sparse_indexes: BTreeMap<String, SparseInvertedIndex>,
38}
39
40impl CollectionParts {
41    /// Returns a new `CollectionParts` with empty graph and sparse indexes.
42    ///
43    /// The six storage/index fields must be supplied by the caller; only the
44    /// four optional index fields (`property_index`, `range_index`,
45    /// `edge_store`, `sparse_indexes`) default to empty.
46    fn new_with_empty_indexes(
47        path: PathBuf,
48        config: CollectionConfig,
49        vector_storage: Arc<RwLock<MmapStorage>>,
50        payload_storage: Arc<RwLock<LogPayloadStorage>>,
51        index: Arc<HnswIndex>,
52        text_index: Arc<Bm25Index>,
53    ) -> Self {
54        Self {
55            path,
56            config,
57            vector_storage,
58            payload_storage,
59            index,
60            text_index,
61            property_index: PropertyIndex::new(),
62            range_index: RangeIndex::new(),
63            edge_store: EdgeStore::new(),
64            sparse_indexes: BTreeMap::new(),
65        }
66    }
67}
68
69impl Collection {
70    /// Assembles a `Collection` from pre-built components and default caches.
71    ///
72    /// This is the single point of truth for the `Self { .. }` struct literal,
73    /// eliminating duplication across the five public constructors.
74    fn assemble(parts: CollectionParts) -> Self {
75        #[cfg(feature = "persistence")]
76        let deferred_indexer = Self::build_deferred_indexer(&parts.config);
77
78        Self {
79            path: parts.path,
80            config: Arc::new(RwLock::new(parts.config)),
81            vector_storage: parts.vector_storage,
82            payload_storage: parts.payload_storage,
83            index: parts.index,
84            text_index: parts.text_index,
85            sq8_cache: Arc::new(RwLock::new(HashMap::new())),
86            binary_cache: Arc::new(RwLock::new(HashMap::new())),
87            pq_cache: Arc::new(RwLock::new(HashMap::new())),
88            pq_quantizer: Arc::new(RwLock::new(None)),
89            pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
90            property_index: Arc::new(RwLock::new(parts.property_index)),
91            range_index: Arc::new(RwLock::new(parts.range_index)),
92            edge_store: Arc::new(RwLock::new(parts.edge_store)),
93            sparse_indexes: Arc::new(RwLock::new(parts.sparse_indexes)),
94            secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
95            guard_rails: Arc::new(GuardRails::default()),
96            query_planner: Arc::new(QueryPlanner::new()),
97            query_cache: Arc::new(QueryCache::new(256)),
98            cached_stats: Arc::new(Mutex::new(None)),
99            write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100            inserts_since_last_hnsw_save: Arc::new(std::sync::atomic::AtomicU64::new(0)),
101            #[cfg(feature = "persistence")]
102            stream_ingester: Arc::new(RwLock::new(None)),
103            #[cfg(feature = "persistence")]
104            delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
105            #[cfg(feature = "persistence")]
106            deferred_indexer,
107        }
108    }
109
110    /// Builds the optional `DeferredIndexer` from config.
111    ///
112    /// Returns `Some(Arc<DeferredIndexer>)` when `deferred_indexing` is
113    /// configured and enabled; `None` otherwise.
114    #[cfg(feature = "persistence")]
115    fn build_deferred_indexer(
116        config: &CollectionConfig,
117    ) -> Option<Arc<crate::collection::streaming::DeferredIndexer>> {
118        config
119            .deferred_indexing
120            .as_ref()
121            .filter(|cfg| cfg.enabled)
122            .map(|cfg| {
123                Arc::new(crate::collection::streaming::DeferredIndexer::new(
124                    cfg.clone(),
125                ))
126            })
127    }
128
129    /// Initialises persistent storages and indexes for a new collection.
130    ///
131    /// Returns a complete `CollectionParts` with empty graph/sparse indexes,
132    /// ready to be passed to [`Self::assemble`].
133    fn init_collection_parts(
134        path: PathBuf,
135        config: CollectionConfig,
136        hnsw_params: Option<crate::index::hnsw::HnswParams>,
137    ) -> Result<CollectionParts> {
138        let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
139        let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
140        let index = if let Some(params) = hnsw_params {
141            Arc::new(HnswIndex::with_params(
142                config.dimension,
143                config.metric,
144                params,
145            )?)
146        } else {
147            Arc::new(HnswIndex::new(config.dimension, config.metric)?)
148        };
149        let text_index = Arc::new(Bm25Index::new());
150        Ok(CollectionParts::new_with_empty_indexes(
151            path,
152            config,
153            vector_storage,
154            payload_storage,
155            index,
156            text_index,
157        ))
158    }
159
160    /// Rebuilds the BM25 full-text index from persisted payloads.
161    fn rebuild_bm25_index(
162        payload_storage: &Arc<RwLock<LogPayloadStorage>>,
163        text_index: &Arc<Bm25Index>,
164    ) {
165        let storage = payload_storage.read();
166        let ids = storage.ids();
167        for id in ids {
168            if let Ok(Some(payload)) = storage.retrieve(id) {
169                let text = Self::extract_text_from_payload(&payload);
170                if !text.is_empty() {
171                    text_index.add_document(id, &text);
172                }
173            }
174        }
175    }
176
177    /// Creates a new collection at the specified path.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if the directory cannot be created or the config cannot be saved.
182    pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
183        Self::create_with_options(path, dimension, metric, StorageMode::default())
184    }
185
186    /// Derives the collection name from the directory path.
187    fn name_from_path(path: &std::path::Path) -> String {
188        path.file_name()
189            .and_then(|n| n.to_str())
190            .unwrap_or("unknown")
191            .to_string()
192    }
193
194    /// Shared init-and-persist pipeline for all `create_*` constructors.
195    ///
196    /// Validates dimensions (when non-zero), creates the directory, assembles
197    /// the collection from the supplied config, and persists `config.json`.
198    fn create_from_config(
199        path: PathBuf,
200        config: CollectionConfig,
201        hnsw_params: Option<crate::index::hnsw::HnswParams>,
202    ) -> Result<Self> {
203        // dimension=0 is valid for metadata-only and graph-without-embedding
204        let skip_dimension_check = config.metadata_only
205            || (config.graph_schema.is_some() && config.embedding_dimension.is_none());
206        if !skip_dimension_check {
207            validate_dimension(config.dimension)?;
208        }
209        std::fs::create_dir_all(&path)?;
210
211        let collection = Self::assemble(Self::init_collection_parts(path, config, hnsw_params)?);
212        collection.save_config()?;
213        Ok(collection)
214    }
215
216    /// Creates a new collection with custom storage options.
217    ///
218    /// # Arguments
219    ///
220    /// * `path` - Path to the collection directory
221    /// * `dimension` - Vector dimension
222    /// * `metric` - Distance metric
223    /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the directory cannot be created or the config cannot be saved.
228    pub fn create_with_options(
229        path: PathBuf,
230        dimension: usize,
231        metric: DistanceMetric,
232        storage_mode: StorageMode,
233    ) -> Result<Self> {
234        let config = CollectionConfig {
235            name: Self::name_from_path(&path),
236            dimension,
237            metric,
238            point_count: 0,
239            storage_mode,
240            metadata_only: false,
241            graph_schema: None,
242            embedding_dimension: None,
243            pq_rescore_oversampling: Some(4),
244            hnsw_params: None,
245            #[cfg(feature = "persistence")]
246            deferred_indexing: None,
247        };
248        Self::create_from_config(path, config, None)
249    }
250
251    /// Creates a new collection with custom HNSW parameters.
252    ///
253    /// This is the lowest-level vector collection constructor, giving full
254    /// control over the HNSW graph topology (M, `ef_construction`) while
255    /// retaining the standard storage pipeline.
256    ///
257    /// # Arguments
258    ///
259    /// * `path` - Path to the collection directory
260    /// * `dimension` - Vector dimension
261    /// * `metric` - Distance metric
262    /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
263    /// * `hnsw_params` - Custom HNSW index parameters
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the directory cannot be created or the config cannot be saved.
268    pub fn create_with_hnsw_params(
269        path: PathBuf,
270        dimension: usize,
271        metric: DistanceMetric,
272        storage_mode: StorageMode,
273        hnsw_params: crate::index::hnsw::HnswParams,
274    ) -> Result<Self> {
275        let config = CollectionConfig {
276            name: Self::name_from_path(&path),
277            dimension,
278            metric,
279            point_count: 0,
280            storage_mode,
281            metadata_only: false,
282            graph_schema: None,
283            embedding_dimension: None,
284            pq_rescore_oversampling: Some(4),
285            hnsw_params: Some(hnsw_params),
286            #[cfg(feature = "persistence")]
287            deferred_indexing: None,
288        };
289        Self::create_from_config(path, config, Some(hnsw_params))
290    }
291
292    /// Creates a new collection with a specific type (Vector or `MetadataOnly`).
293    ///
294    /// # Arguments
295    ///
296    /// * `path` - Path to the collection directory
297    /// * `name` - Name of the collection
298    /// * `collection_type` - Type of collection to create
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the directory cannot be created or the config cannot be saved.
303    pub fn create_typed(
304        path: PathBuf,
305        name: &str,
306        collection_type: &CollectionType,
307    ) -> Result<Self> {
308        match collection_type {
309            CollectionType::Vector {
310                dimension,
311                metric,
312                storage_mode,
313            } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
314            CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
315            CollectionType::Graph { .. } => {
316                // Graph collections will be implemented in EPIC-004
317                // For now, return an error indicating this is not yet supported
318                Err(crate::Error::GraphNotSupported(
319                    "Graph collection creation not yet implemented".to_string(),
320                ))
321            }
322        }
323    }
324
325    /// Creates a new metadata-only collection (no vectors, no HNSW index).
326    ///
327    /// Metadata-only collections are optimized for storing reference data,
328    /// catalogs, and other non-vector data. They support CRUD operations
329    /// and `VelesQL` queries on payload, but NOT vector search.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if the directory cannot be created or the config cannot be saved.
334    pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
335        let config = CollectionConfig {
336            name: name.to_string(),
337            dimension: 0,                   // No vector dimension
338            metric: DistanceMetric::Cosine, // Default, not used
339            point_count: 0,
340            storage_mode: StorageMode::Full, // Default, not used
341            metadata_only: true,
342            graph_schema: None,
343            embedding_dimension: None,
344            pq_rescore_oversampling: Some(4),
345            hnsw_params: None,
346            #[cfg(feature = "persistence")]
347            deferred_indexing: None,
348        };
349        Self::create_from_config(path, config, None)
350    }
351
352    /// Returns true if this is a metadata-only collection.
353    #[must_use]
354    pub fn is_metadata_only(&self) -> bool {
355        self.config.read().metadata_only
356    }
357
358    /// Opens an existing collection from the specified path.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if the config file cannot be read or parsed.
363    ///
364    /// # INVARIANT(CACHE-01): write_generation starts at 0 on open
365    ///
366    /// Every call to `Collection::open` initialises `write_generation` to 0.
367    /// This is **safe** for cache correctness because:
368    ///
369    /// 1. The plan cache is **not persisted** across process restarts — it is
370    ///    always empty when the database opens. There are therefore no stale
371    ///    cached plans that could be incorrectly served.
372    ///
373    /// 2. `Database::load_collections` bumps `schema_version` after loading
374    ///    at least one collection (C-3). Any plan key built before the load
375    ///    would carry the pre-load `schema_version` and would miss the cache
376    ///    even if the `write_generation` happened to match.
377    ///
378    /// 3. Within a single process lifetime the `write_generation` is only ever
379    ///    incremented (never reset), so a cache key built with generation N
380    ///    will never be reused once the generation advances past N.
381    pub fn open(path: PathBuf) -> Result<Self> {
382        let mut config = Self::load_config(&path)?;
383
384        let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
385        let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
386        let index = Self::load_or_create_hnsw(&path, &config)?;
387        let text_index = Arc::new(Bm25Index::new());
388
389        Self::rebuild_bm25_index(&payload_storage, &text_index);
390
391        let property_index = Self::load_property_index(&path);
392        let range_index = Self::load_range_index(&path);
393        let edge_store = Self::load_edge_store(&path);
394        let sparse_indexes = Self::load_named_sparse_indexes(&path);
395
396        config.point_count =
397            Self::reconcile_point_count(&config, &vector_storage, &payload_storage);
398
399        Self::run_crash_recovery(&config, &vector_storage, &index)?;
400
401        Ok(Self::assemble(CollectionParts {
402            path,
403            config,
404            vector_storage,
405            payload_storage,
406            index,
407            text_index,
408            property_index,
409            range_index,
410            edge_store,
411            sparse_indexes,
412        }))
413    }
414
415    /// Reads and deserializes the collection config from disk.
416    fn load_config(path: &std::path::Path) -> Result<CollectionConfig> {
417        let config_path = path.join("config.json");
418        let config_data = std::fs::read_to_string(&config_path)?;
419        serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))
420    }
421
422    /// Reconciles `point_count` from the actual storage (config.json may be
423    /// stale if the previous process exited without calling `save_config`).
424    fn reconcile_point_count(
425        config: &CollectionConfig,
426        vector_storage: &Arc<RwLock<MmapStorage>>,
427        payload_storage: &Arc<RwLock<LogPayloadStorage>>,
428    ) -> usize {
429        if config.metadata_only {
430            payload_storage.read().ids().len()
431        } else {
432            vector_storage.read().len()
433        }
434    }
435
436    /// Runs crash recovery: detects vectors in storage but not in HNSW (gap
437    /// from crash during deferred merge, delta drain, or normal insert).
438    #[cfg(feature = "persistence")]
439    fn run_crash_recovery(
440        config: &CollectionConfig,
441        vector_storage: &Arc<RwLock<MmapStorage>>,
442        index: &Arc<HnswIndex>,
443    ) -> Result<()> {
444        if config.metadata_only || config.dimension == 0 {
445            return Ok(());
446        }
447        let recovered = super::recovery::recover_hnsw_gap(vector_storage, index, config.dimension)?;
448        if recovered > 0 {
449            tracing::info!(
450                collection = %config.name,
451                recovered,
452                "Collection gap recovery completed on open"
453            );
454        }
455        Ok(())
456    }
457
458    /// No-op stub when persistence is disabled.
459    #[cfg(not(feature = "persistence"))]
460    fn run_crash_recovery(
461        _config: &CollectionConfig,
462        _vector_storage: &Arc<RwLock<MmapStorage>>,
463        _index: &Arc<HnswIndex>,
464    ) -> Result<()> {
465        Ok(())
466    }
467
468    /// Creates a new graph collection (with optional node embeddings).
469    ///
470    /// Persists `graph_schema` and `embedding_dimension` in `config.json`.
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if the directory cannot be created or the config cannot be saved.
475    pub fn create_graph_collection(
476        path: PathBuf,
477        name: &str,
478        schema: crate::collection::graph::GraphSchema,
479        embedding_dim: Option<usize>,
480        metric: DistanceMetric,
481    ) -> Result<Self> {
482        let config = CollectionConfig {
483            name: name.to_string(),
484            dimension: embedding_dim.unwrap_or(0),
485            metric,
486            point_count: 0,
487            storage_mode: StorageMode::Full,
488            metadata_only: false,
489            graph_schema: Some(schema),
490            embedding_dimension: embedding_dim,
491            pq_rescore_oversampling: Some(4),
492            hnsw_params: None,
493            #[cfg(feature = "persistence")]
494            deferred_indexing: None,
495        };
496        // NOTE: create_from_config validates dimension only when > 0,
497        // so embedding_dim=None (dimension=0) skips validation correctly.
498        Self::create_from_config(path, config, None)
499    }
500
501    /// Loads the HNSW index from `hnsw.bin` or creates an empty one.
502    ///
503    /// When `hnsw.bin` is absent and `config.hnsw_params` is set, the
504    /// persisted custom params are honoured so they survive collection reopen.
505    fn load_or_create_hnsw(
506        path: &std::path::Path,
507        config: &CollectionConfig,
508    ) -> Result<Arc<HnswIndex>> {
509        if path.join("hnsw.bin").exists() {
510            let idx = HnswIndex::load(path, config.dimension, config.metric)?;
511            Ok(Arc::new(idx))
512        } else if let Some(params) = config.hnsw_params {
513            Ok(Arc::new(HnswIndex::with_params(
514                config.dimension,
515                config.metric,
516                params,
517            )?))
518        } else {
519            Ok(Arc::new(HnswIndex::new(config.dimension, config.metric)?))
520        }
521    }
522
523    /// Loads all named sparse indexes from disk.
524    ///
525    /// Scans for `sparse.meta` (default name `""`) and `sparse-{name}.meta` files.
526    /// Returns a `BTreeMap` keyed by sparse vector name.
527    ///
528    /// # Concurrency safety of `read_dir`
529    ///
530    /// The `read_dir` scan below is safe from race conditions for two reasons:
531    ///
532    /// 1. **Single-threaded open**: `Collection::open` (and therefore this
533    ///    function) is always called from `Database::open`, which runs
534    ///    single-threaded during startup. No concurrent writers exist at this
535    ///    point.
536    ///
537    /// 2. **Atomic rename in compaction**: `compact_with_prefix` writes new
538    ///    data to `{prefix}.*.tmp` staging files and only promotes them to
539    ///    their final names via an atomic `rename(2)`. A `read_dir` scan
540    ///    therefore never observes a partially-written `sparse-*.meta` file;
541    ///    it either sees the complete previous version or the complete new
542    ///    version — never a torn write.
543    fn load_named_sparse_indexes(
544        path: &std::path::Path,
545    ) -> BTreeMap<String, crate::index::sparse::SparseInvertedIndex> {
546        let mut indexes = BTreeMap::new();
547
548        // Load default (unprefixed) sparse index: sparse.meta / sparse.wal
549        match crate::index::sparse::persistence::load_from_disk(path) {
550            Ok(Some(idx)) => {
551                indexes.insert(DEFAULT_SPARSE_INDEX_NAME.to_string(), idx);
552            }
553            Ok(None) => {}
554            Err(e) => {
555                tracing::warn!(
556                    "Failed to load default sparse index from {:?}: {}. Skipping.",
557                    path,
558                    e
559                );
560            }
561        }
562
563        // Scan for named sparse indexes: sparse-{name}.meta files.
564        // The `.meta` suffix is the sentinel for a fully compacted (committed)
565        // index file; stale `.tmp` artefacts from interrupted compactions are
566        // ignored because they do not match the `strip_suffix(".meta")` filter.
567        if let Ok(entries) = std::fs::read_dir(path) {
568            for entry in entries.flatten() {
569                let file_name = entry.file_name();
570                let name_str = file_name.to_string_lossy();
571                if let Some(sparse_name) = name_str
572                    .strip_prefix("sparse-")
573                    .and_then(|s| s.strip_suffix(".meta"))
574                {
575                    let sparse_name = sparse_name.to_string();
576                    match crate::index::sparse::persistence::load_named_from_disk(
577                        path,
578                        &sparse_name,
579                    ) {
580                        Ok(Some(idx)) => {
581                            indexes.insert(sparse_name, idx);
582                        }
583                        Ok(None) => {}
584                        Err(e) => {
585                            tracing::warn!(
586                                "Failed to load sparse index '{}' from {:?}: {}. Skipping.",
587                                sparse_name,
588                                path,
589                                e
590                            );
591                        }
592                    }
593                }
594            }
595        }
596
597        indexes
598    }
599
600    /// Loads a persisted index from disk, falling back to a default on missing
601    /// file or deserialization error.
602    ///
603    /// This is the single implementation for the load-or-default pattern shared
604    /// by `PropertyIndex`, `RangeIndex`, and `EdgeStore`.
605    fn load_or_default<T>(
606        path: &std::path::Path,
607        file_name: &str,
608        load_fn: impl FnOnce(&std::path::Path) -> std::io::Result<T>,
609        default: impl FnOnce() -> T,
610    ) -> T {
611        let full_path = path.join(file_name);
612        if full_path.exists() {
613            match load_fn(&full_path) {
614                Ok(val) => return val,
615                Err(e) => tracing::warn!(
616                    "Failed to load {} from {:?}: {}. Starting with empty default.",
617                    file_name,
618                    full_path,
619                    e
620                ),
621            }
622        }
623        default()
624    }
625
626    fn load_edge_store(path: &std::path::Path) -> EdgeStore {
627        Self::load_or_default(
628            path,
629            "edge_store.bin",
630            EdgeStore::load_from_file,
631            EdgeStore::new,
632        )
633    }
634
635    fn load_property_index(path: &std::path::Path) -> PropertyIndex {
636        Self::load_or_default(
637            path,
638            "property_index.bin",
639            PropertyIndex::load_from_file,
640            PropertyIndex::new,
641        )
642    }
643
644    fn load_range_index(path: &std::path::Path) -> RangeIndex {
645        Self::load_or_default(
646            path,
647            "range_index.bin",
648            RangeIndex::load_from_file,
649            RangeIndex::new,
650        )
651    }
652
653    /// Returns a reference to the collection's guard rails.
654    #[must_use]
655    pub fn guard_rails(&self) -> &std::sync::Arc<crate::guardrails::GuardRails> {
656        &self.guard_rails
657    }
658
659    /// Returns the collection configuration.
660    #[must_use]
661    pub fn config(&self) -> CollectionConfig {
662        self.config.read().clone()
663    }
664
665    /// Issue #423 Component 3: Threshold for periodic HNSW save in `flush()`.
666    ///
667    /// When `inserts_since_last_hnsw_save` exceeds this value, `flush()`
668    /// saves the HNSW graph as a safety measure to limit recovery time.
669    const HNSW_SAVE_THRESHOLD: u64 = 10_000;
670
671    /// Fast durability flush — persists WAL + mmap but defers HNSW save.
672    ///
673    /// Issue #423 Component 3: `index.save()` is skipped unless the insert
674    /// counter exceeds [`Self::HNSW_SAVE_THRESHOLD`]. Gap recovery on
675    /// `Collection::open()` handles missing/stale HNSW data.
676    ///
677    /// Use [`flush_full()`](Self::flush_full) for shutdown or compaction.
678    ///
679    /// # Errors
680    ///
681    /// Returns an error if storage operations fail.
682    pub fn flush(&self) -> Result<()> {
683        self.save_config()?;
684        // Issue #423: vector_storage.flush() is now a fast path (WAL + mmap
685        // only, no vectors.idx serialization). The WAL provides crash recovery
686        // even with a stale index file.
687        self.vector_storage.write().flush()?;
688        self.payload_storage.write().flush()?;
689        // Drain delta buffer into HNSW before persisting the index.
690        // Lock order: delta_buffer(10) is acquired after vector_storage(2)
691        // and payload_storage(3) — both already released above.
692        self.drain_delta_into_index();
693        // Drain deferred indexer into HNSW (position 11, after delta at 10).
694        self.drain_deferred_into_index();
695        // Issue #423 Component 3: Save HNSW only when insert threshold
696        // exceeded. Otherwise defer to flush_full() (shutdown/compaction).
697        self.save_hnsw_if_threshold_exceeded()?;
698        self.flush_secondary_indexes()?;
699        self.flush_sparse_indexes()
700    }
701
702    /// Full durability flush including HNSW save and `vectors.idx`.
703    ///
704    /// Issue #423: This is equivalent to the pre-#423 `flush()` behavior.
705    /// Use on graceful shutdown or before compaction to ensure the HNSW
706    /// graph and vector index file are up-to-date, avoiding gap recovery
707    /// and WAL replay on the next startup.
708    ///
709    /// # Errors
710    ///
711    /// Returns an error if storage operations fail.
712    pub fn flush_full(&self) -> Result<()> {
713        self.save_config()?;
714        self.vector_storage.write().flush()?;
715        self.payload_storage.write().flush()?;
716        self.drain_delta_into_index();
717        self.drain_deferred_into_index();
718        // Always save HNSW on full flush and reset the counter.
719        self.index.save(&self.path)?;
720        self.inserts_since_last_hnsw_save
721            .store(0, std::sync::atomic::Ordering::Relaxed);
722        self.flush_secondary_indexes()?;
723        self.flush_sparse_indexes()?;
724        // Write the deferred vectors.idx after all other flush steps.
725        self.vector_storage.read().flush_index()?;
726        Ok(())
727    }
728
729    /// Saves HNSW to disk only when the insert counter exceeds the threshold.
730    ///
731    /// Issue #423 Component 3: periodic safety save to limit crash recovery
732    /// time for high-throughput workloads.
733    fn save_hnsw_if_threshold_exceeded(&self) -> Result<()> {
734        let count = self
735            .inserts_since_last_hnsw_save
736            .load(std::sync::atomic::Ordering::Relaxed);
737        if count > Self::HNSW_SAVE_THRESHOLD {
738            self.index.save(&self.path)?;
739            self.inserts_since_last_hnsw_save
740                .store(0, std::sync::atomic::Ordering::Relaxed);
741        }
742        Ok(())
743    }
744
745    /// Drains the delta buffer into the HNSW index (if active).
746    ///
747    /// No-op when the delta buffer is inactive (no rebuild in progress).
748    /// After draining, the buffer is empty and inactive.
749    ///
750    /// Filters out IDs that have been deleted from vector storage since they
751    /// were buffered, preventing ghost vectors from being re-inserted into
752    /// HNSW after a concurrent delete.
753    ///
754    /// Uses `insert_batch_parallel` for consistent batch insert performance
755    /// (same strategy as `merge_deferred_batch` in crud.rs).
756    ///
757    /// # Lock ordering
758    ///
759    /// Acquires `vector_storage` (position 2) briefly for the validity
760    /// check, releases it, then inserts into the index (no lock).
761    /// `delta_buffer` (position 10) is acquired first via `deactivate_and_drain`.
762    /// The caller must NOT hold any lower-numbered lock when calling this method.
763    #[cfg(feature = "persistence")]
764    fn drain_delta_into_index(&self) {
765        let drained = self.delta_buffer.deactivate_and_drain();
766        if drained.is_empty() {
767            return;
768        }
769        // Filter out vectors deleted from storage during the buffer's
770        // lifetime to prevent ghost re-insertion into HNSW.
771        let storage = self.vector_storage.read();
772        let valid: Vec<(u64, &[f32])> = drained
773            .iter()
774            .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
775            .map(|(id, v)| (*id, v.as_slice()))
776            .collect();
777        drop(storage); // Release read lock before batch insert
778        if !valid.is_empty() {
779            self.index.insert_batch_parallel(valid);
780        }
781    }
782
783    /// No-op stub when persistence is disabled.
784    #[cfg(not(feature = "persistence"))]
785    fn drain_delta_into_index(&self) {}
786
787    /// Drains the deferred indexer into the HNSW index (if configured).
788    ///
789    /// No-op when deferred indexing is not configured or disabled.
790    /// After draining, both buffers are empty and inactive.
791    ///
792    /// Filters out IDs that have been deleted from vector storage since they
793    /// were buffered, preventing ghost vectors from being re-inserted into
794    /// HNSW after a concurrent delete.
795    ///
796    /// Uses `insert_batch_parallel` for consistent batch insert performance
797    /// (same strategy as `merge_deferred_batch` in crud.rs).
798    ///
799    /// # Lock ordering
800    ///
801    /// Acquires `vector_storage` (position 2) briefly for the validity
802    /// check, releases it, then inserts into the index (no lock).
803    /// `deferred_indexer` (position 11) is acquired first via `drain_all`.
804    /// The caller must NOT hold any lower-numbered lock.
805    #[cfg(feature = "persistence")]
806    fn drain_deferred_into_index(&self) {
807        if let Some(ref di) = self.deferred_indexer {
808            let drained = di.drain_all();
809            if drained.is_empty() {
810                return;
811            }
812            // Filter out vectors deleted from storage during the buffer's
813            // lifetime to prevent ghost re-insertion into HNSW.
814            let storage = self.vector_storage.read();
815            let valid: Vec<(u64, &[f32])> = drained
816                .iter()
817                .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
818                .map(|(id, v)| (*id, v.as_slice()))
819                .collect();
820            drop(storage); // Release read lock before batch insert
821            if !valid.is_empty() {
822                self.index.insert_batch_parallel(valid);
823            }
824        }
825    }
826
827    /// No-op stub when persistence is disabled.
828    #[cfg(not(feature = "persistence"))]
829    fn drain_deferred_into_index(&self) {}
830
831    /// Persists property index, range index, and edge store (EPIC-009 US-005).
832    fn flush_secondary_indexes(&self) -> Result<()> {
833        let property_index_path = self.path.join("property_index.bin");
834        self.property_index
835            .read()
836            .save_to_file(&property_index_path)?;
837
838        let range_index_path = self.path.join("range_index.bin");
839        self.range_index.read().save_to_file(&range_index_path)?;
840
841        // Save EdgeStore for graph collections (BUG-1: was never persisted)
842        if self.config.read().graph_schema.is_some() {
843            let edge_store_path = self.path.join("edge_store.bin");
844            self.edge_store.read().save_to_file(&edge_store_path)?;
845        }
846
847        Ok(())
848    }
849
850    /// Compacts all named sparse indexes to disk (EPIC-062 / SPARSE-04).
851    fn flush_sparse_indexes(&self) -> Result<()> {
852        let indexes = self.sparse_indexes.read();
853        for (name, idx) in indexes.iter() {
854            crate::index::sparse::persistence::compact_named(&self.path, name, idx)?;
855        }
856        Ok(())
857    }
858
859    /// Returns a reference to the collection's data path.
860    #[must_use]
861    pub(crate) fn data_path(&self) -> &std::path::Path {
862        &self.path
863    }
864
865    /// Returns a write guard on the collection config for mutation.
866    pub(crate) fn config_write(
867        &self,
868    ) -> parking_lot::RwLockWriteGuard<'_, crate::collection::types::CollectionConfig> {
869        self.config.write()
870    }
871
872    /// Returns a write guard on the PQ quantizer slot.
873    pub(crate) fn pq_quantizer_write(
874        &self,
875    ) -> parking_lot::RwLockWriteGuard<'_, Option<crate::quantization::ProductQuantizer>> {
876        self.pq_quantizer.write()
877    }
878
879    /// Returns a read guard on the PQ quantizer slot.
880    pub(crate) fn pq_quantizer_read(
881        &self,
882    ) -> parking_lot::RwLockReadGuard<'_, Option<crate::quantization::ProductQuantizer>> {
883        self.pq_quantizer.read()
884    }
885
886    /// Saves the collection configuration to disk.
887    ///
888    /// Uses atomic write-tmp-fsync-rename to prevent torn writes on crash.
889    pub(crate) fn save_config(&self) -> Result<()> {
890        use std::io::Write;
891
892        let config = self.config.read();
893        let config_path = self.path.join("config.json");
894        let tmp_path = self.path.join("config.json.tmp");
895        let config_data = serde_json::to_string_pretty(&*config)
896            .map_err(|e| Error::Serialization(e.to_string()))?;
897
898        let file = std::fs::File::create(&tmp_path)?;
899        let mut writer = std::io::BufWriter::new(file);
900        writer.write_all(config_data.as_bytes())?;
901        writer.flush()?;
902        writer.get_ref().sync_all()?;
903        std::fs::rename(&tmp_path, &config_path)?;
904        Ok(())
905    }
906}