use chrono::{DateTime, Utc};
use std::collections::HashMap;
use super::state::{OperationType, TransactionId};
use super::wal::{PersistentWAL, WALEntry, WALEntryType, WALError};
use crate::storage::StorageManager;
#[allow(dead_code)] pub struct RecoveryManager {
wal: PersistentWAL,
recovered_transactions: HashMap<TransactionId, TransactionRecoveryState>,
storage_manager: Option<StorageManager>,
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub struct TransactionRecoveryState {
pub transaction_id: TransactionId,
pub status: RecoveryStatus,
pub operations: Vec<WALEntry>,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, PartialEq)]
#[allow(dead_code)] pub enum RecoveryStatus {
InProgress,
Committed,
RolledBack,
NeedsAbort,
}
impl RecoveryManager {
#[allow(dead_code)] pub fn new(db_path: std::path::PathBuf) -> Self {
let wal = PersistentWAL::new(db_path).expect("Failed to initialize WAL for recovery");
Self {
wal,
recovered_transactions: HashMap::new(),
storage_manager: None,
}
}
#[allow(dead_code)] pub fn with_storage(db_path: std::path::PathBuf, storage_manager: StorageManager) -> Self {
let wal = PersistentWAL::new(db_path).expect("Failed to initialize WAL for recovery");
Self {
wal,
recovered_transactions: HashMap::new(),
storage_manager: Some(storage_manager),
}
}
#[allow(dead_code)] pub fn recover(&mut self) -> Result<RecoveryReport, RecoveryError> {
let mut report = RecoveryReport::new();
self.analysis_phase(&mut report)?;
self.redo_phase(&mut report)?;
self.undo_phase(&mut report)?;
Ok(report)
}
fn analysis_phase(&mut self, report: &mut RecoveryReport) -> Result<(), RecoveryError> {
log::debug!("🔍 Starting WAL analysis phase...");
let mut file_number = 1u64;
loop {
match self.wal.read_wal_file(file_number) {
Ok(entries) => {
report.total_wal_entries += entries.len();
for entry in entries {
self.process_entry_analysis(entry)?;
}
file_number += 1;
}
Err(WALError::IOError(msg)) if msg.contains("not found") => {
break;
}
Err(e) => {
return Err(RecoveryError::WALReadError(e.to_string()));
}
}
}
for (txn_id, state) in &self.recovered_transactions {
match state.status {
RecoveryStatus::InProgress => {
report.incomplete_transactions.push(*txn_id);
}
RecoveryStatus::Committed => {
report.committed_transactions.push(*txn_id);
}
RecoveryStatus::RolledBack => {
report.rolled_back_transactions.push(*txn_id);
}
_ => {}
}
}
log::debug!(
"✅ Analysis phase complete: {} transactions to recover",
report.incomplete_transactions.len() + report.committed_transactions.len()
);
Ok(())
}
fn process_entry_analysis(&mut self, entry: WALEntry) -> Result<(), RecoveryError> {
let txn_id = entry.transaction_id;
match entry.entry_type {
WALEntryType::TransactionBegin => {
let state = TransactionRecoveryState {
transaction_id: txn_id,
status: RecoveryStatus::InProgress,
operations: Vec::new(),
start_time: DateTime::<Utc>::from(entry.timestamp),
end_time: None,
};
self.recovered_transactions.insert(txn_id, state);
}
WALEntryType::TransactionCommit => {
if let Some(state) = self.recovered_transactions.get_mut(&txn_id) {
state.status = RecoveryStatus::Committed;
state.end_time = Some(DateTime::<Utc>::from(entry.timestamp));
}
}
WALEntryType::TransactionRollback => {
if let Some(state) = self.recovered_transactions.get_mut(&txn_id) {
state.status = RecoveryStatus::RolledBack;
state.end_time = Some(DateTime::<Utc>::from(entry.timestamp));
}
}
WALEntryType::TransactionOperation => {
if let Some(state) = self.recovered_transactions.get_mut(&txn_id) {
state.operations.push(entry);
}
}
}
Ok(())
}
fn redo_phase(&mut self, report: &mut RecoveryReport) -> Result<(), RecoveryError> {
log::debug!("🔄 Starting redo phase...");
let txn_ids = report.committed_transactions.clone();
for txn_id in &txn_ids {
let operations = if let Some(state) = self.recovered_transactions.get(txn_id) {
log::debug!(" Replaying committed transaction: {}", txn_id.id());
state.operations.clone()
} else {
continue;
};
for operation in &operations {
if self.storage_manager.is_some() {
self.apply_operation(operation)?;
}
report.operations_replayed += 1;
}
}
log::debug!(
"✅ Redo phase complete: {} operations replayed",
report.operations_replayed
);
Ok(())
}
fn undo_phase(&mut self, report: &mut RecoveryReport) -> Result<(), RecoveryError> {
log::debug!("↩️ Starting undo phase...");
let txn_ids = report.incomplete_transactions.clone();
for txn_id in &txn_ids {
let operations = if let Some(state) = self.recovered_transactions.get(txn_id) {
log::debug!(" Rolling back incomplete transaction: {}", txn_id.id());
state.operations.clone()
} else {
continue;
};
for operation in operations.iter().rev() {
if self.storage_manager.is_some() {
self.undo_operation(operation)?;
}
report.operations_undone += 1;
}
}
log::debug!(
"✅ Undo phase complete: {} operations undone",
report.operations_undone
);
Ok(())
}
fn apply_operation(&mut self, entry: &WALEntry) -> Result<(), RecoveryError> {
match entry.operation_type {
Some(OperationType::Insert)
| Some(OperationType::Update)
| Some(OperationType::Delete) => {
log::debug!(
" Applying: {:?} - {}",
entry.operation_type,
entry.description
);
}
Some(OperationType::CreateTable)
| Some(OperationType::CreateGraph)
| Some(OperationType::DropTable)
| Some(OperationType::DropGraph) => {
log::debug!(" Applying schema change: {:?}", entry.operation_type);
}
_ => {
}
}
Ok(())
}
fn undo_operation(&mut self, entry: &WALEntry) -> Result<(), RecoveryError> {
match entry.operation_type {
Some(OperationType::Insert) => {
log::debug!(" Undoing insert: {}", entry.description);
}
Some(OperationType::Delete) => {
log::debug!(" Undoing delete: {}", entry.description);
}
Some(OperationType::Update) => {
log::debug!(" Undoing update: {}", entry.description);
}
_ => {
}
}
Ok(())
}
#[allow(dead_code)] pub fn was_transaction_recovered(&self, txn_id: &TransactionId) -> bool {
self.recovered_transactions.contains_key(txn_id)
}
#[allow(dead_code)] pub fn get_transaction_state(
&self,
txn_id: &TransactionId,
) -> Option<&TransactionRecoveryState> {
self.recovered_transactions.get(txn_id)
}
}
#[derive(Debug)]
#[allow(dead_code)] pub struct RecoveryReport {
pub total_wal_entries: usize,
pub committed_transactions: Vec<TransactionId>,
pub rolled_back_transactions: Vec<TransactionId>,
pub incomplete_transactions: Vec<TransactionId>,
pub operations_replayed: usize,
pub operations_undone: usize,
pub recovery_time_ms: u64,
}
impl RecoveryReport {
fn new() -> Self {
Self {
total_wal_entries: 0,
committed_transactions: Vec::new(),
rolled_back_transactions: Vec::new(),
incomplete_transactions: Vec::new(),
operations_replayed: 0,
operations_undone: 0,
recovery_time_ms: 0,
}
}
}
#[derive(Debug)]
pub enum RecoveryError {
WALReadError(String),
#[allow(dead_code)]
OperationReplayError(String),
#[allow(dead_code)]
StorageError(String),
}
impl std::fmt::Display for RecoveryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RecoveryError::WALReadError(msg) => write!(f, "WAL Read Error: {}", msg),
RecoveryError::OperationReplayError(msg) => {
write!(f, "Operation Replay Error: {}", msg)
}
RecoveryError::StorageError(msg) => write!(f, "Storage Error: {}", msg),
}
}
}
impl std::error::Error for RecoveryError {}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_recovery_manager_creation() {
let temp_dir = TempDir::new().unwrap();
let manager = RecoveryManager::new(temp_dir.path().to_path_buf());
assert!(manager.recovered_transactions.is_empty());
}
#[test]
fn test_empty_recovery() {
let temp_dir = TempDir::new().unwrap();
let mut manager = RecoveryManager::new(temp_dir.path().to_path_buf());
let report = manager.recover().unwrap();
assert_eq!(report.total_wal_entries, 0);
assert_eq!(report.committed_transactions.len(), 0);
assert_eq!(report.incomplete_transactions.len(), 0);
}
}