velesdb_core/database/
persistence.rs1use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
4use crate::Result;
5
6use super::Database;
7
8impl Database {
9 pub fn load_collections(&self) -> Result<()> {
21 let mut loaded_count: usize = 0;
22
23 for entry in std::fs::read_dir(&self.data_dir)? {
24 let entry = entry?;
25 if let Some(name) = self.loadable_collection_name(&entry) {
26 if self.try_load_single_collection(&entry.path(), &name) {
27 loaded_count += 1;
28 }
29 }
30 }
31
32 if loaded_count > 0 {
39 self.schema_version
40 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41 }
42
43 Ok(())
44 }
45
46 fn loadable_collection_name(&self, entry: &std::fs::DirEntry) -> Option<String> {
52 let path = entry.path();
53 if !path.is_dir() {
54 return None;
55 }
56 if !path.join("config.json").exists() {
57 return None;
58 }
59 let name = path.file_name()?.to_str()?.to_string();
60 if crate::validation::validate_collection_name(&name).is_err() {
61 tracing::warn!(
62 name = %name,
63 path = %path.display(),
64 "Skipping directory with invalid collection name"
65 );
66 return None;
67 }
68 if self.collections.read().contains_key(&name) {
69 return None;
70 }
71 Some(name)
72 }
73
74 fn try_load_single_collection(&self, path: &std::path::Path, name: &str) -> bool {
76 let config_path = path.join("config.json");
77
78 let cfg_data = match std::fs::read_to_string(&config_path) {
80 Ok(d) => d,
81 Err(e) => {
82 tracing::warn!(error = %e, name, "Cannot read config.json — skipping");
83 return false;
84 }
85 };
86 let cfg = match serde_json::from_str::<crate::collection::CollectionConfig>(&cfg_data) {
87 Ok(c) => c,
88 Err(e) => {
89 tracing::warn!(error = %e, name, "Cannot parse config.json — skipping");
90 return false;
91 }
92 };
93
94 if cfg.graph_schema.is_some() {
95 self.load_graph_collection(path, name)
96 } else if cfg.metadata_only {
97 self.load_metadata_collection(path, name)
98 } else {
99 self.load_vector_collection(path, name)
100 }
101 }
102
103 fn load_graph_collection(&self, path: &std::path::Path, name: &str) -> bool {
105 self.try_open_and_register(path, name, "graph", |p| {
106 GraphCollection::open(p).map(|c| (c.inner.clone(), TypedColl::Graph(c)))
107 })
108 }
109
110 fn load_metadata_collection(&self, path: &std::path::Path, name: &str) -> bool {
112 self.try_open_and_register(path, name, "metadata", |p| {
113 MetadataCollection::open(p).map(|c| (c.inner.clone(), TypedColl::Metadata(c)))
114 })
115 }
116
117 fn load_vector_collection(&self, path: &std::path::Path, name: &str) -> bool {
119 self.try_open_and_register(path, name, "vector", |p| {
120 VectorCollection::open(p).map(|c| (c.inner.clone(), TypedColl::Vector(c)))
121 })
122 }
123
124 #[allow(deprecated)]
129 fn try_open_and_register(
130 &self,
131 path: &std::path::Path,
132 name: &str,
133 kind: &str,
134 open_fn: impl FnOnce(std::path::PathBuf) -> crate::Result<(crate::Collection, TypedColl)>,
135 ) -> bool {
136 match open_fn(path.to_path_buf()) {
137 Ok((inner, typed)) => {
138 self.collections.write().insert(name.to_string(), inner);
139 typed.insert_into(
140 &self.vector_colls,
141 &self.graph_colls,
142 &self.metadata_colls,
143 name,
144 );
145 true
146 }
147 Err(e) => {
148 tracing::warn!(
149 error = %e,
150 name = %path.display(),
151 "Failed to load {kind} collection"
152 );
153 false
154 }
155 }
156 }
157
158 pub fn flush_all(&self) -> usize {
172 let mut failures: usize = 0;
173
174 failures += flush_registry(&self.vector_colls, "vector");
175 failures += flush_registry(&self.graph_colls, "graph");
176 failures += flush_registry(&self.metadata_colls, "metadata");
177
178 failures
179 }
180}
181
182enum TypedColl {
187 Vector(VectorCollection),
188 Graph(GraphCollection),
189 Metadata(MetadataCollection),
190}
191
192impl TypedColl {
193 fn insert_into(
194 self,
195 vectors: &parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
196 graphs: &parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
197 metadata: &parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
198 name: &str,
199 ) {
200 match self {
201 Self::Vector(c) => {
202 vectors.write().insert(name.to_string(), c);
203 }
204 Self::Graph(c) => {
205 graphs.write().insert(name.to_string(), c);
206 }
207 Self::Metadata(c) => {
208 metadata.write().insert(name.to_string(), c);
209 }
210 }
211 }
212}
213
214fn flush_registry<T: Flushable>(
216 registry: &parking_lot::RwLock<std::collections::HashMap<String, T>>,
217 kind: &str,
218) -> usize {
219 let mut failures = 0;
220 for (name, coll) in registry.read().iter() {
221 if let Err(e) = coll.flush() {
222 tracing::warn!(
223 error = %e,
224 collection = %name,
225 "Failed to flush {kind} collection"
226 );
227 failures += 1;
228 }
229 }
230 failures
231}
232
233trait Flushable {
238 fn flush(&self) -> crate::Result<()>;
239}
240
241impl Flushable for VectorCollection {
242 fn flush(&self) -> crate::Result<()> {
243 self.flush_full()
244 }
245}
246
247impl Flushable for GraphCollection {
248 fn flush(&self) -> crate::Result<()> {
249 self.flush_full()
250 }
251}
252
253impl Flushable for MetadataCollection {
254 fn flush(&self) -> crate::Result<()> {
255 self.flush_full()
256 }
257}