use std::io::Write;
use std::ops::Bound;
use manifoldb_core::encoding::Decoder;
use manifoldb_storage::{Cursor, Transaction};
use super::error::{BackupError, BackupResult};
use super::types::{BackupMetadata, BackupRecord, BackupStatistics, EdgeRecord, EntityRecord};
use crate::Database;
mod tables {
pub const NODES: &str = "nodes";
pub const EDGES: &str = "edges";
pub const METADATA: &str = "metadata";
}
mod metadata_keys {
pub const BACKUP_SEQUENCE: &[u8] = b"_backup_sequence";
}
pub struct BackupWriter<W: Write> {
writer: W,
statistics: BackupStatistics,
records_written: u64,
}
impl<W: Write> BackupWriter<W> {
pub fn new(writer: W) -> Self {
Self { writer, statistics: BackupStatistics::default(), records_written: 0 }
}
pub fn write_metadata(&mut self, metadata: &BackupMetadata) -> BackupResult<()> {
let record = BackupRecord::metadata(metadata.clone());
self.write_record(&record)?;
Ok(())
}
pub fn write_entity(&mut self, entity: &manifoldb_core::Entity) -> BackupResult<()> {
let entity_record = EntityRecord::from_entity(entity);
let record = BackupRecord::entity(entity_record);
self.write_record(&record)?;
self.statistics.add_entity();
Ok(())
}
pub fn write_edge(&mut self, edge: &manifoldb_core::Edge) -> BackupResult<()> {
let edge_record = EdgeRecord::from_edge(edge);
let record = BackupRecord::edge(edge_record);
self.write_record(&record)?;
self.statistics.add_edge();
Ok(())
}
pub fn write_key_value(
&mut self,
table: &str,
key: Vec<u8>,
value: Vec<u8>,
) -> BackupResult<()> {
let record = BackupRecord::key_value(table.to_string(), key, value);
self.write_record(&record)?;
self.statistics.add_metadata();
Ok(())
}
pub fn finish(mut self) -> BackupResult<BackupStatistics> {
let record = BackupRecord::end_of_backup(self.statistics.clone());
self.write_record(&record)?;
self.writer.flush()?;
Ok(self.statistics)
}
pub fn statistics(&self) -> &BackupStatistics {
&self.statistics
}
pub fn records_written(&self) -> u64 {
self.records_written
}
fn write_record(&mut self, record: &BackupRecord) -> BackupResult<()> {
let json = serde_json::to_string(record).map_err(BackupError::serialization)?;
self.statistics.add_size(json.len() as u64 + 1); writeln!(self.writer, "{}", json)?;
self.records_written += 1;
Ok(())
}
}
pub fn export_full<W: Write>(db: &Database, writer: W) -> BackupResult<BackupStatistics> {
let tx = db.begin_read()?;
let storage = tx.storage_ref()?;
let sequence_number = get_sequence_number(storage)?;
let metadata = BackupMetadata::new_full(sequence_number);
let mut backup_writer = BackupWriter::new(writer);
backup_writer.write_metadata(&metadata)?;
export_entities(storage, &mut backup_writer)?;
export_edges(storage, &mut backup_writer)?;
export_metadata(storage, &mut backup_writer)?;
backup_writer.finish()
}
pub fn export_incremental<W: Write>(
db: &Database,
writer: W,
since_sequence: u64,
) -> BackupResult<BackupStatistics> {
let tx = db.begin_read()?;
let storage = tx.storage_ref()?;
let current_sequence = get_sequence_number(storage)?;
let metadata = BackupMetadata::new_incremental(current_sequence, since_sequence);
let mut backup_writer = BackupWriter::new(writer);
backup_writer.write_metadata(&metadata)?;
export_entities(storage, &mut backup_writer)?;
export_edges(storage, &mut backup_writer)?;
export_metadata(storage, &mut backup_writer)?;
backup_writer.finish()
}
fn get_sequence_number<T: Transaction>(storage: &T) -> BackupResult<u64> {
match storage.get(tables::METADATA, metadata_keys::BACKUP_SEQUENCE) {
Ok(Some(bytes)) if bytes.len() == 8 => {
let arr: [u8; 8] = bytes.try_into().map_err(|_| {
BackupError::InvalidFormat("invalid sequence number format".to_string())
})?;
Ok(u64::from_be_bytes(arr))
}
Ok(_) => Ok(0), Err(manifoldb_storage::StorageError::TableNotFound(_)) => Ok(0),
Err(e) => Err(BackupError::Storage(e)),
}
}
fn export_entities<T: Transaction, W: Write>(
storage: &T,
writer: &mut BackupWriter<W>,
) -> BackupResult<()> {
let cursor_result = storage.range(tables::NODES, Bound::Unbounded, Bound::Unbounded);
let mut cursor = match cursor_result {
Ok(c) => c,
Err(manifoldb_storage::StorageError::TableNotFound(_)) => return Ok(()),
Err(e) => return Err(BackupError::Storage(e)),
};
while let Some((_key, value)) = cursor.next()? {
let (entity, _): (manifoldb_core::Entity, _) =
bincode::serde::decode_from_slice(&value, bincode::config::standard())
.map_err(|e| BackupError::Deserialization(e.to_string()))?;
writer.write_entity(&entity)?;
}
Ok(())
}
fn export_edges<T: Transaction, W: Write>(
storage: &T,
writer: &mut BackupWriter<W>,
) -> BackupResult<()> {
let cursor_result = storage.range(tables::EDGES, Bound::Unbounded, Bound::Unbounded);
let mut cursor = match cursor_result {
Ok(c) => c,
Err(manifoldb_storage::StorageError::TableNotFound(_)) => return Ok(()),
Err(e) => return Err(BackupError::Storage(e)),
};
while let Some((_key, value)) = cursor.next()? {
let edge = manifoldb_core::Edge::decode(&value)
.map_err(|e| BackupError::Deserialization(e.to_string()))?;
writer.write_edge(&edge)?;
}
Ok(())
}
fn export_metadata<T: Transaction, W: Write>(
storage: &T,
writer: &mut BackupWriter<W>,
) -> BackupResult<()> {
let cursor_result = storage.range(tables::METADATA, Bound::Unbounded, Bound::Unbounded);
let mut cursor = match cursor_result {
Ok(c) => c,
Err(manifoldb_storage::StorageError::TableNotFound(_)) => return Ok(()),
Err(e) => return Err(BackupError::Storage(e)),
};
while let Some((key, value)) = cursor.next()? {
if key.as_slice() == metadata_keys::BACKUP_SEQUENCE {
continue;
}
writer.write_key_value(tables::METADATA, key, value)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_backup_writer_basic() {
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let mut writer = BackupWriter::new(cursor);
let metadata = BackupMetadata::new_full(0);
writer.write_metadata(&metadata).unwrap();
let entity = manifoldb_core::Entity::new(manifoldb_core::EntityId::new(1))
.with_label("Test")
.with_property("name", "Test Entity");
writer.write_entity(&entity).unwrap();
let stats = writer.finish().unwrap();
assert_eq!(stats.entity_count, 1);
assert_eq!(stats.total_records, 1);
}
#[test]
fn test_export_full_empty_db() {
let db = Database::in_memory().unwrap();
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let stats = export_full(&db, cursor).unwrap();
assert_eq!(stats.entity_count, 0);
assert_eq!(stats.edge_count, 0);
}
#[test]
fn test_export_full_with_data() {
let db = Database::in_memory().unwrap();
{
let mut tx = db.begin().unwrap();
let entity1 =
tx.create_entity().unwrap().with_label("Person").with_property("name", "Alice");
let entity2 =
tx.create_entity().unwrap().with_label("Person").with_property("name", "Bob");
tx.put_entity(&entity1).unwrap();
tx.put_entity(&entity2).unwrap();
let edge = tx.create_edge(entity1.id, entity2.id, "KNOWS").unwrap();
tx.put_edge(&edge).unwrap();
tx.commit().unwrap();
}
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let stats = export_full(&db, cursor).unwrap();
assert_eq!(stats.entity_count, 2);
assert_eq!(stats.edge_count, 1);
}
}