use sqlitegraph::backend::native::v2::edge_cluster::{CompactEdgeRecord, Direction};
use sqlitegraph::backend::native::v2::storage::JsonLimits;
use sqlitegraph::backend::native::v2::wal::{
V2WALConfig, V2WALRecord, V2WALRecordType, V2WALWriter,
};
use sqlitegraph::backend::native::{NativeBackendError, NativeResult};
use std::path::PathBuf;
use tempfile::tempdir;
#[test]
fn test_v2_wal_writer_creation_and_basic_writes() -> NativeResult<()> {
let temp_dir = tempdir()?;
let wal_path = temp_dir.path().join("v2_graph_wal.wal");
let config = V2WALConfig {
graph_path: PathBuf::from("v2_graph.db"),
wal_path: wal_path.clone(),
checkpoint_path: temp_dir.path().join("checkpoint.tracker"),
max_wal_size: 32 * 1024 * 1024, buffer_size: 1024 * 1024, checkpoint_interval: 1000,
group_commit_timeout_ms: 100,
max_group_commit_size: 8, enable_compression: false,
compression_level: 3,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: JsonLimits::default(),
};
let writer = V2WALWriter::create(config)?;
assert!(wal_path.exists(), "V2 WAL file should be created");
let node_record = V2WALRecord::NodeInsert {
node_id: 1001,
slot_offset: 4096,
node_data: vec![
0x01, 0x00, 0x02, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, ],
};
let lsn = writer.write_record(node_record)?;
assert!(lsn > 0, "LSN should be positive after writing record");
let edge_record = CompactEdgeRecord::new(
1002 as i64, 0, vec![
0x01, 0x04, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ],
);
let edge_cluster_record = V2WALRecord::EdgeInsert {
cluster_key: (1001, Direction::Outgoing), edge_record: edge_record,
insertion_point: 0, };
let edge_lsn = writer.write_record(edge_cluster_record)?;
assert!(edge_lsn > lsn, "Edge LSN should be greater than node LSN");
writer.shutdown()?;
Ok(())
}
#[test]
fn test_cluster_affinity_grouping_v2_operations() -> NativeResult<()> {
let temp_dir = tempdir()?;
let config = V2WALConfig {
graph_path: PathBuf::from("v2_graph.db"),
wal_path: temp_dir.path().join("cluster_affinity_wal.wal"),
checkpoint_path: temp_dir.path().join("checkpoint.tracker"),
max_wal_size: 16 * 1024 * 1024,
buffer_size: 512 * 1024,
checkpoint_interval: 1000,
group_commit_timeout_ms: 50,
max_group_commit_size: 4,
enable_compression: false,
compression_level: 3,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: JsonLimits::default(),
};
let writer = V2WALWriter::create(config)?;
let cluster_operations = vec![
V2WALRecord::NodeInsert {
node_id: 1001,
slot_offset: 8192,
node_data: create_v2_node_record(1001, "function", "malloc"),
},
V2WALRecord::EdgeInsert {
cluster_key: (1001, Direction::Outgoing),
edge_record: CompactEdgeRecord::new(1002 as i64, 0, create_v2_edge_data(1.0, Some(0))),
insertion_point: 0,
},
V2WALRecord::EdgeInsert {
cluster_key: (1002, Direction::Outgoing),
edge_record: CompactEdgeRecord::new(1001 as i64, 1, create_v2_edge_data(2.0, Some(1))),
insertion_point: 0,
},
V2WALRecord::NodeInsert {
node_id: 2001,
slot_offset: 12288,
node_data: create_v2_node_record(2001, "variable", "buffer"),
},
V2WALRecord::EdgeInsert {
cluster_key: (2001, Direction::Outgoing),
edge_record: CompactEdgeRecord::new(1001 as i64, 0, create_v2_edge_data(1.0, Some(0))),
insertion_point: 0,
},
V2WALRecord::NodeInsert {
node_id: 3001,
slot_offset: 16384,
node_data: create_v2_node_record(3001, "function", "free"),
},
V2WALRecord::EdgeInsert {
cluster_key: (1002, Direction::Outgoing), edge_record: CompactEdgeRecord::new(1001 as i64, 2, create_v2_edge_data(1.0, Some(2))),
insertion_point: 0,
},
];
let mut cluster_1001_count = 0;
let mut cluster_2001_count = 0;
let mut cluster_3001_count = 0;
for operation in cluster_operations {
let lsn = writer.write_record(operation.clone())?;
assert!(lsn > 0, "LSN should be positive for V2 operation");
if let V2WALRecord::NodeInsert { node_id, .. } = operation.clone() {
match node_id {
1001..=1999 => cluster_1001_count += 1,
2001..=2999 => cluster_2001_count += 1,
3001..=3999 => cluster_3001_count += 1,
_ => {}
}
}
}
assert!(
cluster_1001_count >= 2,
"Cluster 1001 should have multiple operations"
);
assert!(
cluster_2001_count >= 1,
"Cluster 2001 should have operations"
);
assert!(
cluster_3001_count >= 1,
"Cluster 3001 should have operations"
);
writer.shutdown()?;
Ok(())
}
#[test]
fn test_v2_graph_batch_write_operations() -> NativeResult<()> {
let temp_dir = tempdir()?;
let config = V2WALConfig {
graph_path: PathBuf::from("v2_graph.db"),
wal_path: temp_dir.path().join("v2_batch_wal.wal"),
checkpoint_path: temp_dir.path().join("checkpoint.tracker"),
max_wal_size: 64 * 1024 * 1024,
buffer_size: 2 * 1024 * 1024,
checkpoint_interval: 1000,
group_commit_timeout_ms: 200,
max_group_commit_size: 50, enable_compression: true,
compression_level: 3,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: JsonLimits::default(),
};
let writer = V2WALWriter::create(config)?;
let batch_size = 100;
let mut batch_records = Vec::with_capacity(batch_size);
batch_records.push(V2WALRecord::TransactionBegin {
tx_id: 10001,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
});
for i in 0..20 {
let function_id = 5000 + i;
batch_records.push(V2WALRecord::NodeInsert {
node_id: function_id,
slot_offset: (i * 4096) as u64,
node_data: create_v2_node_record(function_id, "function", &format!("func_{}", i)),
});
}
for i in 0..20 {
if i < 19 {
batch_records.push(V2WALRecord::EdgeInsert {
cluster_key: ((5000 + i) as i64, Direction::Outgoing), edge_record: CompactEdgeRecord::new(
(5000 + i + 1) as i64,
0,
create_v2_edge_data(1.0, Some(i as u64)),
),
insertion_point: 0,
});
}
}
for i in 0..10 {
let var_id = 7000 + i;
batch_records.push(V2WALRecord::NodeInsert {
node_id: var_id,
slot_offset: ((20 + i) * 4096) as u64,
node_data: create_v2_node_record(var_id, "variable", &format!("var_{}", i)),
});
let func_id = 5000 + (i % 20);
batch_records.push(V2WALRecord::EdgeInsert {
cluster_key: (func_id as i64, Direction::Outgoing), edge_record: CompactEdgeRecord::new(
(func_id + 1) as i64,
1,
create_v2_edge_data(1.0, Some((i * 2) as u64)),
),
insertion_point: 0,
});
}
batch_records.push(V2WALRecord::TransactionCommit {
tx_id: 10001,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
});
let mut lsns = Vec::new();
for record in batch_records {
let lsn = writer.write_record(record)?;
lsns.push(lsn);
}
assert_eq!(
lsns.len(),
1 + 20 + 19 + 10 + 10 + 1,
"Should have written all records"
);
for i in 1..lsns.len() {
assert!(lsns[i] > lsns[i - 1], "LSNs should be strictly increasing");
}
writer.shutdown()?;
Ok(())
}
#[test]
fn test_v2_free_space_and_string_table_operations() -> NativeResult<()> {
let temp_dir = tempdir()?;
let config = V2WALConfig {
graph_path: PathBuf::from("v2_graph.db"),
wal_path: temp_dir.path().join("v2_metadata_wal.wal"),
checkpoint_path: temp_dir.path().join("checkpoint.tracker"),
max_wal_size: 32 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 1000,
group_commit_timeout_ms: 100,
max_group_commit_size: 8,
enable_compression: false,
compression_level: 3,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: JsonLimits::default(),
};
let writer = V2WALWriter::create(config)?;
let free_space_update = V2WALRecord::FreeSpaceAllocate {
block_offset: 4096,
block_size: 20480,
block_type: 1,
};
let lsn1 = writer.write_record(free_space_update)?;
assert!(lsn1 > 0);
let string_table_update = V2WALRecord::StringInsert {
string_id: 1001,
string_value: "function_main".to_string(),
};
let lsn2 = writer.write_record(string_table_update)?;
assert!(lsn2 > lsn1);
let cluster_create = V2WALRecord::ClusterCreate {
node_id: 3001 as i64,
direction: Direction::Outgoing,
cluster_offset: 4096,
cluster_size: 256,
edge_data: vec![
0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ],
};
let lsn3 = writer.write_record(cluster_create)?;
assert!(lsn3 > lsn2);
writer.shutdown()?;
Ok(())
}
#[test]
fn test_v2_wal_write_performance() -> NativeResult<()> {
let temp_dir = tempdir()?;
let config = V2WALConfig {
graph_path: PathBuf::from("v2_graph.db"),
wal_path: temp_dir.path().join("perf_v2_wal.wal"),
checkpoint_path: temp_dir.path().join("checkpoint.tracker"),
max_wal_size: 256 * 1024 * 1024, buffer_size: 8 * 1024 * 1024, checkpoint_interval: 1000,
group_commit_timeout_ms: 1000, max_group_commit_size: 100, enable_compression: true,
compression_level: 3,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: JsonLimits::default(),
};
let writer = V2WALWriter::create(config)?;
let start_time = std::time::Instant::now();
let target_operations = 10_000;
for i in 0..target_operations {
let record = if i % 10 == 0 {
V2WALRecord::NodeInsert {
node_id: 10000 + i,
slot_offset: (i * 512) as u64,
node_data: create_v2_node_record(
10000 + i,
"function",
&format!("perf_func_{}", i),
),
}
} else if i % 10 < 7 {
let cluster_key = 10000 + ((i / 10) * 10);
V2WALRecord::EdgeInsert {
cluster_key: (cluster_key as i64, Direction::Outgoing),
edge_record: CompactEdgeRecord::new(
(cluster_key + (i % 10) + 1) as i64,
(i % 10) as u16,
create_v2_edge_data((i % 10) as f64, Some((i / 3) as u64)),
),
insertion_point: 0,
}
} else if i % 10 == 8 {
V2WALRecord::StringInsert {
string_id: (30000 + i) as u32,
string_value: format!("perf_string_{}", i),
}
} else {
V2WALRecord::FreeSpaceAllocate {
block_offset: (i * 1024) as u64,
block_size: ((i % 10) + 1) as u32 * 64,
block_type: (i % 256) as u8,
}
};
writer.write_record(record)?;
}
let elapsed = start_time.elapsed();
let ops_per_second = target_operations as f64 / elapsed.as_secs_f64();
assert!(
ops_per_second >= 5_000.0,
"V2 WAL should handle at least 5K ops/sec: {:.0} ops/sec",
ops_per_second
);
writer.shutdown()?;
Ok(())
}
#[test]
fn test_v2_wal_write_buffer_management() -> NativeResult<()> {
let temp_dir = tempdir()?;
let config = V2WALConfig {
graph_path: PathBuf::from("v2_graph.db"),
wal_path: temp_dir.path().join("buffer_test_wal.wal"),
checkpoint_path: temp_dir.path().join("checkpoint.tracker"),
max_wal_size: 32 * 1024 * 1024,
buffer_size: 64 * 1024, checkpoint_interval: 1000,
group_commit_timeout_ms: 10, max_group_commit_size: 4,
enable_compression: false,
compression_level: 3,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: JsonLimits::default(),
};
let writer = V2WALWriter::create(config)?;
let large_data = vec![0x42; 4096]; let record_count = 100;
let mut lsns = Vec::new();
for i in 0..record_count {
let record = V2WALRecord::NodeInsert {
node_id: 9000 + i,
slot_offset: (i * 8192) as u64,
node_data: large_data.clone(),
};
let lsn = writer.write_record(record)?;
lsns.push(lsn);
if i % 15 == 0 {
writer.flush_buffer()?;
}
}
assert_eq!(
lsns.len(),
record_count as usize,
"All records should be written"
);
for (i, &lsn) in lsns.iter().enumerate() {
assert!(lsn > 0, "LSN {} should be positive", i);
if i > 0 {
assert!(lsn > lsns[i - 1], "LSNs should be increasing");
}
}
writer.shutdown()?;
Ok(())
}
#[test]
fn test_v2_wal_write_error_handling() -> NativeResult<()> {
let temp_dir = tempdir()?;
let config = V2WALConfig {
graph_path: PathBuf::from("v2_graph.db"),
wal_path: temp_dir.path().join("error_test_wal.wal"),
checkpoint_path: temp_dir.path().join("checkpoint.tracker"),
max_wal_size: 1024 * 1024, buffer_size: 64 * 1024,
checkpoint_interval: 1000,
group_commit_timeout_ms: 50,
max_group_commit_size: 2,
enable_compression: false,
compression_level: 3,
auto_checkpoint: false,
background_checkpoint_thread: false,
background_checkpoint_interval_secs: 60,
json_limits: JsonLimits::default(),
};
let writer = V2WALWriter::create(config)?;
for i in 0..10 {
let record = V2WALRecord::NodeInsert {
node_id: 8000 + i,
slot_offset: (i * 1024) as u64,
node_data: create_v2_node_record(8000 + i, "test", &format!("node_{}", i)),
};
writer.write_record(record)?;
}
let oversized_data = vec![0xFF; 10 * 1024 * 1024]; let oversized_record = V2WALRecord::NodeInsert {
node_id: 9999,
slot_offset: 0,
node_data: oversized_data,
};
let result = writer.write_record(oversized_record);
if let Err(e) = result {
match e {
NativeBackendError::Io { .. } | NativeBackendError::InvalidConfiguration { .. } => {
}
_ => {
panic!("Unexpected error type: {:?}", e);
}
}
}
let recovery_record = V2WALRecord::NodeInsert {
node_id: 8000,
slot_offset: 10240,
node_data: create_v2_node_record(8000, "recovery", "test_recovery"),
};
let result = writer.write_record(recovery_record);
assert!(result.is_ok(), "Writer should recover and allow new writes");
writer.shutdown()?;
Ok(())
}
fn create_v2_node_record(node_id: i64, node_type: &str, name: &str) -> Vec<u8> {
let mut data = Vec::new();
data.extend_from_slice(&[0x02, 0x00]); data.extend_from_slice(&[0x01]);
data.extend_from_slice(&(node_type.len() as u16).to_le_bytes());
data.extend_from_slice(node_type.as_bytes());
data.extend_from_slice(&(name.len() as u16).to_le_bytes());
data.extend_from_slice(name.as_bytes());
data.extend_from_slice(&node_id.to_le_bytes());
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]);
data.resize(128, 0);
data
}
fn create_v2_edge_data(weight: f64, timestamp: Option<u64>) -> Vec<u8> {
let mut data = Vec::new();
data.extend_from_slice(&[0x01, 0x00]); data.extend_from_slice(&[0x05]);
data.extend_from_slice(&weight.to_le_bytes());
if let Some(ts) = timestamp {
data.extend_from_slice(&ts.to_le_bytes());
} else {
data.extend_from_slice(&[0u8; 8]);
}
data.resize(64, 0);
data
}