use sqlitegraph::backend::native::GraphFile;
use sqlitegraph::backend::native::v2::wal::{
V2WALConfig, V2WALHeader, V2WALManager, V2WALRecord, transaction_coordinator::IsolationLevel,
};
use sqlitegraph::backend::native::{NativeBackendError, NativeResult};
use std::path::PathBuf;
use tempfile::TempDir;
struct RecoveryTestSetup {
#[allow(dead_code)]
temp_dir: TempDir, wal_path: PathBuf,
db_path: PathBuf,
checkpoint_path: PathBuf,
}
impl RecoveryTestSetup {
fn new() -> NativeResult<Self> {
let temp_dir = TempDir::new().map_err(|e| NativeBackendError::IoError {
context: "Failed to create temp dir".to_string(),
source: e.into(),
})?;
let wal_path = temp_dir.path().join("test.wal");
let db_path = temp_dir.path().join("test.v2");
let checkpoint_path = temp_dir.path().join("test.checkpoint");
let _graph_file = GraphFile::create(&db_path)?;
Ok(Self {
temp_dir,
wal_path,
db_path,
checkpoint_path,
})
}
fn config(&self) -> V2WALConfig {
V2WALConfig {
graph_path: self.db_path.clone(),
wal_path: self.wal_path.clone(),
checkpoint_path: self.checkpoint_path.clone(),
..Default::default()
}
}
fn write_corrupted_wal(&self, data: &[u8]) -> NativeResult<()> {
std::fs::write(&self.wal_path, data).map_err(|e| NativeBackendError::IoError {
context: "Failed to write corrupted WAL".to_string(),
source: e.into(),
})?;
Ok(())
}
fn wal_size(&self) -> u64 {
std::fs::metadata(&self.wal_path)
.map(|m| m.len())
.unwrap_or(0)
}
fn wal_exists(&self) -> bool {
self.wal_path.exists()
}
}
#[test]
fn test_recovery_from_truncated_wal() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
)?;
manager.commit_transaction(tx_id)?;
manager.flush()?;
drop(manager);
let original_size = setup.wal_size();
let truncate_at = original_size / 2;
let mut wal_data = std::fs::read(&setup.wal_path)?;
wal_data.truncate(truncate_at as usize);
std::fs::write(&setup.wal_path, wal_data)?;
assert!(setup.wal_exists(), "WAL file should still exist");
assert!(setup.wal_size() < original_size, "WAL should be truncated");
Ok(())
}
#[test]
fn test_recovery_with_invalid_magic_bytes() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let invalid_data = vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]; setup.write_corrupted_wal(&invalid_data)?;
let config = setup.config();
let result = V2WALManager::create(config);
match result {
Ok(_) => {
assert!(setup.wal_exists());
}
Err(e) => {
let error_msg = e.to_string();
assert!(
error_msg.contains("corruption")
|| error_msg.contains("magic")
|| error_msg.contains("invalid")
|| error_msg.contains("header"),
"Error should indicate corruption: {}",
error_msg
);
}
}
Ok(())
}
#[test]
fn test_recovery_with_corrupted_payload() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
)?;
manager.commit_transaction(tx_id)?;
manager.flush()?;
drop(manager);
let mut wal_data = std::fs::read(&setup.wal_path)?;
let header_size = std::mem::size_of::<V2WALHeader>();
if wal_data.len() > header_size + 10 {
for i in header_size..header_size + 10 {
wal_data[i] = wal_data[i].wrapping_add(1);
}
std::fs::write(&setup.wal_path, wal_data)?;
}
let result = V2WALManager::create(config);
match result {
Ok(_) => {
assert!(setup.wal_exists());
}
Err(e) => {
let error_msg = e.to_string().to_lowercase();
assert!(
error_msg.contains("corruption")
|| error_msg.contains("checksum")
|| error_msg.contains("invalid"),
"Error should indicate corruption: {}",
e.to_string()
);
}
}
Ok(())
}
#[test]
fn test_recovery_with_corrupted_checksum() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
)?;
manager.commit_transaction(tx_id)?;
manager.flush()?;
drop(manager);
let mut wal_data = std::fs::read(&setup.wal_path)?;
if wal_data.len() > 100 {
wal_data[100] = wal_data[100].wrapping_add(1);
std::fs::write(&setup.wal_path, wal_data)?;
}
let result = V2WALManager::create(config);
match result {
Ok(_) => {
assert!(setup.wal_exists());
}
Err(e) => {
let error_msg = e.to_string().to_lowercase();
assert!(
error_msg.contains("corruption")
|| error_msg.contains("checksum")
|| error_msg.contains("invalid"),
"Error should indicate data integrity issue: {}",
e.to_string()
);
}
}
Ok(())
}
#[test]
fn test_recovery_with_incomplete_transaction() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx_id1 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id1,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
)?;
manager.commit_transaction(tx_id1)?;
let tx_id2 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id2,
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2048,
node_data: vec![4, 5, 6],
},
)?;
drop(manager);
assert!(setup.wal_exists(), "WAL should exist after crash");
let result = V2WALManager::create(config);
assert!(
result.is_ok(),
"Should recover despite incomplete transaction"
);
Ok(())
}
#[test]
fn test_recovery_rollback_after_partial_writes() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2048,
node_data: vec![4, 5, 6],
},
)?;
manager.rollback_transaction(tx_id)?;
let metrics = manager.get_metrics();
assert_eq!(
metrics.rolled_back_transactions, 1,
"Should have 1 rolled back transaction"
);
let tx_id2 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id2,
V2WALRecord::NodeInsert {
node_id: 3,
slot_offset: 3072,
node_data: vec![7, 8, 9],
},
)?;
manager.commit_transaction(tx_id2)?;
let final_metrics = manager.get_metrics();
assert_eq!(
final_metrics.committed_transactions, 1,
"Should have 1 committed transaction"
);
Ok(())
}
#[test]
fn test_recovery_mixed_commit_rollback_transactions() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx1 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx1,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![],
},
)?;
manager.commit_transaction(tx1)?;
let tx2 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx2,
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2048,
node_data: vec![],
},
)?;
manager.rollback_transaction(tx2)?;
let tx3 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx3,
V2WALRecord::NodeInsert {
node_id: 3,
slot_offset: 3072,
node_data: vec![],
},
)?;
manager.commit_transaction(tx3)?;
let metrics = manager.get_metrics();
assert_eq!(
metrics.committed_transactions, 2,
"Should have 2 committed transactions"
);
assert_eq!(
metrics.rolled_back_transactions, 1,
"Should have 1 rolled back transaction"
);
Ok(())
}
#[test]
fn test_recovery_transaction_with_multiple_records() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx_id = manager.begin_transaction(IsolationLevel::Serializable)?;
for i in 1..=10 {
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 1024) as u64,
node_data: vec![i as u8],
},
)?;
}
manager.commit_transaction(tx_id)?;
let metrics = manager.get_metrics();
assert_eq!(metrics.committed_transactions, 1);
assert!(
metrics.total_records_written >= 10,
"Should have written at least 10 records"
);
Ok(())
}
#[test]
fn test_recovery_incomplete_checkpoint() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
for i in 1..=5 {
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 1024) as u64,
node_data: vec![i as u8],
},
)?;
manager.commit_transaction(tx_id)?;
}
let checkpoint_result = manager.force_checkpoint();
assert!(checkpoint_result.is_ok(), "Checkpoint should succeed");
let metrics = manager.get_metrics();
assert!(
metrics.checkpoint_count > 0,
"Should have performed checkpoint"
);
Ok(())
}
#[test]
fn test_checkpoint_after_rollback() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx1 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx1,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1],
},
)?;
manager.commit_transaction(tx1)?;
let tx2 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx2,
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2048,
node_data: vec![2],
},
)?;
manager.rollback_transaction(tx2)?;
let _checkpoint_result = manager.force_checkpoint();
assert!(
_checkpoint_result.is_ok(),
"Checkpoint should succeed after rollback"
);
Ok(())
}
#[test]
fn test_multiple_checkpoints() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
for checkpoint_round in 1..=3 {
for i in 1..=2 {
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: (checkpoint_round * 10 + i) as i64,
slot_offset: ((checkpoint_round * 10 + i) * 1024) as u64,
node_data: vec![(checkpoint_round * 10 + i) as u8],
},
)?;
manager.commit_transaction(tx_id)?;
}
manager.force_checkpoint()?;
}
let metrics = manager.get_metrics();
assert_eq!(metrics.checkpoint_count, 3, "Should have 3 checkpoints");
assert_eq!(
metrics.committed_transactions, 6,
"Should have 6 committed transactions"
);
Ok(())
}
#[test]
fn test_checkpoint_with_empty_wal() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let checkpoint_result = manager.force_checkpoint();
assert!(
checkpoint_result.is_ok(),
"Checkpoint should succeed with empty WAL"
);
let metrics = manager.get_metrics();
assert_eq!(
metrics.committed_transactions, 0,
"Should have no committed transactions"
);
Ok(())
}
#[test]
fn test_recovery_from_empty_wal() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
std::fs::File::create(&setup.wal_path)?;
let config = setup.config();
let result = V2WALManager::create(config);
match result {
Ok(_manager) => {
}
Err(_) => {
}
}
Ok(())
}
#[test]
fn test_recovery_with_only_committed_transactions() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
for i in 1..=5 {
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 1024) as u64,
node_data: vec![i as u8],
},
)?;
manager.commit_transaction(tx_id)?;
}
manager.flush()?;
drop(manager);
let new_manager = V2WALManager::create(config)?;
let _metrics = new_manager.get_metrics();
assert!(
setup.wal_exists(),
"WAL file should persist after manager drop"
);
assert!(
new_manager.get_active_transaction_count() == 0,
"New manager should have no active transactions"
);
let tx_id = new_manager.begin_transaction(IsolationLevel::ReadCommitted)?;
new_manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 10,
slot_offset: 10240,
node_data: vec![10],
},
)?;
new_manager.commit_transaction(tx_id)?;
Ok(())
}
#[test]
fn test_recovery_with_mixed_committed_rolled_back() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
for i in 1..=3 {
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 1024) as u64,
node_data: vec![i as u8],
},
)?;
manager.commit_transaction(tx_id)?;
}
for i in 4..=6 {
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 1024) as u64,
node_data: vec![i as u8],
},
)?;
manager.rollback_transaction(tx_id)?;
}
manager.flush()?;
drop(manager);
let new_manager = V2WALManager::create(config)?;
assert!(setup.wal_exists(), "WAL should exist after restart");
let tx_id = new_manager.begin_transaction(IsolationLevel::ReadCommitted)?;
new_manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 100,
slot_offset: 102400,
node_data: vec![100],
},
)?;
new_manager.commit_transaction(tx_id)?;
Ok(())
}
#[test]
fn test_recovery_after_manager_drop() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
{
let manager = V2WALManager::create(config.clone())?;
for i in 1..=3 {
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 1024) as u64,
node_data: vec![i as u8],
},
)?;
manager.commit_transaction(tx_id)?;
}
manager.flush()?;
}
let new_manager = V2WALManager::create(config)?;
assert!(setup.wal_exists(), "WAL should persist after manager drop");
let tx_id = new_manager.begin_transaction(IsolationLevel::ReadCommitted)?;
new_manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 999,
slot_offset: 999999,
node_data: vec![99],
},
)?;
new_manager.commit_transaction(tx_id)?;
let metrics = new_manager.get_metrics();
assert!(
metrics.total_transactions >= 1 || metrics.committed_transactions >= 1,
"New manager should track new transactions"
);
Ok(())
}
#[test]
fn test_recovery_concurrent_transactions() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx1 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
let tx2 = manager.begin_transaction(IsolationLevel::Serializable)?;
manager.write_transaction_record(
tx1,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1],
},
)?;
manager.write_transaction_record(
tx2,
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2048,
node_data: vec![2],
},
)?;
manager.commit_transaction(tx1)?;
manager.commit_transaction(tx2)?;
let metrics = manager.get_metrics();
assert_eq!(metrics.committed_transactions, 2);
Ok(())
}
#[test]
fn test_recovery_large_transaction() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
let tx_id = manager.begin_transaction(IsolationLevel::Serializable)?;
for i in 1..=100 {
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 1024) as u64,
node_data: vec![i as u8; 100], },
)?;
}
manager.commit_transaction(tx_id)?;
let _metrics = manager.get_metrics();
Ok(())
}
#[test]
fn test_recovery_rapid_commits() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
let manager = V2WALManager::create(config.clone())?;
for i in 1..=20 {
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: (i * 1024) as u64,
node_data: vec![i as u8],
},
)?;
manager.commit_transaction(tx_id)?;
}
let metrics = manager.get_metrics();
assert_eq!(metrics.committed_transactions, 20);
Ok(())
}
#[test]
fn test_wal_recreation_after_deletion() -> NativeResult<()> {
let setup = RecoveryTestSetup::new()?;
let config = setup.config();
{
let manager = V2WALManager::create(config.clone())?;
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1],
},
)?;
manager.commit_transaction(tx_id)?;
manager.flush()?;
}
std::fs::remove_file(&setup.wal_path)?;
let manager = V2WALManager::create(config)?;
let _metrics = manager.get_metrics();
assert!(setup.wal_exists(), "WAL should be recreated");
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
manager.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2048,
node_data: vec![2],
},
)?;
manager.commit_transaction(tx_id)?;
Ok(())
}