use super::{constants::*, core::TransactionState, errors::RecoveryError};
use crate::backend::native::v2::wal::{V2WALReader, V2WALRecord, V2WALRecordType};
use crate::debug::{debug_log, info_log, warn_log};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ScannerConfig {
pub max_buffer_size: usize,
pub validate_records: bool,
pub progress_interval: usize,
pub max_incomplete_transactions: usize,
}
impl Default for ScannerConfig {
fn default() -> Self {
Self {
max_buffer_size: scanning::MAX_READ_BUFFER_SIZE,
validate_records: true,
progress_interval: RECOVERY_PROGRESS_INTERVAL,
max_incomplete_transactions: MAX_INCOMPLETE_TRANSACTIONS,
}
}
}
#[derive(Debug, Clone)]
pub struct WALScanResult {
pub transactions: Vec<TransactionState>,
pub warnings: Vec<String>,
pub statistics: ScanStatistics,
}
#[derive(Debug, Clone, Default)]
pub struct ScanStatistics {
pub total_records: u64,
pub total_bytes: u64,
pub transactions_found: u64,
pub committed_transactions: u64,
pub rolled_back_transactions: u64,
pub incomplete_transactions: u64,
pub corrupted_records: u64,
pub scan_duration_ms: u64,
}
pub struct WALScanner {
active_transactions: Arc<Mutex<HashMap<u64, TransactionState>>>,
config: ScannerConfig,
}
pub struct TransactionScanner {
reader: V2WALReader,
active_transactions: Arc<Mutex<HashMap<u64, TransactionState>>>,
statistics: ScanStatistics,
config: ScannerConfig,
}
impl WALScanner {
pub fn new() -> Self {
Self::with_config(ScannerConfig::default())
}
pub fn with_config(config: ScannerConfig) -> Self {
Self {
active_transactions: Arc::new(Mutex::new(HashMap::new())),
config,
}
}
pub async fn scan_wal_file(
&self,
wal_path: &std::path::Path,
) -> Result<WALScanResult, RecoveryError> {
let start_time = std::time::Instant::now();
info_log!("Starting WAL scan: {:?}", wal_path);
if !wal_path.exists() {
return Err(RecoveryError::configuration(format!(
"WAL file does not exist: {:?}",
wal_path
)));
}
if !wal_path.is_file() {
return Err(RecoveryError::configuration(format!(
"WAL path is not a file: {:?}",
wal_path
)));
}
let mut scanner = TransactionScanner::new(wal_path, self.config.clone())?;
let result = scanner.scan().await?;
let _duration = start_time.elapsed();
info_log!(
"WAL scan completed: {} transactions, {} records in {:?}",
result.transactions.len(),
result.statistics.total_records,
duration
);
Ok(result)
}
pub fn active_transactions_count(&self) -> usize {
self.active_transactions.lock().len()
}
pub fn clear_active_transactions(&self) {
self.active_transactions.lock().clear();
}
}
impl TransactionScanner {
fn new(wal_path: &std::path::Path, config: ScannerConfig) -> Result<Self, RecoveryError> {
let reader = V2WALReader::open(wal_path)
.map_err(|e| RecoveryError::io_error(format!("Failed to open WAL: {:?}", e)))?;
Ok(Self {
reader,
active_transactions: Arc::new(Mutex::new(HashMap::new())),
statistics: ScanStatistics::default(),
config,
})
}
async fn scan(&mut self) -> Result<WALScanResult, RecoveryError> {
let start_time = std::time::Instant::now();
let header = self.reader.header().clone();
info_log!("Scanning WAL from LSN 1 to {}", header.current_lsn);
self.active_transactions.lock().clear();
let mut transactions = Vec::new();
let mut warnings = Vec::new();
let mut record_count = 0;
while let Some((lsn, record)) = self.read_next_record()? {
record_count += 1;
if let Some((tx_state, record_warnings)) = self.process_record(lsn, record)? {
if tx_state.committed || tx_state.commit_lsn.is_some() {
transactions.push(tx_state);
}
warnings.extend(record_warnings);
}
if record_count % self.config.progress_interval == 0 {
self.report_progress(record_count, lsn, header.current_lsn);
}
if self.active_transactions.lock().len() > self.config.max_incomplete_transactions {
warn_log!("Too many active transactions, forcing completion");
self.force_complete_incomplete_transactions(&mut transactions, &mut warnings);
}
}
self.finalize_incomplete_transactions(&mut transactions, &mut warnings);
self.statistics.scan_duration_ms = start_time.elapsed().as_millis() as u64;
self.statistics.transactions_found = transactions.len() as u64;
self.statistics.committed_transactions =
transactions.iter().filter(|tx| tx.committed).count() as u64;
self.statistics.rolled_back_transactions = transactions
.iter()
.filter(|tx| !tx.committed && tx.commit_lsn.is_some())
.count() as u64;
self.statistics.incomplete_transactions = transactions
.iter()
.filter(|tx| tx.commit_lsn.is_none())
.count() as u64;
let result = WALScanResult {
transactions,
warnings,
statistics: self.statistics.clone(),
};
info_log!(
"WAL scan complete: {} total records, {} transactions",
self.statistics.total_records,
self.statistics.transactions_found
);
Ok(result)
}
fn read_next_record(&mut self) -> Result<Option<(u64, V2WALRecord)>, RecoveryError> {
match self.reader.read_next_record() {
Ok(result) => {
if let Some((lsn, record)) = result {
self.statistics.total_records += 1;
self.statistics.total_bytes += self.estimate_record_size(&record);
Ok(Some((lsn, record)))
} else {
Ok(None)
}
}
Err(e) => {
self.statistics.corrupted_records += 1;
Err(RecoveryError::corruption(format!(
"Failed to read WAL record: {}",
e
)))
}
}
}
fn process_record(
&mut self,
lsn: u64,
record: V2WALRecord,
) -> Result<Option<(TransactionState, Vec<String>)>, RecoveryError> {
let mut warnings = Vec::new();
match record.record_type() {
V2WALRecordType::TransactionBegin => {
if let V2WALRecord::TransactionBegin { tx_id, timestamp } = record {
Ok(Some(self.handle_transaction_begin(tx_id, lsn, timestamp)?))
} else {
warnings.push("Invalid TransactionBegin record format".to_string());
Ok(None)
}
}
V2WALRecordType::TransactionCommit => {
if let V2WALRecord::TransactionCommit { tx_id, timestamp } = record {
Ok(Some(self.handle_transaction_commit(
tx_id,
lsn,
timestamp,
&mut warnings,
)?))
} else {
warnings.push("Invalid TransactionCommit record format".to_string());
Ok(None)
}
}
V2WALRecordType::TransactionRollback => {
if let V2WALRecord::TransactionRollback { tx_id, timestamp } = record {
Ok(Some(self.handle_transaction_rollback(
tx_id,
lsn,
timestamp,
&mut warnings,
)?))
} else {
warnings.push("Invalid TransactionRollback record format".to_string());
Ok(None)
}
}
_ => {
if let Some(tx_id) = self.extract_transaction_id(&record) {
self.add_record_to_transaction(tx_id, record, lsn, &mut warnings)?;
}
Ok(None)
}
}
}
fn handle_transaction_begin(
&mut self,
tx_id: u64,
lsn: u64,
timestamp: u64,
) -> Result<(TransactionState, Vec<String>), RecoveryError> {
let mut warnings = Vec::new();
let tx_state = TransactionState {
tx_id,
start_lsn: lsn,
commit_lsn: None,
records: Vec::new(),
committed: false,
timestamp,
};
{
let mut active_tx = self.active_transactions.lock();
if active_tx.contains_key(&tx_id) {
warnings.push(format!("Duplicate transaction begin for TX {}", tx_id));
}
active_tx.insert(tx_id, tx_state);
}
Ok((
TransactionState {
tx_id,
start_lsn: lsn,
commit_lsn: None,
records: Vec::new(),
committed: false,
timestamp,
},
warnings,
))
}
fn handle_transaction_commit(
&mut self,
tx_id: u64,
lsn: u64,
timestamp: u64,
warnings: &mut Vec<String>,
) -> Result<(TransactionState, Vec<String>), RecoveryError> {
let mut active_tx = self.active_transactions.lock();
if let Some(mut tx_state) = active_tx.remove(&tx_id) {
tx_state.commit_lsn = Some(lsn);
tx_state.committed = true;
tx_state.timestamp = timestamp;
Ok((tx_state, warnings.clone()))
} else {
warnings.push(format!("Commit for unknown transaction TX {}", tx_id));
Ok((
TransactionState {
tx_id,
start_lsn: 0, commit_lsn: Some(lsn),
records: Vec::new(),
committed: true,
timestamp,
},
warnings.clone(),
))
}
}
fn handle_transaction_rollback(
&mut self,
tx_id: u64,
lsn: u64,
timestamp: u64,
warnings: &mut Vec<String>,
) -> Result<(TransactionState, Vec<String>), RecoveryError> {
let mut active_tx = self.active_transactions.lock();
if let Some(mut tx_state) = active_tx.remove(&tx_id) {
tx_state.committed = false;
tx_state.timestamp = timestamp;
Ok((tx_state, warnings.clone()))
} else {
warnings.push(format!("Rollback for unknown transaction TX {}", tx_id));
Ok((
TransactionState {
tx_id,
start_lsn: 0, commit_lsn: Some(lsn),
records: Vec::new(),
committed: false,
timestamp,
},
warnings.clone(),
))
}
}
fn add_record_to_transaction(
&mut self,
tx_id: u64,
record: V2WALRecord,
_lsn: u64,
warnings: &mut Vec<String>,
) -> Result<(), RecoveryError> {
let mut active_tx = self.active_transactions.lock();
if let Some(tx_state) = active_tx.get_mut(&tx_id) {
tx_state.records.push(record);
} else {
debug_log!("Record for inactive transaction TX {}", tx_id);
warnings.push(format!("Record for inactive transaction TX {}", tx_id));
}
Ok(())
}
fn extract_transaction_id(&self, record: &V2WALRecord) -> Option<u64> {
match record {
V2WALRecord::NodeInsert { node_id, .. } => {
Some((*node_id as u64).wrapping_add(1_000_000))
}
V2WALRecord::NodeUpdate { node_id, .. } => {
Some((*node_id as u64).wrapping_add(2_000_000))
}
V2WALRecord::ClusterCreate { node_id, .. } => {
Some((*node_id as u64).wrapping_add(3_000_000))
}
V2WALRecord::EdgeInsert { cluster_key, .. } => {
Some((cluster_key.0 as u64).wrapping_add(4_000_000))
}
V2WALRecord::EdgeUpdate { cluster_key, .. } => {
Some((cluster_key.0 as u64).wrapping_add(5_000_000))
}
V2WALRecord::EdgeDelete { cluster_key, .. } => {
Some((cluster_key.0 as u64).wrapping_add(6_000_000))
}
V2WALRecord::StringInsert { string_id, .. } => {
Some((*string_id as u64).wrapping_add(7_000_000))
}
V2WALRecord::FreeSpaceAllocate { block_offset, .. } => {
Some(block_offset.wrapping_add(8_000_000))
}
V2WALRecord::FreeSpaceDeallocate { block_offset, .. } => {
Some(block_offset.wrapping_add(9_000_000))
}
V2WALRecord::NodeDelete { node_id, .. } => {
Some((*node_id as u64).wrapping_add(10_000_000))
}
V2WALRecord::HeaderUpdate { header_offset, .. } => {
Some(header_offset.wrapping_add(11_000_000))
}
V2WALRecord::TransactionBegin { .. }
| V2WALRecord::TransactionCommit { .. }
| V2WALRecord::TransactionRollback { .. }
| V2WALRecord::TransactionPrepare { .. }
| V2WALRecord::TransactionAbort { .. }
| V2WALRecord::SavepointCreate { .. }
| V2WALRecord::SavepointRollback { .. }
| V2WALRecord::SavepointRelease { .. }
| V2WALRecord::BackupCreate { .. }
| V2WALRecord::BackupRestore { .. }
| V2WALRecord::LockAcquire { .. }
| V2WALRecord::LockRelease { .. }
| V2WALRecord::IndexUpdate { .. }
| V2WALRecord::StatisticsUpdate { .. }
| V2WALRecord::RollbackContiguous { .. }
| V2WALRecord::Checkpoint { .. }
| V2WALRecord::SegmentEnd { .. } => None,
V2WALRecord::AllocateContiguous { txn_id, .. } => Some(*txn_id),
V2WALRecord::CommitContiguous { txn_id, .. } => Some(*txn_id),
V2WALRecord::KvSet { version, .. } => Some(*version),
V2WALRecord::KvDelete { old_version, .. } => Some(*old_version),
}
}
fn force_complete_incomplete_transactions(
&mut self,
transactions: &mut Vec<TransactionState>,
warnings: &mut Vec<String>,
) {
let mut active_tx = self.active_transactions.lock();
let incomplete_count = active_tx.len();
if incomplete_count > 0 {
warn_log!(
"Forcing completion of {} incomplete transactions",
incomplete_count
);
for (_, mut tx_state) in active_tx.drain() {
tx_state.committed = false; let tx_id = tx_state.tx_id;
transactions.push(tx_state);
warnings.push(format!(
"Incomplete transaction TX {} forced to completion",
tx_id
));
}
}
}
fn finalize_incomplete_transactions(
&mut self,
transactions: &mut Vec<TransactionState>,
warnings: &mut Vec<String>,
) {
let mut active_tx = self.active_transactions.lock();
for (_, tx_state) in active_tx.drain() {
warnings.push(format!(
"Incomplete transaction TX {} recovered",
tx_state.tx_id
));
transactions.push(tx_state);
}
}
fn report_progress(&self, _record_count: usize, current_lsn: u64, total_lsn: u64) {
let _percentage = if total_lsn > 0 {
(current_lsn as f64 / total_lsn as f64) * 100.0
} else {
0.0
};
debug_log!(
"WAL scan progress: {} records, LSN {}/{}, {:.1}% complete",
record_count,
current_lsn,
total_lsn,
percentage
);
}
fn estimate_record_size(&self, record: &V2WALRecord) -> u64 {
let base_size = 16;
match record {
V2WALRecord::NodeInsert { node_data, .. } => base_size + node_data.len() as u64,
V2WALRecord::NodeUpdate { new_data, .. } => base_size + new_data.len() as u64,
V2WALRecord::ClusterCreate { edge_data, .. } => base_size + edge_data.len() as u64,
V2WALRecord::EdgeInsert { edge_record, .. } => {
base_size + edge_record.estimated_size() as u64
}
V2WALRecord::EdgeUpdate { new_edge, .. } => {
base_size + new_edge.estimated_size() as u64
}
V2WALRecord::StringInsert { string_value, .. } => base_size + string_value.len() as u64,
V2WALRecord::HeaderUpdate { new_data, .. } => base_size + new_data.len() as u64,
_ => base_size + 32, }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::v2::edge_cluster::{CompactEdgeRecord, Direction};
use std::path::PathBuf;
use tempfile::tempdir;
#[test]
fn test_scanner_config_default() {
let config = ScannerConfig::default();
assert!(config.validate_records);
assert_eq!(config.progress_interval, RECOVERY_PROGRESS_INTERVAL);
assert_eq!(
config.max_incomplete_transactions,
MAX_INCOMPLETE_TRANSACTIONS
);
}
#[test]
fn test_wal_scanner_creation() {
let scanner = WALScanner::new();
assert_eq!(scanner.active_transactions_count(), 0);
let config = ScannerConfig {
validate_records: false,
progress_interval: 500,
max_incomplete_transactions: 200,
max_buffer_size: 64 * 1024,
};
let custom_scanner = WALScanner::with_config(config);
assert_eq!(custom_scanner.active_transactions_count(), 0);
}
#[test]
fn test_scan_statistics_default() {
let stats = ScanStatistics::default();
assert_eq!(stats.total_records, 0);
assert_eq!(stats.transactions_found, 0);
assert_eq!(stats.committed_transactions, 0);
assert_eq!(stats.rolled_back_transactions, 0);
assert_eq!(stats.incomplete_transactions, 0);
assert_eq!(stats.corrupted_records, 0);
assert_eq!(stats.scan_duration_ms, 0);
}
#[test]
fn test_transaction_id_extraction() {
let node_insert = V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 100,
node_data: vec![1, 2, 3],
};
let node_tx_id = match &node_insert {
V2WALRecord::NodeInsert { node_id, .. } => {
Some((*node_id as u64).wrapping_add(1_000_000))
}
_ => None,
};
assert_eq!(node_tx_id, Some(1000042));
let edge_record = CompactEdgeRecord {
neighbor_id: 456,
edge_type_offset: 0,
edge_data: vec![],
};
let edge_insert = V2WALRecord::EdgeInsert {
cluster_key: (123, Direction::Outgoing),
edge_record,
insertion_point: 0,
};
let edge_tx_id = match &edge_insert {
V2WALRecord::EdgeInsert { cluster_key, .. } => {
Some((cluster_key.0 as u64).wrapping_add(4_000_000))
}
_ => None,
};
assert_eq!(edge_tx_id, Some(4000123));
let tx_begin = V2WALRecord::TransactionBegin {
tx_id: 1,
timestamp: 1234567890,
};
let control_tx_id: Option<u64> = match &tx_begin {
V2WALRecord::NodeInsert { .. }
| V2WALRecord::NodeUpdate { .. }
| V2WALRecord::ClusterCreate { .. }
| V2WALRecord::EdgeInsert { .. }
| V2WALRecord::EdgeUpdate { .. }
| V2WALRecord::EdgeDelete { .. } => None,
_ => None,
};
assert_eq!(control_tx_id, None);
}
#[test]
fn test_uncommitted_transactions_filtered() {
let transactions = vec![
TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: Some(10),
records: vec![V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1000,
node_data: vec![1, 2, 3],
}],
committed: true,
timestamp: 0,
},
TransactionState {
tx_id: 2,
start_lsn: 11,
commit_lsn: None, records: vec![V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2000,
node_data: vec![4, 5, 6],
}],
committed: false, timestamp: 0,
},
TransactionState {
tx_id: 3,
start_lsn: 21,
commit_lsn: Some(30),
records: vec![V2WALRecord::NodeInsert {
node_id: 3,
slot_offset: 3000,
node_data: vec![7, 8, 9],
}],
committed: false, timestamp: 0,
},
];
let committed_transactions: Vec<_> = transactions
.iter()
.filter(|tx| tx.committed && tx.commit_lsn.is_some())
.collect();
assert_eq!(
committed_transactions.len(),
1,
"Only committed transactions should be replayed"
);
assert_eq!(
committed_transactions[0].tx_id, 1,
"TX 1 should be included"
);
}
#[test]
fn test_transaction_state_initialization() {
let tx_state = TransactionState {
tx_id: 42,
start_lsn: 100,
commit_lsn: None,
records: vec![],
committed: false, timestamp: 1234567890,
};
assert_eq!(tx_state.tx_id, 42);
assert_eq!(tx_state.start_lsn, 100);
assert_eq!(tx_state.commit_lsn, None, "IN_PROGRESS has no commit LSN");
assert_eq!(tx_state.committed, false, "IN_PROGRESS is not committed");
assert_eq!(tx_state.records.len(), 0);
let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
assert!(
!should_replay,
"IN_PROGRESS transactions should not be replayed"
);
}
#[test]
fn test_committed_transaction_passes_filter() {
let tx_state = TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: Some(10), records: vec![],
committed: true, timestamp: 0,
};
assert_eq!(tx_state.commit_lsn, Some(10));
assert_eq!(tx_state.committed, true);
let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
assert!(should_replay, "Committed transactions should be replayed");
}
#[test]
fn test_multiple_in_progress_transactions_filtered() {
let transactions = vec![
TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: Some(10),
records: vec![],
committed: true,
timestamp: 0,
},
TransactionState {
tx_id: 2,
start_lsn: 11,
commit_lsn: None,
records: vec![],
committed: false,
timestamp: 0,
},
TransactionState {
tx_id: 3,
start_lsn: 21,
commit_lsn: None,
records: vec![],
committed: false,
timestamp: 0,
},
TransactionState {
tx_id: 4,
start_lsn: 31,
commit_lsn: Some(40),
records: vec![],
committed: true,
timestamp: 0,
},
];
let committed_transactions: Vec<_> = transactions
.iter()
.filter(|tx| tx.committed && tx.commit_lsn.is_some())
.collect();
assert_eq!(
committed_transactions.len(),
2,
"Only TX 1 and TX 4 should pass filter"
);
assert_eq!(committed_transactions[0].tx_id, 1);
assert_eq!(committed_transactions[1].tx_id, 4);
}
#[test]
fn test_committed_without_commit_lsn_filtered() {
let tx_state = TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: None, records: vec![],
committed: true, timestamp: 0,
};
let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
assert!(
!should_replay,
"Transactions without commit_lsn should not replay"
);
}
#[test]
fn test_rollback_transaction_state() {
let tx_state = TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: Some(10), records: vec![],
committed: false, timestamp: 0,
};
let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
assert!(
!should_replay,
"Rolled back transactions should not be replayed"
);
}
}