use super::validator::{RecoveryValidator, ValidationResult};
use super::{constants::*, errors::RecoveryError};
use crate::backend::native::v2::wal::{V2WALConfig, V2WALRecord};
use crate::backend::native::{GraphFile, NativeBackendError, NativeResult};
use crate::debug::{debug_log, error_log, info_log, warn_log};
use parking_lot::{Condvar, Mutex};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveryState {
Idle,
Initializing,
Scanning,
Validating,
Replaying,
Finalizing,
Complete,
Failed,
}
#[derive(Debug, Clone)]
pub struct RecoveryOptions {
pub fast_recovery: bool,
pub max_batch_size: usize,
pub recovery_timeout: Duration,
pub perform_consistency_checks: bool,
pub create_backup: bool,
pub max_recovery_attempts: u32,
pub force_recovery: bool,
pub max_parallel_transactions: usize,
}
impl Default for RecoveryOptions {
fn default() -> Self {
Self {
fast_recovery: false,
max_batch_size: DEFAULT_BATCH_SIZE,
recovery_timeout: Duration::from_secs(DEFAULT_RECOVERY_TIMEOUT_SECONDS),
perform_consistency_checks: true,
create_backup: true,
max_recovery_attempts: MAX_RECOVERY_ATTEMPTS,
force_recovery: false,
max_parallel_transactions: 4, }
}
}
#[derive(Debug, Clone)]
pub struct RecoveryProgress {
pub state: RecoveryState,
pub current_lsn: u64,
pub total_lsns: u64,
pub transactions_processed: u64,
pub total_transactions: u64,
pub completion_percentage: f64,
pub start_time: Instant,
pub estimated_time_remaining: Duration,
}
pub type RecoveryResult<T = RecoverySuccess> = Result<T, RecoveryError>;
#[derive(Debug, Clone)]
pub struct RecoverySuccess {
pub metrics: RecoveryMetrics,
pub warnings: Vec<String>,
pub duration: Duration,
}
#[derive(Debug, Clone, Default)]
pub struct RecoveryMetrics {
pub total_duration_ms: u64,
pub transactions_scanned: u64,
pub committed_transactions_replayed: u64,
pub rolled_back_transactions: u64,
pub records_processed: u64,
pub corrupted_records: u64,
pub checkpoint_inconsistencies: u64,
pub database_size_before_recovery: u64,
pub database_size_after_recovery: u64,
pub recovery_start_timestamp: u64,
pub recovery_end_timestamp: u64,
}
pub struct V2WALRecoveryEngine {
config: V2WALConfig,
database_path: PathBuf,
state: Arc<Mutex<RecoveryState>>,
options: RecoveryOptions,
metrics: Arc<Mutex<RecoveryMetrics>>,
active_transactions: Arc<Mutex<HashMap<u64, TransactionState>>>,
recovery_cv: Arc<Condvar>,
backup_path: Option<PathBuf>,
}
#[derive(Debug, Clone)]
pub struct TransactionState {
pub tx_id: u64,
pub start_lsn: u64,
pub commit_lsn: Option<u64>,
pub records: Vec<V2WALRecord>,
pub committed: bool,
pub timestamp: u64,
}
impl V2WALRecoveryEngine {
pub fn create(
config: V2WALConfig,
database_path: PathBuf,
options: RecoveryOptions,
) -> NativeResult<Self> {
Self::validate_configuration(&config, &database_path)?;
let backup_path = if options.create_backup {
Some(Self::create_database_backup(&database_path)?)
} else {
None
};
Ok(Self {
config,
database_path,
state: Arc::new(Mutex::new(RecoveryState::Idle)),
options,
metrics: Arc::new(Mutex::new(RecoveryMetrics::default())),
active_transactions: Arc::new(Mutex::new(HashMap::new())),
recovery_cv: Arc::new(Condvar::new()),
backup_path,
})
}
fn validate_configuration(config: &V2WALConfig, database_path: &Path) -> NativeResult<()> {
config.validate()?;
if !database_path.exists() || !database_path.is_file() {
return Err(NativeBackendError::InvalidParameter {
context: "Database file does not exist or is not a file".to_string(),
source: None,
});
}
if !config.wal_path.exists() || !config.wal_path.is_file() {
return Err(NativeBackendError::InvalidParameter {
context: "WAL file does not exist or is not a file".to_string(),
source: None,
});
}
Ok(())
}
fn create_database_backup(database_path: &Path) -> NativeResult<PathBuf> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(NativeBackendError::from)?
.as_secs();
let database_name = database_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("database");
let backup_filename = format!("{}.recovery_backup.{}", database_name, timestamp);
let backup_path = database_path
.parent()
.unwrap_or_else(|| Path::new("."))
.join("recovery_backups")
.join(backup_filename);
if let Some(parent) = backup_path.parent() {
std::fs::create_dir_all(parent).map_err(NativeBackendError::from)?;
}
std::fs::copy(database_path, &backup_path).map_err(NativeBackendError::from)?;
Ok(backup_path)
}
pub fn recover(&self) -> RecoveryResult {
let start_time = Instant::now();
let start_timestamp = self.get_current_timestamp()?;
info_log!("Starting V2 WAL recovery");
self.set_recovery_state(RecoveryState::Initializing)?;
let mut warnings = Vec::new();
for attempt in 1..=self.options.max_recovery_attempts {
debug_log!(
"Recovery attempt {}/{}",
attempt,
self.options.max_recovery_attempts
);
match self.attempt_recovery(attempt) {
Ok(mut attempt_warnings) => {
warnings.append(&mut attempt_warnings);
return self.finalize_successful_recovery(
start_time,
start_timestamp,
warnings,
);
}
Err(e) => {
error_log!("Recovery attempt {} failed: {}", attempt, e);
if attempt == self.options.max_recovery_attempts {
if self.options.force_recovery {
warn_log!("Force recovery enabled");
return self.finalize_successful_recovery(
start_time,
start_timestamp,
warnings,
);
}
return self.finalize_failed_recovery(start_time, start_timestamp, e);
}
let backoff = Duration::from_millis(
(RECOVERY_RETRY_BACKOFF_MULTIPLIER.powi(attempt as i32) * 1000.0) as u64,
);
std::thread::sleep(backoff.min(Duration::from_secs(MAX_RETRY_DELAY_SECONDS)));
}
}
}
Err(RecoveryError::configuration(
"Unexpected recovery completion".to_string(),
))
}
fn get_current_timestamp(&self) -> Result<u64, RecoveryError> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.map_err(|e| RecoveryError::configuration(format!("Failed to get timestamp: {}", e)))
}
fn attempt_recovery(&self, _attempt: u32) -> Result<Vec<String>, RecoveryError> {
let mut warnings = Vec::new();
self.set_recovery_state(RecoveryState::Initializing)?;
self.set_recovery_state(RecoveryState::Scanning)?;
let (transactions, scan_warnings) = self.scan_wal_for_transactions()?;
warnings.extend(scan_warnings);
if !self.options.fast_recovery {
self.set_recovery_state(RecoveryState::Validating)?;
let validation_warnings = self.validate_transactions(&transactions)?;
warnings.extend(validation_warnings);
}
self.set_recovery_state(RecoveryState::Replaying)?;
let replay_warnings = self.replay_transactions(&transactions)?;
warnings.extend(replay_warnings);
let post_validation_warnings = self.validate_post_recovery(&transactions)?;
warnings.extend(post_validation_warnings);
self.set_recovery_state(RecoveryState::Finalizing)?;
let final_warnings = self.finalize_recovery()?;
warnings.extend(final_warnings);
self.update_transaction_metrics(&transactions);
Ok(warnings)
}
fn finalize_successful_recovery(
&self,
start_time: Instant,
start_timestamp: u64,
warnings: Vec<String>,
) -> RecoveryResult {
let duration = start_time.elapsed();
let end_timestamp = start_timestamp + duration.as_secs();
{
let mut metrics = self.metrics.lock();
metrics.total_duration_ms = duration.as_millis() as u64;
metrics.recovery_start_timestamp = start_timestamp;
metrics.recovery_end_timestamp = end_timestamp;
if let Ok(metadata) = std::fs::metadata(&self.database_path) {
metrics.database_size_after_recovery = metadata.len();
}
}
self.set_recovery_state(RecoveryState::Complete)?;
self.recovery_cv.notify_all();
info_log!("Recovery completed successfully in {:?}", duration);
Ok(RecoverySuccess {
metrics: self.metrics.lock().clone(),
warnings,
duration,
})
}
fn finalize_failed_recovery(
&self,
start_time: Instant,
start_timestamp: u64,
error: RecoveryError,
) -> RecoveryResult {
let duration = start_time.elapsed();
{
let mut metrics = self.metrics.lock();
metrics.total_duration_ms = duration.as_millis() as u64;
metrics.recovery_start_timestamp = start_timestamp;
metrics.recovery_end_timestamp = start_timestamp + duration.as_secs();
}
self.set_recovery_state(RecoveryState::Failed)?;
self.recovery_cv.notify_all();
error_log!("Recovery failed after {:?}: {}", duration, error);
Err(error)
}
fn set_recovery_state(&self, new_state: RecoveryState) -> Result<(), RecoveryError> {
let mut state = self.state.lock();
if !self.is_valid_state_transition(*state, new_state) {
return Err(RecoveryError::state_transition(format!(
"Invalid transition from {:?} to {:?}",
*state, new_state
)));
}
*state = new_state;
debug_log!("Recovery state: {:?}", new_state);
Ok(())
}
fn is_valid_state_transition(&self, from: RecoveryState, to: RecoveryState) -> bool {
use RecoveryState::*;
match (from, to) {
(Idle, Initializing) => true,
(Initializing, Scanning) => true,
(Scanning, Validating) => true,
(Scanning, Replaying) => true,
(Validating, Replaying) => true,
(Replaying, Finalizing) => true,
(Finalizing, Complete) => true,
(_, Failed) => true,
_ => false,
}
}
fn update_transaction_metrics(&self, transactions: &[TransactionState]) {
let mut metrics = self.metrics.lock();
metrics.transactions_scanned = transactions.len() as u64;
metrics.committed_transactions_replayed =
transactions.iter().filter(|tx| tx.committed).count() as u64;
metrics.rolled_back_transactions =
transactions.iter().filter(|tx| !tx.committed).count() as u64;
metrics.records_processed = transactions.iter().map(|tx| tx.records.len() as u64).sum();
}
pub fn get_progress(&self) -> RecoveryProgress {
let state = *self.state.lock();
RecoveryProgress {
state,
current_lsn: 0,
total_lsns: 0,
transactions_processed: 0,
total_transactions: 0,
completion_percentage: 0.0,
start_time: Instant::now(),
estimated_time_remaining: Duration::from_secs(0),
}
}
pub fn get_state(&self) -> RecoveryState {
*self.state.lock()
}
pub fn get_metrics(&self) -> RecoveryMetrics {
self.metrics.lock().clone()
}
pub fn wait_for_completion(&self, timeout: Duration) -> Result<RecoveryState, RecoveryError> {
let mut state = self.state.lock();
let result = self.recovery_cv.wait_for(&mut state, timeout);
if result.timed_out() {
return Err(RecoveryError::timeout(
"Recovery completion timeout".to_string(),
));
}
Ok(*state)
}
pub fn cancel_recovery(&self) -> Result<(), RecoveryError> {
let mut state = self.state.lock();
match *state {
RecoveryState::Idle => Err(RecoveryError::configuration(
"No recovery in progress".to_string(),
)),
RecoveryState::Complete | RecoveryState::Failed => Err(RecoveryError::configuration(
"Recovery already completed".to_string(),
)),
_ => {
*state = RecoveryState::Failed;
self.recovery_cv.notify_all();
info_log!("Recovery cancelled");
Ok(())
}
}
}
fn scan_wal_for_transactions(
&self,
) -> Result<(Vec<TransactionState>, Vec<String>), RecoveryError> {
use super::scanner::{ScanStatistics, WALScanResult, WALScanner};
let _scanner = WALScanner::new();
let mut active_tx = self.active_transactions.lock();
active_tx.clear();
let scan_result = WALScanResult {
transactions: Vec::new(),
warnings: vec!["Async scanning not yet implemented".to_string()],
statistics: ScanStatistics::default(),
};
*active_tx = HashMap::new();
Ok((scan_result.transactions, scan_result.warnings))
}
fn validate_transactions(
&self,
transactions: &[TransactionState],
) -> Result<Vec<String>, RecoveryError> {
self.validate_transactions_basic(transactions)
}
fn replay_transactions(
&self,
transactions: &[TransactionState],
) -> Result<Vec<String>, RecoveryError> {
use super::replayer::{ReplayConfig, V2GraphFileReplayer};
let config = ReplayConfig {
strict_validation: self.options.perform_consistency_checks,
max_batch_size: self.options.max_batch_size,
operation_timeout_ms: validation::CONSISTENCY_CHECK_TIMEOUT_MS,
create_backup: false, progress_interval: RECOVERY_PROGRESS_INTERVAL,
max_parallel_transactions: self.options.max_parallel_transactions,
};
let replayer =
V2GraphFileReplayer::create(self.database_path.clone(), config).map_err(|e| {
RecoveryError::configuration(format!("Failed to create replayer: {}", e))
})?;
let replay_result = replayer.replay_transactions(transactions).map_err(|e| {
RecoveryError::replay_failure(format!("Transaction replay failed: {}", e))
})?;
{
let mut metrics = self.metrics.lock();
metrics.committed_transactions_replayed = replay_result.successful_operations;
metrics.corrupted_records += replay_result.failed_operations.len() as u64;
metrics.records_processed += transactions
.iter()
.map(|tx| tx.records.len() as u64)
.sum::<u64>();
}
Ok(replay_result.warnings)
}
fn validate_transactions_basic(
&self,
transactions: &[TransactionState],
) -> Result<Vec<String>, RecoveryError> {
let mut warnings = Vec::new();
let mut tx_ids = std::collections::HashSet::new();
for tx in transactions {
if tx_ids.contains(&tx.tx_id) {
warnings.push(format!("Duplicate transaction ID: {}", tx.tx_id));
}
tx_ids.insert(&tx.tx_id);
if tx.start_lsn == 0 {
warnings.push(format!("Transaction TX {} has invalid start LSN", tx.tx_id));
}
if tx.committed && tx.commit_lsn.is_none() {
warnings.push(format!(
"Committed transaction TX {} missing commit LSN",
tx.tx_id
));
}
if !tx.committed && tx.commit_lsn.is_some() {
warnings.push(format!(
"Uncommitted transaction TX {} has commit LSN",
tx.tx_id
));
}
if let Some(commit_lsn) = tx.commit_lsn {
if commit_lsn < tx.start_lsn {
warnings.push(format!(
"Transaction TX {} commit LSN before start LSN",
tx.tx_id
));
}
}
if tx.records.is_empty() {
warnings.push(format!("Transaction TX {} has no records", tx.tx_id));
}
}
debug_log!(
"Transaction validation complete, {} warnings",
warnings.len()
);
Ok(warnings)
}
fn finalize_recovery(&self) -> Result<Vec<String>, RecoveryError> {
let mut warnings = Vec::new();
self.active_transactions.lock().clear();
if let Err(e) = std::fs::metadata(&self.database_path) {
warnings.push(format!("Database validation issue: {:?}", e));
}
Ok(warnings)
}
fn validate_post_recovery(
&self,
transactions: &[TransactionState],
) -> Result<Vec<String>, RecoveryError> {
debug_log!(
"Starting post-recovery validation for {} transactions",
transactions.len()
);
let mut all_warnings = Vec::new();
let mut validator = RecoveryValidator::new(self.database_path.clone()).map_err(|e| {
RecoveryError::validation(format!("Failed to create recovery validator: {}", e))
})?;
let (_stats, tx_warnings) =
validator
.validate_recovery_sequence(transactions)
.map_err(|e| {
error_log!("Post-recovery transaction validation failed: {}", e);
e
})?;
all_warnings.extend(tx_warnings);
if self.options.perform_consistency_checks {
debug_log!("Performing database integrity checks");
let transactions_replayed =
transactions.iter().filter(|tx| tx.committed).count() as u64;
let graph_integrity_result = self.validate_graph_file_integrity(transactions_replayed);
match graph_integrity_result {
Ok(integrity_warnings) => {
all_warnings.extend(integrity_warnings);
}
Err(e) => {
error_log!("Database integrity check failed: {}", e);
return Err(e);
}
}
let integrity_result = validator.validate_database_integrity().map_err(|e| {
error_log!("Database integrity validation failed: {}", e);
e
})?;
match integrity_result {
ValidationResult::Valid => {
debug_log!("Database integrity validation passed");
}
ValidationResult::Recoverable { issues, .. } => {
warn_log!(
"Database integrity validation passed with {} warnings",
issues.len()
);
all_warnings.extend(issues);
}
ValidationResult::Invalid {
errors,
critical_error,
} => {
error_log!("Database integrity validation failed: {}", critical_error);
for error in &errors {
debug_log!("Integrity error: {}", error);
}
return Err(RecoveryError::validation(format!(
"Database integrity check failed: {}",
critical_error
)));
}
}
}
if all_warnings.is_empty() {
info_log!("Post-recovery validation passed with no warnings");
} else {
warn_log!(
"Post-recovery validation passed with {} warnings",
all_warnings.len()
);
for warning in &all_warnings {
debug_log!("Validation warning: {}", warning);
}
}
Ok(all_warnings)
}
fn validate_graph_file_integrity(
&self,
transactions_replayed: u64,
) -> Result<Vec<String>, RecoveryError> {
let mut warnings = Vec::new();
let graph_file = GraphFile::open(&self.database_path).map_err(|e| {
RecoveryError::validation(format!("Cannot open graph file for integrity check: {}", e))
})?;
let header = graph_file.persistent_header();
if transactions_replayed > 0 && header.node_count == 0 {
warnings.push(
"Transactions were replayed but node_count is 0 - possible data loss".to_string(),
);
}
let file_size = graph_file
.file_size()
.map_err(|e| RecoveryError::validation(format!("Cannot get file size: {}", e)))?;
let min_expected_size = crate::backend::native::constants::HEADER_SIZE as u64;
if file_size < min_expected_size {
return Err(RecoveryError::validation(format!(
"Graph file appears truncated: size {} bytes is less than minimum {} bytes",
file_size, min_expected_size
)));
}
let max_offset = header
.free_space_offset
.max(header.incoming_cluster_offset)
.max(header.outgoing_cluster_offset)
.max(header.edge_data_offset)
.max(header.node_data_offset);
if file_size < max_offset {
warnings.push(format!(
"File size {} bytes is less than expected max offset {} bytes - possible truncation",
file_size, max_offset
));
}
header.validate().map_err(|e| {
RecoveryError::validation(format!("Persistent header validation failed: {}", e))
})?;
debug_log!(
"Graph file integrity check passed: node_count={}, edge_count={}, file_size={}",
header.node_count,
header.edge_count,
file_size
);
Ok(warnings)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_recovery_state_transitions() {
assert!(is_valid_transition_static(
RecoveryState::Idle,
RecoveryState::Initializing
));
assert!(is_valid_transition_static(
RecoveryState::Initializing,
RecoveryState::Scanning
));
assert!(is_valid_transition_static(
RecoveryState::Scanning,
RecoveryState::Validating
));
assert!(is_valid_transition_static(
RecoveryState::Validating,
RecoveryState::Replaying
));
assert!(is_valid_transition_static(
RecoveryState::Replaying,
RecoveryState::Finalizing
));
assert!(is_valid_transition_static(
RecoveryState::Finalizing,
RecoveryState::Complete
));
assert!(!is_valid_transition_static(
RecoveryState::Complete,
RecoveryState::Scanning
));
assert!(!is_valid_transition_static(
RecoveryState::Idle,
RecoveryState::Replaying
));
}
#[test]
fn test_recovery_options_default() {
let options = RecoveryOptions::default();
assert!(!options.fast_recovery);
assert_eq!(options.max_batch_size, DEFAULT_BATCH_SIZE);
assert_eq!(
options.recovery_timeout,
Duration::from_secs(DEFAULT_RECOVERY_TIMEOUT_SECONDS)
);
assert!(options.perform_consistency_checks);
assert!(options.create_backup);
assert_eq!(options.max_recovery_attempts, MAX_RECOVERY_ATTEMPTS);
assert!(!options.force_recovery);
assert_eq!(options.max_parallel_transactions, 4);
}
fn is_valid_transition_static(from: RecoveryState, to: RecoveryState) -> bool {
use RecoveryState::*;
match (from, to) {
(Idle, Initializing) => true,
(Initializing, Scanning) => true,
(Scanning, Validating) => true,
(Scanning, Replaying) => true,
(Validating, Replaying) => true,
(Replaying, Finalizing) => true,
(Finalizing, Complete) => true,
(_, Failed) => true,
_ => false,
}
}
#[test]
fn test_post_recovery_hook_with_empty_transactions() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let wal_path = temp_dir.path().join("test.wal");
let _graph_file = crate::backend::native::GraphFile::create(&db_path).unwrap();
std::fs::File::create(&wal_path).unwrap();
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let options = RecoveryOptions {
create_backup: false,
perform_consistency_checks: false, ..Default::default()
};
let engine = V2WALRecoveryEngine::create(config, db_path, options).unwrap();
let transactions: Vec<TransactionState> = Vec::new();
let result = engine.validate_post_recovery(&transactions);
assert!(
result.is_ok(),
"validate_post_recovery should succeed with empty transactions: {:?}",
result.err()
);
let warnings = result.unwrap();
assert!(
warnings.is_empty(),
"Expected no warnings for empty transactions with consistency checks disabled"
);
}
#[test]
fn test_post_recovery_returns_warnings() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let wal_path = temp_dir.path().join("test.wal");
let _graph_file = crate::backend::native::GraphFile::create(&db_path).unwrap();
std::fs::File::create(&wal_path).unwrap();
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let options = RecoveryOptions {
create_backup: false,
..Default::default()
};
let engine = V2WALRecoveryEngine::create(config, db_path, options).unwrap();
let transactions = vec![TransactionState {
tx_id: 1,
start_lsn: 100,
commit_lsn: Some(200),
records: vec![],
committed: true,
timestamp: 12345,
}];
let result = engine.validate_post_recovery(&transactions);
assert!(
result.is_ok(),
"validate_post_recovery should succeed: {:?}",
result.err()
);
let warnings = result.unwrap();
}
#[test]
fn test_post_recovery_hook_is_called_in_recovery_flow() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let wal_path = temp_dir.path().join("test.wal");
std::fs::File::create(&db_path).unwrap();
std::fs::File::create(&wal_path).unwrap();
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let options = RecoveryOptions {
create_backup: false,
fast_recovery: true, ..Default::default()
};
let engine = V2WALRecoveryEngine::create(config, db_path, options).unwrap();
let transactions: Vec<TransactionState> = Vec::new();
let result = engine.validate_post_recovery(&transactions);
assert!(
result.is_ok() || result.is_err(),
"validate_post_recovery should be callable"
);
}
#[test]
fn test_post_recovery_with_empty_wal() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let wal_path = temp_dir.path().join("test.wal");
let _graph_file = crate::backend::native::GraphFile::create(&db_path).unwrap();
std::fs::File::create(&wal_path).unwrap();
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let options = RecoveryOptions {
create_backup: false,
perform_consistency_checks: false, ..Default::default()
};
let engine = V2WALRecoveryEngine::create(config, db_path, options).unwrap();
let transactions: Vec<TransactionState> = Vec::new();
let result = engine.validate_post_recovery(&transactions);
assert!(
result.is_ok(),
"validate_post_recovery should succeed with empty WAL: {:?}",
result.err()
);
let warnings = result.unwrap();
assert!(
warnings.is_empty(),
"Expected no warnings for empty WAL and database with consistency checks disabled"
);
}
#[test]
fn test_post_recovery_detects_truncated_file() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let wal_path = temp_dir.path().join("test.wal");
let _graph_file = crate::backend::native::GraphFile::create(&db_path).unwrap();
std::fs::File::create(&wal_path).unwrap();
const HEADER_SIZE: u64 = 80;
std::fs::File::options()
.write(true)
.open(&db_path)
.unwrap()
.set_len(HEADER_SIZE - 10)
.unwrap();
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let options = RecoveryOptions {
create_backup: false,
perform_consistency_checks: true,
..Default::default()
};
let engine = V2WALRecoveryEngine::create(config, db_path, options).unwrap();
let transactions: Vec<TransactionState> = Vec::new();
let result = engine.validate_post_recovery(&transactions);
assert!(
result.is_err(),
"validate_post_recovery should fail with truncated file"
);
let err_msg = format!("{:?}", result.unwrap_err());
assert!(
err_msg.contains("too small")
|| err_msg.contains("truncated")
|| err_msg.contains("Cannot open"),
"Error should mention file size issue, got: {}",
err_msg
);
}
#[test]
fn test_post_recovery_validates_node_count() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let wal_path = temp_dir.path().join("test.wal");
let _graph_file = crate::backend::native::GraphFile::create(&db_path).unwrap();
std::fs::File::create(&wal_path).unwrap();
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let options = RecoveryOptions {
create_backup: false,
perform_consistency_checks: true, ..Default::default()
};
let engine = V2WALRecoveryEngine::create(config, db_path, options).unwrap();
let transactions = vec![TransactionState {
tx_id: 1,
start_lsn: 100,
commit_lsn: Some(200),
records: vec![],
committed: true,
timestamp: 12345,
}];
let result = engine.validate_post_recovery(&transactions);
assert!(
result.is_ok(),
"validate_post_recovery should succeed: {:?}",
result.err()
);
let warnings = result.unwrap();
let has_expected_warning = warnings
.iter()
.any(|w| w.contains("node_count") || w.contains("Transactions were replayed"));
assert!(
has_expected_warning,
"Expected warning about node_count inconsistency when transactions were replayed but database is empty. Got warnings: {:?}",
warnings
);
}
#[test]
fn test_post_recovery_validates_free_space() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let wal_path = temp_dir.path().join("test.wal");
let _graph_file = crate::backend::native::GraphFile::create(&db_path).unwrap();
std::fs::File::create(&wal_path).unwrap();
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let options = RecoveryOptions {
create_backup: false,
perform_consistency_checks: false, ..Default::default()
};
let engine = V2WALRecoveryEngine::create(config, db_path, options).unwrap();
let transactions: Vec<TransactionState> = Vec::new();
let result = engine.validate_post_recovery(&transactions);
assert!(
result.is_ok(),
"validate_post_recovery should succeed: {:?}",
result.err()
);
let warnings = result.unwrap();
assert!(
warnings.is_empty(),
"Expected no warnings with consistency checks disabled, got: {:?}",
warnings
);
}
#[test]
fn test_graph_file_integrity_validates_node_count() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let wal_path = temp_dir.path().join("test.wal");
let _graph_file = crate::backend::native::GraphFile::create(&db_path).unwrap();
std::fs::File::create(&wal_path).unwrap();
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let options = RecoveryOptions {
create_backup: false,
..Default::default()
};
let engine = V2WALRecoveryEngine::create(config, db_path, options).unwrap();
let result = engine.validate_graph_file_integrity(1);
assert!(
result.is_ok(),
"validate_graph_file_integrity should succeed"
);
let warnings = result.unwrap();
assert!(
!warnings.is_empty(),
"Expected warning when node_count=0 but transactions_replayed>0"
);
assert!(
warnings[0].contains("node_count")
|| warnings[0].contains("Transactions were replayed"),
"Warning should mention node_count or transactions replayed: {}",
warnings[0]
);
}
#[test]
fn test_graph_file_integrity_passes_with_no_transactions() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let wal_path = temp_dir.path().join("test.wal");
let _graph_file = crate::backend::native::GraphFile::create(&db_path).unwrap();
std::fs::File::create(&wal_path).unwrap();
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let options = RecoveryOptions {
create_backup: false,
..Default::default()
};
let engine = V2WALRecoveryEngine::create(config, db_path, options).unwrap();
let result = engine.validate_graph_file_integrity(0);
assert!(
result.is_ok(),
"validate_graph_file_integrity should succeed with no transactions"
);
let warnings = result.unwrap();
let has_node_count_warning = warnings
.iter()
.any(|w| w.contains("node_count") || w.contains("Transactions were replayed"));
assert!(
!has_node_count_warning,
"Should not have node_count warning with no transactions, got: {:?}",
warnings
);
}
}