Skip to main content

velesdb_core/database/
persistence.rs

1//! Collection loading from disk at database startup.
2
3use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
4use crate::Result;
5
6use super::Database;
7
8impl Database {
9    /// Loads existing collections from disk.
10    ///
11    /// # Deprecation note
12    ///
13    /// **This method is called automatically by [`Database::open`].**
14    /// There is no need to call it manually. It is kept public only for
15    /// backward compatibility with code that relied on the old two-step pattern.
16    ///
17    /// # Errors
18    ///
19    /// Returns an error if collection directories cannot be read.
20    pub fn load_collections(&self) -> Result<()> {
21        let mut loaded_count: usize = 0;
22
23        for entry in std::fs::read_dir(&self.data_dir)? {
24            let entry = entry?;
25            if let Some(name) = self.loadable_collection_name(&entry) {
26                if self.try_load_single_collection(&entry.path(), &name) {
27                    loaded_count += 1;
28                }
29            }
30        }
31
32        // Bump schema_version if at least one collection was loaded from disk (C-3).
33        //
34        // This ensures that any plan key built before load_collections() ran
35        // (schema_version = 0) will never match a key built after it
36        // (schema_version >= 1), preventing the plan cache from serving a stale
37        // plan for a collection that was not yet visible in the registry.
38        if loaded_count > 0 {
39            self.schema_version
40                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41        }
42
43        Ok(())
44    }
45
46    /// Returns the collection name if the directory entry is a loadable collection.
47    ///
48    /// A directory is loadable when it contains `config.json` and is not
49    /// already registered in the legacy collections map.
50    fn loadable_collection_name(&self, entry: &std::fs::DirEntry) -> Option<String> {
51        let path = entry.path();
52        if !path.is_dir() {
53            return None;
54        }
55        if !path.join("config.json").exists() {
56            return None;
57        }
58        let name = path.file_name()?.to_str().unwrap_or("unknown").to_string();
59        if self.collections.read().contains_key(&name) {
60            return None;
61        }
62        Some(name)
63    }
64
65    /// Attempts to load a single collection directory, returning `true` on success.
66    fn try_load_single_collection(&self, path: &std::path::Path, name: &str) -> bool {
67        let config_path = path.join("config.json");
68
69        // Read config to determine the concrete type before opening.
70        let cfg_data = match std::fs::read_to_string(&config_path) {
71            Ok(d) => d,
72            Err(e) => {
73                tracing::warn!(error = %e, name, "Cannot read config.json — skipping");
74                return false;
75            }
76        };
77        let cfg = match serde_json::from_str::<crate::collection::CollectionConfig>(&cfg_data) {
78            Ok(c) => c,
79            Err(e) => {
80                tracing::warn!(error = %e, name, "Cannot parse config.json — skipping");
81                return false;
82            }
83        };
84
85        if cfg.graph_schema.is_some() {
86            self.load_graph_collection(path, name)
87        } else if cfg.metadata_only {
88            self.load_metadata_collection(path, name)
89        } else {
90            self.load_vector_collection(path, name)
91        }
92    }
93
94    /// Loads a graph collection from disk, registering it in both registries.
95    fn load_graph_collection(&self, path: &std::path::Path, name: &str) -> bool {
96        self.try_open_and_register(path, name, "graph", |p| {
97            GraphCollection::open(p).map(|c| (c.inner.clone(), TypedColl::Graph(c)))
98        })
99    }
100
101    /// Loads a metadata collection from disk, registering it in both registries.
102    fn load_metadata_collection(&self, path: &std::path::Path, name: &str) -> bool {
103        self.try_open_and_register(path, name, "metadata", |p| {
104            MetadataCollection::open(p).map(|c| (c.inner.clone(), TypedColl::Metadata(c)))
105        })
106    }
107
108    /// Loads a vector collection from disk, registering it in both registries.
109    fn load_vector_collection(&self, path: &std::path::Path, name: &str) -> bool {
110        self.try_open_and_register(path, name, "vector", |p| {
111            VectorCollection::open(p).map(|c| (c.inner.clone(), TypedColl::Vector(c)))
112        })
113    }
114
115    /// Opens a collection from disk and registers it in the legacy + typed registries.
116    ///
117    /// The `open_fn` closure returns `(inner Collection clone, TypedColl variant)`.
118    /// Returns `true` on success, `false` on failure (logged as warning).
119    #[allow(deprecated)]
120    fn try_open_and_register(
121        &self,
122        path: &std::path::Path,
123        name: &str,
124        kind: &str,
125        open_fn: impl FnOnce(std::path::PathBuf) -> crate::Result<(crate::Collection, TypedColl)>,
126    ) -> bool {
127        match open_fn(path.to_path_buf()) {
128            Ok((inner, typed)) => {
129                self.collections.write().insert(name.to_string(), inner);
130                typed.insert_into(
131                    &self.vector_colls,
132                    &self.graph_colls,
133                    &self.metadata_colls,
134                    name,
135                );
136                true
137            }
138            Err(e) => {
139                tracing::warn!(
140                    error = %e,
141                    name = %path.display(),
142                    "Failed to load {kind} collection"
143                );
144                false
145            }
146        }
147    }
148
149    /// Flushes all WALs across the typed collection registries.
150    ///
151    /// Best-effort: logs warnings for individual flush failures but continues
152    /// flushing remaining collections. Returns the count of failures.
153    ///
154    /// The legacy `collections` registry is **not** iterated because it shares
155    /// the same `Arc`'d inner storage as the typed registries. Flushing both
156    /// would double-flush every collection, causing redundant I/O and
157    /// potentially double-counting failures.
158    pub fn flush_all(&self) -> usize {
159        let mut failures: usize = 0;
160
161        failures += flush_registry(&self.vector_colls, "vector");
162        failures += flush_registry(&self.graph_colls, "graph");
163        failures += flush_registry(&self.metadata_colls, "metadata");
164
165        failures
166    }
167}
168
169/// Discriminated union for the three typed collection registries.
170///
171/// Used by [`Database::try_open_and_register`] to route a freshly opened
172/// collection into the correct registry without duplicating match arms.
173enum TypedColl {
174    Vector(VectorCollection),
175    Graph(GraphCollection),
176    Metadata(MetadataCollection),
177}
178
179impl TypedColl {
180    fn insert_into(
181        self,
182        vectors: &parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
183        graphs: &parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
184        metadata: &parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
185        name: &str,
186    ) {
187        match self {
188            Self::Vector(c) => {
189                vectors.write().insert(name.to_string(), c);
190            }
191            Self::Graph(c) => {
192                graphs.write().insert(name.to_string(), c);
193            }
194            Self::Metadata(c) => {
195                metadata.write().insert(name.to_string(), c);
196            }
197        }
198    }
199}
200
201/// Flushes all collections in a registry, logging failures. Returns failure count.
202fn flush_registry<T: Flushable>(
203    registry: &parking_lot::RwLock<std::collections::HashMap<String, T>>,
204    kind: &str,
205) -> usize {
206    let mut failures = 0;
207    for (name, coll) in registry.read().iter() {
208        if let Err(e) = coll.flush() {
209            tracing::warn!(
210                error = %e,
211                collection = %name,
212                "Failed to flush {kind} collection"
213            );
214            failures += 1;
215        }
216    }
217    failures
218}
219
220/// Internal trait for deduplicating `flush_all` iteration across collection types.
221trait Flushable {
222    fn flush(&self) -> crate::Result<()>;
223}
224
225impl Flushable for VectorCollection {
226    fn flush(&self) -> crate::Result<()> {
227        self.flush()
228    }
229}
230
231impl Flushable for GraphCollection {
232    fn flush(&self) -> crate::Result<()> {
233        self.flush()
234    }
235}
236
237impl Flushable for MetadataCollection {
238    fn flush(&self) -> crate::Result<()> {
239        self.flush()
240    }
241}