pub use self::core::{RecoveryOptions, RecoveryProgress, RecoveryResult, V2WALRecoveryEngine};
pub use self::states::{Authority, RecoveryContext, RecoveryState as ExplicitRecoveryState};
pub use self::coordinator::{
RecoveryCoordinator, RecoveryCoordinatorResult, RecoveryCoordinatorStats, RecoveryDecision,
};
pub use self::scanner::{TransactionScanner, WALScanResult, WALScanner};
pub use self::validator::{RecoveryValidator, TransactionValidator, ValidationResult};
pub use self::replayer::{
ReplayConfig, ReplayResult, ReplayStatistics, RollbackOperation, V2GraphFileReplayer,
};
pub mod coordinator;
pub mod core;
pub mod replayer;
pub mod scanner;
pub mod states;
pub mod validator;
pub mod constants;
pub mod errors;
pub mod store_helpers;
use crate::backend::native::v2::wal::V2WALConfig;
use std::path::{Path, PathBuf};
pub use self::errors::RecoveryError;
pub struct RecoveryFactory;
impl RecoveryFactory {
pub fn create_engine(
config: V2WALConfig,
database_path: PathBuf,
) -> RecoveryResult<V2WALRecoveryEngine> {
let options = RecoveryOptions::default();
V2WALRecoveryEngine::create(config, database_path, options).map_err(RecoveryError::from)
}
pub fn create_engine_with_options(
config: V2WALConfig,
database_path: PathBuf,
options: RecoveryOptions,
) -> RecoveryResult<V2WALRecoveryEngine> {
V2WALRecoveryEngine::create(config, database_path, options).map_err(RecoveryError::from)
}
pub fn create_v2_optimized_engine(
config: V2WALConfig,
database_path: PathBuf,
) -> RecoveryResult<V2WALRecoveryEngine> {
let options = RecoveryOptions {
fast_recovery: false, max_batch_size: 500, recovery_timeout: std::time::Duration::from_secs(600), perform_consistency_checks: true,
create_backup: true,
max_recovery_attempts: 5,
force_recovery: false,
max_parallel_transactions: 4,
};
V2WALRecoveryEngine::create(config, database_path, options).map_err(RecoveryError::from)
}
pub fn create_fast_recovery_engine(
config: V2WALConfig,
database_path: PathBuf,
) -> RecoveryResult<V2WALRecoveryEngine> {
let options = RecoveryOptions {
fast_recovery: true,
max_batch_size: 2000, recovery_timeout: std::time::Duration::from_secs(120), perform_consistency_checks: false, create_backup: false, max_recovery_attempts: 1,
force_recovery: true,
max_parallel_transactions: 4,
};
V2WALRecoveryEngine::create(config, database_path, options).map_err(RecoveryError::from)
}
pub fn validate_prerequisites(
config: &V2WALConfig,
database_path: &Path,
) -> RecoveryResult<()> {
if !database_path.exists() {
return Err(RecoveryError::configuration(format!(
"Database file does not exist: {}",
database_path.display()
)));
}
if !config.wal_path.exists() {
return Err(RecoveryError::configuration(format!(
"WAL file does not exist: {}",
config.wal_path.display()
)));
}
if !database_path.is_file() {
return Err(RecoveryError::configuration(format!(
"Database path is not a file: {}",
database_path.display()
)));
}
if !config.wal_path.is_file() {
return Err(RecoveryError::configuration(format!(
"WAL path is not a file: {}",
config.wal_path.display()
)));
}
Ok(())
}
pub fn create_backup_path(database_path: &Path, timestamp: u64) -> PathBuf {
let database_name = database_path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("database");
let backup_name = format!("{}.recovery_backup.{}", database_name, timestamp);
database_path
.parent()
.unwrap_or_else(|| Path::new("."))
.join("recovery_backups")
.join(backup_name)
}
}
pub mod utils {
use super::*;
pub fn estimate_recovery_duration(
database_size_bytes: u64,
wal_size_bytes: u64,
options: &RecoveryOptions,
) -> std::time::Duration {
let base_duration = std::time::Duration::from_millis(
((database_size_bytes + wal_size_bytes) / (1024 * 1024)) as u64 * 50, );
let mut duration = base_duration;
if options.fast_recovery {
duration = duration / 2; }
if options.perform_consistency_checks {
duration = duration * 2; }
if options.create_backup {
duration += std::time::Duration::from_secs(10); }
duration
}
pub fn calculate_optimal_batch_size(database_size_bytes: u64) -> usize {
let size_mb = database_size_bytes / (1024 * 1024);
match size_mb {
0..=100 => 100, 101..=500 => 500, 501..=1000 => 1000, _ => 2000, }
}
pub fn validate_recovery_options(options: &RecoveryOptions) -> RecoveryResult<()> {
if options.max_batch_size == 0 {
return Err(RecoveryError::configuration(
"Max batch size cannot be zero".to_string(),
));
}
if options.max_batch_size > 10000 {
return Err(RecoveryError::configuration(
"Max batch size too large (>10000)".to_string(),
));
}
if options.recovery_timeout.as_secs() == 0 {
return Err(RecoveryError::configuration(
"Recovery timeout cannot be zero".to_string(),
));
}
if options.recovery_timeout.as_secs() > 3600 {
return Err(RecoveryError::configuration(
"Recovery timeout too large (>1 hour)".to_string(),
));
}
if options.max_recovery_attempts == 0 {
return Err(RecoveryError::configuration(
"Max recovery attempts cannot be zero".to_string(),
));
}
if options.max_recovery_attempts > 10 {
return Err(RecoveryError::configuration(
"Max recovery attempts too many (>10)".to_string(),
));
}
Ok(())
}
pub fn get_recovery_severity(
database_corrupted: bool,
wal_corrupted: bool,
transaction_count: u64,
) -> RecoverySeverity {
if database_corrupted {
RecoverySeverity::Critical
} else if wal_corrupted {
RecoverySeverity::High
} else if transaction_count > 1000 {
RecoverySeverity::Medium
} else if transaction_count > 100 {
RecoverySeverity::Low
} else {
RecoverySeverity::Minimal
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum RecoverySeverity {
Minimal,
Low,
Medium,
High,
Critical,
}
impl std::fmt::Display for RecoverySeverity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RecoverySeverity::Minimal => write!(f, "Minimal"),
RecoverySeverity::Low => write!(f, "Low"),
RecoverySeverity::Medium => write!(f, "Medium"),
RecoverySeverity::High => write!(f, "High"),
RecoverySeverity::Critical => write!(f, "Critical"),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RecoveryStatistics {
pub total_attempts: u64,
pub successful_recoveries: u64,
pub failed_recoveries: u64,
pub avg_duration_ms: u64,
pub total_data_recovered: u64,
pub total_transactions_recovered: u64,
pub last_recovery_timestamp: Option<std::time::SystemTime>,
}
impl RecoveryStatistics {
pub fn success_rate(&self) -> f64 {
if self.total_attempts == 0 {
0.0
} else {
self.successful_recoveries as f64 / self.total_attempts as f64 * 100.0
}
}
pub fn status_description(&self) -> String {
if self.total_attempts == 0 {
"No recovery attempts recorded".to_string()
} else {
format!(
"Recovery success rate: {:.1}% ({} of {} attempts)",
self.success_rate(),
self.successful_recoveries,
self.total_attempts
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tempfile::tempdir;
#[test]
fn test_recovery_factory_create_engine() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let database_path = temp_dir.path().join("test.db");
std::fs::File::create(&config.wal_path).unwrap();
std::fs::File::create(&database_path).unwrap();
let result = RecoveryFactory::create_engine(config, database_path.clone());
assert!(result.is_ok(), "Recovery engine creation should succeed");
}
#[test]
fn test_recovery_factory_validate_prerequisites() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let database_path = temp_dir.path().join("test.db");
let result = RecoveryFactory::validate_prerequisites(&config, &database_path);
assert!(result.is_err(), "Should fail with missing files");
std::fs::File::create(&config.wal_path).unwrap();
std::fs::File::create(&database_path).unwrap();
let result = RecoveryFactory::validate_prerequisites(&config, &database_path);
assert!(result.is_ok(), "Should succeed with existing files");
}
#[test]
fn test_recovery_estimation() {
let database_size = 100 * 1024 * 1024; let wal_size = 50 * 1024 * 1024;
let options = RecoveryOptions::default();
let duration = utils::estimate_recovery_duration(database_size, wal_size, &options);
assert!(
duration.as_secs() >= 5,
"Duration should be at least 5 seconds"
);
assert!(
duration.as_secs() <= 30,
"Duration should be at most 30 seconds"
);
}
#[test]
fn test_optimal_batch_size() {
assert_eq!(utils::calculate_optimal_batch_size(50 * 1024 * 1024), 100); assert_eq!(utils::calculate_optimal_batch_size(250 * 1024 * 1024), 500); assert_eq!(utils::calculate_optimal_batch_size(750 * 1024 * 1024), 1000); assert_eq!(
utils::calculate_optimal_batch_size(2 * 1024 * 1024 * 1024),
2000
); }
#[test]
fn test_recovery_severity() {
let severity = utils::get_recovery_severity(false, false, 10);
assert_eq!(severity, RecoverySeverity::Minimal);
let severity = utils::get_recovery_severity(false, false, 1500);
assert_eq!(severity, RecoverySeverity::Medium);
let severity = utils::get_recovery_severity(true, false, 10);
assert_eq!(severity, RecoverySeverity::Critical);
let severity = utils::get_recovery_severity(false, true, 10);
assert_eq!(severity, RecoverySeverity::High);
}
#[test]
fn test_recovery_statistics() {
let mut stats = RecoveryStatistics::default();
assert_eq!(stats.success_rate(), 0.0);
assert!(stats.status_description().contains("No recovery attempts"));
stats.total_attempts = 5;
stats.successful_recoveries = 4;
stats.failed_recoveries = 1;
assert_eq!(stats.success_rate(), 80.0);
assert!(stats.status_description().contains("80.0%"));
}
}