use sqlitegraph::backend::native::graph_file::GraphFile;
use sqlitegraph::backend::native::v2::Direction;
use sqlitegraph::backend::native::v2::export::{
ExportConsistencyReport, ExportFactory, ExportManifest, ExportResult, V2ExportConfig,
V2Exporter, manifest::ManifestSerializer,
};
use sqlitegraph::backend::native::v2::import::{
ImportFactory, ImportMode, ImportResult, ImportValidationReport, V2ImportConfig, V2Importer,
};
use sqlitegraph::backend::native::v2::wal::recovery::states::{
Authority, RecoveryContext, RecoveryState as ExplicitRecoveryState,
};
use sqlitegraph::backend::native::v2::wal::{
BulkIngestConfig, BulkIngestExt, IsolationLevel, V2WALConfig, V2WALManager, V2WALReader,
V2WALRecord,
};
use sqlitegraph::backend::native::{NativeBackendError, NativeResult};
use tempfile::tempdir;
#[test]
fn test_export_clean_checkpoint_no_wal() -> NativeResult<()> {
let temp_dir = tempdir()?;
let graph_path = temp_dir.path().join("test.v2");
let export_dir = temp_dir.path().join("export");
let _graph_file = GraphFile::create(&graph_path)?;
let wal_path = graph_path.with_extension("wal");
let checkpoint_path = graph_path.with_extension("checkpoint");
let wal_config = V2WALConfig {
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
..Default::default()
};
let manager = V2WALManager::create(wal_config)?;
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 0,
node_data: vec![1, 2, 3, 4, 5],
},
)?;
manager.commit_transaction(tx_id)?;
manager.force_checkpoint()?;
let exporter = ExportFactory::create_checkpoint_aligned_exporter(&graph_path, &export_dir)?;
let result = exporter.export_checkpoint_aligned();
assert!(
result.is_err(),
"Should fail until export_checkpoint_aligned is implemented"
);
let manifest_path = export_dir.join("export.manifest");
let graph_file_path = export_dir.join("export.graph");
let wal_file_path = export_dir.join("export.wal");
Ok(())
}
#[test]
fn test_export_active_wal_tail() -> NativeResult<()> {
let temp_dir = tempdir()?;
let graph_path = temp_dir.path().join("test.v2");
let export_dir = temp_dir.path().join("export");
let _graph_file = GraphFile::create(&graph_path)?;
let wal_path = graph_path.with_extension("wal");
let checkpoint_path = graph_path.with_extension("checkpoint");
let wal_config = V2WALConfig {
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
..Default::default()
};
let manager = V2WALManager::create(wal_config)?;
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 0,
node_data: vec![6, 7, 8, 9, 10],
},
)?;
drop(manager);
let exporter = ExportFactory::create_full_exporter(&graph_path, &export_dir)?;
let result = exporter.export_full();
assert!(
result.is_err(),
"Should fail until export_full is implemented"
);
Ok(())
}
#[test]
fn test_import_into_empty_graph() -> NativeResult<()> {
let temp_dir = tempdir()?;
let export_dir = temp_dir.path().join("export");
let target_path = temp_dir.path().join("imported.v2");
std::fs::create_dir_all(&export_dir)?;
let manifest = ExportManifest::new();
let manifest_bytes = format!(
"V2EXPORT_MANIFEST\nversion=1\nrecovery_state=CleanShutdown\ngraph_checkpoint_lsn=50\nexport_mode=CheckpointAligned"
);
let manifest_path = export_dir.join("export.manifest");
std::fs::write(&manifest_path, manifest_bytes)?;
let graph_file_path = export_dir.join("export.graph");
std::fs::write(&graph_file_path, b"mock exported graph data")?;
let wal_file_path = export_dir.join("export.wal");
std::fs::write(&wal_file_path, b"mock exported wal data")?;
let config = V2ImportConfig {
target_graph_path: target_path.clone(),
export_dir_path: export_dir.clone(),
import_mode: ImportMode::Fresh,
validate_recovery: true,
force_checkpoint_after_import: true,
};
let importer_result = V2Importer::from_export_dir(&export_dir, &target_path, config);
assert!(
importer_result.is_err(),
"Should fail until V2Importer::from_export_dir is implemented"
);
if let Ok(importer) = importer_result {
let validation_result = importer.validate_export();
assert!(
validation_result.is_err(),
"Should fail until validate_export is implemented"
);
if let Ok(_validation) = validation_result {
let import_result = importer.import();
assert!(
import_result.is_err(),
"Should fail until import is implemented"
);
}
}
Ok(())
}
#[test]
fn test_import_recovery_validation() -> NativeResult<()> {
let temp_dir = tempdir()?;
let export_dir = temp_dir.path().join("export");
let target_path = temp_dir.path().join("imported.v2");
std::fs::create_dir_all(&export_dir)?;
let manifest_path = export_dir.join("export.manifest");
let manifest_bytes = format!(
"V2EXPORT_MANIFEST\nversion=1\nrecovery_state=DirtyShutdown\ngraph_checkpoint_lsn=100\nwal_start_lsn=101\nwal_end_lsn=150\nexport_mode=Full"
);
std::fs::write(&manifest_path, manifest_bytes)?;
let graph_file_path = export_dir.join("export.graph");
let wal_file_path = export_dir.join("export.wal");
std::fs::write(&graph_file_path, b"mock exported graph data")?;
std::fs::write(&wal_file_path, b"mock exported wal records")?;
let config = V2ImportConfig {
target_graph_path: target_path.clone(),
export_dir_path: export_dir.clone(),
import_mode: ImportMode::Fresh,
validate_recovery: true,
force_checkpoint_after_import: true,
};
let importer_result = V2Importer::from_export_dir(&export_dir, &target_path, config);
assert!(
importer_result.is_err(),
"Should fail until V2Importer is implemented"
);
Ok(())
}
#[test]
fn test_import_incompatible_manifest() -> NativeResult<()> {
let temp_dir = tempdir()?;
let export_dir = temp_dir.path().join("export");
let target_path = temp_dir.path().join("imported.v2");
std::fs::create_dir_all(&export_dir)?;
let manifest_path = export_dir.join("export.manifest");
let incompatible_manifest = format!(
"V2EXPORT_MANIFEST\nversion=999\nincompatible_format=true\ninvalid_field=bad_value"
);
std::fs::write(&manifest_path, incompatible_manifest)?;
let graph_file_path = export_dir.join("export.graph");
std::fs::write(&graph_file_path, b"mock exported graph data")?;
let config = V2ImportConfig {
target_graph_path: target_path.clone(),
export_dir_path: export_dir.clone(),
import_mode: ImportMode::Fresh,
validate_recovery: true,
force_checkpoint_after_import: true,
};
let result = V2Importer::from_export_dir(&export_dir, &target_path, config);
assert!(result.is_err(), "Should fail due to incompatible manifest");
if let Ok(importer) = result {
let validation_result = importer.validate_export();
assert!(
validation_result.is_err(),
"Should detect manifest incompatibility"
);
}
Ok(())
}
#[test]
fn test_end_to_end_export_import_roundtrip() -> NativeResult<()> {
let temp_dir = tempdir()?;
let source_path = temp_dir.path().join("source.v2");
let export_dir = temp_dir.path().join("export");
let target_path = temp_dir.path().join("restored.v2");
let _source_graph = GraphFile::create(&source_path)?;
let source_wal_path = source_path.with_extension("wal");
let source_checkpoint_path = source_path.with_extension("checkpoint");
let source_wal_config = V2WALConfig {
wal_path: source_wal_path.clone(),
checkpoint_path: source_checkpoint_path.clone(),
..Default::default()
};
let source_manager = V2WALManager::create(source_wal_config)?;
for i in 1..10 {
let tx_id = source_manager.begin_transaction(IsolationLevel::ReadCommitted)?;
source_manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 100) as u64,
node_data: vec![i as u8; 10],
},
)?;
source_manager.write_transaction_record(
tx_id,
V2WALRecord::ClusterCreate {
node_id: i,
direction: Direction::Outgoing,
cluster_offset: (i * 1000) as u64,
cluster_size: 5,
edge_data: vec![i as u8; 20],
},
)?;
source_manager.commit_transaction(tx_id)?;
}
source_manager.force_checkpoint()?;
let source_exporter = ExportFactory::create_full_exporter(&source_path, &export_dir)?;
let export_result = source_exporter.export_full();
assert!(
export_result.is_err(),
"Should fail until export system is implemented"
);
let import_config = V2ImportConfig {
target_graph_path: target_path.clone(),
export_dir_path: export_dir.clone(),
import_mode: ImportMode::Fresh,
validate_recovery: true,
force_checkpoint_after_import: true,
};
let target_importer = V2Importer::from_export_dir(&export_dir, &target_path, import_config)?;
let import_result = target_importer.import();
assert!(
import_result.is_err(),
"Should fail until import system is implemented"
);
Ok(())
}
#[test]
fn test_lsn_bounded_export_validation() -> NativeResult<()> {
let temp_dir = tempdir()?;
let graph_path = temp_dir.path().join("test.v2");
let export_dir = temp_dir.path().join("export");
let _graph_file = GraphFile::create(&graph_path)?;
let wal_path = graph_path.with_extension("wal");
let checkpoint_path = graph_path.with_extension("checkpoint");
let wal_config = V2WALConfig {
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
..Default::default()
};
let manager = V2WALManager::create(wal_config)?;
for i in 1..5 {
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 100) as u64,
node_data: vec![i as u8; 50],
},
)?;
manager.commit_transaction(tx_id)?;
}
let config = V2ExportConfig {
export_path: export_dir.join("export"),
include_wal_tail: true,
compression_enabled: false,
checksum_validation: true,
};
let exporter_result = V2Exporter::from_graph_file(&graph_path, config);
assert!(
exporter_result.is_err(),
"Should fail until V2Exporter is implemented"
);
if let Ok(exporter) = exporter_result {
let lsn_result = exporter.export_lsn_bounded(1, 100);
assert!(
lsn_result.is_err(),
"Should fail until LSN-bounded export is implemented"
);
}
Ok(())
}
#[test]
fn test_manifest_integrity_validation() -> NativeResult<()> {
let temp_dir = tempdir()?;
let export_dir = temp_dir.path().join("export");
std::fs::create_dir_all(&export_dir)?;
let manifest_path = export_dir.join("export.manifest");
std::fs::write(&manifest_path, b"CORRUPT_MANIFEST_DATA")?;
let read_result = ManifestSerializer::read_from_file(&manifest_path);
assert!(
read_result.is_err(),
"Should fail due to corrupted manifest"
);
if let Ok(_manifest) = read_result {
let validation_result = _manifest.validate();
assert!(
validation_result.is_err(),
"Should detect manifest corruption"
);
}
Ok(())
}
#[test]
fn test_bulk_ingest_integration() -> NativeResult<()> {
let temp_dir = tempdir()?;
let export_dir = temp_dir.path().join("export");
let target_path = temp_dir.path().join("bulk_imported.v2");
std::fs::create_dir_all(&export_dir)?;
let manifest_path = export_dir.join("export.manifest");
let manifest_bytes = format!(
"V2EXPORT_MANIFEST\nversion=1\nrecovery_state=DirtyShutdown\nwal_start_lsn=1\nwal_end_lsn=1000\nexport_mode=Full"
);
std::fs::write(&manifest_path, manifest_bytes)?;
let graph_file_path = export_dir.join("export.graph");
std::fs::write(&graph_file_path, b"mock exported graph data")?;
let wal_file_path = export_dir.join("export.wal");
let mut wal_data = Vec::new();
for i in 1..=100 {
wal_data.extend_from_slice(&[1u8]); wal_data.extend_from_slice(&(i as u32).to_le_bytes()); wal_data.extend_from_slice(&[i; 10]); }
std::fs::write(&wal_file_path, wal_data)?;
let config = V2ImportConfig {
target_graph_path: target_path.clone(),
export_dir_path: export_dir.clone(),
import_mode: ImportMode::Fresh,
validate_recovery: true,
force_checkpoint_after_import: true,
};
let result = V2Importer::from_export_dir(&export_dir, &target_path, config);
assert!(
result.is_err(),
"Should fail until import system is implemented"
);
Ok(())
}
#[test]
fn test_recovery_state_detection_accuracy() -> NativeResult<()> {
let temp_dir = tempdir()?;
let graph_path = temp_dir.path().join("test.v2");
let _graph_file = GraphFile::create(&graph_path)?;
let wal_path = graph_path.with_extension("wal");
let checkpoint_path = graph_path.with_extension("checkpoint");
let wal_config = V2WALConfig {
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
..Default::default()
};
let manager = V2WALManager::create(wal_config)?;
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 0,
node_data: vec![1, 2, 3],
},
)?;
manager.commit_transaction(tx_id)?;
manager.force_checkpoint()?;
let recovery_context =
RecoveryContext::analyze_files(&wal_path, &graph_path, &checkpoint_path)?;
assert_eq!(recovery_context.state, ExplicitRecoveryState::CleanShutdown);
assert_eq!(recovery_context.authority, Authority::GraphFile);
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 0,
node_data: vec![4, 5, 6],
},
)?;
drop(manager);
let recovery_context =
RecoveryContext::analyze_files(&wal_path, &graph_path, &checkpoint_path)?;
assert_eq!(recovery_context.state, ExplicitRecoveryState::DirtyShutdown);
assert_eq!(recovery_context.authority, Authority::WAL);
Ok(())
}