use super::super::RollbackSystem;
use crate::backend::native::NativeNodeId;
use crate::backend::native::v2::Direction;
use crate::backend::native::v2::wal::recovery::errors::RecoveryError;
use crate::backend::native::v2::wal::recovery::store_helpers;
use crate::debug::debug_log;
pub fn rollback_cluster_create(
system: &RollbackSystem,
node_id: u64,
direction: Direction,
cluster_offset: u64,
cluster_size: u64,
_cluster_data: Vec<u8>,
) -> Result<(), RecoveryError> {
debug_log!(
"Rolling back cluster create: node_id={}, direction={:?}, cluster_offset={}, cluster_size={}",
node_id,
direction,
cluster_offset,
cluster_size
);
{
let mut free_space_guard = system.free_space_manager().lock().map_err(|e| {
RecoveryError::replay_failure(format!("Failed to lock free space manager: {}", e))
})?;
let free_space_manager = free_space_guard.as_mut().ok_or_else(|| {
RecoveryError::replay_failure("Free space manager not initialized".to_string())
})?;
free_space_manager.add_free_block(cluster_offset, cluster_size as u32);
debug_log!(
"Deallocated cluster: offset={}, size={}",
cluster_offset,
cluster_size
);
}
{
let mut node_store_guard = system.node_store().lock().map_err(|e| {
RecoveryError::replay_failure(format!("Failed to lock node store: {}", e))
})?;
if node_store_guard.is_none() {
let mut graph_file = system.graph_file().write().map_err(|e| {
RecoveryError::io_error(format!("Failed to lock graph file: {}", e))
})?;
*node_store_guard = Some(unsafe { store_helpers::create_node_store(&mut *graph_file) });
}
}
{
let mut node_store_guard = system.node_store().lock().map_err(|e| {
RecoveryError::replay_failure(format!(
"Failed to lock node store for node update: {}",
e
))
})?;
let node_store = node_store_guard.as_mut().ok_or_else(|| {
RecoveryError::replay_failure("NodeStore initialization failed".to_string())
})?;
let mut node_record = match node_store.read_node_v2(node_id as NativeNodeId) {
Ok(record) => record,
Err(_) => {
debug_log!(
"Node {} doesn't exist, skipping NodeRecordV2 cluster cleanup for direction={:?}",
node_id,
direction
);
return Ok(());
}
};
match direction {
Direction::Outgoing => {
debug_log!(
"Clearing outgoing cluster reference: node_id={}, old_offset={}, old_size={}",
node_id,
node_record.outgoing_cluster_offset,
node_record.outgoing_cluster_size
);
node_record.outgoing_cluster_offset = 0;
node_record.outgoing_cluster_size = 0;
node_record.outgoing_edge_count = 0;
}
Direction::Incoming => {
debug_log!(
"Clearing incoming cluster reference: node_id={}, old_offset={}, old_size={}",
node_id,
node_record.incoming_cluster_offset,
node_record.incoming_cluster_size
);
node_record.incoming_cluster_offset = 0;
node_record.incoming_cluster_size = 0;
node_record.incoming_edge_count = 0;
}
}
node_store.write_node_v2(&node_record).map_err(|e| {
RecoveryError::io_error(format!(
"Failed to update node {} after cluster cleanup: {}",
node_id, e
))
})?;
debug_log!(
"Successfully cleared cluster reference from node_id={}, direction={:?}",
node_id,
direction
);
}
debug_log!(
"Successfully completed cluster create rollback: node_id={}, direction={:?}, deallocated_offset={}",
node_id,
direction,
cluster_offset
);
Ok(())
}