use crate::backend::native::v2::edge_cluster::CompactEdgeRecord;
use crate::backend::native::v2::pubsub::PubSubEvent;
use crate::backend::native::v2::wal::V2WALRecord;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
pub fn records_to_events(records: &[V2WALRecord], commit_lsn: u64) -> Vec<PubSubEvent> {
let mut events = Vec::new();
for record in records {
match record {
V2WALRecord::NodeInsert { node_id, .. } => {
events.push(PubSubEvent::NodeChanged {
node_id: *node_id,
snapshot_id: commit_lsn,
});
}
V2WALRecord::NodeUpdate { node_id, .. } => {
events.push(PubSubEvent::NodeChanged {
node_id: *node_id,
snapshot_id: commit_lsn,
});
}
V2WALRecord::EdgeInsert { edge_record, .. } => {
let edge_id = edge_id_from_record(edge_record);
events.push(PubSubEvent::EdgeChanged {
edge_id,
snapshot_id: commit_lsn,
});
}
V2WALRecord::EdgeUpdate { new_edge, .. } => {
let edge_id = edge_id_from_record(new_edge);
events.push(PubSubEvent::EdgeChanged {
edge_id,
snapshot_id: commit_lsn,
});
}
V2WALRecord::EdgeDelete { .. } => {
let edge_id = 0; events.push(PubSubEvent::EdgeChanged {
edge_id,
snapshot_id: commit_lsn,
});
}
V2WALRecord::KvSet { key, .. } => {
let key_hash = key_hash_bytes(key);
events.push(PubSubEvent::KVChanged {
key_hash,
snapshot_id: commit_lsn,
});
}
V2WALRecord::KvDelete { key, .. } => {
let key_hash = key_hash_bytes(key);
events.push(PubSubEvent::KVChanged {
key_hash,
snapshot_id: commit_lsn,
});
}
_ => {}
}
}
events.push(PubSubEvent::SnapshotCommitted {
snapshot_id: commit_lsn,
});
events
}
fn edge_id_from_record(edge: &CompactEdgeRecord) -> i64 {
let neighbor_part = (edge.neighbor_id as u64) & 0xFFFFFFFFFFFFu64; let type_part = (edge.edge_type_offset as u64) << 48; (neighbor_part | type_part) as i64
}
fn key_hash_bytes(key: &[u8]) -> u64 {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
hasher.finish()
}
pub fn should_emit_event(record: &V2WALRecord) -> bool {
matches!(
record,
V2WALRecord::NodeInsert { .. }
| V2WALRecord::NodeUpdate { .. }
| V2WALRecord::EdgeInsert { .. }
| V2WALRecord::EdgeUpdate { .. }
| V2WALRecord::EdgeDelete { .. }
| V2WALRecord::KvSet { .. }
| V2WALRecord::KvDelete { .. }
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_insert_emits_node_changed() {
let records = vec![V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 0,
node_data: vec![1, 2, 3],
}];
let events = records_to_events(&records, 100);
assert_eq!(events.len(), 2); assert!(events[0].is_node_event());
match &events[0] {
PubSubEvent::NodeChanged {
node_id,
snapshot_id,
} => {
assert_eq!(*node_id, 42);
assert_eq!(*snapshot_id, 100);
}
_ => panic!("Expected NodeChanged event"),
}
assert!(events[1].is_commit_event());
}
#[test]
fn test_edge_insert_emits_edge_changed() {
let edge_record = CompactEdgeRecord::new(123, 45, vec![6, 7, 8]);
let records = vec![V2WALRecord::EdgeInsert {
cluster_key: (
42,
crate::backend::native::v2::edge_cluster::cluster_trace::Direction::Outgoing,
),
edge_record,
insertion_point: 0,
}];
let events = records_to_events(&records, 200);
assert_eq!(events.len(), 2); assert!(events[0].is_edge_event());
match &events[0] {
PubSubEvent::EdgeChanged {
edge_id,
snapshot_id,
} => {
assert_eq!(*snapshot_id, 200);
assert_ne!(*edge_id, 0);
}
_ => panic!("Expected EdgeChanged event"),
}
assert!(events[1].is_commit_event());
}
#[test]
fn test_snapshot_committed_always_emitted() {
let records = vec![
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 0,
node_data: vec![1, 2, 3],
},
V2WALRecord::NodeUpdate {
node_id: 2,
slot_offset: 0,
old_data: vec![4, 5],
new_data: vec![6, 7],
},
];
let events = records_to_events(&records, 300);
assert_eq!(events.len(), 3);
assert!(events[0].is_node_event());
assert!(events[1].is_node_event());
assert!(events[2].is_commit_event());
match &events[2] {
PubSubEvent::SnapshotCommitted { snapshot_id } => {
assert_eq!(*snapshot_id, 300);
}
_ => panic!("Expected SnapshotCommitted event"),
}
}
#[test]
fn test_transaction_records_ignored() {
let records = vec![
V2WALRecord::TransactionBegin {
tx_id: 1,
timestamp: 0,
},
V2WALRecord::TransactionCommit {
tx_id: 1,
timestamp: 0,
},
];
let events = records_to_events(&records, 400);
assert_eq!(events.len(), 1);
assert!(events[0].is_commit_event());
}
#[test]
fn test_kv_set_emits_kv_changed() {
let records = vec![V2WALRecord::KvSet {
key: b"test_key".to_vec(),
value_bytes: b"test_value".to_vec(),
value_type: 1,
ttl_seconds: Some(3600),
version: 123,
}];
let events = records_to_events(&records, 500);
assert_eq!(events.len(), 2); assert!(events[0].is_kv_event());
match &events[0] {
PubSubEvent::KVChanged {
key_hash,
snapshot_id,
} => {
assert_eq!(*snapshot_id, 500);
assert_ne!(*key_hash, 0); }
_ => panic!("Expected KVChanged event"),
}
}
#[test]
fn test_kv_delete_emits_kv_changed() {
let records = vec![V2WALRecord::KvDelete {
key: b"deleted_key".to_vec(),
old_value_bytes: Some(b"old_value".to_vec()),
old_value_type: 1,
old_version: 122,
}];
let events = records_to_events(&records, 600);
assert_eq!(events.len(), 2); assert!(events[0].is_kv_event());
}
#[test]
fn test_node_delete_emits_no_event() {
let records = vec![V2WALRecord::NodeDelete {
node_id: 99,
slot_offset: 0,
old_data: vec![1, 2, 3],
outgoing_edges: vec![],
incoming_edges: vec![],
}];
let events = records_to_events(&records, 700);
assert_eq!(events.len(), 1);
assert!(events[0].is_commit_event());
}
#[test]
fn test_edge_id_from_record() {
let edge = CompactEdgeRecord::new(12345, 67, vec![]);
let edge_id = edge_id_from_record(&edge);
assert_ne!(edge_id, 0);
let neighbor_part = (edge_id as u64) & 0xFFFFFFFFFFFF;
assert_eq!(neighbor_part as i64, 12345);
let type_part = (edge_id as u64) >> 48;
assert_eq!(type_part as u16, 67);
}
#[test]
fn test_key_hash_consistency() {
let key1 = b"test_key";
let key2 = b"test_key";
let key3 = b"other_key";
let hash1 = key_hash_bytes(key1);
let hash2 = key_hash_bytes(key2);
let hash3 = key_hash_bytes(key3);
assert_eq!(hash1, hash2);
assert_ne!(hash1, hash3);
}
#[test]
fn test_should_emit_event() {
assert!(should_emit_event(&V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 0,
node_data: vec![],
}));
assert!(should_emit_event(&V2WALRecord::NodeUpdate {
node_id: 1,
slot_offset: 0,
old_data: vec![],
new_data: vec![],
}));
assert!(should_emit_event(&V2WALRecord::EdgeInsert {
cluster_key: (
1,
crate::backend::native::v2::edge_cluster::cluster_trace::Direction::Outgoing
),
edge_record: CompactEdgeRecord::new(2, 3, vec![]),
insertion_point: 0,
}));
assert!(should_emit_event(&V2WALRecord::KvSet {
key: vec![],
value_bytes: vec![],
value_type: 0,
ttl_seconds: None,
version: 0,
}));
assert!(!should_emit_event(&V2WALRecord::NodeDelete {
node_id: 1,
slot_offset: 0,
old_data: vec![],
outgoing_edges: vec![],
incoming_edges: vec![],
}));
assert!(!should_emit_event(&V2WALRecord::TransactionBegin {
tx_id: 1,
timestamp: 0,
}));
assert!(!should_emit_event(&V2WALRecord::TransactionCommit {
tx_id: 1,
timestamp: 0,
}));
assert!(!should_emit_event(&V2WALRecord::Checkpoint {
checkpointed_lsn: 0,
timestamp: 0,
}));
}
#[test]
fn test_empty_records_emits_only_snapshot_committed() {
let records: Vec<V2WALRecord> = vec![];
let events = records_to_events(&records, 800);
assert_eq!(events.len(), 1);
assert!(events[0].is_commit_event());
match &events[0] {
PubSubEvent::SnapshotCommitted { snapshot_id } => {
assert_eq!(*snapshot_id, 800);
}
_ => panic!("Expected SnapshotCommitted event"),
}
}
#[test]
fn test_multiple_operations_same_transaction() {
let records = vec![
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 0,
node_data: vec![1, 2, 3],
},
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 0,
node_data: vec![4, 5, 6],
},
V2WALRecord::KvSet {
key: b"key1".to_vec(),
value_bytes: b"value1".to_vec(),
value_type: 0,
ttl_seconds: None,
version: 1,
},
V2WALRecord::EdgeInsert {
cluster_key: (
1,
crate::backend::native::v2::edge_cluster::cluster_trace::Direction::Outgoing,
),
edge_record: CompactEdgeRecord::new(2, 1, vec![]),
insertion_point: 0,
},
];
let events = records_to_events(&records, 900);
assert_eq!(events.len(), 5);
assert!(events[0].is_node_event());
assert!(events[1].is_node_event());
assert!(events[2].is_kv_event());
assert!(events[3].is_edge_event());
assert!(events[4].is_commit_event());
}
}