use super::super::RollbackSystem;
use crate::backend::native::NativeNodeId;
use crate::backend::native::v2::edge_cluster::CompactEdgeRecord;
use crate::backend::native::v2::wal::recovery::errors::RecoveryError;
use crate::backend::native::v2::wal::recovery::store_helpers;
use crate::backend::native::v2::{Direction, EdgeCluster};
use crate::debug::debug_log;
pub fn rollback_edge_insert(
system: &RollbackSystem,
cluster_key: (u64, u64),
_insertion_point: u32,
_edge_record: &[u8],
cluster_offset: u64,
cluster_size: u32,
) -> Result<(), RecoveryError> {
let (node_id, direction) = cluster_key;
debug_log!(
"Rolling back edge insert: node_id={}, direction={}, cluster_offset={}, cluster_size={}",
node_id,
direction,
cluster_offset,
cluster_size
);
{
let mut free_space_guard = system.free_space_manager().lock().map_err(|e| {
RecoveryError::replay_failure(format!("Failed to lock free space manager: {}", e))
})?;
let free_space_manager = free_space_guard.as_mut().ok_or_else(|| {
RecoveryError::replay_failure("Free space manager not initialized".to_string())
})?;
free_space_manager.add_free_block(cluster_offset, cluster_size);
debug_log!(
"Deallocated cluster: offset={}, size={}",
cluster_offset,
cluster_size
);
}
let direction_enum = match direction {
0 => Direction::Outgoing,
1 => Direction::Incoming,
_ => {
return Err(RecoveryError::validation(format!(
"Invalid direction value: {}, expected 0 (Outgoing) or 1 (Incoming)",
direction
)));
}
};
{
let mut node_store_guard = system.node_store().lock().map_err(|e| {
RecoveryError::replay_failure(format!("Failed to lock node store: {}", e))
})?;
if node_store_guard.is_none() {
let mut graph_file = system.graph_file().write().map_err(|e| {
RecoveryError::io_error(format!("Failed to lock graph file: {}", e))
})?;
*node_store_guard = Some(unsafe { store_helpers::create_node_store(&mut *graph_file) });
}
}
{
let mut node_store_guard = system.node_store().lock().map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to lock node store for node update: {}",
e
))
})?;
let node_store = node_store_guard.as_mut().ok_or_else(|| {
RecoveryError::replay_failure("NodeStore initialization failed".to_string())
})?;
let mut node_record = match node_store.read_node_v2(node_id as NativeNodeId) {
Ok(record) => record,
Err(_) => {
debug_log!(
"Node {} doesn't exist, skipping NodeRecordV2 cluster cleanup for direction={:?}",
node_id,
direction_enum
);
return Ok(());
}
};
match direction_enum {
Direction::Outgoing => {
debug_log!(
"Clearing outgoing cluster reference: node_id={}, old_offset={}, old_size={}",
node_id,
node_record.outgoing_cluster_offset,
node_record.outgoing_cluster_size
);
node_record.outgoing_cluster_offset = 0;
node_record.outgoing_cluster_size = 0;
node_record.outgoing_edge_count = 0;
}
Direction::Incoming => {
debug_log!(
"Clearing incoming cluster reference: node_id={}, old_offset={}, old_size={}",
node_id,
node_record.incoming_cluster_offset,
node_record.incoming_cluster_size
);
node_record.incoming_cluster_offset = 0;
node_record.incoming_cluster_size = 0;
node_record.incoming_edge_count = 0;
}
}
node_store.write_node_v2(&node_record).map_err(|e| {
RecoveryError::io_error(format!(
"Failed to update node {} after cluster cleanup: {}",
node_id, e
))
})?;
debug_log!(
"Successfully cleared cluster reference from node_id={}, direction={:?}",
node_id,
direction_enum
);
}
debug_log!(
"Successfully completed edge insert rollback: node_id={}, direction={:?}, deallocated_offset={}",
node_id,
direction_enum,
cluster_offset
);
Ok(())
}
pub fn rollback_edge_update(
system: &RollbackSystem,
cluster_key: (i64, Direction),
position: u32,
old_edge: &[u8],
) -> Result<(), RecoveryError> {
let (node_id, direction) = cluster_key;
debug_log!(
"Rolling back edge update: node_id={}, direction={:?}, position={}, old_edge_size={}",
node_id,
direction,
position,
old_edge.len()
);
let (cluster_offset, cluster_size) = {
let mut node_store_guard = system.node_store().lock().map_err(|e| {
RecoveryError::replay_failure(format!("Failed to lock node store: {}", e))
})?;
if node_store_guard.is_none() {
let mut graph_file = system.graph_file().write().map_err(|e| {
RecoveryError::replay_failure(format!("Failed to lock graph file: {}", e))
})?;
*node_store_guard = Some(unsafe { store_helpers::create_node_store(&mut *graph_file) });
}
let node_store = node_store_guard.as_mut().ok_or_else(|| {
RecoveryError::replay_failure("NodeStore initialization failed".to_string())
})?;
let node_record = match node_store.read_node_v2(node_id as NativeNodeId) {
Ok(record) => record,
Err(_) => {
debug_log!(
"Node {} doesn't exist, skipping edge update rollback (edge would be restored to non-existent node)",
node_id
);
return Ok(());
}
};
let (cluster_offset, cluster_size) = match direction {
Direction::Outgoing => {
if node_record.outgoing_cluster_offset == 0 {
return Err(RecoveryError::validation(format!(
"Node {} has no outgoing cluster to restore edge to",
node_id
)));
}
(
node_record.outgoing_cluster_offset,
node_record.outgoing_cluster_size,
)
}
Direction::Incoming => {
if node_record.incoming_cluster_offset == 0 {
return Err(RecoveryError::validation(format!(
"Node {} has no incoming cluster to restore edge to",
node_id
)));
}
(
node_record.incoming_cluster_offset,
node_record.incoming_cluster_size,
)
}
};
debug_log!(
"Found cluster at offset {} with size {} for node {} direction {:?}",
cluster_offset,
cluster_size,
node_id,
direction
);
(cluster_offset, cluster_size)
};
let mut existing_edges = {
let mut graph_file = system.graph_file().write().map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to lock graph file for cluster read: {}",
e
))
})?;
let mut cluster_buffer = vec![0u8; cluster_size as usize];
graph_file
.read_bytes(cluster_offset, &mut cluster_buffer)
.map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to read cluster data at offset {}: {:?}",
cluster_offset, e
))
})?;
EdgeCluster::verify_serialized_layout(&cluster_buffer).map_err(|e| {
RecoveryError::replay_failure(format!("Cluster layout verification failed: {:?}", e))
})?;
let edge_cluster = EdgeCluster::deserialize(&cluster_buffer).map_err(|e| {
RecoveryError::replay_failure(format!("Failed to deserialize cluster: {:?}", e))
})?;
edge_cluster.edges().to_vec()
};
if position >= existing_edges.len() as u32 {
return Err(RecoveryError::validation(format!(
"Position {} out of bounds for cluster with {} edges (restoring old edge)",
position,
existing_edges.len()
)));
}
let old_edge_record = CompactEdgeRecord::deserialize(old_edge).map_err(|e| {
RecoveryError::replay_failure(format!("Failed to deserialize old_edge data: {:?}", e))
})?;
existing_edges[position as usize] = old_edge_record;
debug_log!(
"Restored old edge at position {} in cluster for node {} direction {:?}",
position,
node_id,
direction
);
let restored_cluster_data = {
let restored_cluster =
EdgeCluster::create_from_compact_edges(existing_edges.clone(), node_id, direction)
.map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to create restored cluster after edge restoration: {:?}",
e
))
})?;
let mut cluster_bytes = Vec::new();
cluster_bytes.extend_from_slice(&(node_id as i64).to_le_bytes());
let direction_u32: u32 = match direction {
Direction::Outgoing => 0,
Direction::Incoming => 1,
};
cluster_bytes.extend_from_slice(&direction_u32.to_le_bytes());
let edge_count = restored_cluster.edge_count();
cluster_bytes.extend_from_slice(&edge_count.to_le_bytes());
for edge in restored_cluster.edges() {
let edge_bytes = edge.serialize();
cluster_bytes.extend_from_slice(&edge_bytes);
}
cluster_bytes
};
{
let mut graph_file = system.graph_file().write().map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to lock graph file for cluster write: {}",
e
))
})?;
graph_file
.write_bytes(cluster_offset, &restored_cluster_data)
.map_err(|e| {
RecoveryError::io_error(format!(
"Failed to write restored cluster at offset {}: {:?}",
cluster_offset, e
))
})?;
debug_log!(
"Successfully restored cluster at offset {} ({} bytes) with old edge at position {}",
cluster_offset,
restored_cluster_data.len(),
position
);
}
debug_log!(
"Edge update rollback completed: node_id={}, direction={:?}, position={}, edges_restored={}",
node_id,
direction,
position,
existing_edges.len()
);
Ok(())
}
pub fn rollback_edge_delete(
system: &RollbackSystem,
cluster_key: (i64, Direction),
position: u32,
old_edge: &[u8],
) -> Result<(), RecoveryError> {
let (node_id, direction) = cluster_key;
debug_log!(
"Rolling back edge delete: node_id={}, direction={:?}, position={}, old_edge_size={}",
node_id,
direction,
position,
old_edge.len()
);
let (cluster_offset, cluster_size) = {
let mut node_store_guard = system.node_store().lock().map_err(|e| {
RecoveryError::replay_failure(format!("Failed to lock node store: {}", e))
})?;
if node_store_guard.is_none() {
let mut graph_file = system.graph_file().write().map_err(|e| {
RecoveryError::replay_failure(format!("Failed to lock graph file: {}", e))
})?;
*node_store_guard = Some(unsafe { store_helpers::create_node_store(&mut *graph_file) });
}
let node_store = node_store_guard.as_mut().ok_or_else(|| {
RecoveryError::replay_failure("NodeStore initialization failed".to_string())
})?;
let node_record = match node_store.read_node_v2(node_id as NativeNodeId) {
Ok(record) => record,
Err(_) => {
debug_log!(
"Node {} doesn't exist, skipping edge delete rollback (edge would be restored to non-existent node)",
node_id
);
return Ok(());
}
};
let (cluster_offset, cluster_size) = match direction {
Direction::Outgoing => {
if node_record.outgoing_cluster_offset == 0 {
return Err(RecoveryError::validation(format!(
"Node {} has no outgoing cluster to restore edge to",
node_id
)));
}
(
node_record.outgoing_cluster_offset,
node_record.outgoing_cluster_size,
)
}
Direction::Incoming => {
if node_record.incoming_cluster_offset == 0 {
return Err(RecoveryError::validation(format!(
"Node {} has no incoming cluster to restore edge to",
node_id
)));
}
(
node_record.incoming_cluster_offset,
node_record.incoming_cluster_size,
)
}
};
debug_log!(
"Found cluster at offset {} with size {} for node {} direction {:?}",
cluster_offset,
cluster_size,
node_id,
direction
);
(cluster_offset, cluster_size)
};
let mut existing_edges = {
let mut graph_file = system.graph_file().write().map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to lock graph file for cluster read: {}",
e
))
})?;
let mut cluster_buffer = vec![0u8; cluster_size as usize];
graph_file
.read_bytes(cluster_offset, &mut cluster_buffer)
.map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to read cluster data at offset {}: {:?}",
cluster_offset, e
))
})?;
EdgeCluster::verify_serialized_layout(&cluster_buffer).map_err(|e| {
RecoveryError::replay_failure(format!("Cluster layout verification failed: {:?}", e))
})?;
let edge_cluster = EdgeCluster::deserialize(&cluster_buffer).map_err(|e| {
RecoveryError::replay_failure(format!("Failed to deserialize cluster: {:?}", e))
})?;
edge_cluster.edges().to_vec()
};
if position > existing_edges.len() as u32 {
return Err(RecoveryError::validation(format!(
"Position {} out of bounds for cluster with {} edges (restoring deleted edge)",
position,
existing_edges.len()
)));
}
let old_edge_record = CompactEdgeRecord::deserialize(old_edge).map_err(|e| {
RecoveryError::replay_failure(format!("Failed to deserialize old_edge data: {:?}", e))
})?;
existing_edges.insert(position as usize, old_edge_record);
let restored_edge_count = existing_edges.len();
debug_log!(
"Inserted deleted edge at position {} in cluster for node {} direction {:?} - {} edges total",
position,
node_id,
direction,
restored_edge_count
);
let restored_cluster_data = {
let restored_cluster =
EdgeCluster::create_from_compact_edges(existing_edges, node_id, direction).map_err(
|e| {
RecoveryError::replay_failure(format!(
"Failed to create restored cluster after edge reinsertion: {:?}",
e
))
},
)?;
let mut cluster_bytes = Vec::new();
cluster_bytes.extend_from_slice(&(node_id as i64).to_le_bytes());
let direction_u32: u32 = match direction {
Direction::Outgoing => 0,
Direction::Incoming => 1,
};
cluster_bytes.extend_from_slice(&direction_u32.to_le_bytes());
let edge_count = restored_cluster.edge_count();
cluster_bytes.extend_from_slice(&edge_count.to_le_bytes());
for edge in restored_cluster.edges() {
let edge_bytes = edge.serialize();
cluster_bytes.extend_from_slice(&edge_bytes);
}
cluster_bytes
};
{
let mut graph_file = system.graph_file().write().map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to lock graph file for cluster write: {}",
e
))
})?;
graph_file
.write_bytes(cluster_offset, &restored_cluster_data)
.map_err(|e| {
RecoveryError::io_error(format!(
"Failed to write restored cluster at offset {}: {:?}",
cluster_offset, e
))
})?;
debug_log!(
"Successfully restored cluster at offset {} ({} bytes) with deleted edge at position {}",
cluster_offset,
restored_cluster_data.len(),
position
);
}
debug_log!(
"Edge delete rollback completed: node_id={}, direction={:?}, position={}, edges_restored={}",
node_id,
direction,
position,
restored_edge_count
);
Ok(())
}