use crate::engine::Database;
use crate::engine::metadata::SchemaMetadata;
pub use crate::engine::snapshot::DatabaseSnapshot;
use crate::engine::snapshot::TableData;
use crate::error::{DbxError, DbxResult};
use arrow::datatypes::Schema;
use std::path::Path;
use std::sync::Arc;
impl Database {
pub fn save_to_file<P: AsRef<Path>>(&self, path: P) -> DbxResult<()> {
if !self.is_in_memory() {
return Err(DbxError::InvalidOperation {
message: "save_to_file only works for in-memory databases".to_string(),
context: "Use flush() for file-based databases".to_string(),
});
}
let snapshot = self.create_snapshot()?;
let json = serde_json::to_string_pretty(&snapshot)
.map_err(|e| DbxError::Serialization(e.to_string()))?;
std::fs::write(path, json)?;
Ok(())
}
pub fn load_from_file<P: AsRef<Path>>(path: P) -> DbxResult<Self> {
let json = std::fs::read_to_string(path)?;
let snapshot: DatabaseSnapshot =
serde_json::from_str(&json).map_err(|e| DbxError::Serialization(e.to_string()))?;
let db = Self::open_in_memory()?;
db.restore_snapshot(snapshot)?;
Ok(db)
}
fn is_in_memory(&self) -> bool {
self.file_wos.is_none()
}
fn create_snapshot(&self) -> DbxResult<DatabaseSnapshot> {
let mut snapshot = DatabaseSnapshot::new();
let schemas = self.table_schemas.read().unwrap();
for (table_name, schema) in schemas.iter() {
let metadata = SchemaMetadata::from(schema.as_ref());
snapshot.schemas.insert(table_name.clone(), metadata);
}
drop(schemas);
let indexes = self.index_registry.read().unwrap();
snapshot.indexes = indexes.clone();
drop(indexes);
let table_list: Vec<String> = self
.row_counters
.iter()
.map(|entry| entry.key().clone())
.collect();
for table_name in table_list {
if table_name.starts_with("__meta__") {
continue;
}
let entries = self.wos_for_table(&table_name).scan(&table_name, ..)?;
snapshot.tables.insert(table_name, TableData { entries });
}
for entry in self.row_counters.iter() {
let table = entry.key().clone();
let counter = entry.value().load(std::sync::atomic::Ordering::SeqCst);
snapshot.row_counters.insert(table, counter);
}
Ok(snapshot)
}
fn restore_snapshot(&self, snapshot: DatabaseSnapshot) -> DbxResult<()> {
if snapshot.version != DatabaseSnapshot::CURRENT_VERSION {
return Err(DbxError::InvalidOperation {
message: format!("Unsupported snapshot version: {}", snapshot.version),
context: format!("Expected version {}", DatabaseSnapshot::CURRENT_VERSION),
});
}
let mut table_schemas = self.table_schemas.write().unwrap();
let mut schemas = self.schemas.write().unwrap();
for (table_name, metadata) in snapshot.schemas {
let schema = Arc::new(
Schema::try_from(metadata)
.map_err(|e| DbxError::Schema(format!("Failed to restore schema: {}", e)))?,
);
table_schemas.insert(table_name.clone(), schema.clone());
schemas.insert(table_name, schema);
}
drop(table_schemas);
drop(schemas);
let mut indexes = self.index_registry.write().unwrap();
*indexes = snapshot.indexes;
drop(indexes);
for (table_name, table_data) in snapshot.tables {
for (key, value) in table_data.entries {
self.wos_for_table(&table_name)
.insert(&table_name, &key, &value)?;
}
}
for (table, count) in snapshot.row_counters {
self.row_counters
.insert(table, std::sync::atomic::AtomicUsize::new(count));
}
Ok(())
}
}
impl crate::traits::DatabaseSnapshot for Database {
fn save_to_file(&self, path: &str) -> DbxResult<()> {
Database::save_to_file(self, path)
}
fn load_from_file(path: &str) -> DbxResult<Self> {
Database::load_from_file(path)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_in_memory() {
let db = Database::open_in_memory().unwrap();
assert!(db.is_in_memory());
}
#[test]
fn test_file_based_db_rejects_save() {
let temp_dir = tempfile::tempdir().unwrap();
let db = Database::open(temp_dir.path()).unwrap();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let result = db.save_to_file(temp_file.path());
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("in-memory"));
}
}