pub use self::operations::*;
pub use self::rollback::*;
pub use self::types::*;
mod operations;
mod rollback;
mod types;
use super::{core::TransactionState, errors::RecoveryError};
use crate::backend::native::v2::StringTable;
use crate::backend::native::v2::wal::V2WALRecord;
use crate::backend::native::{EdgeStore, GraphFile, NodeStore};
use crate::debug::{debug_log, error_log, info_log, warn_log};
use rayon::prelude::*;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
pub struct V2GraphFileReplayer {
config: ReplayConfig,
graph_file: Arc<RwLock<GraphFile>>,
node_store: Arc<Mutex<Option<NodeStore<'static>>>>,
edge_store: Arc<Mutex<Option<EdgeStore<'static>>>>,
string_table: Arc<Mutex<StringTable>>,
operations: DefaultReplayOperations,
rollback_system: Arc<Mutex<RollbackSystem>>,
statistics: Arc<ReplayStatistics>,
}
impl V2GraphFileReplayer {
pub fn create(database_path: PathBuf, config: ReplayConfig) -> Result<Self, RecoveryError> {
if !database_path.exists() {
return Err(RecoveryError::configuration(
"Database file does not exist".to_string(),
));
}
if !database_path.is_file() {
return Err(RecoveryError::configuration(
"Database path is not a file".to_string(),
));
}
let graph_file = GraphFile::open(&database_path)
.map_err(|e| RecoveryError::io_error(format!("Failed to open graph file: {}", e)))?;
let graph_file = Arc::new(RwLock::new(graph_file));
let node_store: Arc<Mutex<Option<NodeStore<'static>>>> = Arc::new(Mutex::new(None));
let edge_store: Arc<Mutex<Option<EdgeStore<'static>>>> = Arc::new(Mutex::new(None));
let string_table = Arc::new(Mutex::new(StringTable::new()));
let free_space_manager = Arc::new(Mutex::new(None));
let kv_store = Arc::new(Mutex::new(crate::backend::native::v2::KvStore::new()));
let statistics = Arc::new(ReplayStatistics::new());
let rollback_system = Arc::new(Mutex::new(RollbackSystem::new(
graph_file.clone(),
node_store.clone(),
string_table.clone(),
free_space_manager.clone(),
)));
let operations = DefaultReplayOperations::new(
graph_file.clone(),
node_store.clone(),
edge_store.clone(),
string_table.clone(),
free_space_manager.clone(),
kv_store.clone(),
statistics.clone(),
);
Ok(Self {
config,
graph_file,
node_store,
edge_store,
string_table,
operations,
rollback_system,
statistics,
})
}
pub fn replay_transactions(
&self,
transactions: &[TransactionState],
) -> Result<ReplayResult, RecoveryError> {
let start_time = Instant::now();
let successful_operations = AtomicUsize::new(0);
info_log!(
"Starting PARALLEL V2 transaction replay for {} transactions",
transactions.len()
);
let mut committed_transactions: Vec<_> = transactions
.iter()
.filter(|tx| tx.committed && tx.commit_lsn.is_some())
.collect();
committed_transactions
.sort_by(|a, b| a.commit_lsn.unwrap_or(0).cmp(&b.commit_lsn.unwrap_or(0)));
info_log!(
"Replaying {} committed transactions (parallelism: {})",
committed_transactions.len(),
self.config.max_parallel_transactions
);
let tx_results: Vec<_> = committed_transactions
.par_iter() .enumerate()
.map(|(tx_index, transaction)| {
debug_log!(
"Replaying transaction TX {} ({}/{}) with {} records",
transaction.tx_id,
tx_index + 1,
committed_transactions.len(),
transaction.records.len()
);
let result = self.replay_transaction(
transaction,
tx_index + 1,
committed_transactions.len(),
);
if let Ok(ref tx_result) = result {
successful_operations
.fetch_add(tx_result.successful_operations as usize, Ordering::Relaxed);
}
(tx_index, transaction.tx_id, result)
})
.collect();
let mut failed_operations = Vec::new();
let mut warnings = Vec::new();
for (tx_index, tx_id, result) in tx_results {
match result {
Ok(tx_result) => {
debug_log!(
"Successfully replayed TX {} with {} operations",
tx_id,
tx_result.successful_operations
);
failed_operations.extend(tx_result.failed_operations);
warnings.extend(tx_result.warnings);
}
Err(e) => {
error_log!("Failed to replay TX {}: {}", tx_id, e);
failed_operations.push((
V2WALRecord::HeaderUpdate {
header_offset: 0,
new_data: vec![],
old_data: vec![],
},
e,
));
}
}
if (tx_index + 1) % self.config.progress_interval == 0 {
self.report_progress(tx_index + 1, committed_transactions.len());
}
}
let duration = start_time.elapsed();
self.statistics
.set_total_duration(duration.as_millis() as u64);
let total_successful = successful_operations.load(Ordering::Relaxed) as u64;
let result = ReplayResult {
successful_operations: total_successful,
failed_operations,
statistics: self.statistics.snapshot(),
warnings,
};
info_log!(
"PARALLEL V2 transaction replay completed: {} operations successful, {} failed, duration: {:?}",
result.successful_operations,
result.failed_operations.len(),
duration
);
Ok(result)
}
fn replay_transaction(
&self,
transaction: &TransactionState,
_tx_index: usize,
_total_txs: usize,
) -> Result<ReplayResult, RecoveryError> {
let start_time = Instant::now();
let mut successful_operations = 0;
let mut failed_operations = Vec::new();
let mut warnings = Vec::new();
{
let mut rollback_system = self.rollback_system.lock().unwrap();
rollback_system.clear();
}
self.begin_transaction()?;
for record in &transaction.records {
let mut rollback_data = Vec::new();
match self.replay_record(record, &mut rollback_data) {
Ok(()) => {
successful_operations += 1;
{
let mut rollback_system = self.rollback_system.lock().unwrap();
for operation in rollback_data {
rollback_system.add_operation(operation);
}
}
}
Err(e) => {
error_log!("Failed to replay record: {}", e);
failed_operations.push((record.clone(), e));
break;
}
}
}
if failed_operations.is_empty() {
self.commit_transaction()?;
} else {
warn_log!(
"Rolling back transaction due to {} failed operations",
failed_operations.len()
);
if let Err(e) = self.rollback_transaction() {
error_log!("Failed to rollback transaction: {}", e);
warnings.push(format!("Rollback failed: {}", e));
}
}
let _duration = start_time.elapsed();
debug_log!(
"Transaction TX {} replayed in {:?}: {} success, {} failed",
transaction.tx_id,
duration,
successful_operations,
failed_operations.len()
);
Ok(ReplayResult {
successful_operations,
failed_operations,
statistics: StatisticsSnapshot::default(), warnings,
})
}
fn replay_record(
&self,
record: &V2WALRecord,
rollback_data: &mut Vec<RollbackOperation>,
) -> Result<(), RecoveryError> {
match record {
V2WALRecord::NodeInsert {
node_id,
slot_offset,
node_data,
} => self.operations.handle_node_insert(
*node_id as u64,
*slot_offset,
node_data,
rollback_data,
),
V2WALRecord::NodeUpdate {
node_id,
slot_offset,
new_data,
old_data,
} => self.operations.handle_node_update(
*node_id as u64,
*slot_offset,
&new_data,
Some(&old_data),
rollback_data,
),
V2WALRecord::NodeDelete {
node_id,
slot_offset,
old_data,
outgoing_edges: _,
incoming_edges: _,
} => self.operations.handle_node_delete(
*node_id as u64,
*slot_offset,
Some(&old_data),
rollback_data,
),
V2WALRecord::StringInsert {
string_id,
string_value,
} => {
self.operations
.handle_string_insert(*string_id as u64, string_value, rollback_data)
}
V2WALRecord::ClusterCreate {
node_id,
direction,
cluster_offset,
cluster_size,
edge_data,
} => self.operations.handle_cluster_create(
*node_id as u64,
*direction,
*cluster_offset,
*cluster_size as u64,
&edge_data,
rollback_data,
),
V2WALRecord::EdgeInsert {
cluster_key,
edge_record,
insertion_point,
} => {
let cluster_key_u64 = (
cluster_key.0 as u64,
match cluster_key.1 {
crate::backend::native::v2::edge_cluster::Direction::Outgoing => 0,
crate::backend::native::v2::edge_cluster::Direction::Incoming => 1,
},
);
self.operations.handle_edge_insert(
cluster_key_u64,
&edge_record,
*insertion_point,
rollback_data,
)
}
V2WALRecord::EdgeUpdate {
cluster_key,
new_edge,
position,
old_edge,
} => self.operations.handle_edge_update(
*cluster_key,
&new_edge,
*position,
&old_edge,
rollback_data,
),
V2WALRecord::EdgeDelete {
cluster_key,
position,
old_edge,
} => self.operations.handle_edge_delete(
*cluster_key,
*position,
&old_edge,
rollback_data,
),
V2WALRecord::FreeSpaceAllocate {
block_offset,
block_size,
block_type,
} => self.operations.handle_free_space_allocate(
*block_offset,
*block_size as u64,
*block_type,
rollback_data,
),
V2WALRecord::FreeSpaceDeallocate {
block_offset,
block_size,
block_type,
} => self.operations.handle_free_space_deallocate(
*block_offset,
*block_size as u64,
*block_type,
rollback_data,
),
V2WALRecord::HeaderUpdate {
header_offset,
new_data,
old_data,
} => {
let old_data_slice: Option<&[u8]> = Some(&old_data[..]);
self.operations.handle_header_update(
*header_offset,
new_data,
old_data_slice,
rollback_data,
)
}
V2WALRecord::TransactionBegin { .. }
| V2WALRecord::TransactionCommit { .. }
| V2WALRecord::TransactionRollback { .. } => {
debug_log!(
"Transaction control record encountered during replay - handled by recovery coordinator"
);
Ok(())
}
V2WALRecord::Checkpoint { .. } => {
debug_log!(
"Checkpoint record encountered during replay - handled by recovery coordinator"
);
Ok(())
}
V2WALRecord::SegmentEnd { .. } => {
debug_log!(
"Segment end marker encountered during replay - handled by recovery coordinator"
);
Ok(())
}
V2WALRecord::TransactionPrepare { .. } | V2WALRecord::TransactionAbort { .. } => {
debug_log!(
"Two-phase commit transaction record encountered during replay - handled by recovery coordinator"
);
Ok(())
}
V2WALRecord::SavepointCreate { .. }
| V2WALRecord::SavepointRollback { .. }
| V2WALRecord::SavepointRelease { .. } => {
debug_log!(
"Savepoint record encountered during replay - handled by recovery coordinator"
);
Ok(())
}
V2WALRecord::BackupCreate { .. } | V2WALRecord::BackupRestore { .. } => {
debug_log!(
"Backup record encountered during replay - handled by recovery coordinator"
);
Ok(())
}
V2WALRecord::LockAcquire { .. } | V2WALRecord::LockRelease { .. } => {
debug_log!(
"Lock record encountered during replay - handled by recovery coordinator"
);
Ok(())
}
V2WALRecord::IndexUpdate { .. } | V2WALRecord::StatisticsUpdate { .. } => {
debug_log!(
"Metadata update record encountered during replay - handled by recovery coordinator"
);
Ok(())
}
V2WALRecord::AllocateContiguous { .. }
| V2WALRecord::CommitContiguous { .. }
| V2WALRecord::RollbackContiguous { .. } => {
debug_log!(
"Contiguous allocation record encountered during replay - handled by recovery coordinator"
);
Ok(())
}
V2WALRecord::KvSet {
key,
value_bytes,
value_type,
ttl_seconds,
version,
} => self.operations.handle_kv_set(
key.clone(),
value_bytes.clone(),
*value_type,
*ttl_seconds,
*version,
rollback_data,
),
V2WALRecord::KvDelete {
key,
old_value_bytes,
old_value_type,
old_version,
} => self.operations.handle_kv_delete(
key.clone(),
old_value_bytes.clone(),
*old_value_type,
*old_version,
rollback_data,
),
}
}
fn begin_transaction(&self) -> Result<(), RecoveryError> {
debug_log!("Beginning transaction for replay operations");
Ok(())
}
fn commit_transaction(&self) -> Result<(), RecoveryError> {
debug_log!("Committing successful transaction replay");
{
let mut rollback_system = self.rollback_system.lock().unwrap();
rollback_system.clear();
}
Ok(())
}
fn rollback_transaction(&self) -> Result<(), RecoveryError> {
debug_log!("V2 transaction rollback initiated");
let rollback_system = self.rollback_system.lock().unwrap();
let summary = rollback_system.get_summary();
if summary.total_operations > 0 {
info_log!(
"Rolling back {} operations ({} node, {} string)",
summary.total_operations,
summary.data_operations_count() - summary.string_insert_count as usize,
summary.string_insert_count
);
}
rollback_system.execute_rollback()
}
fn report_progress(&self, completed: usize, total: usize) {
let _percentage = (completed as f64 / total as f64) * 100.0;
info_log!(
"Replay progress: {}/{} transactions ({:.1}%)",
completed,
total,
percentage
);
}
pub fn get_statistics(&self) -> StatisticsSnapshot {
self.statistics.snapshot()
}
pub fn reset_statistics(&self) {
warn_log!(
"reset_statistics called on Arc<ReplayStatistics> - this is a no-op, use snapshot() instead"
);
}
pub fn get_rollback_summary(&self) -> RollbackSummary {
self.rollback_system.lock().unwrap().get_summary()
}
pub fn string_table(&self) -> Arc<Mutex<StringTable>> {
self.string_table.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_replay_config_default() {
let config = ReplayConfig::default();
assert!(config.strict_validation);
assert_eq!(config.create_backup, false);
assert!(config.max_batch_size > 0);
assert!(config.operation_timeout_ms > 0);
}
#[test]
fn test_replayer_file_validation() {
let temp_dir = tempdir().unwrap();
let config = ReplayConfig::default();
let non_existent_path = temp_dir.path().join("nonexistent.db");
let result = V2GraphFileReplayer::create(non_existent_path, config.clone());
assert!(result.is_err());
if let Err(error) = result {
assert!(matches!(
error.kind,
crate::backend::native::v2::wal::recovery::errors::RecoveryErrorKind::Configuration
));
assert!(error.message.contains("Database file does not exist"));
}
let result = V2GraphFileReplayer::create(temp_dir.path().into(), config);
assert!(result.is_err());
if let Err(error) = result {
assert!(matches!(
error.kind,
crate::backend::native::v2::wal::recovery::errors::RecoveryErrorKind::Configuration
));
assert!(error.message.contains("Database path is not a file"));
}
}
#[test]
fn test_replay_statistics() {
use std::sync::atomic::Ordering;
let stats = ReplayStatistics::default();
assert_eq!(stats.total_duration_ms.load(Ordering::Relaxed), 0);
assert_eq!(stats.node_operations.load(Ordering::Relaxed), 0);
assert_eq!(stats.edge_operations.load(Ordering::Relaxed), 0);
assert_eq!(stats.string_operations.load(Ordering::Relaxed), 0);
assert_eq!(stats.free_space_operations.load(Ordering::Relaxed), 0);
assert_eq!(stats.bytes_written.load(Ordering::Relaxed), 0);
}
#[test]
fn test_rollback_operation_serialization() {
let operation = RollbackOperation::StringInsert {
string_id: 123,
string_value: "test_rollback".to_string(),
};
match operation {
RollbackOperation::StringInsert {
string_id,
string_value,
} => {
assert_eq!(string_id, 123);
assert_eq!(string_value, "test_rollback");
}
_ => panic!("Expected StringInsert operation"),
}
}
#[test]
fn test_v2_graph_integrity() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test_integrity.db");
let graph_file = GraphFile::create(&db_path).unwrap();
let config = ReplayConfig::default();
let replayer_result = V2GraphFileReplayer::create(db_path, config);
assert!(
replayer_result.is_ok(),
"Should create replayer successfully with modular structure"
);
if let Ok(replayer) = replayer_result {
let stats = replayer.get_statistics();
assert_eq!(stats.total_operations(), 0);
let rollback_summary = replayer.get_rollback_summary();
assert_eq!(rollback_summary.total_operations, 0);
let string_table = replayer.string_table();
assert!(
string_table.try_lock().is_ok(),
"Should be able to lock string table"
);
}
}
#[test]
fn test_modular_integration() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test_integration.db");
GraphFile::create(&db_path).unwrap();
let config = ReplayConfig::default();
let replayer = V2GraphFileReplayer::create(db_path, config).unwrap();
let mut rollback_data = Vec::new();
let result =
replayer
.operations
.handle_string_insert(100, "integration_test", &mut rollback_data);
assert!(result.is_ok());
assert_eq!(rollback_data.len(), 1);
{
let mut rollback_system = replayer.rollback_system.lock().unwrap();
for operation in rollback_data {
rollback_system.add_operation(operation);
}
}
let summary = replayer.get_rollback_summary();
assert_eq!(summary.total_operations, 1);
assert_eq!(summary.string_insert_count, 1);
}
}