Skip to main content

velesdb_core/database/
persistence.rs

1//! Collection loading from disk at database startup.
2
3use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
4use crate::Result;
5
6use super::Database;
7
8impl Database {
9    /// Loads existing collections from disk.
10    ///
11    /// # Deprecation note
12    ///
13    /// **This method is called automatically by [`Database::open`].**
14    /// There is no need to call it manually. It is kept public only for
15    /// backward compatibility with code that relied on the old two-step pattern.
16    ///
17    /// # Errors
18    ///
19    /// Returns an error if collection directories cannot be read.
20    pub fn load_collections(&self) -> Result<()> {
21        let mut loaded_count: usize = 0;
22
23        for entry in std::fs::read_dir(&self.data_dir)? {
24            let entry = entry?;
25            if let Some(name) = self.loadable_collection_name(&entry) {
26                if self.try_load_single_collection(&entry.path(), &name) {
27                    loaded_count += 1;
28                }
29            }
30        }
31
32        // Bump schema_version if at least one collection was loaded from disk (C-3).
33        //
34        // This ensures that any plan key built before load_collections() ran
35        // (schema_version = 0) will never match a key built after it
36        // (schema_version >= 1), preventing the plan cache from serving a stale
37        // plan for a collection that was not yet visible in the registry.
38        if loaded_count > 0 {
39            self.schema_version
40                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41        }
42
43        Ok(())
44    }
45
46    /// Returns the collection name if the directory entry is a loadable collection.
47    ///
48    /// A directory is loadable when it contains `config.json`, has a valid
49    /// collection name, and is not already registered in any typed registry.
50    /// Directories with invalid names are skipped with a warning.
51    fn loadable_collection_name(&self, entry: &std::fs::DirEntry) -> Option<String> {
52        let path = entry.path();
53        if !path.is_dir() || !path.join("config.json").exists() {
54            return None;
55        }
56        let name = Self::validated_collection_name(&path)?;
57        if self.is_already_registered(&name) {
58            return None;
59        }
60        Some(name)
61    }
62
63    /// Extracts and validates the collection name from a directory path.
64    ///
65    /// Returns `None` if the directory name is not valid UTF-8 or fails
66    /// collection name validation (logged as a warning).
67    fn validated_collection_name(path: &std::path::Path) -> Option<String> {
68        let name = path.file_name()?.to_str()?.to_string();
69        if crate::validation::validate_collection_name(&name).is_err() {
70            tracing::warn!(
71                name = %name,
72                path = %path.display(),
73                "Skipping directory with invalid collection name"
74            );
75            return None;
76        }
77        Some(name)
78    }
79
80    /// Checks whether a collection name is already registered in any typed registry.
81    fn is_already_registered(&self, name: &str) -> bool {
82        self.vector_colls.read().contains_key(name)
83            || self.graph_colls.read().contains_key(name)
84            || self.metadata_colls.read().contains_key(name)
85    }
86
87    /// Attempts to load a single collection directory, returning `true` on success.
88    fn try_load_single_collection(&self, path: &std::path::Path, name: &str) -> bool {
89        let config_path = path.join("config.json");
90
91        // Read config to determine the concrete type before opening.
92        let cfg_data = match std::fs::read_to_string(&config_path) {
93            Ok(d) => d,
94            Err(e) => {
95                tracing::warn!(error = %e, name, "Cannot read config.json — skipping");
96                return false;
97            }
98        };
99        let cfg = match serde_json::from_str::<crate::collection::CollectionConfig>(&cfg_data) {
100            Ok(c) => c,
101            Err(e) => {
102                tracing::warn!(error = %e, name, "Cannot parse config.json — skipping");
103                return false;
104            }
105        };
106
107        if cfg.graph_schema.is_some() {
108            self.load_graph_collection(path, name)
109        } else if cfg.metadata_only {
110            self.load_metadata_collection(path, name)
111        } else {
112            self.load_vector_collection(path, name)
113        }
114    }
115
116    /// Loads a graph collection from disk, registering it in the typed registry.
117    fn load_graph_collection(&self, path: &std::path::Path, name: &str) -> bool {
118        self.try_open_and_register(path, name, "graph", |p| {
119            GraphCollection::open(p).map(TypedColl::Graph)
120        })
121    }
122
123    /// Loads a metadata collection from disk, registering it in the typed registry.
124    fn load_metadata_collection(&self, path: &std::path::Path, name: &str) -> bool {
125        self.try_open_and_register(path, name, "metadata", |p| {
126            MetadataCollection::open(p).map(TypedColl::Metadata)
127        })
128    }
129
130    /// Loads a vector collection from disk, registering it in the typed registry.
131    fn load_vector_collection(&self, path: &std::path::Path, name: &str) -> bool {
132        self.try_open_and_register(path, name, "vector", |p| {
133            VectorCollection::open(p).map(TypedColl::Vector)
134        })
135    }
136
137    /// Opens a collection from disk and registers it in the typed registry.
138    ///
139    /// The `open_fn` closure returns a `TypedColl` variant.
140    /// Returns `true` on success, `false` on failure (logged as warning).
141    fn try_open_and_register(
142        &self,
143        path: &std::path::Path,
144        name: &str,
145        kind: &str,
146        open_fn: impl FnOnce(std::path::PathBuf) -> crate::Result<TypedColl>,
147    ) -> bool {
148        match open_fn(path.to_path_buf()) {
149            Ok(typed) => {
150                typed.insert_into(
151                    &self.vector_colls,
152                    &self.graph_colls,
153                    &self.metadata_colls,
154                    name,
155                );
156                true
157            }
158            Err(e) => {
159                tracing::warn!(
160                    error = %e,
161                    name = %path.display(),
162                    "Failed to load {kind} collection"
163                );
164                false
165            }
166        }
167    }
168
169    /// Flushes all collections including `vectors.idx` serialization.
170    ///
171    /// Issue #423: Uses `flush_full()` to ensure the vector index file is
172    /// up-to-date, avoiding a full WAL replay on the next startup. This is
173    /// the correct method for graceful shutdown.
174    ///
175    /// Best-effort: logs warnings for individual flush failures but continues
176    /// flushing remaining collections. Returns the count of failures.
177    ///
178    /// The legacy `collections` registry is **not** iterated because it shares
179    /// the same `Arc`'d inner storage as the typed registries. Flushing both
180    /// would double-flush every collection, causing redundant I/O and
181    /// potentially double-counting failures.
182    pub fn flush_all(&self) -> usize {
183        let mut failures: usize = 0;
184
185        failures += flush_registry(&self.vector_colls, "vector");
186        failures += flush_registry(&self.graph_colls, "graph");
187        failures += flush_registry(&self.metadata_colls, "metadata");
188
189        failures
190    }
191}
192
193/// Discriminated union for the three typed collection registries.
194///
195/// Used by [`Database::try_open_and_register`] to route a freshly opened
196/// collection into the correct registry without duplicating match arms.
197enum TypedColl {
198    Vector(VectorCollection),
199    Graph(GraphCollection),
200    Metadata(MetadataCollection),
201}
202
203impl TypedColl {
204    fn insert_into(
205        self,
206        vectors: &parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
207        graphs: &parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
208        metadata: &parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
209        name: &str,
210    ) {
211        match self {
212            Self::Vector(c) => {
213                vectors.write().insert(name.to_string(), c);
214            }
215            Self::Graph(c) => {
216                graphs.write().insert(name.to_string(), c);
217            }
218            Self::Metadata(c) => {
219                metadata.write().insert(name.to_string(), c);
220            }
221        }
222    }
223}
224
225/// Flushes all collections in a registry, logging failures. Returns failure count.
226fn flush_registry<T: Flushable>(
227    registry: &parking_lot::RwLock<std::collections::HashMap<String, T>>,
228    kind: &str,
229) -> usize {
230    let mut failures = 0;
231    for (name, coll) in registry.read().iter() {
232        if let Err(e) = coll.flush() {
233            tracing::warn!(
234                error = %e,
235                collection = %name,
236                "Failed to flush {kind} collection"
237            );
238            failures += 1;
239        }
240    }
241    failures
242}
243
244/// Internal trait for deduplicating `flush_all` iteration across collection types.
245///
246/// Issue #423: Uses `flush_full()` to include `vectors.idx` serialization,
247/// ensuring fast startup after graceful shutdown.
248trait Flushable {
249    fn flush(&self) -> crate::Result<()>;
250}
251
252impl Flushable for VectorCollection {
253    fn flush(&self) -> crate::Result<()> {
254        self.flush_full()
255    }
256}
257
258impl Flushable for GraphCollection {
259    fn flush(&self) -> crate::Result<()> {
260        self.flush_full()
261    }
262}
263
264impl Flushable for MetadataCollection {
265    fn flush(&self) -> crate::Result<()> {
266        self.flush_full()
267    }
268}