use crate::ast::ast::RollbackStatement;
use crate::exec::write_stmt::{ExecutionContext, StatementExecutor, TransactionStatementExecutor};
use crate::exec::{ExecutionError, QueryResult, Row};
use crate::storage::value::Value;
use crate::storage::{GraphCache, StorageManager};
use crate::txn::log::{TransactionLog, UndoOperation};
use crate::txn::state::OperationType;
use std::collections::HashMap;
use std::sync::RwLock;
pub struct RollbackExecutor {
}
impl RollbackExecutor {
pub fn new(_statement: RollbackStatement) -> Self {
Self {}
}
}
impl StatementExecutor for RollbackExecutor {
fn operation_type(&self) -> OperationType {
OperationType::Rollback
}
fn operation_description(&self, _context: &ExecutionContext) -> String {
"ROLLBACK".to_string()
}
fn requires_write_permission(&self) -> bool {
false }
}
impl TransactionStatementExecutor for RollbackExecutor {
fn execute_transaction_operation(
&self,
context: &ExecutionContext,
) -> Result<QueryResult, ExecutionError> {
let transaction_state = context.transaction_state().ok_or_else(|| {
ExecutionError::RuntimeError("No transaction state available".to_string())
})?;
let storage_manager = context.storage_manager.as_ref().ok_or_else(|| {
ExecutionError::RuntimeError("No storage manager available".to_string())
})?;
transaction_state.rollback_transaction_with_storage(Some(storage_manager))?;
let message = "Transaction rolled back successfully";
Ok(QueryResult {
rows: vec![Row::from_values(HashMap::from([(
"status".to_string(),
Value::String(message.to_string()),
)]))],
variables: vec!["status".to_string()],
execution_time_ms: 0,
rows_affected: 0,
session_result: None,
warnings: Vec::new(),
})
}
}
impl RollbackExecutor {
#[allow(dead_code)] pub fn execute_rollback(
_statement: &RollbackStatement,
transaction_manager: &crate::txn::TransactionManager,
current_transaction: &RwLock<Option<crate::txn::TransactionId>>,
transaction_logs: &RwLock<HashMap<crate::txn::TransactionId, TransactionLog>>,
storage: &StorageManager,
) -> Result<QueryResult, ExecutionError> {
let current_txn = current_transaction.read().map_err(|_| {
ExecutionError::RuntimeError("Failed to acquire transaction lock".to_string())
})?;
let txn_id = match *current_txn {
Some(id) => id,
None => {
drop(current_txn);
return Err(ExecutionError::RuntimeError(
"No transaction in progress".to_string(),
));
}
};
drop(current_txn);
{
let mut logs = transaction_logs.write().map_err(|_| {
ExecutionError::RuntimeError("Failed to acquire transaction logs lock".to_string())
})?;
if let Some(log) = logs.get(&txn_id) {
log::info!(
"ROLLBACK: Found transaction log with {} operations",
log.operation_count
);
for (i, undo_op) in log.get_rollback_operations().enumerate() {
log::info!("ROLLBACK: Applying undo operation {}: {:?}", i, undo_op);
Self::apply_undo_operation(undo_op, storage)?;
}
log::info!("ROLLBACK: Applied {} undo operations", log.operation_count);
} else {
drop(logs);
return Err(ExecutionError::RuntimeError(
"No transaction log available".to_string(),
));
}
logs.remove(&txn_id);
}
transaction_manager.rollback_transaction(txn_id)?;
{
let mut current_txn = current_transaction.write().map_err(|_| {
ExecutionError::RuntimeError("Failed to acquire transaction lock".to_string())
})?;
*current_txn = None;
}
let message = "Transaction rolled back successfully";
Ok(QueryResult {
rows: vec![Row::from_values(HashMap::from([(
"status".to_string(),
Value::String(message.to_string()),
)]))],
variables: vec!["status".to_string()],
execution_time_ms: 0,
rows_affected: 0,
session_result: None,
warnings: Vec::new(),
})
}
#[allow(dead_code)] fn apply_undo_operation(
undo_op: &UndoOperation,
unified_storage: &StorageManager,
) -> Result<(), ExecutionError> {
if let UndoOperation::Batch { operations } = undo_op {
for op in operations {
Self::apply_undo_operation(op, unified_storage)?;
}
return Ok(());
}
let graph_name = match undo_op {
UndoOperation::InsertNode { graph_path, .. }
| UndoOperation::UpdateNode { graph_path, .. }
| UndoOperation::DeleteNode { graph_path, .. }
| UndoOperation::InsertEdge { graph_path, .. }
| UndoOperation::UpdateEdge { graph_path, .. }
| UndoOperation::DeleteEdge { graph_path, .. } => graph_path,
UndoOperation::Batch { .. } => unreachable!("Batch handled above"),
};
let mut graph = unified_storage
.get_graph(graph_name)
.map_err(|e| {
ExecutionError::StorageError(format!("Failed to get graph during rollback: {}", e))
})?
.ok_or_else(|| {
ExecutionError::StorageError(format!(
"Graph not found during rollback: {}",
graph_name
))
})?;
Self::apply_undo_to_graph(&mut graph, undo_op)?;
unified_storage.save_graph(graph_name, graph).map_err(|e| {
ExecutionError::StorageError(format!(
"Failed to save rollback changes to storage: {}",
e
))
})?;
log::debug!(
"ROLLBACK: Applied undo operation for graph '{}'",
graph_name
);
Ok(())
}
#[allow(dead_code)] fn apply_undo_to_graph(
graph: &mut GraphCache,
undo_op: &UndoOperation,
) -> Result<(), ExecutionError> {
if let UndoOperation::Batch { operations } = undo_op {
for op in operations {
Self::apply_undo_to_graph(graph, op)?;
}
return Ok(());
}
match undo_op {
UndoOperation::InsertNode { node_id, .. } => {
graph.remove_node(node_id).map_err(|e| {
ExecutionError::StorageError(format!(
"Failed to remove node during rollback: {}",
e
))
})?;
log::debug!("ROLLBACK: Removed node {} (undo insert)", node_id);
}
UndoOperation::InsertEdge { edge_id, .. } => {
graph.remove_edge(edge_id).map_err(|e| {
ExecutionError::StorageError(format!(
"Failed to remove edge during rollback: {}",
e
))
})?;
log::debug!("ROLLBACK: Removed edge {} (undo insert)", edge_id);
}
UndoOperation::UpdateNode {
node_id,
old_properties,
old_labels,
..
} => {
if let Some(node) = graph.get_node_mut(node_id) {
node.properties = old_properties.clone();
node.labels = old_labels.clone();
log::debug!(
"ROLLBACK: Restored node {} properties and labels (undo update)",
node_id
);
} else {
log::warn!(
"ROLLBACK: Node {} not found for property restoration",
node_id
);
}
}
UndoOperation::UpdateEdge {
edge_id,
old_properties,
old_label,
..
} => {
if let Some(edge) = graph.get_edge_mut(edge_id) {
edge.properties = old_properties.clone();
edge.label = old_label.clone();
log::debug!(
"ROLLBACK: Restored edge {} properties and label (undo update)",
edge_id
);
} else {
log::warn!(
"ROLLBACK: Edge {} not found for property restoration",
edge_id
);
}
}
UndoOperation::DeleteNode {
node_id,
deleted_node,
..
} => {
graph.add_node(deleted_node.clone()).map_err(|e| {
ExecutionError::StorageError(format!(
"Failed to restore node during rollback: {}",
e
))
})?;
log::debug!("ROLLBACK: Restored node {} (undo delete)", node_id);
}
UndoOperation::DeleteEdge {
edge_id,
deleted_edge,
..
} => {
graph.add_edge(deleted_edge.clone()).map_err(|e| {
ExecutionError::StorageError(format!(
"Failed to restore edge during rollback: {}",
e
))
})?;
log::debug!("ROLLBACK: Restored edge {} (undo delete)", edge_id);
}
UndoOperation::Batch { .. } => {
unreachable!("Batch operations should be handled before this match statement");
}
}
Ok(())
}
}