use sqlitegraph::backend::native::v2::edge_cluster::{CompactEdgeRecord, Direction};
use sqlitegraph::backend::native::v2::wal::record::V2WALSerializer;
use sqlitegraph::backend::native::v2::wal::{V2WALRecord, V2WALRecordType, WALSerializationError};
use sqlitegraph::backend::native::{NativeBackendError, NativeResult};
use std::collections::HashMap;
use tempfile::tempdir;
#[test]
fn test_all_record_types_creation() -> NativeResult<()> {
let node_insert = V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1, 2, 3, 4, 5],
};
assert_eq!(node_insert.record_type(), V2WALRecordType::NodeInsert);
assert!(node_insert.modifies_data());
assert!(!node_insert.is_transaction_control());
assert_eq!(node_insert.cluster_key(), Some(42));
let node_update = V2WALRecord::NodeUpdate {
node_id: 100,
slot_offset: 2048,
old_data: vec![1, 2, 3],
new_data: vec![4, 5, 6],
};
assert_eq!(node_update.record_type(), V2WALRecordType::NodeUpdate);
assert!(node_update.modifies_data());
assert!(!node_update.is_transaction_control());
assert_eq!(node_update.cluster_key(), Some(100));
let edge_insert = V2WALRecord::EdgeInsert {
cluster_key: (100, Direction::Outgoing), edge_record: CompactEdgeRecord::new(
200, 0, vec![7, 8, 9], ),
insertion_point: 0, };
assert_eq!(edge_insert.record_type(), V2WALRecordType::EdgeInsert);
assert!(edge_insert.modifies_data());
assert!(!edge_insert.is_transaction_control());
assert_eq!(edge_insert.cluster_key(), Some(100));
let cluster_create = V2WALRecord::ClusterCreate {
node_id: 5555, direction: Direction::Outgoing, cluster_offset: 0, cluster_size: 1000, edge_data: vec![10, 20, 30], };
assert_eq!(cluster_create.record_type(), V2WALRecordType::ClusterCreate);
assert!(cluster_create.modifies_data());
assert!(!cluster_create.is_transaction_control());
assert_eq!(cluster_create.cluster_key(), Some(5555));
let tx_begin = V2WALRecord::TransactionBegin {
tx_id: 123456, timestamp: 1640995200000, };
assert_eq!(tx_begin.record_type(), V2WALRecordType::TransactionBegin);
assert!(!tx_begin.modifies_data());
assert!(tx_begin.is_transaction_control());
assert_eq!(tx_begin.cluster_key(), None);
let tx_commit = V2WALRecord::TransactionCommit {
tx_id: 123456, timestamp: 1640995201000, };
assert_eq!(tx_commit.record_type(), V2WALRecordType::TransactionCommit);
assert!(!tx_commit.modifies_data());
assert!(tx_commit.is_transaction_control());
assert_eq!(tx_commit.cluster_key(), None);
let tx_rollback = V2WALRecord::TransactionRollback {
tx_id: 123456, timestamp: 1640995200500, };
assert_eq!(
tx_rollback.record_type(),
V2WALRecordType::TransactionRollback
);
assert!(!tx_rollback.modifies_data());
assert!(tx_rollback.is_transaction_control());
assert_eq!(tx_rollback.cluster_key(), None);
Ok(())
}
#[test]
fn test_record_serialization_round_trip() -> NativeResult<()> {
let test_records = vec![
V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1, 2, 3, 4, 5],
},
V2WALRecord::NodeUpdate {
node_id: 100,
slot_offset: 2048,
old_data: vec![1, 2, 3],
new_data: vec![4, 5, 6],
},
V2WALRecord::EdgeInsert {
cluster_key: (100, Direction::Outgoing), edge_record: CompactEdgeRecord::new(
200, 0, vec![7, 8, 9], ),
insertion_point: 0, },
V2WALRecord::TransactionBegin {
tx_id: 123456, timestamp: 1640995200000, },
V2WALRecord::ClusterCreate {
node_id: 5555, direction: Direction::Outgoing, cluster_offset: 0, cluster_size: 1000, edge_data: vec![10, 20, 30], },
];
for original_record in test_records {
let serialized = V2WALSerializer::serialize(&original_record)?;
assert!(
!serialized.is_empty(),
"Serialized record should not be empty"
);
let deserialized_record = V2WALSerializer::deserialize(&serialized)?;
assert_eq!(
original_record.record_type(),
deserialized_record.record_type(),
"Record types should match"
);
assert_eq!(
original_record.modifies_data(),
deserialized_record.modifies_data(),
"Data modification flag should match"
);
assert_eq!(
original_record.is_transaction_control(),
deserialized_record.is_transaction_control(),
"Transaction control flag should match"
);
assert_eq!(
original_record.cluster_key(),
deserialized_record.cluster_key(),
"Cluster key should match"
);
match (&original_record, &deserialized_record) {
(
V2WALRecord::NodeInsert {
node_data: orig_data,
..
},
V2WALRecord::NodeInsert {
node_data: de_data, ..
},
) => {
assert_eq!(orig_data, de_data, "NodeInsert data should match");
}
(
V2WALRecord::NodeUpdate {
old_data: orig_old,
new_data: orig_new,
..
},
V2WALRecord::NodeUpdate {
old_data: de_old,
new_data: de_new,
..
},
) => {
assert_eq!(orig_old, de_old, "NodeUpdate old data should match");
assert_eq!(orig_new, de_new, "NodeUpdate new data should match");
}
(
V2WALRecord::EdgeInsert {
edge_record: orig_edge_record,
..
},
V2WALRecord::EdgeInsert {
edge_record: de_edge_record,
..
},
) => {
assert_eq!(
orig_edge_record.neighbor_id, de_edge_record.neighbor_id,
"EdgeInsert neighbor_id should match"
);
assert_eq!(
orig_edge_record.edge_type_offset, de_edge_record.edge_type_offset,
"EdgeInsert edge_type_offset should match"
);
assert_eq!(
orig_edge_record.edge_data, de_edge_record.edge_data,
"EdgeInsert edge_data should match"
);
}
_ => {} }
}
Ok(())
}
#[test]
fn test_record_size_estimation() -> NativeResult<()> {
let test_cases = vec![
(
V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1; 100], },
100,
),
(
V2WALRecord::NodeUpdate {
node_id: 100,
slot_offset: 2048,
old_data: vec![2; 50], new_data: vec![3; 75], },
125,
),
(
V2WALRecord::EdgeInsert {
cluster_key: (100, Direction::Outgoing), edge_record: CompactEdgeRecord::new(
200, 0, vec![4; 200], ),
insertion_point: 0,
},
215,
),
];
for (record, expected_min_size) in test_cases {
let actual_size = V2WALSerializer::serialize(&record)?.len();
assert!(
actual_size >= expected_min_size,
"Actual size should be at least {} bytes, got {}",
expected_min_size,
actual_size
);
}
Ok(())
}
#[test]
fn test_record_validation_errors() -> NativeResult<()> {
let result = V2WALSerializer::deserialize(&[]);
assert!(result.is_err(), "Deserializing empty data should fail");
let truncated_data = vec![1]; let result = V2WALSerializer::deserialize(&truncated_data);
assert!(result.is_err(), "Deserializing truncated data should fail");
let large_record = V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 0,
node_data: vec![0; 10 * 1024 * 1024], };
let serialized = V2WALSerializer::serialize(&large_record)?;
assert!(
serialized.len() > 10 * 1024 * 1024,
"Large record should have large serialized size: {} bytes",
serialized.len()
);
Ok(())
}
#[test]
fn test_cluster_affinity_grouping() -> NativeResult<()> {
let records = vec![
V2WALRecord::NodeInsert {
node_id: 100,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
V2WALRecord::NodeInsert {
node_id: 100,
slot_offset: 2048,
node_data: vec![4, 5, 6],
},
V2WALRecord::EdgeInsert {
cluster_key: (10, Direction::Outgoing), edge_record: CompactEdgeRecord::new(
20, 0, vec![7, 8, 9], ),
insertion_point: 0,
},
V2WALRecord::NodeInsert {
node_id: 200,
slot_offset: 3072,
node_data: vec![10, 11, 12],
},
V2WALRecord::TransactionBegin {
tx_id: 12345,
timestamp: 1640995200000,
},
V2WALRecord::EdgeInsert {
cluster_key: (30, Direction::Outgoing), edge_record: CompactEdgeRecord::new(
40, 0, vec![13, 14, 15], ),
insertion_point: 0,
},
];
let mut cluster_groups: HashMap<i64, Vec<&V2WALRecord>> = HashMap::new();
let mut transaction_records = Vec::new();
for record in &records {
if let Some(cluster_key) = record.cluster_key() {
cluster_groups.entry(cluster_key).or_default().push(record);
} else {
transaction_records.push(record);
}
}
assert_eq!(
cluster_groups.len(),
2,
"Should have 2 different cluster groups"
);
assert_eq!(
cluster_groups.get(&100).unwrap().len(),
3,
"Cluster 100 should have 3 records"
);
assert_eq!(
cluster_groups.get(&200).unwrap().len(),
2,
"Cluster 200 should have 2 records"
);
assert_eq!(
transaction_records.len(),
1,
"Should have 1 transaction record"
);
let cluster_100_records = cluster_groups.get(&100).unwrap();
for record in cluster_100_records {
assert_eq!(
record.cluster_key(),
Some(100),
"All records in cluster 100 should have cluster_key 100"
);
}
let cluster_200_records = cluster_groups.get(&200).unwrap();
for record in cluster_200_records {
assert_eq!(
record.cluster_key(),
Some(200),
"All records in cluster 200 should have cluster_key 200"
);
}
Ok(())
}
#[test]
fn test_record_sequence_validation() -> NativeResult<()> {
let valid_sequence = vec![
V2WALRecord::TransactionBegin {
tx_id: 123,
timestamp: 1640995200000,
},
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
V2WALRecord::EdgeInsert {
cluster_key: (1, Direction::Outgoing), edge_record: CompactEdgeRecord::new(
2, 0, vec![4, 5, 6], ),
insertion_point: 0,
},
V2WALRecord::TransactionCommit {
tx_id: 123,
timestamp: 1640995201000,
},
];
let invalid_sequence_1 = vec![
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
V2WALRecord::TransactionCommit {
tx_id: 123,
timestamp: 1640995201000,
},
];
let invalid_sequence_2 = vec![
V2WALRecord::TransactionBegin {
tx_id: 123,
timestamp: 1640995200000,
},
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
];
Ok(())
}
#[test]
fn test_record_type_properties() -> NativeResult<()> {
let all_record_types = vec![
V2WALRecordType::NodeInsert,
V2WALRecordType::NodeUpdate,
V2WALRecordType::NodeDelete,
V2WALRecordType::ClusterCreate,
V2WALRecordType::EdgeInsert,
V2WALRecordType::EdgeUpdate,
V2WALRecordType::EdgeDelete,
V2WALRecordType::StringInsert,
V2WALRecordType::FreeSpaceAllocate,
V2WALRecordType::FreeSpaceDeallocate,
V2WALRecordType::TransactionBegin,
V2WALRecordType::TransactionCommit,
V2WALRecordType::TransactionRollback,
V2WALRecordType::HeaderUpdate,
V2WALRecordType::SegmentEnd,
];
for record_type in all_record_types {
let record_type_byte: u8 = record_type as u8;
let converted_back = V2WALRecordType::try_from(record_type_byte)?;
assert_eq!(
record_type, converted_back,
"Record type should survive byte conversion round-trip"
);
let type_string = format!("{:?}", record_type);
assert!(
!type_string.is_empty(),
"Record type should have string representation"
);
}
Ok(())
}
#[test]
fn test_serialization_performance() -> NativeResult<()> {
let start_time = std::time::Instant::now();
let record_count = 10_000;
let mut records = Vec::with_capacity(record_count);
for i in 0..record_count {
let record = if i % 5 == 0 {
V2WALRecord::NodeInsert {
node_id: i as i64,
slot_offset: (i * 1024) as u64,
node_data: vec![i as u8; 64],
}
} else if i % 5 == 1 {
V2WALRecord::EdgeInsert {
cluster_key: ((i * 2) as i64, Direction::Outgoing), edge_record: CompactEdgeRecord::new(
(i * 3) as i64, 0, vec![i as u8; 32], ),
insertion_point: 0,
}
} else if i % 5 == 2 {
V2WALRecord::TransactionBegin {
tx_id: i as u64,
timestamp: 1640995200000 + (i as u64 * 1000),
}
} else if i % 5 == 3 {
V2WALRecord::NodeUpdate {
node_id: i as i64,
slot_offset: (i * 1024) as u64,
old_data: vec![i as u8; 32],
new_data: vec![(i + 1) as u8; 32],
}
} else {
V2WALRecord::TransactionCommit {
tx_id: i as u64,
timestamp: 1640995200000 + (i as u64 * 1000) + 500,
}
};
records.push(record);
}
let mut total_serialized_size = 0;
for record in &records {
let serialized = V2WALSerializer::serialize(record)?;
total_serialized_size += serialized.len();
}
let elapsed = start_time.elapsed();
let records_per_second = record_count as f64 / elapsed.as_secs_f64();
let mb_per_second = (total_serialized_size as f64) / (1024.0 * 1024.0) / elapsed.as_secs_f64();
assert!(
records_per_second >= 10_000.0,
"Should serialize at least 10K records/sec: {:.0} records/sec",
records_per_second
);
assert!(
mb_per_second >= 10.0,
"Should serialize at least 10 MB/sec: {:.1} MB/sec",
mb_per_second
);
Ok(())
}
#[test]
fn test_memory_usage_patterns() -> NativeResult<()> {
let test_sizes = vec![1, 64, 1024, 16_384, 262_144];
for size in test_sizes {
let large_record = V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 0,
node_data: vec![42u8; size],
};
let serialized = V2WALSerializer::serialize(&large_record)?;
let deserialized = V2WALSerializer::deserialize(&serialized)?;
if let (
V2WALRecord::NodeInsert {
node_data: orig_data,
..
},
V2WALRecord::NodeInsert {
node_data: de_data, ..
},
) = (&large_record, &deserialized)
{
assert_eq!(
orig_data, de_data,
"Data integrity should be maintained for size {}",
size
);
}
assert!(
serialized.len() >= size,
"Serialized size should be at least data size"
);
}
Ok(())
}