use crate::backend::native::NativeResult;
use crate::backend::native::graph_file::GraphFile;
use crate::backend::native::v2::wal::{
BulkIngestConfig, BulkIngestExt, V2WALConfig, V2WALManager, V2WALRecord,
};
use tempfile::tempdir;
#[test]
fn test_bulk_ingest_batches_flushes() -> NativeResult<()> {
let temp_dir = tempdir()?;
let baseline_config = V2WALConfig {
wal_path: temp_dir.path().join("baseline.wal"),
checkpoint_path: temp_dir.path().join("baseline.checkpoint"),
group_commit_timeout_ms: 0, max_wal_size: 1024 * 1024, checkpoint_interval: 1000,
..Default::default()
};
let baseline_graph_path = temp_dir.path().join("baseline.v2");
let _baseline_graph_file = GraphFile::create(&baseline_graph_path)?;
let baseline_manager = V2WALManager::create(baseline_config)?;
for i in 0..1000 {
let record = V2WALRecord::NodeInsert {
node_id: i,
slot_offset: 0,
node_data: vec![1, 2, 3], };
baseline_manager.write_record(record)?;
}
let baseline_metrics = baseline_manager.get_metrics();
println!(
"Baseline metrics - Total records: {}, Group commits: {}",
baseline_metrics.total_records_written, baseline_metrics.group_commit_batches
);
let bulk_config = V2WALConfig {
wal_path: temp_dir.path().join("bulk.wal"),
checkpoint_path: temp_dir.path().join("bulk.checkpoint"),
group_commit_timeout_ms: 1000, max_wal_size: 1024 * 1024,
checkpoint_interval: 1000,
..Default::default()
};
let bulk_graph_path = temp_dir.path().join("bulk.v2");
let _bulk_graph_file = GraphFile::create(&bulk_graph_path)?;
let bulk_manager = V2WALManager::create(bulk_config)?;
let bulk_guard = bulk_manager.begin_bulk_ingest(BulkIngestConfig::default())?;
for i in 0..1000 {
let record = V2WALRecord::NodeInsert {
node_id: i + 1000, slot_offset: 0,
node_data: vec![1, 2, 3],
};
bulk_manager.write_record(record)?;
}
let bulk_metrics = bulk_manager.get_metrics();
println!(
"Bulk metrics - Total records: {}, Group commits: {}",
bulk_metrics.total_records_written, bulk_metrics.group_commit_batches
);
drop(bulk_guard);
assert!(
bulk_metrics.group_commit_batches <= baseline_metrics.group_commit_batches,
"Bulk ingest should reduce or maintain group commit operations"
);
assert!(baseline_metrics.total_records_written >= 1000);
assert!(bulk_metrics.total_records_written >= 1000);
println!(
"Records written - Baseline: {}, Bulk: {}",
baseline_metrics.total_records_written, bulk_metrics.total_records_written
);
Ok(())
}
#[test]
fn test_bulk_ingest_recovery_consistency() -> NativeResult<()> {
let temp_dir = tempdir()?;
let wal_path = temp_dir.path().join("recovery.wal");
let checkpoint_path = temp_dir.path().join("recovery.checkpoint");
let graph_path = temp_dir.path().join("recovery.v2");
let _graph_file = GraphFile::create(&graph_path)?;
let config = V2WALConfig {
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
group_commit_timeout_ms: 1000, max_wal_size: 1024 * 1024,
checkpoint_interval: 100,
..Default::default()
};
let manager = V2WALManager::create(config)?;
let bulk_guard = manager.begin_bulk_ingest(BulkIngestConfig::default())?;
for i in 0..500 {
let node_record = V2WALRecord::NodeInsert {
node_id: i,
slot_offset: 0,
node_data: vec![1, 2, 3],
};
manager.write_record(node_record)?;
let edge_record = V2WALRecord::ClusterCreate {
node_id: i,
direction: crate::backend::native::v2::Direction::Outgoing,
cluster_offset: 100 + i as u64,
cluster_size: 10,
edge_data: vec![1, 2, 3],
};
manager.write_record(edge_record)?;
}
let write_metrics = manager.get_metrics();
assert_eq!(write_metrics.total_records_written, 1000);
drop(bulk_guard);
drop(manager);
let recovery_manager = V2WALManager::create(V2WALConfig {
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
group_commit_timeout_ms: 0,
max_wal_size: 1024 * 1024,
checkpoint_interval: 1000,
..Default::default()
})?;
let recovery_metrics = recovery_manager.get_metrics();
assert!(wal_path.exists(), "WAL file should persist after recovery");
assert!(
checkpoint_path.exists(),
"Checkpoint file should exist after recovery"
);
println!(
"Recovery test completed - WAL file: {:?}, Checkpoint: {:?}",
wal_path.exists(),
checkpoint_path.exists()
);
Ok(())
}
#[test]
fn test_bulk_ingest_rollback() -> NativeResult<()> {
let temp_dir = tempdir()?;
let wal_path = temp_dir.path().join("rollback.wal");
let checkpoint_path = temp_dir.path().join("rollback.checkpoint");
let graph_path = temp_dir.path().join("rollback.v2");
let _graph_file = GraphFile::create(&graph_path)?;
let config = V2WALConfig {
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
group_commit_timeout_ms: 1000,
max_wal_size: 1024 * 1024,
checkpoint_interval: 1000,
..Default::default()
};
let manager = V2WALManager::create(config)?;
let initial_metrics = manager.get_metrics();
let tx_id = manager
.begin_transaction(crate::backend::native::v2::wal::IsolationLevel::ReadCommitted)?;
let bulk_guard = manager.begin_bulk_ingest(BulkIngestConfig::default())?;
for i in 0..100 {
let record = V2WALRecord::NodeInsert {
node_id: i,
slot_offset: 0,
node_data: vec![1, 2, 3],
};
manager.write_transaction_record(tx_id, record)?;
}
let write_metrics = manager.get_metrics();
assert!(write_metrics.total_records_written > initial_metrics.total_records_written);
manager.rollback_transaction(tx_id)?;
drop(bulk_guard);
drop(manager);
assert!(wal_path.exists(), "WAL file should exist after rollback");
let reopened_manager = V2WALManager::create(V2WALConfig {
wal_path: wal_path.clone(),
checkpoint_path: checkpoint_path.clone(),
group_commit_timeout_ms: 0,
max_wal_size: 1024 * 1024,
checkpoint_interval: 1000,
..Default::default()
})?;
let reopened_metrics = reopened_manager.get_metrics();
println!("Rollback test completed - WAL persisted with rollback records");
Ok(())
}
fn create_test_node_record(node_id: i64) -> V2WALRecord {
V2WALRecord::NodeInsert {
node_id,
slot_offset: 0,
node_data: vec![1, 2, 3],
}
}
fn create_test_cluster_record(node_id: i64) -> V2WALRecord {
V2WALRecord::ClusterCreate {
node_id,
direction: crate::backend::native::v2::Direction::Outgoing,
cluster_offset: 100 + node_id as u64,
cluster_size: 10,
edge_data: vec![1, 2, 3],
}
}