velesdb_core/database/
persistence.rs1use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
4use crate::Result;
5
6use super::Database;
7
8impl Database {
9 pub fn load_collections(&self) -> Result<()> {
21 let mut loaded_count: usize = 0;
22
23 for entry in std::fs::read_dir(&self.data_dir)? {
24 let entry = entry?;
25 if let Some(name) = self.loadable_collection_name(&entry) {
26 if self.try_load_single_collection(&entry.path(), &name) {
27 loaded_count += 1;
28 }
29 }
30 }
31
32 if loaded_count > 0 {
39 self.schema_version
40 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41 }
42
43 Ok(())
44 }
45
46 fn loadable_collection_name(&self, entry: &std::fs::DirEntry) -> Option<String> {
52 let path = entry.path();
53 if !path.is_dir() || !path.join("config.json").exists() {
54 return None;
55 }
56 let name = Self::validated_collection_name(&path)?;
57 if self.is_already_registered(&name) {
58 return None;
59 }
60 Some(name)
61 }
62
63 fn validated_collection_name(path: &std::path::Path) -> Option<String> {
68 let name = path.file_name()?.to_str()?.to_string();
69 if crate::validation::validate_collection_name(&name).is_err() {
70 tracing::warn!(
71 name = %name,
72 path = %path.display(),
73 "Skipping directory with invalid collection name"
74 );
75 return None;
76 }
77 Some(name)
78 }
79
80 fn is_already_registered(&self, name: &str) -> bool {
82 self.vector_colls.read().contains_key(name)
83 || self.graph_colls.read().contains_key(name)
84 || self.metadata_colls.read().contains_key(name)
85 }
86
87 fn try_load_single_collection(&self, path: &std::path::Path, name: &str) -> bool {
89 let config_path = path.join("config.json");
90
91 let cfg_data = match std::fs::read_to_string(&config_path) {
93 Ok(d) => d,
94 Err(e) => {
95 tracing::warn!(error = %e, name, "Cannot read config.json — skipping");
96 return false;
97 }
98 };
99 let cfg = match serde_json::from_str::<crate::collection::CollectionConfig>(&cfg_data) {
100 Ok(c) => c,
101 Err(e) => {
102 tracing::warn!(error = %e, name, "Cannot parse config.json — skipping");
103 return false;
104 }
105 };
106
107 if cfg.graph_schema.is_some() {
108 self.load_graph_collection(path, name)
109 } else if cfg.metadata_only {
110 self.load_metadata_collection(path, name)
111 } else {
112 self.load_vector_collection(path, name)
113 }
114 }
115
116 fn load_graph_collection(&self, path: &std::path::Path, name: &str) -> bool {
118 self.try_open_and_register(path, name, "graph", |p| {
119 GraphCollection::open(p).map(TypedColl::Graph)
120 })
121 }
122
123 fn load_metadata_collection(&self, path: &std::path::Path, name: &str) -> bool {
125 self.try_open_and_register(path, name, "metadata", |p| {
126 MetadataCollection::open(p).map(TypedColl::Metadata)
127 })
128 }
129
130 fn load_vector_collection(&self, path: &std::path::Path, name: &str) -> bool {
132 self.try_open_and_register(path, name, "vector", |p| {
133 VectorCollection::open(p).map(TypedColl::Vector)
134 })
135 }
136
137 fn try_open_and_register(
142 &self,
143 path: &std::path::Path,
144 name: &str,
145 kind: &str,
146 open_fn: impl FnOnce(std::path::PathBuf) -> crate::Result<TypedColl>,
147 ) -> bool {
148 match open_fn(path.to_path_buf()) {
149 Ok(typed) => {
150 typed.insert_into(
151 &self.vector_colls,
152 &self.graph_colls,
153 &self.metadata_colls,
154 name,
155 );
156 true
157 }
158 Err(e) => {
159 tracing::warn!(
160 error = %e,
161 name = %path.display(),
162 "Failed to load {kind} collection"
163 );
164 false
165 }
166 }
167 }
168
169 pub fn flush_all(&self) -> usize {
183 let mut failures: usize = 0;
184
185 failures += flush_registry(&self.vector_colls, "vector");
186 failures += flush_registry(&self.graph_colls, "graph");
187 failures += flush_registry(&self.metadata_colls, "metadata");
188
189 failures
190 }
191}
192
193enum TypedColl {
198 Vector(VectorCollection),
199 Graph(GraphCollection),
200 Metadata(MetadataCollection),
201}
202
203impl TypedColl {
204 fn insert_into(
205 self,
206 vectors: &parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
207 graphs: &parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
208 metadata: &parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
209 name: &str,
210 ) {
211 match self {
212 Self::Vector(c) => {
213 vectors.write().insert(name.to_string(), c);
214 }
215 Self::Graph(c) => {
216 graphs.write().insert(name.to_string(), c);
217 }
218 Self::Metadata(c) => {
219 metadata.write().insert(name.to_string(), c);
220 }
221 }
222 }
223}
224
225fn flush_registry<T: Flushable>(
227 registry: &parking_lot::RwLock<std::collections::HashMap<String, T>>,
228 kind: &str,
229) -> usize {
230 let mut failures = 0;
231 for (name, coll) in registry.read().iter() {
232 if let Err(e) = coll.flush() {
233 tracing::warn!(
234 error = %e,
235 collection = %name,
236 "Failed to flush {kind} collection"
237 );
238 failures += 1;
239 }
240 }
241 failures
242}
243
244trait Flushable {
249 fn flush(&self) -> crate::Result<()>;
250}
251
252impl Flushable for VectorCollection {
253 fn flush(&self) -> crate::Result<()> {
254 self.flush_full()
255 }
256}
257
258impl Flushable for GraphCollection {
259 fn flush(&self) -> crate::Result<()> {
260 self.flush_full()
261 }
262}
263
264impl Flushable for MetadataCollection {
265 fn flush(&self) -> crate::Result<()> {
266 self.flush_full()
267 }
268}