use std::collections::HashSet;
use std::io::{BufRead, BufReader, Read};
use super::error::{BackupError, BackupResult};
use super::types::{
BackupMetadata, BackupRecord, BackupRecordType, BackupStatistics, KeyValueRecord, RecordData,
BACKUP_FORMAT_VERSION,
};
use crate::Database;
mod tables {
pub const METADATA: &str = "metadata";
}
#[derive(Debug, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct ImportOptions {
pub verify_references: bool,
pub skip_duplicates: bool,
pub rebuild_indexes: bool,
pub batch_size: Option<usize>,
pub dry_run: bool,
}
impl Default for ImportOptions {
fn default() -> Self {
Self {
verify_references: true,
skip_duplicates: false,
rebuild_indexes: true,
batch_size: Some(10_000),
dry_run: false,
}
}
}
impl ImportOptions {
pub fn fast() -> Self {
Self {
verify_references: false,
skip_duplicates: true,
rebuild_indexes: false,
batch_size: Some(50_000),
dry_run: false,
}
}
pub fn dry_run() -> Self {
Self { dry_run: true, ..Default::default() }
}
}
pub struct Importer<R: Read> {
reader: BufReader<R>,
metadata: Option<BackupMetadata>,
options: ImportOptions,
statistics: BackupStatistics,
line_number: u64,
}
impl<R: Read> Importer<R> {
pub fn new(reader: R) -> Self {
Self::with_options(reader, ImportOptions::default())
}
pub fn with_options(reader: R, options: ImportOptions) -> Self {
Self {
reader: BufReader::new(reader),
metadata: None,
options,
statistics: BackupStatistics::default(),
line_number: 0,
}
}
pub fn read_metadata(&mut self) -> BackupResult<&BackupMetadata> {
if let Some(ref meta) = self.metadata {
return Ok(meta);
}
let record =
self.read_record()?.ok_or_else(|| BackupError::incomplete("empty backup file"))?;
match record.record_type {
BackupRecordType::Metadata => {
if let RecordData::Metadata(meta) = record.data {
if meta.version > BACKUP_FORMAT_VERSION {
return Err(BackupError::UnsupportedVersion(meta.version));
}
Ok(self.metadata.insert(meta))
} else {
Err(BackupError::malformed_record(
self.line_number,
"metadata record has wrong data type",
))
}
}
_ => Err(BackupError::malformed_record(
self.line_number,
"first record must be metadata",
)),
}
}
pub fn metadata(&self) -> Option<&BackupMetadata> {
self.metadata.as_ref()
}
pub fn statistics(&self) -> &BackupStatistics {
&self.statistics
}
pub fn read_record(&mut self) -> BackupResult<Option<BackupRecord>> {
let mut line = String::new();
let bytes_read = self.reader.read_line(&mut line)?;
if bytes_read == 0 {
return Ok(None);
}
self.line_number += 1;
let line = line.trim();
if line.is_empty() {
return self.read_record(); }
let record: BackupRecord = serde_json::from_str(line).map_err(|e| {
BackupError::Deserialization(format!("line {}: {}", self.line_number, e))
})?;
Ok(Some(record))
}
pub fn import_all(mut self, db: &Database) -> BackupResult<BackupStatistics> {
self.read_metadata()?;
if self.options.dry_run {
return self.verify_all();
}
let mut entity_ids: HashSet<u64> = HashSet::new();
let batch_size = self.options.batch_size.unwrap_or(usize::MAX);
let mut batch_records: Vec<BackupRecord> = Vec::with_capacity(batch_size.min(10_000));
loop {
batch_records.clear();
for _ in 0..batch_size {
match self.read_record()? {
Some(record) => {
if matches!(record.record_type, BackupRecordType::EndOfBackup) {
break;
}
batch_records.push(record);
}
None => break,
}
}
if batch_records.is_empty() {
break;
}
let mut tx = db.begin()?;
for record in &batch_records {
match &record.record_type {
BackupRecordType::Entity => {
if let RecordData::Entity(entity_record) = &record.data {
entity_ids.insert(entity_record.id);
let entity = entity_record.to_entity();
tx.put_entity(&entity)?;
self.statistics.add_entity();
}
}
BackupRecordType::Edge => {
if let RecordData::Edge(edge_record) = &record.data {
if self.options.verify_references {
if !entity_ids.contains(&edge_record.source) {
return Err(BackupError::MissingReference(format!(
"edge {} references missing source entity {}",
edge_record.id, edge_record.source
)));
}
if !entity_ids.contains(&edge_record.target) {
return Err(BackupError::MissingReference(format!(
"edge {} references missing target entity {}",
edge_record.id, edge_record.target
)));
}
}
let edge = edge_record.to_edge();
tx.put_edge(&edge)?;
self.statistics.add_edge();
}
}
BackupRecordType::KeyValue => {
if let RecordData::KeyValue(kv) = &record.data {
if let Some(table) = &record.table {
self.import_key_value(&mut tx, table, kv)?;
self.statistics.add_metadata();
}
}
}
BackupRecordType::Metadata | BackupRecordType::EndOfBackup => {
}
}
}
tx.commit()?;
}
Ok(self.statistics)
}
fn import_key_value<T: manifoldb_storage::Transaction>(
&self,
tx: &mut crate::transaction::DatabaseTransaction<T>,
table: &str,
kv: &KeyValueRecord,
) -> BackupResult<()> {
if table == tables::METADATA {
tx.put_metadata(&kv.key, &kv.value)?;
}
Ok(())
}
fn verify_all(mut self) -> BackupResult<BackupStatistics> {
let mut entity_ids: HashSet<u64> = HashSet::new();
while let Some(record) = self.read_record()? {
match &record.record_type {
BackupRecordType::Entity => {
if let RecordData::Entity(entity_record) = &record.data {
entity_ids.insert(entity_record.id);
self.statistics.add_entity();
}
}
BackupRecordType::Edge => {
if let RecordData::Edge(edge_record) = &record.data {
if self.options.verify_references {
if !entity_ids.contains(&edge_record.source) {
return Err(BackupError::MissingReference(format!(
"edge {} references missing source entity {}",
edge_record.id, edge_record.source
)));
}
if !entity_ids.contains(&edge_record.target) {
return Err(BackupError::MissingReference(format!(
"edge {} references missing target entity {}",
edge_record.id, edge_record.target
)));
}
}
self.statistics.add_edge();
}
}
BackupRecordType::KeyValue => {
self.statistics.add_metadata();
}
BackupRecordType::EndOfBackup => {
break;
}
BackupRecordType::Metadata => {
}
}
}
Ok(self.statistics)
}
}
pub fn import<R: Read>(db: &Database, reader: R) -> BackupResult<BackupStatistics> {
let importer = Importer::new(reader);
importer.import_all(db)
}
pub fn verify<R: Read>(reader: R) -> BackupResult<BackupStatistics> {
let importer = Importer::with_options(reader, ImportOptions::dry_run());
importer.import_all(&Database::in_memory()?)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backup::export::export_full;
use std::io::Cursor;
#[test]
fn test_import_options_default() {
let opts = ImportOptions::default();
assert!(opts.verify_references);
assert!(!opts.skip_duplicates);
assert!(opts.rebuild_indexes);
assert!(!opts.dry_run);
}
#[test]
fn test_import_options_fast() {
let opts = ImportOptions::fast();
assert!(!opts.verify_references);
assert!(opts.skip_duplicates);
assert!(!opts.rebuild_indexes);
}
#[test]
fn test_roundtrip_empty_db() {
let source_db = Database::in_memory().unwrap();
let mut buffer = Vec::new();
let _stats = export_full(&source_db, &mut buffer).unwrap();
let target_db = Database::in_memory().unwrap();
let cursor = Cursor::new(buffer);
let stats = import(&target_db, cursor).unwrap();
assert_eq!(stats.entity_count, 0);
assert_eq!(stats.edge_count, 0);
}
#[test]
fn test_roundtrip_with_data() {
let source_db = Database::in_memory().unwrap();
{
let mut tx = source_db.begin().unwrap();
let e1 =
tx.create_entity().unwrap().with_label("Person").with_property("name", "Alice");
let e2 = tx.create_entity().unwrap().with_label("Person").with_property("name", "Bob");
tx.put_entity(&e1).unwrap();
tx.put_entity(&e2).unwrap();
let edge =
tx.create_edge(e1.id, e2.id, "KNOWS").unwrap().with_property("since", 2024i64);
tx.put_edge(&edge).unwrap();
tx.commit().unwrap();
}
let mut buffer = Vec::new();
let export_stats = export_full(&source_db, &mut buffer).unwrap();
assert_eq!(export_stats.entity_count, 2);
assert_eq!(export_stats.edge_count, 1);
let target_db = Database::in_memory().unwrap();
let cursor = Cursor::new(buffer);
let import_stats = import(&target_db, cursor).unwrap();
assert_eq!(import_stats.entity_count, 2);
assert_eq!(import_stats.edge_count, 1);
let tx = target_db.begin_read().unwrap();
let entities = tx.iter_entities(Some("Person")).unwrap();
assert_eq!(entities.len(), 2);
}
#[test]
fn test_verify_backup() {
let db = Database::in_memory().unwrap();
{
let mut tx = db.begin().unwrap();
let e1 = tx.create_entity().unwrap().with_label("Test");
let e2 = tx.create_entity().unwrap().with_label("Test");
tx.put_entity(&e1).unwrap();
tx.put_entity(&e2).unwrap();
let edge = tx.create_edge(e1.id, e2.id, "LINKS").unwrap();
tx.put_edge(&edge).unwrap();
tx.commit().unwrap();
}
let mut buffer = Vec::new();
export_full(&db, &mut buffer).unwrap();
let cursor = Cursor::new(buffer);
let stats = verify(cursor).unwrap();
assert_eq!(stats.entity_count, 2);
assert_eq!(stats.edge_count, 1);
}
#[test]
fn test_missing_reference_detection() {
let backup = r#"{"type":"metadata","data":{"version":1,"format":"json_lines","created_at":0,"sequence_number":0,"is_incremental":false,"previous_sequence":null,"statistics":{"entity_count":0,"edge_count":0,"metadata_count":0,"total_records":0,"uncompressed_size":0}}}
{"type":"edge","data":{"id":1,"source":999,"target":998,"edge_type":"BROKEN","properties":{}}}"#;
let cursor = Cursor::new(backup.as_bytes());
let result = verify(cursor);
assert!(result.is_err());
match result {
Err(BackupError::MissingReference(_)) => (),
_ => panic!("expected MissingReference error"),
}
}
#[test]
fn test_empty_backup_file() {
let cursor = Cursor::new(b"");
let result = verify(cursor);
assert!(result.is_err());
match result {
Err(BackupError::Incomplete(msg)) => {
assert!(msg.contains("empty"), "Expected 'empty' in message: {msg}");
}
other => panic!("expected Incomplete error, got: {other:?}"),
}
}
#[test]
fn test_truncated_json_line() {
let backup = r#"{"type":"metadata","data":{"version":1,"format":"json_lines","created_at":0,"sequence_number":0,"is_incremental":false,"previous_sequence":null,"statistics":{"entity_count":0,"edge_count":0,"metadata_count":0,"total_records":0,"uncompressed_size":0}}}
{"type":"entity","data":{"id":1,"labels":["Test"],"properties":{"#;
let cursor = Cursor::new(backup.as_bytes());
let result = verify(cursor);
assert!(result.is_err());
match result {
Err(BackupError::Deserialization(msg)) => {
assert!(msg.contains("line 2"), "Expected line number in message: {msg}");
}
other => panic!("expected Deserialization error, got: {other:?}"),
}
}
#[test]
fn test_invalid_json_syntax() {
let backup = "not valid json at all";
let cursor = Cursor::new(backup.as_bytes());
let result = verify(cursor);
assert!(result.is_err());
match result {
Err(BackupError::Deserialization(msg)) => {
assert!(msg.contains("line 1"), "Expected line number in message: {msg}");
}
other => panic!("expected Deserialization error, got: {other:?}"),
}
}
#[test]
fn test_non_metadata_first_record() {
let backup = r#"{"type":"entity","data":{"id":1,"labels":["Test"],"properties":{}}}"#;
let cursor = Cursor::new(backup.as_bytes());
let result = verify(cursor);
assert!(result.is_err());
match result {
Err(BackupError::MalformedRecord { line, message }) => {
assert_eq!(line, 1);
assert!(
message.contains("first record must be metadata"),
"Unexpected message: {message}"
);
}
other => panic!("expected MalformedRecord error, got: {other:?}"),
}
}
#[test]
fn test_unsupported_version() {
let backup = r#"{"type":"metadata","data":{"version":999,"format":"json_lines","created_at":0,"sequence_number":0,"is_incremental":false,"previous_sequence":null,"statistics":{"entity_count":0,"edge_count":0,"metadata_count":0,"total_records":0,"uncompressed_size":0}}}"#;
let cursor = Cursor::new(backup.as_bytes());
let result = verify(cursor);
assert!(result.is_err());
match result {
Err(BackupError::UnsupportedVersion(version)) => {
assert_eq!(version, 999);
}
other => panic!("expected UnsupportedVersion error, got: {other:?}"),
}
}
#[test]
fn test_corrupted_entity_record() {
let backup = r#"{"type":"metadata","data":{"version":1,"format":"json_lines","created_at":0,"sequence_number":0,"is_incremental":false,"previous_sequence":null,"statistics":{"entity_count":0,"edge_count":0,"metadata_count":0,"total_records":0,"uncompressed_size":0}}}
{"type":"entity","data":{"wrong_field":"bad_value"}}"#;
let cursor = Cursor::new(backup.as_bytes());
let result = verify(cursor);
assert!(result.is_err());
}
}