Skip to main content

velesdb_core/database/
persistence.rs

1//! Collection loading from disk at database startup.
2
3use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
4use crate::Result;
5
6use super::Database;
7
8impl Database {
9    /// Loads existing collections from disk.
10    ///
11    /// # Deprecation note
12    ///
13    /// **This method is called automatically by [`Database::open`].**
14    /// There is no need to call it manually. It is kept public only for
15    /// backward compatibility with code that relied on the old two-step pattern.
16    ///
17    /// # Errors
18    ///
19    /// Returns an error if collection directories cannot be read.
20    pub fn load_collections(&self) -> Result<()> {
21        let mut loaded_count: usize = 0;
22
23        for entry in std::fs::read_dir(&self.data_dir)? {
24            let entry = entry?;
25            if let Some(name) = self.loadable_collection_name(&entry) {
26                if self.try_load_single_collection(&entry.path(), &name) {
27                    loaded_count += 1;
28                }
29            }
30        }
31
32        // Bump schema_version if at least one collection was loaded from disk (C-3).
33        //
34        // This ensures that any plan key built before load_collections() ran
35        // (schema_version = 0) will never match a key built after it
36        // (schema_version >= 1), preventing the plan cache from serving a stale
37        // plan for a collection that was not yet visible in the registry.
38        if loaded_count > 0 {
39            self.schema_version
40                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41        }
42
43        Ok(())
44    }
45
46    /// Returns the collection name if the directory entry is a loadable collection.
47    ///
48    /// A directory is loadable when it contains `config.json`, has a valid
49    /// collection name, and is not already registered in the legacy collections
50    /// map. Directories with invalid names are skipped with a warning.
51    fn loadable_collection_name(&self, entry: &std::fs::DirEntry) -> Option<String> {
52        let path = entry.path();
53        if !path.is_dir() {
54            return None;
55        }
56        if !path.join("config.json").exists() {
57            return None;
58        }
59        let name = path.file_name()?.to_str()?.to_string();
60        if crate::validation::validate_collection_name(&name).is_err() {
61            tracing::warn!(
62                name = %name,
63                path = %path.display(),
64                "Skipping directory with invalid collection name"
65            );
66            return None;
67        }
68        if self.collections.read().contains_key(&name) {
69            return None;
70        }
71        Some(name)
72    }
73
74    /// Attempts to load a single collection directory, returning `true` on success.
75    fn try_load_single_collection(&self, path: &std::path::Path, name: &str) -> bool {
76        let config_path = path.join("config.json");
77
78        // Read config to determine the concrete type before opening.
79        let cfg_data = match std::fs::read_to_string(&config_path) {
80            Ok(d) => d,
81            Err(e) => {
82                tracing::warn!(error = %e, name, "Cannot read config.json — skipping");
83                return false;
84            }
85        };
86        let cfg = match serde_json::from_str::<crate::collection::CollectionConfig>(&cfg_data) {
87            Ok(c) => c,
88            Err(e) => {
89                tracing::warn!(error = %e, name, "Cannot parse config.json — skipping");
90                return false;
91            }
92        };
93
94        if cfg.graph_schema.is_some() {
95            self.load_graph_collection(path, name)
96        } else if cfg.metadata_only {
97            self.load_metadata_collection(path, name)
98        } else {
99            self.load_vector_collection(path, name)
100        }
101    }
102
103    /// Loads a graph collection from disk, registering it in both registries.
104    fn load_graph_collection(&self, path: &std::path::Path, name: &str) -> bool {
105        self.try_open_and_register(path, name, "graph", |p| {
106            GraphCollection::open(p).map(|c| (c.inner.clone(), TypedColl::Graph(c)))
107        })
108    }
109
110    /// Loads a metadata collection from disk, registering it in both registries.
111    fn load_metadata_collection(&self, path: &std::path::Path, name: &str) -> bool {
112        self.try_open_and_register(path, name, "metadata", |p| {
113            MetadataCollection::open(p).map(|c| (c.inner.clone(), TypedColl::Metadata(c)))
114        })
115    }
116
117    /// Loads a vector collection from disk, registering it in both registries.
118    fn load_vector_collection(&self, path: &std::path::Path, name: &str) -> bool {
119        self.try_open_and_register(path, name, "vector", |p| {
120            VectorCollection::open(p).map(|c| (c.inner.clone(), TypedColl::Vector(c)))
121        })
122    }
123
124    /// Opens a collection from disk and registers it in the legacy + typed registries.
125    ///
126    /// The `open_fn` closure returns `(inner Collection clone, TypedColl variant)`.
127    /// Returns `true` on success, `false` on failure (logged as warning).
128    #[allow(deprecated)]
129    fn try_open_and_register(
130        &self,
131        path: &std::path::Path,
132        name: &str,
133        kind: &str,
134        open_fn: impl FnOnce(std::path::PathBuf) -> crate::Result<(crate::Collection, TypedColl)>,
135    ) -> bool {
136        match open_fn(path.to_path_buf()) {
137            Ok((inner, typed)) => {
138                self.collections.write().insert(name.to_string(), inner);
139                typed.insert_into(
140                    &self.vector_colls,
141                    &self.graph_colls,
142                    &self.metadata_colls,
143                    name,
144                );
145                true
146            }
147            Err(e) => {
148                tracing::warn!(
149                    error = %e,
150                    name = %path.display(),
151                    "Failed to load {kind} collection"
152                );
153                false
154            }
155        }
156    }
157
158    /// Flushes all collections including `vectors.idx` serialization.
159    ///
160    /// Issue #423: Uses `flush_full()` to ensure the vector index file is
161    /// up-to-date, avoiding a full WAL replay on the next startup. This is
162    /// the correct method for graceful shutdown.
163    ///
164    /// Best-effort: logs warnings for individual flush failures but continues
165    /// flushing remaining collections. Returns the count of failures.
166    ///
167    /// The legacy `collections` registry is **not** iterated because it shares
168    /// the same `Arc`'d inner storage as the typed registries. Flushing both
169    /// would double-flush every collection, causing redundant I/O and
170    /// potentially double-counting failures.
171    pub fn flush_all(&self) -> usize {
172        let mut failures: usize = 0;
173
174        failures += flush_registry(&self.vector_colls, "vector");
175        failures += flush_registry(&self.graph_colls, "graph");
176        failures += flush_registry(&self.metadata_colls, "metadata");
177
178        failures
179    }
180}
181
182/// Discriminated union for the three typed collection registries.
183///
184/// Used by [`Database::try_open_and_register`] to route a freshly opened
185/// collection into the correct registry without duplicating match arms.
186enum TypedColl {
187    Vector(VectorCollection),
188    Graph(GraphCollection),
189    Metadata(MetadataCollection),
190}
191
192impl TypedColl {
193    fn insert_into(
194        self,
195        vectors: &parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
196        graphs: &parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
197        metadata: &parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
198        name: &str,
199    ) {
200        match self {
201            Self::Vector(c) => {
202                vectors.write().insert(name.to_string(), c);
203            }
204            Self::Graph(c) => {
205                graphs.write().insert(name.to_string(), c);
206            }
207            Self::Metadata(c) => {
208                metadata.write().insert(name.to_string(), c);
209            }
210        }
211    }
212}
213
214/// Flushes all collections in a registry, logging failures. Returns failure count.
215fn flush_registry<T: Flushable>(
216    registry: &parking_lot::RwLock<std::collections::HashMap<String, T>>,
217    kind: &str,
218) -> usize {
219    let mut failures = 0;
220    for (name, coll) in registry.read().iter() {
221        if let Err(e) = coll.flush() {
222            tracing::warn!(
223                error = %e,
224                collection = %name,
225                "Failed to flush {kind} collection"
226            );
227            failures += 1;
228        }
229    }
230    failures
231}
232
233/// Internal trait for deduplicating `flush_all` iteration across collection types.
234///
235/// Issue #423: Uses `flush_full()` to include `vectors.idx` serialization,
236/// ensuring fast startup after graceful shutdown.
237trait Flushable {
238    fn flush(&self) -> crate::Result<()>;
239}
240
241impl Flushable for VectorCollection {
242    fn flush(&self) -> crate::Result<()> {
243        self.flush_full()
244    }
245}
246
247impl Flushable for GraphCollection {
248    fn flush(&self) -> crate::Result<()> {
249        self.flush_full()
250    }
251}
252
253impl Flushable for MetadataCollection {
254    fn flush(&self) -> crate::Result<()> {
255        self.flush_full()
256    }
257}