dbx_core/storage/backup/
snapshot.rs1use crate::engine::metadata::SchemaMetadata;
4pub use crate::engine::snapshot::DatabaseSnapshot;
5use crate::engine::snapshot::TableData;
6use crate::engine::{Database, WosVariant};
7use crate::error::{DbxError, DbxResult};
8use arrow::datatypes::Schema;
9use std::path::Path;
10use std::sync::Arc;
11
12impl Database {
13 pub fn save_to_file<P: AsRef<Path>>(&self, path: P) -> DbxResult<()> {
33 if !self.is_in_memory() {
35 return Err(DbxError::InvalidOperation {
36 message: "save_to_file only works for in-memory databases".to_string(),
37 context: "Use flush() for file-based databases".to_string(),
38 });
39 }
40
41 let snapshot = self.create_snapshot()?;
43
44 let json = serde_json::to_string_pretty(&snapshot)
46 .map_err(|e| DbxError::Serialization(e.to_string()))?;
47
48 std::fs::write(path, json)?;
50
51 Ok(())
52 }
53
54 pub fn load_from_file<P: AsRef<Path>>(path: P) -> DbxResult<Self> {
73 let json = std::fs::read_to_string(path)?;
75
76 let snapshot: DatabaseSnapshot =
78 serde_json::from_str(&json).map_err(|e| DbxError::Serialization(e.to_string()))?;
79
80 let db = Self::open_in_memory()?;
82
83 db.restore_snapshot(snapshot)?;
85
86 Ok(db)
87 }
88
89 fn is_in_memory(&self) -> bool {
91 matches!(self.wos, WosVariant::InMemory(_))
92 }
93
94 fn create_snapshot(&self) -> DbxResult<DatabaseSnapshot> {
96 let mut snapshot = DatabaseSnapshot::new();
97
98 let schemas = self.table_schemas.read().unwrap();
100 for (table_name, schema) in schemas.iter() {
101 let metadata = SchemaMetadata::from(schema.as_ref());
102 snapshot.schemas.insert(table_name.clone(), metadata);
103 }
104 drop(schemas);
105
106 let indexes = self.index_registry.read().unwrap();
108 snapshot.indexes = indexes.clone();
109 drop(indexes);
110
111 let table_list: Vec<String> = self
114 .row_counters
115 .iter()
116 .map(|entry| entry.key().clone())
117 .collect();
118
119 for table_name in table_list {
120 if table_name.starts_with("__meta__") {
122 continue;
123 }
124
125 let entries = self.wos.scan(&table_name, ..)?;
126 snapshot.tables.insert(table_name, TableData { entries });
127 }
128
129 for entry in self.row_counters.iter() {
131 let table = entry.key().clone();
132 let counter = entry.value().load(std::sync::atomic::Ordering::SeqCst);
133 snapshot.row_counters.insert(table, counter);
134 }
135
136 Ok(snapshot)
137 }
138
139 fn restore_snapshot(&self, snapshot: DatabaseSnapshot) -> DbxResult<()> {
141 if snapshot.version != DatabaseSnapshot::CURRENT_VERSION {
143 return Err(DbxError::InvalidOperation {
144 message: format!("Unsupported snapshot version: {}", snapshot.version),
145 context: format!("Expected version {}", DatabaseSnapshot::CURRENT_VERSION),
146 });
147 }
148
149 let mut table_schemas = self.table_schemas.write().unwrap();
151 let mut schemas = self.schemas.write().unwrap();
152 for (table_name, metadata) in snapshot.schemas {
153 let schema = Arc::new(
154 Schema::try_from(metadata)
155 .map_err(|e| DbxError::Schema(format!("Failed to restore schema: {}", e)))?,
156 );
157 table_schemas.insert(table_name.clone(), schema.clone());
158 schemas.insert(table_name, schema);
159 }
160 drop(table_schemas);
161 drop(schemas);
162
163 let mut indexes = self.index_registry.write().unwrap();
165 *indexes = snapshot.indexes;
166 drop(indexes);
167
168 for (table_name, table_data) in snapshot.tables {
170 for (key, value) in table_data.entries {
171 self.wos.insert(&table_name, &key, &value)?;
172 }
173 }
174
175 for (table, count) in snapshot.row_counters {
177 self.row_counters
178 .insert(table, std::sync::atomic::AtomicUsize::new(count));
179 }
180
181 Ok(())
182 }
183}
184
185impl crate::traits::DatabaseSnapshot for Database {
190 fn save_to_file(&self, path: &str) -> DbxResult<()> {
191 Database::save_to_file(self, path)
193 }
194
195 fn load_from_file(path: &str) -> DbxResult<Self> {
196 Database::load_from_file(path)
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204
205 #[test]
206 fn test_is_in_memory() {
207 let db = Database::open_in_memory().unwrap();
208 assert!(db.is_in_memory());
209 }
210
211 #[test]
212 fn test_file_based_db_rejects_save() {
213 let temp_dir = tempfile::tempdir().unwrap();
214 let db = Database::open(temp_dir.path()).unwrap();
215
216 let temp_file = tempfile::NamedTempFile::new().unwrap();
217 let result = db.save_to_file(temp_file.path());
218
219 assert!(result.is_err());
220 assert!(result.unwrap_err().to_string().contains("in-memory"));
221 }
222}