use crate::backend::native::v2::wal::recovery::errors::RecoveryError;
use crate::backend::native::v2::wal::recovery::replayer::types::RollbackOperation;
use crate::backend::native::v2::{
EdgeCluster, FreeSpaceManager, NodeRecordV2, free_space::AllocationStrategy,
};
use crate::backend::native::{EdgeStore, NativeNodeId, NodeStore, adjacency::Direction};
use crate::debug::{debug_log, warn_log};
impl super::DefaultReplayOperations {
pub fn handle_node_insert(
&self,
node_id: u64,
slot_offset: u64,
node_data: &[u8],
rollback_data: &mut Vec<RollbackOperation>,
) -> Result<(), RecoveryError> {
debug_log!(
"Replaying node insert: node_id={}, slot_offset={}, data_size={}",
node_id,
slot_offset,
node_data.len()
);
let node_record =
crate::backend::native::v2::node_record_v2::NodeRecordV2::deserialize(node_data)
.map_err(|e| {
RecoveryError::io_error(format!("Failed to deserialize node data: {}", e))
})?;
if node_record.id != node_id as crate::backend::native::NativeNodeId {
return Err(RecoveryError::validation(format!(
"Node ID mismatch: expected {}, got {}",
node_id, node_record.id
)));
}
rollback_data.push(RollbackOperation::NodeInsert {
node_id: node_id as crate::backend::native::NativeNodeId,
node_data: node_data.to_vec(),
});
{
let mut graph_file = self.graph_file.write().map_err(|e| {
RecoveryError::io_error(format!("Failed to lock graph file: {}", e))
})?;
let mut node_store = NodeStore::new(&mut *graph_file);
node_store
.write_node_v2(&node_record)
.map_err(|e| RecoveryError::io_error(format!("Failed to write node: {}", e)))?;
}
self.statistics.record_node_operation();
self.statistics.record_bytes_written(node_data.len() as u64);
debug_log!("Successfully replayed node insert: node_id={}", node_id);
Ok(())
}
pub fn handle_node_update(
&self,
node_id: u64,
_slot_offset: u64,
new_data: &[u8],
old_data: Option<&Vec<u8>>,
rollback_data: &mut Vec<RollbackOperation>,
) -> Result<(), RecoveryError> {
debug_log!(
"Replaying node update: node_id={}, data_size={}",
node_id,
new_data.len()
);
if new_data.is_empty() {
return Err(RecoveryError::validation(
"Node update data cannot be empty".to_string(),
));
}
let node_record =
crate::backend::native::v2::node_record_v2::NodeRecordV2::deserialize(new_data)
.map_err(|e| {
RecoveryError::io_error(format!("Failed to deserialize node data: {}", e))
})?;
if node_record.id != node_id as crate::backend::native::NativeNodeId {
return Err(RecoveryError::validation(format!(
"Node ID mismatch: expected {}, got {}",
node_id, node_record.id
)));
}
if let Some(old_data_vec) = old_data {
rollback_data.push(RollbackOperation::NodeUpdate {
node_id: node_id as crate::backend::native::NativeNodeId,
old_data: old_data_vec.clone(),
});
}
{
let mut graph_file = self.graph_file.write().map_err(|e| {
RecoveryError::io_error(format!("Failed to lock graph file: {}", e))
})?;
let mut node_store = NodeStore::new(&mut *graph_file);
node_store.write_node_v2(&node_record).map_err(|e| {
RecoveryError::io_error(format!("Failed to write node update: {}", e))
})?;
}
self.statistics.record_node_operation();
self.statistics.record_bytes_written(new_data.len() as u64);
debug_log!("Successfully replayed node update: node_id={}", node_id);
Ok(())
}
pub fn handle_node_delete(
&self,
node_id: u64,
slot_offset: u64,
old_data: Option<&Vec<u8>>,
rollback_data: &mut Vec<RollbackOperation>,
) -> Result<(), RecoveryError> {
debug_log!(
"Replaying node delete: node_id={}, slot_offset={}",
node_id,
slot_offset
);
if node_id == 0 {
warn_log!("Invalid node_id=0 for node deletion - treating as no-op");
return Ok(());
}
let node_record = if let Some(data) = old_data {
NodeRecordV2::deserialize(data).map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to deserialize NodeRecordV2 data: {}",
e
))
})?
} else {
warn_log!("No old_data provided for node delete - creating minimal rollback record");
NodeRecordV2::new(
node_id as i64,
"Unknown".to_string(),
"deleted_node".to_string(),
serde_json::Value::Null,
)
};
let old_data = node_record.serialize();
let old_data_len = old_data.len();
let mut captured_outgoing_edges = Vec::new();
let mut captured_incoming_edges = Vec::new();
{
let mut graph_file = self.graph_file.write().map_err(|e| {
RecoveryError::io_error(format!("Failed to lock graph file: {}", e))
})?;
if node_record.outgoing_edge_count > 0 {
debug_log!(
"Capturing {} outgoing edges before deletion",
node_record.outgoing_edge_count
);
if node_record.outgoing_cluster_offset != 0 && node_record.outgoing_cluster_size > 0
{
let mut cluster_buffer = vec![0u8; node_record.outgoing_cluster_size as usize];
graph_file
.read_bytes(node_record.outgoing_cluster_offset, &mut cluster_buffer)
.map_err(|e| {
RecoveryError::io_error(format!(
"Failed to read outgoing cluster: {}",
e
))
})?;
let cluster = EdgeCluster::deserialize(&cluster_buffer).map_err(|e| {
RecoveryError::io_error(format!(
"Failed to deserialize outgoing cluster: {}",
e
))
})?;
captured_outgoing_edges = cluster.edges().to_vec();
debug_log!(
"Captured {} outgoing edge records",
captured_outgoing_edges.len()
);
}
}
if node_record.incoming_edge_count > 0 {
debug_log!(
"Capturing {} incoming edges before deletion",
node_record.incoming_edge_count
);
if node_record.incoming_cluster_offset != 0 && node_record.incoming_cluster_size > 0
{
let mut cluster_buffer = vec![0u8; node_record.incoming_cluster_size as usize];
graph_file
.read_bytes(node_record.incoming_cluster_offset, &mut cluster_buffer)
.map_err(|e| {
RecoveryError::io_error(format!(
"Failed to read incoming cluster: {}",
e
))
})?;
let cluster = EdgeCluster::deserialize(&cluster_buffer).map_err(|e| {
RecoveryError::io_error(format!(
"Failed to deserialize incoming cluster: {}",
e
))
})?;
captured_incoming_edges = cluster.edges().to_vec();
debug_log!(
"Captured {} incoming edge records",
captured_incoming_edges.len()
);
}
}
if node_record.outgoing_edge_count > 0 || node_record.incoming_edge_count > 0 {
debug_log!(
"Node {} has edges - performing cascade cleanup: outgoing={}, incoming={}",
node_id,
node_record.outgoing_edge_count,
node_record.incoming_edge_count
);
let mut edge_store = EdgeStore::new(&mut *graph_file);
if node_record.outgoing_edge_count > 0 {
let outgoing_edges: Vec<(NativeNodeId, NativeNodeId)> = edge_store
.iter_edges_with_ids(node_id as NativeNodeId, Direction::Outgoing)
.collect();
let outgoing_count = outgoing_edges.len();
for (edge_id, neighbor_id) in outgoing_edges {
if let Err(e) = edge_store.delete_edge(edge_id) {
warn_log!(
"Failed to delete outgoing edge {} for node {} -> neighbor {}: {:?}",
edge_id,
node_id,
neighbor_id,
e
);
} else {
debug_log!(
"Deleted outgoing edge {} for node {} -> neighbor {}",
edge_id,
node_id,
neighbor_id
);
}
}
debug_log!(
"Deleted {} outgoing edges for node {}",
outgoing_count,
node_id
);
}
if node_record.incoming_edge_count > 0 {
let incoming_edges: Vec<(NativeNodeId, NativeNodeId)> = edge_store
.iter_edges_with_ids(node_id as NativeNodeId, Direction::Incoming)
.collect();
let incoming_count = incoming_edges.len();
for (edge_id, neighbor_id) in incoming_edges {
if let Err(e) = edge_store.delete_edge(edge_id) {
warn_log!(
"Failed to delete incoming edge {} for node {} <- neighbor {}: {:?}",
edge_id,
node_id,
neighbor_id,
e
);
} else {
debug_log!(
"Deleted incoming edge {} for node {} <- neighbor {}",
edge_id,
node_id,
neighbor_id
);
}
}
debug_log!(
"Deleted {} incoming edges for node {}",
incoming_count,
node_id
);
}
debug_log!(
"Successfully completed edge cascade cleanup for node {}",
node_id
);
}
let mut node_store = NodeStore::new(&mut *graph_file);
let mut free_space_manager = FreeSpaceManager::new(AllocationStrategy::FirstFit);
if node_record.outgoing_cluster_offset != 0 || node_record.incoming_cluster_offset != 0
{
debug_log!(
"Cleaning up cluster references for node {}: outgoing_offset={}, incoming_offset={}",
node_id,
node_record.outgoing_cluster_offset,
node_record.incoming_cluster_offset
);
if node_record.outgoing_cluster_offset != 0 && node_record.outgoing_cluster_size > 0
{
free_space_manager.add_free_block(
node_record.outgoing_cluster_offset,
node_record.outgoing_cluster_size,
);
debug_log!(
"Deallocated outgoing cluster: node_id={}, offset={}, size={}",
node_id,
node_record.outgoing_cluster_offset,
node_record.outgoing_cluster_size
);
}
if node_record.incoming_cluster_offset != 0 && node_record.incoming_cluster_size > 0
{
free_space_manager.add_free_block(
node_record.incoming_cluster_offset,
node_record.incoming_cluster_size,
);
debug_log!(
"Deallocated incoming cluster: node_id={}, offset={}, size={}",
node_id,
node_record.incoming_cluster_offset,
node_record.incoming_cluster_size
);
}
debug_log!(
"Successfully cleaned up cluster references for node {}",
node_id
);
}
if slot_offset != 0 {
let estimated_node_size = std::mem::size_of::<NodeRecordV2>() as u32;
free_space_manager.add_free_block(slot_offset, estimated_node_size);
debug_log!(
"Deallocated node slot: offset={}, size={}",
slot_offset,
estimated_node_size
);
}
node_store
.delete_node_with_offset(node_id as NativeNodeId, Some(slot_offset))
.map_err(|e| {
RecoveryError::io_error(format!(
"Failed to delete node {} from NodeStore: {}",
node_id, e
))
})?;
}
rollback_data.push(RollbackOperation::NodeDelete {
node_id: node_id as NativeNodeId,
slot_offset,
old_data,
outgoing_edges: captured_outgoing_edges,
incoming_edges: captured_incoming_edges,
});
self.statistics.record_node_operation();
self.statistics.record_bytes_written(old_data_len as u64);
debug_log!(
"Successfully completed node delete: node_id={}, rollback_data_count={}",
node_id,
rollback_data.len()
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::v2::wal::recovery::replayer::types::RollbackOperation;
use tempfile::tempdir;
fn create_test_operations() -> super::super::DefaultReplayOperations {
super::super::DefaultReplayOperations::create_test_operations()
}
#[test]
fn test_handle_node_delete_basic() {
let operations = create_test_operations();
let mut rollback_data = Vec::new();
let result = operations.handle_node_delete(42, 4096, None, &mut rollback_data);
assert!(result.is_ok(), "Basic node delete should succeed");
assert_eq!(rollback_data.len(), 1, "Should record rollback operation");
if let Some(RollbackOperation::NodeDelete {
node_id,
slot_offset,
..
}) = rollback_data.first()
{
assert_eq!(*node_id, 42, "Rollback should preserve node ID");
assert_eq!(*slot_offset, 4096, "Rollback should preserve slot offset");
} else {
panic!("Expected NodeDelete rollback operation");
}
}
#[test]
fn test_handle_node_delete_with_old_data() {
let operations = create_test_operations();
let mut rollback_data = Vec::new();
let test_node = NodeRecordV2::new(
123,
"Document".to_string(),
"test_doc".to_string(),
serde_json::json!({"content": "test data", "version": 1}),
);
let serialized_data = test_node.serialize();
let result =
operations.handle_node_delete(123, 8192, Some(&serialized_data), &mut rollback_data);
assert!(result.is_ok(), "Node delete with old data should succeed");
assert_eq!(rollback_data.len(), 1, "Should record rollback operation");
if let Some(RollbackOperation::NodeDelete {
node_id,
slot_offset,
old_data,
..
}) = rollback_data.first()
{
assert_eq!(*node_id, 123);
assert_eq!(*slot_offset, 8192);
let restored_node = NodeRecordV2::deserialize(old_data);
assert!(
restored_node.is_ok(),
"Old data should be valid binary serialization"
);
if let Ok(node) = restored_node {
assert_eq!(node.id, 123);
assert_eq!(node.kind, "Document");
assert_eq!(node.name, "test_doc");
}
} else {
panic!("Expected NodeDelete rollback operation");
}
}
#[test]
fn test_handle_node_delete_nonexistent_node() {
let operations = create_test_operations();
let mut rollback_data = Vec::new();
let result = operations.handle_node_delete(
999999, 4096,
None,
&mut rollback_data,
);
assert!(result.is_ok(), "Should handle non-existent node gracefully");
assert_eq!(
rollback_data.len(),
1,
"Should record rollback operation even for non-existent node"
);
}
#[test]
fn test_handle_node_delete_with_cluster_references() {
let operations = create_test_operations();
let mut rollback_data = Vec::new();
let mut test_node = NodeRecordV2::new(
456,
"Function".to_string(),
"complex_func".to_string(),
serde_json::json!({"complex": "node"}),
);
test_node.outgoing_cluster_offset = 1024;
test_node.outgoing_cluster_size = 256;
test_node.outgoing_edge_count = 5;
test_node.incoming_cluster_offset = 2048;
test_node.incoming_cluster_size = 128;
test_node.incoming_edge_count = 3;
let serialized_data = test_node.serialize();
let result =
operations.handle_node_delete(456, 4096, Some(&serialized_data), &mut rollback_data);
match &result {
Ok(()) => {
assert_eq!(rollback_data.len(), 1, "Should record rollback operation");
}
Err(e) => {
println!(
"Node delete with cluster references failed (expected for test): {}",
e.message
);
}
}
}
#[test]
fn test_handle_node_delete_malformed_old_data() {
let operations = create_test_operations();
let mut rollback_data = Vec::new();
let malformed_data = vec![1, 2, 3];
let result =
operations.handle_node_delete(42, 4096, Some(&malformed_data), &mut rollback_data);
assert!(result.is_err(), "Malformed data should cause error");
assert_eq!(
rollback_data.len(),
0,
"Should not record rollback operation for malformed data"
);
}
#[test]
fn test_handle_node_delete_zero_node_id() {
let operations = create_test_operations();
let mut rollback_data = Vec::new();
let result = operations.handle_node_delete(
0, 4096,
None,
&mut rollback_data,
);
assert!(result.is_ok(), "Should handle invalid node ID gracefully");
assert_eq!(
rollback_data.len(),
0,
"Should not record rollback operation for node_id=0 (no-op)"
);
}
#[test]
fn test_handle_node_delete_rollback_operation_preserves_slot_offset() {
let operations = create_test_operations();
let mut rollback_data = Vec::new();
let test_slot_offset = 16384;
let result = operations.handle_node_delete(789, test_slot_offset, None, &mut rollback_data);
assert!(result.is_ok(), "Node delete should succeed");
assert_eq!(rollback_data.len(), 1, "Should record rollback operation");
if let Some(RollbackOperation::NodeDelete {
node_id,
slot_offset,
..
}) = rollback_data.first()
{
assert_eq!(*node_id, 789, "Should preserve correct node ID");
assert_eq!(
*slot_offset, test_slot_offset,
"Should preserve exact slot offset for restoration"
);
} else {
panic!("Expected NodeDelete rollback operation");
}
}
#[test]
fn test_handle_node_delete_edge_cleanup_required() {
let operations = create_test_operations();
let mut rollback_data = Vec::new();
let mut node_with_edges = NodeRecordV2::new(
555,
"Module".to_string(),
"test_module".to_string(),
serde_json::json!({"has_edges": true}),
);
node_with_edges.outgoing_cluster_offset = 512;
node_with_edges.outgoing_cluster_size = 64;
node_with_edges.outgoing_edge_count = 2;
node_with_edges.incoming_cluster_offset = 1024;
node_with_edges.incoming_cluster_size = 32;
node_with_edges.incoming_edge_count = 1;
let serialized_data = node_with_edges.serialize();
let result =
operations.handle_node_delete(555, 4096, Some(&serialized_data), &mut rollback_data);
match &result {
Ok(()) => {
assert_eq!(rollback_data.len(), 1, "Should record rollback operation");
}
Err(e) => {
println!(
"Node delete with edges failed (expected for test): {}",
e.message
);
}
}
}
#[test]
fn test_full_node_delete_and_restore_cycle() {
use crate::backend::native::GraphFile;
use crate::backend::native::v2::StringTable;
use crate::backend::native::v2::free_space::AllocationStrategy;
use crate::backend::native::v2::wal::recovery::replayer::rollback::RollbackSystem;
use std::sync::{Arc, Mutex, RwLock};
let temp_file = tempfile::NamedTempFile::new().expect("Failed to create temp file");
let graph_file = GraphFile::create(temp_file.path()).expect("Failed to create GraphFile");
let graph_file = Arc::new(RwLock::new(graph_file));
let node_store: Arc<Mutex<Option<NodeStore<'static>>>> = Arc::new(Mutex::new(None));
let edge_store: Arc<Mutex<Option<EdgeStore<'static>>>> = Arc::new(Mutex::new(None));
let string_table = Arc::new(Mutex::new(StringTable::new()));
let mut free_space_mgr = FreeSpaceManager::new(AllocationStrategy::FirstFit);
free_space_mgr.add_free_block(2048, 1024 * 1024); let free_space_manager = Arc::new(Mutex::new(Some(free_space_mgr)));
let statistics = Arc::new(
crate::backend::native::v2::wal::recovery::replayer::types::ReplayStatistics::new(),
);
let kv_store = Arc::new(Mutex::new(crate::backend::native::v2::KvStore::new()));
let operations = super::super::DefaultReplayOperations::new(
graph_file.clone(),
node_store.clone(),
edge_store,
string_table,
free_space_manager,
kv_store,
statistics,
);
let original_node = NodeRecordV2::new(
1001,
"TestClass".to_string(),
"test_method".to_string(),
serde_json::json!({"version": 1, "state": "initial"}),
);
{
let mut graph_file_lock = graph_file.write().unwrap();
let mut node_store = NodeStore::new(&mut *graph_file_lock);
node_store
.write_node_v2(&original_node)
.expect("Failed to write initial node");
}
{
let mut graph_file_lock = graph_file.write().unwrap();
let mut node_store = NodeStore::new(&mut *graph_file_lock);
let read_result = node_store.read_node_v2(1001);
assert!(read_result.is_ok(), "Node should exist after creation");
let node = read_result.unwrap();
assert_eq!(node.id, 1001);
assert_eq!(node.name, "test_method");
}
let mut rollback_data = Vec::new();
let serialized_old_data = original_node.serialize();
let delete_result = operations.handle_node_delete(
1001,
4096,
Some(&serialized_old_data),
&mut rollback_data,
);
assert!(delete_result.is_ok(), "Node delete should succeed");
assert_eq!(rollback_data.len(), 1, "Should record rollback operation");
let (node_id, slot_offset, old_data, outgoing_edges, incoming_edges) =
match rollback_data.first() {
Some(RollbackOperation::NodeDelete {
node_id,
slot_offset,
old_data,
outgoing_edges,
incoming_edges,
}) => (
*node_id,
*slot_offset,
old_data.clone(),
outgoing_edges.clone(),
incoming_edges.clone(),
),
_ => panic!("Expected NodeDelete rollback operation"),
};
let rollback_system = RollbackSystem::new(
graph_file.clone(),
node_store.clone(),
Arc::new(Mutex::new(StringTable::new())),
Arc::new(Mutex::new(Some(FreeSpaceManager::new(
AllocationStrategy::FirstFit,
)))),
);
let rollback_result = crate::backend::native::v2::wal::recovery::replayer::rollback::node_ops::rollback_node_delete(
&rollback_system,
node_id,
slot_offset,
old_data,
outgoing_edges,
incoming_edges,
);
assert!(rollback_result.is_ok(), "Rollback should succeed");
{
let mut graph_file_lock = graph_file.write().unwrap();
let mut node_store = NodeStore::new(&mut *graph_file_lock);
let read_result = node_store.read_node_v2(1001);
assert!(read_result.is_ok(), "Node should exist after rollback");
let restored_node = read_result.unwrap();
assert_eq!(restored_node.id, 1001, "Node ID should be restored");
assert_eq!(
restored_node.kind, "TestClass",
"Node kind should be restored"
);
assert_eq!(
restored_node.name, "test_method",
"Node name should be restored"
);
assert_eq!(
restored_node.data.get("version"),
Some(&serde_json::json!(1)),
"Node data should be preserved"
);
}
}
}