Skip to main content

velesdb_core/collection/core/
lifecycle.rs

1//! Collection lifecycle methods (create, open, flush).
2
3use crate::collection::graph::{EdgeStore, PropertyIndex, RangeIndex};
4use crate::collection::types::{Collection, CollectionConfig, CollectionType};
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::guardrails::GuardRails;
8use crate::index::{Bm25Index, HnswIndex};
9use crate::quantization::StorageMode;
10use crate::sparse_index::DEFAULT_SPARSE_INDEX_NAME;
11use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
12use crate::velesql::{QueryCache, QueryPlanner};
13
14use std::collections::{BTreeMap, HashMap, VecDeque};
15
16use parking_lot::{Mutex, RwLock};
17use std::path::PathBuf;
18use std::sync::Arc;
19
20impl Collection {
21    /// Creates a new collection at the specified path.
22    ///
23    /// # Errors
24    ///
25    /// Returns an error if the directory cannot be created or the config cannot be saved.
26    pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
27        Self::create_with_options(path, dimension, metric, StorageMode::default())
28    }
29
30    /// Creates a new collection with custom storage options.
31    ///
32    /// # Arguments
33    ///
34    /// * `path` - Path to the collection directory
35    /// * `dimension` - Vector dimension
36    /// * `metric` - Distance metric
37    /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
38    ///
39    /// # Errors
40    ///
41    /// Returns an error if the directory cannot be created or the config cannot be saved.
42    pub fn create_with_options(
43        path: PathBuf,
44        dimension: usize,
45        metric: DistanceMetric,
46        storage_mode: StorageMode,
47    ) -> Result<Self> {
48        std::fs::create_dir_all(&path)?;
49
50        let name = path
51            .file_name()
52            .and_then(|n| n.to_str())
53            .unwrap_or("unknown")
54            .to_string();
55
56        let config = CollectionConfig {
57            name,
58            dimension,
59            metric,
60            point_count: 0,
61            storage_mode,
62            metadata_only: false,
63            graph_schema: None,
64            embedding_dimension: None,
65            pq_rescore_oversampling: Some(4),
66        };
67
68        // Initialize persistent storages
69        let vector_storage = Arc::new(RwLock::new(
70            MmapStorage::new(&path, dimension).map_err(Error::Io)?,
71        ));
72
73        let payload_storage = Arc::new(RwLock::new(
74            LogPayloadStorage::new(&path).map_err(Error::Io)?,
75        ));
76
77        // Create HNSW index
78        let index = Arc::new(HnswIndex::new(dimension, metric));
79
80        // Create BM25 index for full-text search
81        let text_index = Arc::new(Bm25Index::new());
82
83        let collection = Self {
84            path,
85            config: Arc::new(RwLock::new(config)),
86            vector_storage,
87            payload_storage,
88            index,
89            text_index,
90            sq8_cache: Arc::new(RwLock::new(HashMap::new())),
91            binary_cache: Arc::new(RwLock::new(HashMap::new())),
92            pq_cache: Arc::new(RwLock::new(HashMap::new())),
93            pq_quantizer: Arc::new(RwLock::new(None)),
94            pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
95            property_index: Arc::new(RwLock::new(PropertyIndex::new())),
96            range_index: Arc::new(RwLock::new(RangeIndex::new())),
97            edge_store: Arc::new(RwLock::new(EdgeStore::new())),
98            sparse_indexes: Arc::new(RwLock::new(BTreeMap::new())),
99            secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
100            guard_rails: Arc::new(GuardRails::default()),
101            query_planner: Arc::new(QueryPlanner::new()),
102            query_cache: Arc::new(QueryCache::new(256)),
103            cached_stats: Arc::new(Mutex::new(None)),
104            write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
105            #[cfg(feature = "persistence")]
106            stream_ingester: Arc::new(RwLock::new(None)),
107            #[cfg(feature = "persistence")]
108            delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
109        };
110
111        collection.save_config()?;
112
113        Ok(collection)
114    }
115
116    /// Creates a new collection with a specific type (Vector or `MetadataOnly`).
117    ///
118    /// # Arguments
119    ///
120    /// * `path` - Path to the collection directory
121    /// * `name` - Name of the collection
122    /// * `collection_type` - Type of collection to create
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if the directory cannot be created or the config cannot be saved.
127    pub fn create_typed(
128        path: PathBuf,
129        name: &str,
130        collection_type: &CollectionType,
131    ) -> Result<Self> {
132        match collection_type {
133            CollectionType::Vector {
134                dimension,
135                metric,
136                storage_mode,
137            } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
138            CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
139            CollectionType::Graph { .. } => {
140                // Graph collections will be implemented in EPIC-004
141                // For now, return an error indicating this is not yet supported
142                Err(crate::Error::GraphNotSupported(
143                    "Graph collection creation not yet implemented".to_string(),
144                ))
145            }
146        }
147    }
148
149    /// Creates a new metadata-only collection (no vectors, no HNSW index).
150    ///
151    /// Metadata-only collections are optimized for storing reference data,
152    /// catalogs, and other non-vector data. They support CRUD operations
153    /// and `VelesQL` queries on payload, but NOT vector search.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the directory cannot be created or the config cannot be saved.
158    pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
159        std::fs::create_dir_all(&path)?;
160
161        let config = CollectionConfig {
162            name: name.to_string(),
163            dimension: 0,                   // No vector dimension
164            metric: DistanceMetric::Cosine, // Default, not used
165            point_count: 0,
166            storage_mode: StorageMode::Full, // Default, not used
167            metadata_only: true,
168            graph_schema: None,
169            embedding_dimension: None,
170            pq_rescore_oversampling: Some(4),
171        };
172
173        // For metadata-only, we only need payload storage
174        // Vector storage with dimension 0 won't allocate space
175        let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, 0).map_err(Error::Io)?));
176
177        let payload_storage = Arc::new(RwLock::new(
178            LogPayloadStorage::new(&path).map_err(Error::Io)?,
179        ));
180
181        // Create minimal HNSW index (won't be used)
182        let index = Arc::new(HnswIndex::new(0, DistanceMetric::Cosine));
183
184        // BM25 index for full-text search (still useful for metadata-only)
185        let text_index = Arc::new(Bm25Index::new());
186
187        let collection = Self {
188            path,
189            config: Arc::new(RwLock::new(config)),
190            vector_storage,
191            payload_storage,
192            index,
193            text_index,
194            sq8_cache: Arc::new(RwLock::new(HashMap::new())),
195            binary_cache: Arc::new(RwLock::new(HashMap::new())),
196            pq_cache: Arc::new(RwLock::new(HashMap::new())),
197            pq_quantizer: Arc::new(RwLock::new(None)),
198            pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
199            property_index: Arc::new(RwLock::new(PropertyIndex::new())),
200            range_index: Arc::new(RwLock::new(RangeIndex::new())),
201            edge_store: Arc::new(RwLock::new(EdgeStore::new())),
202            sparse_indexes: Arc::new(RwLock::new(BTreeMap::new())),
203            secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
204            guard_rails: Arc::new(GuardRails::default()),
205            query_planner: Arc::new(QueryPlanner::new()),
206            query_cache: Arc::new(QueryCache::new(256)),
207            cached_stats: Arc::new(Mutex::new(None)),
208            write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
209            #[cfg(feature = "persistence")]
210            stream_ingester: Arc::new(RwLock::new(None)),
211            #[cfg(feature = "persistence")]
212            delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
213        };
214
215        collection.save_config()?;
216
217        Ok(collection)
218    }
219
220    /// Returns true if this is a metadata-only collection.
221    #[must_use]
222    pub fn is_metadata_only(&self) -> bool {
223        self.config.read().metadata_only
224    }
225
226    /// Opens an existing collection from the specified path.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the config file cannot be read or parsed.
231    ///
232    /// # INVARIANT(CACHE-01): write_generation starts at 0 on open
233    ///
234    /// Every call to `Collection::open` initialises `write_generation` to 0.
235    /// This is **safe** for cache correctness because:
236    ///
237    /// 1. The plan cache is **not persisted** across process restarts — it is
238    ///    always empty when the database opens. There are therefore no stale
239    ///    cached plans that could be incorrectly served.
240    ///
241    /// 2. `Database::load_collections` bumps `schema_version` after loading
242    ///    at least one collection (C-3). Any plan key built before the load
243    ///    would carry the pre-load `schema_version` and would miss the cache
244    ///    even if the `write_generation` happened to match.
245    ///
246    /// 3. Within a single process lifetime the `write_generation` is only ever
247    ///    incremented (never reset), so a cache key built with generation N
248    ///    will never be reused once the generation advances past N.
249    pub fn open(path: PathBuf) -> Result<Self> {
250        let config_path = path.join("config.json");
251        let config_data = std::fs::read_to_string(&config_path)?;
252        let config: CollectionConfig =
253            serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
254
255        // Open persistent storages
256        let vector_storage = Arc::new(RwLock::new(
257            MmapStorage::new(&path, config.dimension).map_err(Error::Io)?,
258        ));
259
260        let payload_storage = Arc::new(RwLock::new(
261            LogPayloadStorage::new(&path).map_err(Error::Io)?,
262        ));
263
264        // Load HNSW index if it exists, otherwise create new (empty)
265        let index = if path.join("hnsw.bin").exists() {
266            Arc::new(HnswIndex::load(&path, config.dimension, config.metric).map_err(Error::Io)?)
267        } else {
268            Arc::new(HnswIndex::new(config.dimension, config.metric))
269        };
270
271        // Create and rebuild BM25 index from existing payloads
272        let text_index = Arc::new(Bm25Index::new());
273
274        // Rebuild BM25 index from persisted payloads
275        {
276            let storage = payload_storage.read();
277            let ids = storage.ids();
278            for id in ids {
279                if let Ok(Some(payload)) = storage.retrieve(id) {
280                    let text = Self::extract_text_from_payload(&payload);
281                    if !text.is_empty() {
282                        text_index.add_document(id, &text);
283                    }
284                }
285            }
286        }
287
288        // Load PropertyIndex and RangeIndex if they exist (EPIC-009 US-005)
289        let property_index = Self::load_property_index(&path);
290        let range_index = Self::load_range_index(&path);
291        // Load EdgeStore if present (graph collections -- D-02)
292        let edge_store = Self::load_edge_store(&path);
293        // Load named sparse indexes if present (EPIC-062 / SPARSE-04)
294        let sparse_indexes = Self::load_named_sparse_indexes(&path);
295
296        Ok(Self {
297            path,
298            config: Arc::new(RwLock::new(config)),
299            vector_storage,
300            payload_storage,
301            index,
302            text_index,
303            sq8_cache: Arc::new(RwLock::new(HashMap::new())),
304            binary_cache: Arc::new(RwLock::new(HashMap::new())),
305            pq_cache: Arc::new(RwLock::new(HashMap::new())),
306            pq_quantizer: Arc::new(RwLock::new(None)),
307            pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
308            property_index: Arc::new(RwLock::new(property_index)),
309            range_index: Arc::new(RwLock::new(range_index)),
310            edge_store: Arc::new(RwLock::new(edge_store)),
311            sparse_indexes: Arc::new(RwLock::new(sparse_indexes)),
312            secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
313            guard_rails: Arc::new(GuardRails::default()),
314            query_planner: Arc::new(QueryPlanner::new()),
315            query_cache: Arc::new(QueryCache::new(256)),
316            cached_stats: Arc::new(Mutex::new(None)),
317            write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
318            #[cfg(feature = "persistence")]
319            stream_ingester: Arc::new(RwLock::new(None)),
320            #[cfg(feature = "persistence")]
321            delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
322        })
323    }
324
325    /// Creates a new graph collection (with optional node embeddings).
326    ///
327    /// Persists `graph_schema` and `embedding_dimension` in `config.json`.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the directory cannot be created or the config cannot be saved.
332    pub fn create_graph_collection(
333        path: PathBuf,
334        name: &str,
335        schema: crate::collection::graph::GraphSchema,
336        embedding_dim: Option<usize>,
337        metric: DistanceMetric,
338    ) -> Result<Self> {
339        let dim = embedding_dim.unwrap_or(0);
340        std::fs::create_dir_all(&path)?;
341
342        let config = CollectionConfig {
343            name: name.to_string(),
344            dimension: dim,
345            metric,
346            point_count: 0,
347            storage_mode: StorageMode::Full,
348            metadata_only: false,
349            graph_schema: Some(schema),
350            embedding_dimension: embedding_dim,
351            pq_rescore_oversampling: Some(4),
352        };
353
354        let vector_storage = Arc::new(RwLock::new(
355            MmapStorage::new(&path, dim).map_err(Error::Io)?,
356        ));
357        let payload_storage = Arc::new(RwLock::new(
358            LogPayloadStorage::new(&path).map_err(Error::Io)?,
359        ));
360        let index = Arc::new(HnswIndex::new(dim, metric));
361        let text_index = Arc::new(Bm25Index::new());
362
363        let collection = Self {
364            path,
365            config: Arc::new(RwLock::new(config)),
366            vector_storage,
367            payload_storage,
368            index,
369            text_index,
370            sq8_cache: Arc::new(RwLock::new(HashMap::new())),
371            binary_cache: Arc::new(RwLock::new(HashMap::new())),
372            pq_cache: Arc::new(RwLock::new(HashMap::new())),
373            pq_quantizer: Arc::new(RwLock::new(None)),
374            pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
375            property_index: Arc::new(RwLock::new(PropertyIndex::new())),
376            range_index: Arc::new(RwLock::new(RangeIndex::new())),
377            edge_store: Arc::new(RwLock::new(EdgeStore::new())),
378            sparse_indexes: Arc::new(RwLock::new(BTreeMap::new())),
379            secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
380            guard_rails: Arc::new(GuardRails::default()),
381            query_planner: Arc::new(QueryPlanner::new()),
382            query_cache: Arc::new(QueryCache::new(256)),
383            cached_stats: Arc::new(Mutex::new(None)),
384            write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
385            #[cfg(feature = "persistence")]
386            stream_ingester: Arc::new(RwLock::new(None)),
387            #[cfg(feature = "persistence")]
388            delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
389        };
390
391        collection.save_config()?;
392        Ok(collection)
393    }
394
395    /// Loads all named sparse indexes from disk.
396    ///
397    /// Scans for `sparse.meta` (default name `""`) and `sparse-{name}.meta` files.
398    /// Returns a `BTreeMap` keyed by sparse vector name.
399    ///
400    /// # Concurrency safety of `read_dir`
401    ///
402    /// The `read_dir` scan below is safe from race conditions for two reasons:
403    ///
404    /// 1. **Single-threaded open**: `Collection::open` (and therefore this
405    ///    function) is always called from `Database::open`, which runs
406    ///    single-threaded during startup. No concurrent writers exist at this
407    ///    point.
408    ///
409    /// 2. **Atomic rename in compaction**: `compact_with_prefix` writes new
410    ///    data to `{prefix}.*.tmp` staging files and only promotes them to
411    ///    their final names via an atomic `rename(2)`. A `read_dir` scan
412    ///    therefore never observes a partially-written `sparse-*.meta` file;
413    ///    it either sees the complete previous version or the complete new
414    ///    version — never a torn write.
415    fn load_named_sparse_indexes(
416        path: &std::path::Path,
417    ) -> BTreeMap<String, crate::index::sparse::SparseInvertedIndex> {
418        let mut indexes = BTreeMap::new();
419
420        // Load default (unprefixed) sparse index: sparse.meta / sparse.wal
421        match crate::index::sparse::persistence::load_from_disk(path) {
422            Ok(Some(idx)) => {
423                indexes.insert(DEFAULT_SPARSE_INDEX_NAME.to_string(), idx);
424            }
425            Ok(None) => {}
426            Err(e) => {
427                tracing::warn!(
428                    "Failed to load default sparse index from {:?}: {}. Skipping.",
429                    path,
430                    e
431                );
432            }
433        }
434
435        // Scan for named sparse indexes: sparse-{name}.meta files.
436        // The `.meta` suffix is the sentinel for a fully compacted (committed)
437        // index file; stale `.tmp` artefacts from interrupted compactions are
438        // ignored because they do not match the `strip_suffix(".meta")` filter.
439        if let Ok(entries) = std::fs::read_dir(path) {
440            for entry in entries.flatten() {
441                let file_name = entry.file_name();
442                let name_str = file_name.to_string_lossy();
443                if let Some(sparse_name) = name_str
444                    .strip_prefix("sparse-")
445                    .and_then(|s| s.strip_suffix(".meta"))
446                {
447                    let sparse_name = sparse_name.to_string();
448                    match crate::index::sparse::persistence::load_named_from_disk(
449                        path,
450                        &sparse_name,
451                    ) {
452                        Ok(Some(idx)) => {
453                            indexes.insert(sparse_name, idx);
454                        }
455                        Ok(None) => {}
456                        Err(e) => {
457                            tracing::warn!(
458                                "Failed to load sparse index '{}' from {:?}: {}. Skipping.",
459                                sparse_name,
460                                path,
461                                e
462                            );
463                        }
464                    }
465                }
466            }
467        }
468
469        indexes
470    }
471
472    fn load_edge_store(path: &std::path::Path) -> EdgeStore {
473        let edge_path = path.join("edge_store.bin");
474        if edge_path.exists() {
475            match EdgeStore::load_from_file(&edge_path) {
476                Ok(store) => return store,
477                Err(e) => tracing::warn!(
478                    "Failed to load EdgeStore from {:?}: {}. Starting empty.",
479                    edge_path,
480                    e
481                ),
482            }
483        }
484        EdgeStore::new()
485    }
486
487    fn load_property_index(path: &std::path::Path) -> PropertyIndex {
488        let index_path = path.join("property_index.bin");
489        if index_path.exists() {
490            match PropertyIndex::load_from_file(&index_path) {
491                Ok(idx) => return idx,
492                Err(e) => tracing::warn!(
493                    "Failed to load PropertyIndex from {:?}: {}. Starting with empty index.",
494                    index_path,
495                    e
496                ),
497            }
498        }
499        PropertyIndex::new()
500    }
501
502    fn load_range_index(path: &std::path::Path) -> RangeIndex {
503        let index_path = path.join("range_index.bin");
504        if index_path.exists() {
505            match RangeIndex::load_from_file(&index_path) {
506                Ok(idx) => return idx,
507                Err(e) => tracing::warn!(
508                    "Failed to load RangeIndex from {:?}: {}. Starting with empty index.",
509                    index_path,
510                    e
511                ),
512            }
513        }
514        RangeIndex::new()
515    }
516
517    /// Returns the collection configuration.
518    #[must_use]
519    pub fn config(&self) -> CollectionConfig {
520        self.config.read().clone()
521    }
522
523    /// Saves the collection configuration and index to disk.
524    ///
525    /// # Errors
526    ///
527    /// Returns an error if storage operations fail.
528    pub fn flush(&self) -> Result<()> {
529        self.save_config()?;
530        self.vector_storage.write().flush().map_err(Error::Io)?;
531        self.payload_storage.write().flush().map_err(Error::Io)?;
532        self.index.save(&self.path).map_err(Error::Io)?;
533
534        // Save PropertyIndex (EPIC-009 US-005)
535        let property_index_path = self.path.join("property_index.bin");
536        self.property_index
537            .read()
538            .save_to_file(&property_index_path)
539            .map_err(Error::Io)?;
540
541        // Save RangeIndex (EPIC-009 US-005)
542        let range_index_path = self.path.join("range_index.bin");
543        self.range_index
544            .read()
545            .save_to_file(&range_index_path)
546            .map_err(Error::Io)?;
547
548        // Save EdgeStore for graph collections (BUG-1: was never persisted)
549        if self.config.read().graph_schema.is_some() {
550            let edge_store_path = self.path.join("edge_store.bin");
551            self.edge_store
552                .read()
553                .save_to_file(&edge_store_path)
554                .map_err(Error::Io)?;
555        }
556
557        // Compact all named sparse indexes to disk (EPIC-062 / SPARSE-04)
558        {
559            let indexes = self.sparse_indexes.read();
560            for (name, idx) in indexes.iter() {
561                crate::index::sparse::persistence::compact_named(&self.path, name, idx)?;
562            }
563        }
564
565        Ok(())
566    }
567
568    /// Returns a reference to the collection's data path.
569    #[must_use]
570    pub(crate) fn data_path(&self) -> &std::path::Path {
571        &self.path
572    }
573
574    /// Returns a write guard on the collection config for mutation.
575    pub(crate) fn config_write(
576        &self,
577    ) -> parking_lot::RwLockWriteGuard<'_, crate::collection::types::CollectionConfig> {
578        self.config.write()
579    }
580
581    /// Returns a write guard on the PQ quantizer slot.
582    pub(crate) fn pq_quantizer_write(
583        &self,
584    ) -> parking_lot::RwLockWriteGuard<'_, Option<crate::quantization::ProductQuantizer>> {
585        self.pq_quantizer.write()
586    }
587
588    /// Returns a read guard on the PQ quantizer slot.
589    pub(crate) fn pq_quantizer_read(
590        &self,
591    ) -> parking_lot::RwLockReadGuard<'_, Option<crate::quantization::ProductQuantizer>> {
592        self.pq_quantizer.read()
593    }
594
595    /// Saves the collection configuration to disk.
596    pub(crate) fn save_config(&self) -> Result<()> {
597        let config = self.config.read();
598        let config_path = self.path.join("config.json");
599        let config_data = serde_json::to_string_pretty(&*config)
600            .map_err(|e| Error::Serialization(e.to_string()))?;
601        std::fs::write(config_path, config_data)?;
602        Ok(())
603    }
604}