use sqlitegraph::backend::native::v2::wal::checkpoint::strategies::{
StrategyEvaluator, StrategyValidator,
};
use sqlitegraph::backend::native::v2::wal::checkpoint::{
CheckpointStrategy, V2WALCheckpointManager,
};
use sqlitegraph::backend::native::v2::wal::recovery::{RecoveryOptions, V2WALRecoveryEngine};
use sqlitegraph::backend::native::v2::wal::{V2WALConfig, V2WALReader, V2WALRecord, V2WALWriter};
use sqlitegraph::backend::native::{GraphFile, NativeResult};
use std::time::Duration;
use tempfile::TempDir;
#[test]
fn test_v2_wal_checkpoint_creation_and_validation() -> NativeResult<()> {
let temp_dir = TempDir::new()?;
let graph_path = temp_dir.path().join("v2_checkpoint.v2");
let wal_path = temp_dir.path().join("v2_checkpoint.wal");
let checkpoint_path = temp_dir.path().join("v2_checkpoint.ckpt");
let _graph_file = GraphFile::create(&graph_path)?;
let config = V2WALConfig {
graph_path: graph_path.clone(),
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
max_wal_size: 32 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 5000,
group_commit_timeout_ms: 100,
max_group_commit_size: 8,
enable_compression: false,
compression_level: 0,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: Default::default(),
};
let writer = V2WALWriter::create(config.clone())?;
let node_operations = vec![
V2WALRecord::NodeInsert {
node_id: 1001,
slot_offset: 4096,
node_data: create_v2_node_record(1001, "function", "main"),
},
V2WALRecord::NodeInsert {
node_id: 1002,
slot_offset: 8192,
node_data: create_v2_node_record(1002, "function", "helper"),
},
V2WALRecord::NodeInsert {
node_id: 1003,
slot_offset: 12288,
node_data: create_v2_node_record(1003, "function", "util"),
},
];
for op in node_operations.iter() {
writer.write_record(op.clone())?;
}
writer.shutdown()?;
assert!(
wal_path.exists(),
"WAL file should be created after writing records"
);
let wal_metadata = std::fs::metadata(&wal_path)?;
assert!(wal_metadata.len() > 0, "WAL file should contain data");
let reader = V2WALReader::open(&wal_path)?;
let header = reader.header();
assert_eq!(
header.magic,
sqlitegraph::backend::native::v2::wal::V2WALHeader::MAGIC
);
assert!(header.current_lsn >= 1, "Header should have valid LSN");
let manager = V2WALCheckpointManager::create(
config.clone(),
CheckpointStrategy::SizeThreshold(16 * 1024 * 1024),
)?;
assert_eq!(
manager.get_state(),
sqlitegraph::backend::native::v2::wal::checkpoint::core::CheckpointState::Idle
);
assert_eq!(
manager.get_last_checkpointed_lsn(),
0,
"LSN should be tracked"
);
assert!(!manager.is_checkpoint_in_progress());
let evaluator = StrategyEvaluator::new(config.clone());
let (should_checkpoint, _) = evaluator.should_checkpoint(
&CheckpointStrategy::SizeThreshold(16 * 1024 * 1024),
std::time::SystemTime::now(),
0,
)?;
assert!(
!should_checkpoint,
"Should not trigger checkpoint for small WAL"
);
Ok(())
}
#[test]
fn test_checkpoint_strategies_v2_workloads() -> NativeResult<()> {
let temp_dir = TempDir::new()?;
let graph_path = temp_dir.path().join("v2_strategy.v2");
let wal_path = temp_dir.path().join("v2_strategy.wal");
let checkpoint_path = temp_dir.path().join("v2_strategy.ckpt");
let _graph_file = GraphFile::create(&graph_path)?;
{
let config = V2WALConfig {
graph_path: graph_path.clone(),
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
max_wal_size: 32 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 5000,
group_commit_timeout_ms: 100,
max_group_commit_size: 8,
enable_compression: false,
compression_level: 0,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: Default::default(),
};
let writer = V2WALWriter::create(config.clone())?;
for i in 0..10 {
writer.write_record(V2WALRecord::NodeInsert {
node_id: 1000 + i,
slot_offset: 4096 + (i * 128) as u64,
node_data: create_v2_node_record(1000 + i, "test", "node"),
})?;
}
writer.shutdown()?;
let strategy = CheckpointStrategy::SizeThreshold(16 * 1024 * 1024);
assert!(StrategyValidator::validate_strategy(&strategy).is_ok());
let evaluator = StrategyEvaluator::new(config.clone());
let (should_trigger, trigger) = evaluator.should_checkpoint(
&CheckpointStrategy::SizeThreshold(16 * 1024 * 1024),
std::time::SystemTime::now(),
0,
)?;
assert!(
trigger.is_some() || !should_trigger,
"Should have trigger info or not trigger"
);
}
{
let wal_path2 = temp_dir.path().join("v2_strategy_tx.wal");
let checkpoint_path2 = temp_dir.path().join("v2_strategy_tx.ckpt");
let config = V2WALConfig {
graph_path: graph_path.clone(),
wal_path: wal_path2.clone(),
checkpoint_path: checkpoint_path2.clone(),
max_wal_size: 32 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 5000,
group_commit_timeout_ms: 100,
max_group_commit_size: 8,
enable_compression: false,
compression_level: 0,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: Default::default(),
};
let writer = V2WALWriter::create(config.clone())?;
for i in 0..5 {
writer.write_record(V2WALRecord::NodeInsert {
node_id: 2000 + i,
slot_offset: 8192 + (i * 128) as u64,
node_data: create_v2_node_record(2000 + i, "test", "tx"),
})?;
}
writer.shutdown()?;
let strategy = CheckpointStrategy::TransactionCount(5);
assert!(StrategyValidator::validate_strategy(&strategy).is_ok());
let evaluator = StrategyEvaluator::new(config);
let (should_trigger, trigger) =
evaluator.should_checkpoint(&strategy, std::time::SystemTime::now(), 0)?;
assert!(trigger.is_some() || !should_trigger);
}
{
let strategy = CheckpointStrategy::TimeInterval(Duration::from_secs(60));
assert!(StrategyValidator::validate_strategy(&strategy).is_ok());
}
{
let strategy = CheckpointStrategy::Adaptive {
min_interval: Duration::from_secs(60),
max_wal_size: 1024 * 1024,
max_transactions: 100,
};
assert!(StrategyValidator::validate_strategy(&strategy).is_ok());
}
{
let strategy = CheckpointStrategy::default();
match strategy {
CheckpointStrategy::Adaptive {
min_interval,
max_wal_size,
max_transactions,
} => {
assert!(min_interval.as_secs() > 0);
assert!(max_wal_size > 0);
assert!(max_transactions > 0);
}
_ => panic!("Default should be Adaptive"),
}
}
Ok(())
}
#[test]
fn test_v2_wal_crash_recovery_transaction_replay() -> NativeResult<()> {
let temp_dir = TempDir::new()?;
let graph_path = temp_dir.path().join("v2_recovery.v2");
let wal_path = temp_dir.path().join("v2_recovery.wal");
let checkpoint_path = temp_dir.path().join("v2_recovery.ckpt");
let _graph_file = GraphFile::create(&graph_path)?;
let config = V2WALConfig {
graph_path: graph_path.clone(),
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
max_wal_size: 32 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 5000,
group_commit_timeout_ms: 100,
max_group_commit_size: 8,
enable_compression: false,
compression_level: 0,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: Default::default(),
};
let writer = V2WALWriter::create(config.clone())?;
writer.write_record(V2WALRecord::NodeInsert {
node_id: 1001,
slot_offset: 4096,
node_data: create_v2_node_record(1001, "function", "main"),
})?;
writer.write_record(V2WALRecord::NodeInsert {
node_id: 1002,
slot_offset: 8192,
node_data: create_v2_node_record(1002, "function", "helper"),
})?;
writer.shutdown()?;
assert!(
wal_path.exists(),
"WAL file should exist after crash simulation"
);
let wal_metadata = std::fs::metadata(&wal_path)?;
assert!(wal_metadata.len() > 0, "WAL should have data for recovery");
let options = RecoveryOptions {
perform_consistency_checks: false, create_backup: false, ..Default::default()
};
let recovery_engine = V2WALRecoveryEngine::create(config.clone(), graph_path.clone(), options)?;
let progress = recovery_engine.get_progress();
assert_eq!(
progress.state,
sqlitegraph::backend::native::v2::wal::recovery::core::RecoveryState::Idle
);
let state = recovery_engine.get_state();
assert_eq!(
state,
sqlitegraph::backend::native::v2::wal::recovery::core::RecoveryState::Idle
);
let metrics = recovery_engine.get_metrics();
assert_eq!(
metrics.transactions_scanned, 0,
"No transactions scanned yet"
);
assert_eq!(metrics.committed_transactions_replayed, 0);
assert_eq!(metrics.rolled_back_transactions, 0);
Ok(())
}
#[test]
fn test_recovery_multiple_incomplete_transactions() -> NativeResult<()> {
let temp_dir = TempDir::new()?;
let graph_path = temp_dir.path().join("v2_incomplete.v2");
let wal_path = temp_dir.path().join("v2_incomplete.wal");
let checkpoint_path = temp_dir.path().join("v2_incomplete.ckpt");
let _graph_file = GraphFile::create(&graph_path)?;
let config = V2WALConfig {
graph_path: graph_path.clone(),
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
max_wal_size: 32 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 5000,
group_commit_timeout_ms: 100,
max_group_commit_size: 8,
enable_compression: false,
compression_level: 0,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: Default::default(),
};
let writer = V2WALWriter::create(config.clone())?;
writer.write_record(V2WALRecord::NodeInsert {
node_id: 2001,
slot_offset: 4096,
node_data: create_v2_node_record(2001, "committed", "tx1"),
})?;
writer.write_record(V2WALRecord::NodeInsert {
node_id: 2002,
slot_offset: 8192,
node_data: create_v2_node_record(2002, "committed", "tx2"),
})?;
writer.write_record(V2WALRecord::NodeInsert {
node_id: 2999,
slot_offset: 12288,
node_data: create_v2_node_record(2999, "incomplete", "tx3"),
})?;
writer.shutdown()?;
assert!(wal_path.exists(), "WAL file should exist");
let options = RecoveryOptions {
perform_consistency_checks: false,
create_backup: false,
..Default::default()
};
let recovery_engine = V2WALRecoveryEngine::create(config.clone(), graph_path.clone(), options)?;
let state = recovery_engine.get_state();
assert_eq!(
state,
sqlitegraph::backend::native::v2::wal::recovery::core::RecoveryState::Idle
);
let metrics = recovery_engine.get_metrics();
assert_eq!(metrics.transactions_scanned, 0);
let options_with_validation = RecoveryOptions {
perform_consistency_checks: true,
create_backup: true,
fast_recovery: false,
..Default::default()
};
assert!(options_with_validation.perform_consistency_checks);
assert!(options_with_validation.create_backup);
assert!(!options_with_validation.fast_recovery);
Ok(())
}
#[test]
fn test_checkpoint_recovery_integration_v2_graph() -> NativeResult<()> {
let temp_dir = TempDir::new()?;
let graph_path = temp_dir.path().join("v2_integration.v2");
let wal_path = temp_dir.path().join("v2_integration.wal");
let checkpoint_path = temp_dir.path().join("v2_integration.ckpt");
let _graph_file = GraphFile::create(&graph_path)?;
let config = V2WALConfig {
graph_path: graph_path.clone(),
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
max_wal_size: 32 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 5000,
group_commit_timeout_ms: 100,
max_group_commit_size: 8,
enable_compression: false,
compression_level: 0,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: Default::default(),
};
let writer = V2WALWriter::create(config.clone())?;
for i in 0..5 {
writer.write_record(V2WALRecord::NodeInsert {
node_id: 3000 + i,
slot_offset: 4096 + (i * 128) as u64,
node_data: create_v2_node_record(3000 + i, "pre", "checkpoint"),
})?;
}
writer.shutdown()?;
let manager = V2WALCheckpointManager::create(
config.clone(),
CheckpointStrategy::SizeThreshold(16 * 1024 * 1024),
)?;
assert_eq!(
manager.get_state(),
sqlitegraph::backend::native::v2::wal::checkpoint::core::CheckpointState::Idle
);
assert!(!manager.is_checkpoint_in_progress());
let writer2 = V2WALWriter::create(config.clone())?;
for i in 5..10 {
writer2.write_record(V2WALRecord::NodeInsert {
node_id: 3000 + i,
slot_offset: 4096 + (i * 128) as u64,
node_data: create_v2_node_record(3000 + i, "post", "checkpoint"),
})?;
}
writer2.shutdown()?;
let wal_size = std::fs::metadata(&wal_path)?;
assert!(wal_size.len() > 0, "WAL should have data");
let lsn = manager.get_last_checkpointed_lsn();
assert_eq!(lsn, 0, "LSN should be trackable");
assert!(graph_path.exists(), "Graph file should exist");
assert!(wal_path.exists(), "WAL file should exist for recovery");
Ok(())
}
#[test]
fn test_recovery_validation_consistency_checking() -> NativeResult<()> {
let temp_dir = TempDir::new()?;
let graph_path = temp_dir.path().join("v2_validation.v2");
let wal_path = temp_dir.path().join("v2_validation.wal");
let checkpoint_path = temp_dir.path().join("v2_validation.ckpt");
let _graph_file = GraphFile::create(&graph_path)?;
let config = V2WALConfig {
graph_path: graph_path.clone(),
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
max_wal_size: 32 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 5000,
group_commit_timeout_ms: 100,
max_group_commit_size: 8,
enable_compression: false,
compression_level: 0,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: Default::default(),
};
let writer = V2WALWriter::create(config.clone())?;
writer.write_record(V2WALRecord::NodeInsert {
node_id: 5000,
slot_offset: 4096,
node_data: create_v2_node_record(5000, "validation", "test"),
})?;
writer.shutdown()?;
let options = RecoveryOptions {
perform_consistency_checks: true,
create_backup: false,
..Default::default()
};
let recovery_engine = V2WALRecoveryEngine::create(config, graph_path.clone(), options)?;
let metrics = recovery_engine.get_metrics();
assert_eq!(metrics.transactions_scanned, 0);
let state = recovery_engine.get_state();
assert_eq!(
state,
sqlitegraph::backend::native::v2::wal::recovery::core::RecoveryState::Idle
);
Ok(())
}
fn create_v2_node_record(node_id: i64, kind: &str, name: &str) -> Vec<u8> {
format!("V2_NODE_{}_{}_{}", node_id, kind, name).into_bytes()
}