Skip to main content

velesdb_core/database/
persistence.rs

1//! Collection loading from disk at database startup.
2
3use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
4use crate::Result;
5
6use super::Database;
7
8impl Database {
9    /// Loads existing collections from disk.
10    ///
11    /// # Deprecation note
12    ///
13    /// **This method is called automatically by [`Database::open`].**
14    /// There is no need to call it manually. It is kept public only for
15    /// backward compatibility with code that relied on the old two-step pattern.
16    ///
17    /// # Errors
18    ///
19    /// Returns an error if collection directories cannot be read.
20    pub fn load_collections(&self) -> Result<()> {
21        let mut loaded_count: usize = 0;
22
23        for entry in std::fs::read_dir(&self.data_dir)? {
24            let entry = entry?;
25            if let Some(name) = self.loadable_collection_name(&entry) {
26                if self.try_load_single_collection(&entry.path(), &name) {
27                    loaded_count += 1;
28                }
29            }
30        }
31
32        // Bump schema_version if at least one collection was loaded from disk (C-3).
33        //
34        // This ensures that any plan key built before load_collections() ran
35        // (schema_version = 0) will never match a key built after it
36        // (schema_version >= 1), preventing the plan cache from serving a stale
37        // plan for a collection that was not yet visible in the registry.
38        if loaded_count > 0 {
39            self.schema_version
40                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41        }
42
43        Ok(())
44    }
45
46    /// Returns the collection name if the directory entry is a loadable collection.
47    ///
48    /// A directory is loadable when it contains `config.json`, has a valid
49    /// collection name, and is not already registered in any typed registry.
50    /// Directories with invalid names are skipped with a warning.
51    fn loadable_collection_name(&self, entry: &std::fs::DirEntry) -> Option<String> {
52        let path = entry.path();
53        if !path.is_dir() || !path.join("config.json").exists() {
54            return None;
55        }
56        let name = Self::validated_collection_name(&path)?;
57        if self.is_already_registered(&name) {
58            return None;
59        }
60        Some(name)
61    }
62
63    /// Extracts and validates the collection name from a directory path.
64    ///
65    /// Returns `None` if the directory name is not valid UTF-8 or fails
66    /// collection name validation (logged as a warning).
67    fn validated_collection_name(path: &std::path::Path) -> Option<String> {
68        let name = path.file_name()?.to_str()?.to_string();
69        if crate::validation::validate_collection_name(&name).is_err() {
70            tracing::warn!(
71                name = %name,
72                path = %path.display(),
73                "Skipping directory with invalid collection name"
74            );
75            return None;
76        }
77        Some(name)
78    }
79
80    /// Checks whether a collection name is already registered in any typed registry.
81    fn is_already_registered(&self, name: &str) -> bool {
82        self.vector_colls.read().contains_key(name)
83            || self.graph_colls.read().contains_key(name)
84            || self.metadata_colls.read().contains_key(name)
85    }
86
87    /// Attempts to load a single collection directory, returning `true` on success.
88    fn try_load_single_collection(&self, path: &std::path::Path, name: &str) -> bool {
89        let config_path = path.join("config.json");
90
91        // Read config to determine the concrete type before opening.
92        let cfg_data = match std::fs::read_to_string(&config_path) {
93            Ok(d) => d,
94            Err(e) => {
95                tracing::warn!(error = %e, name, "Cannot read config.json — skipping");
96                return false;
97            }
98        };
99        let cfg = match serde_json::from_str::<crate::collection::CollectionConfig>(&cfg_data) {
100            Ok(c) => c,
101            Err(e) => {
102                tracing::warn!(error = %e, name, "Cannot parse config.json — skipping");
103                return false;
104            }
105        };
106
107        if cfg.graph_schema.is_some() {
108            self.load_graph_collection(path, name)
109        } else if cfg.metadata_only {
110            self.load_metadata_collection(path, name)
111        } else {
112            self.load_vector_collection(path, name)
113        }
114    }
115
116    /// Loads a graph collection from disk, registering it in the typed registry.
117    fn load_graph_collection(&self, path: &std::path::Path, name: &str) -> bool {
118        self.try_open_and_register(path, name, "graph", |p| {
119            GraphCollection::open(p).map(TypedColl::Graph)
120        })
121    }
122
123    /// Loads a metadata collection from disk, registering it in the typed registry.
124    fn load_metadata_collection(&self, path: &std::path::Path, name: &str) -> bool {
125        self.try_open_and_register(path, name, "metadata", |p| {
126            MetadataCollection::open(p).map(TypedColl::Metadata)
127        })
128    }
129
130    /// Loads a vector collection from disk, registering it in the typed registry.
131    fn load_vector_collection(&self, path: &std::path::Path, name: &str) -> bool {
132        self.try_open_and_register(path, name, "vector", |p| {
133            VectorCollection::open(p).map(TypedColl::Vector)
134        })
135    }
136
137    /// Opens a collection from disk and registers it in the typed registry.
138    ///
139    /// The `open_fn` closure returns a `TypedColl` variant.
140    /// Returns `true` on success, `false` on failure (logged as warning).
141    fn try_open_and_register(
142        &self,
143        path: &std::path::Path,
144        name: &str,
145        kind: &str,
146        open_fn: impl FnOnce(std::path::PathBuf) -> crate::Result<TypedColl>,
147    ) -> bool {
148        match open_fn(path.to_path_buf()) {
149            Ok(typed) => {
150                // Parity item E: thread the live LimitsConfig caps into the
151                // startup-loaded collection (not persisted — re-pushed here).
152                self.push_runtime_limits(typed.inner());
153                typed.insert_into(
154                    &self.vector_colls,
155                    &self.graph_colls,
156                    &self.metadata_colls,
157                    name,
158                );
159                true
160            }
161            Err(e) => {
162                tracing::warn!(
163                    error = %e,
164                    name = %path.display(),
165                    "Failed to load {kind} collection"
166                );
167                false
168            }
169        }
170    }
171
172    /// Flushes all collections including `vectors.idx` serialization.
173    ///
174    /// Issue #423: Uses `flush_full()` to ensure the vector index file is
175    /// up-to-date, avoiding a full WAL replay on the next startup. This is
176    /// the correct method for graceful shutdown.
177    ///
178    /// Best-effort: logs warnings for individual flush failures but continues
179    /// flushing remaining collections. Returns the count of failures.
180    ///
181    /// The legacy `collections` registry is **not** iterated because it shares
182    /// the same `Arc`'d inner storage as the typed registries. Flushing both
183    /// would double-flush every collection, causing redundant I/O and
184    /// potentially double-counting failures.
185    pub fn flush_all(&self) -> usize {
186        let mut failures: usize = 0;
187
188        failures += flush_registry(&self.vector_colls, "vector");
189        failures += flush_registry(&self.graph_colls, "graph");
190        failures += flush_registry(&self.metadata_colls, "metadata");
191
192        failures
193    }
194}
195
196/// Discriminated union for the three typed collection registries.
197///
198/// Used by [`Database::try_open_and_register`] to route a freshly opened
199/// collection into the correct registry without duplicating match arms.
200enum TypedColl {
201    Vector(VectorCollection),
202    Graph(GraphCollection),
203    Metadata(MetadataCollection),
204}
205
206impl TypedColl {
207    /// Returns the wrapped `Collection` so the caller can push runtime limits
208    /// before the typed handle is moved into its registry (parity item E).
209    fn inner(&self) -> &crate::collection::Collection {
210        match self {
211            Self::Vector(c) => &c.inner,
212            Self::Graph(c) => &c.inner,
213            Self::Metadata(c) => &c.inner,
214        }
215    }
216
217    fn insert_into(
218        self,
219        vectors: &parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
220        graphs: &parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
221        metadata: &parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
222        name: &str,
223    ) {
224        match self {
225            Self::Vector(c) => {
226                vectors.write().insert(name.to_string(), c);
227            }
228            Self::Graph(c) => {
229                graphs.write().insert(name.to_string(), c);
230            }
231            Self::Metadata(c) => {
232                metadata.write().insert(name.to_string(), c);
233            }
234        }
235    }
236}
237
238/// Flushes all collections in a registry, logging failures. Returns failure count.
239fn flush_registry<T: Flushable>(
240    registry: &parking_lot::RwLock<std::collections::HashMap<String, T>>,
241    kind: &str,
242) -> usize {
243    let mut failures = 0;
244    for (name, coll) in registry.read().iter() {
245        if let Err(e) = coll.flush() {
246            tracing::warn!(
247                error = %e,
248                collection = %name,
249                "Failed to flush {kind} collection"
250            );
251            failures += 1;
252        }
253    }
254    failures
255}
256
257/// Internal trait for deduplicating `flush_all` iteration across collection types.
258///
259/// Issue #423: Uses `flush_full()` to include `vectors.idx` serialization,
260/// ensuring fast startup after graceful shutdown.
261trait Flushable {
262    fn flush(&self) -> crate::Result<()>;
263}
264
265impl Flushable for VectorCollection {
266    fn flush(&self) -> crate::Result<()> {
267        self.flush_full()
268    }
269}
270
271impl Flushable for GraphCollection {
272    fn flush(&self) -> crate::Result<()> {
273        self.flush_full()
274    }
275}
276
277impl Flushable for MetadataCollection {
278    fn flush(&self) -> crate::Result<()> {
279        self.flush_full()
280    }
281}