velesdb_core/database/
persistence.rs1use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
4use crate::Result;
5
6use super::Database;
7
8impl Database {
9 pub fn load_collections(&self) -> Result<()> {
21 let mut loaded_count: usize = 0;
22
23 for entry in std::fs::read_dir(&self.data_dir)? {
24 let entry = entry?;
25 if let Some(name) = self.loadable_collection_name(&entry) {
26 if self.try_load_single_collection(&entry.path(), &name) {
27 loaded_count += 1;
28 }
29 }
30 }
31
32 if loaded_count > 0 {
39 self.schema_version
40 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41 }
42
43 Ok(())
44 }
45
46 fn loadable_collection_name(&self, entry: &std::fs::DirEntry) -> Option<String> {
52 let path = entry.path();
53 if !path.is_dir() || !path.join("config.json").exists() {
54 return None;
55 }
56 let name = Self::validated_collection_name(&path)?;
57 if self.is_already_registered(&name) {
58 return None;
59 }
60 Some(name)
61 }
62
63 fn validated_collection_name(path: &std::path::Path) -> Option<String> {
68 let name = path.file_name()?.to_str()?.to_string();
69 if crate::validation::validate_collection_name(&name).is_err() {
70 tracing::warn!(
71 name = %name,
72 path = %path.display(),
73 "Skipping directory with invalid collection name"
74 );
75 return None;
76 }
77 Some(name)
78 }
79
80 fn is_already_registered(&self, name: &str) -> bool {
82 self.vector_colls.read().contains_key(name)
83 || self.graph_colls.read().contains_key(name)
84 || self.metadata_colls.read().contains_key(name)
85 }
86
87 fn try_load_single_collection(&self, path: &std::path::Path, name: &str) -> bool {
89 let config_path = path.join("config.json");
90
91 let cfg_data = match std::fs::read_to_string(&config_path) {
93 Ok(d) => d,
94 Err(e) => {
95 tracing::warn!(error = %e, name, "Cannot read config.json — skipping");
96 return false;
97 }
98 };
99 let cfg = match serde_json::from_str::<crate::collection::CollectionConfig>(&cfg_data) {
100 Ok(c) => c,
101 Err(e) => {
102 tracing::warn!(error = %e, name, "Cannot parse config.json — skipping");
103 return false;
104 }
105 };
106
107 if cfg.graph_schema.is_some() {
108 self.load_graph_collection(path, name)
109 } else if cfg.metadata_only {
110 self.load_metadata_collection(path, name)
111 } else {
112 self.load_vector_collection(path, name)
113 }
114 }
115
116 fn load_graph_collection(&self, path: &std::path::Path, name: &str) -> bool {
118 self.try_open_and_register(path, name, "graph", |p| {
119 GraphCollection::open(p).map(TypedColl::Graph)
120 })
121 }
122
123 fn load_metadata_collection(&self, path: &std::path::Path, name: &str) -> bool {
125 self.try_open_and_register(path, name, "metadata", |p| {
126 MetadataCollection::open(p).map(TypedColl::Metadata)
127 })
128 }
129
130 fn load_vector_collection(&self, path: &std::path::Path, name: &str) -> bool {
132 self.try_open_and_register(path, name, "vector", |p| {
133 VectorCollection::open(p).map(TypedColl::Vector)
134 })
135 }
136
137 fn try_open_and_register(
142 &self,
143 path: &std::path::Path,
144 name: &str,
145 kind: &str,
146 open_fn: impl FnOnce(std::path::PathBuf) -> crate::Result<TypedColl>,
147 ) -> bool {
148 match open_fn(path.to_path_buf()) {
149 Ok(typed) => {
150 self.push_runtime_limits(typed.inner());
153 typed.insert_into(
154 &self.vector_colls,
155 &self.graph_colls,
156 &self.metadata_colls,
157 name,
158 );
159 true
160 }
161 Err(e) => {
162 tracing::warn!(
163 error = %e,
164 name = %path.display(),
165 "Failed to load {kind} collection"
166 );
167 false
168 }
169 }
170 }
171
172 pub fn flush_all(&self) -> usize {
186 let mut failures: usize = 0;
187
188 failures += flush_registry(&self.vector_colls, "vector");
189 failures += flush_registry(&self.graph_colls, "graph");
190 failures += flush_registry(&self.metadata_colls, "metadata");
191
192 failures
193 }
194}
195
196enum TypedColl {
201 Vector(VectorCollection),
202 Graph(GraphCollection),
203 Metadata(MetadataCollection),
204}
205
206impl TypedColl {
207 fn inner(&self) -> &crate::collection::Collection {
210 match self {
211 Self::Vector(c) => &c.inner,
212 Self::Graph(c) => &c.inner,
213 Self::Metadata(c) => &c.inner,
214 }
215 }
216
217 fn insert_into(
218 self,
219 vectors: &parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
220 graphs: &parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
221 metadata: &parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
222 name: &str,
223 ) {
224 match self {
225 Self::Vector(c) => {
226 vectors.write().insert(name.to_string(), c);
227 }
228 Self::Graph(c) => {
229 graphs.write().insert(name.to_string(), c);
230 }
231 Self::Metadata(c) => {
232 metadata.write().insert(name.to_string(), c);
233 }
234 }
235 }
236}
237
238fn flush_registry<T: Flushable>(
240 registry: &parking_lot::RwLock<std::collections::HashMap<String, T>>,
241 kind: &str,
242) -> usize {
243 let mut failures = 0;
244 for (name, coll) in registry.read().iter() {
245 if let Err(e) = coll.flush() {
246 tracing::warn!(
247 error = %e,
248 collection = %name,
249 "Failed to flush {kind} collection"
250 );
251 failures += 1;
252 }
253 }
254 failures
255}
256
257trait Flushable {
262 fn flush(&self) -> crate::Result<()>;
263}
264
265impl Flushable for VectorCollection {
266 fn flush(&self) -> crate::Result<()> {
267 self.flush_full()
268 }
269}
270
271impl Flushable for GraphCollection {
272 fn flush(&self) -> crate::Result<()> {
273 self.flush_full()
274 }
275}
276
277impl Flushable for MetadataCollection {
278 fn flush(&self) -> crate::Result<()> {
279 self.flush_full()
280 }
281}