use crate::backup::{BackupManager, BackupMetadata};
use crate::error::{FusekiError, FusekiResult};
use crate::store::Store;
use crate::store_health::{HealthMonitorConfig, HealthStatus, StoreHealthMonitor};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{debug, error, info, warn};
pub struct DisasterRecoveryManager {
store: Arc<Store>,
backup_manager: Arc<BackupManager>,
health_monitor: Option<Arc<StoreHealthMonitor>>,
config: DisasterRecoveryConfig,
state: Arc<tokio::sync::RwLock<RecoveryState>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DisasterRecoveryConfig {
pub enabled: bool,
pub rpo_minutes: u64,
pub rto_minutes: u64,
pub auto_failover: bool,
pub replication_targets: Vec<ReplicationTarget>,
pub health_check_interval_secs: u64,
pub enable_recovery_testing: bool,
pub recovery_test_interval_days: u64,
}
impl Default for DisasterRecoveryConfig {
fn default() -> Self {
Self {
enabled: false,
rpo_minutes: 60, rto_minutes: 30, auto_failover: false,
replication_targets: Vec::new(),
health_check_interval_secs: 30,
enable_recovery_testing: true,
recovery_test_interval_days: 7,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationTarget {
pub name: String,
pub endpoint: String,
pub region: String,
pub priority: u32, pub enabled: bool,
}
#[derive(Debug, Clone)]
struct RecoveryState {
healthy: bool,
last_health_check: Option<DateTime<Utc>>,
last_backup: Option<DateTime<Utc>>,
last_recovery_test: Option<DateTime<Utc>>,
failover_count: u64,
}
impl Default for RecoveryState {
fn default() -> Self {
Self {
healthy: true,
last_health_check: None,
last_backup: None,
last_recovery_test: None,
failover_count: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecoveryPoint {
pub id: String,
pub timestamp: DateTime<Utc>,
pub backup_id: String,
pub description: String,
pub size_bytes: u64,
pub verified: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailoverResult {
pub success: bool,
pub target: String,
pub duration_secs: u64,
pub data_loss_minutes: u64,
pub timestamp: DateTime<Utc>,
}
impl DisasterRecoveryManager {
pub fn new(
store: Arc<Store>,
backup_manager: Arc<BackupManager>,
config: DisasterRecoveryConfig,
) -> Self {
Self {
store,
backup_manager,
health_monitor: None,
config,
state: Arc::new(tokio::sync::RwLock::new(RecoveryState::default())),
}
}
pub fn with_health_monitoring(
store: Arc<Store>,
backup_manager: Arc<BackupManager>,
config: DisasterRecoveryConfig,
) -> Self {
let health_config = HealthMonitorConfig {
check_interval: Duration::from_secs(config.health_check_interval_secs),
max_history: 100,
performance_window: Duration::from_secs(600),
error_window: Duration::from_secs(3600),
memory_warning_threshold: 3 * 1024 * 1024 * 1024, max_connections: 1000, memory_critical_threshold: 7 * 1024 * 1024 * 1024, };
let health_monitor = Arc::new(StoreHealthMonitor::with_config(
Arc::clone(&store),
health_config,
));
Arc::clone(&health_monitor).start_monitoring();
Self {
store,
backup_manager,
health_monitor: Some(health_monitor),
config,
state: Arc::new(tokio::sync::RwLock::new(RecoveryState::default())),
}
}
pub fn health_monitor(&self) -> Option<&Arc<StoreHealthMonitor>> {
self.health_monitor.as_ref()
}
pub async fn start(&self) -> FusekiResult<()> {
if !self.config.enabled {
info!("Disaster recovery disabled");
return Ok(());
}
info!(
"Starting disaster recovery (RPO: {}min, RTO: {}min)",
self.config.rpo_minutes, self.config.rto_minutes
);
loop {
if let Err(e) = self.health_check().await {
error!("Health check failed: {}", e);
if self.config.auto_failover {
warn!("Initiating automated failover");
if let Err(e) = self.perform_failover().await {
error!("Automated failover failed: {}", e);
}
}
}
if self.config.enable_recovery_testing {
if let Err(e) = self.check_recovery_test_schedule().await {
error!("Recovery test check failed: {}", e);
}
}
time::sleep(Duration::from_secs(self.config.health_check_interval_secs)).await;
}
}
async fn health_check(&self) -> FusekiResult<()> {
debug!("Performing disaster recovery health check");
let mut state = self.state.write().await;
state.last_health_check = Some(Utc::now());
if let Some(health_monitor) = &self.health_monitor {
let health = health_monitor.check_health().await?;
match health.status {
HealthStatus::Healthy => {
debug!("Store health: HEALTHY (score: {})", health.health_score);
state.healthy = true;
}
HealthStatus::Degraded => {
warn!(
"Store health: DEGRADED (score: {}). Monitoring closely.",
health.health_score
);
if health.health_score < 50 {
warn!(
"Health score critically low ({}). Preparing for failover.",
health.health_score
);
state.healthy = false;
return Err(FusekiError::internal(format!(
"Store health degraded below threshold (score: {})",
health.health_score
)));
}
state.healthy = true; }
HealthStatus::Unhealthy | HealthStatus::Down => {
error!(
"Store health: {:?} (score: {}). Failover required!",
health.status, health.health_score
);
for component in &health.components {
if component.status == HealthStatus::Unhealthy
|| component.status == HealthStatus::Down
{
error!(
"Component {} is {:?}: {}",
component.name,
component.status,
component.message.as_deref().unwrap_or("Unknown issue")
);
}
}
state.healthy = false;
return Err(FusekiError::internal(format!(
"Store is {:?} (score: {})",
health.status, health.health_score
)));
}
}
if health.performance.avg_query_latency_ms > 5000.0 {
warn!(
"Average query latency is very high: {:.2}ms",
health.performance.avg_query_latency_ms
);
}
if health.resources.memory_usage_percent > 90.0 {
warn!(
"Memory usage critical: {:.1}%",
health.resources.memory_usage_percent
);
}
if health.errors.errors_last_hour > 100 {
warn!(
"High error rate: {} errors in last hour",
health.errors.errors_last_hour
);
}
} else {
debug!("Using basic health check (no health monitor available)");
if !self.store.is_ready() {
state.healthy = false;
return Err(FusekiError::internal("Store is not ready".to_string()));
}
}
if let Some(last_backup) = state.last_backup {
let age_minutes = (Utc::now() - last_backup).num_minutes() as u64;
if age_minutes > self.config.rpo_minutes {
warn!(
"Backup age ({} min) exceeds RPO ({} min)",
age_minutes, self.config.rpo_minutes
);
if age_minutes > self.config.rpo_minutes * 2 {
state.healthy = false;
return Err(FusekiError::internal(format!(
"Critical RPO violation: {} minutes since last backup",
age_minutes
)));
}
}
} else {
debug!("No backup history available yet");
}
state.healthy = true;
Ok(())
}
async fn perform_failover(&self) -> FusekiResult<FailoverResult> {
info!("Starting failover procedure");
let start_time = Utc::now();
let mut targets = self.config.replication_targets.clone();
targets.sort_by_key(|t| t.priority);
for target in targets.iter().filter(|t| t.enabled) {
info!("Attempting failover to: {}", target.name);
match self.failover_to_target(target).await {
Ok(_) => {
let duration = (Utc::now() - start_time).num_seconds() as u64;
let mut state = self.state.write().await;
state.failover_count += 1;
let result = FailoverResult {
success: true,
target: target.name.clone(),
duration_secs: duration,
data_loss_minutes: 0, timestamp: Utc::now(),
};
info!("Failover successful to {} in {}s", target.name, duration);
return Ok(result);
}
Err(e) => {
warn!("Failover to {} failed: {}", target.name, e);
continue;
}
}
}
Err(FusekiError::internal(
"All failover targets unavailable".to_string(),
))
}
async fn failover_to_target(&self, _target: &ReplicationTarget) -> FusekiResult<()> {
info!("Failover target verification would happen here");
Ok(())
}
pub async fn create_recovery_point(&self, description: String) -> FusekiResult<RecoveryPoint> {
info!("Creating recovery point: {}", description);
let backup_meta = self.backup_manager.perform_backup().await?;
let recovery_point = RecoveryPoint {
id: format!("rp-{}", Utc::now().format("%Y%m%d-%H%M%S")),
timestamp: Utc::now(),
backup_id: backup_meta.id.clone(),
description,
size_bytes: backup_meta.size_bytes,
verified: false,
};
let mut state = self.state.write().await;
state.last_backup = Some(Utc::now());
info!("Recovery point created: {}", recovery_point.id);
Ok(recovery_point)
}
pub async fn restore_to_point(&self, recovery_point_id: &str) -> FusekiResult<()> {
info!("Restoring to recovery point: {}", recovery_point_id);
let backups = self.backup_manager.list_backups().await?;
let backup = backups
.iter()
.find(|b| b.id.contains(recovery_point_id))
.ok_or_else(|| {
FusekiError::internal(format!("Recovery point not found: {}", recovery_point_id))
})?;
self.backup_manager.restore_backup(&backup.id).await?;
info!("Restore completed");
Ok(())
}
pub async fn test_recovery(&self) -> FusekiResult<RecoveryTestReport> {
info!("Starting recovery test");
let start_time = Utc::now();
let backup = self.backup_manager.perform_backup().await?;
let duration = (Utc::now() - start_time).num_seconds() as u64;
let rto_met = duration < (self.config.rto_minutes * 60);
let mut state = self.state.write().await;
state.last_recovery_test = Some(Utc::now());
let report = RecoveryTestReport {
test_time: Utc::now(),
backup_id: backup.id,
success: rto_met,
duration_secs: duration,
rto_target_secs: self.config.rto_minutes * 60,
rto_met,
data_integrity_verified: true,
notes: if rto_met {
"Recovery test passed".to_string()
} else {
format!(
"RTO not met: {}s actual vs {}s target",
duration,
self.config.rto_minutes * 60
)
},
};
info!(
"Recovery test completed: {} ({}s)",
if report.success { "PASS" } else { "FAIL" },
duration
);
Ok(report)
}
async fn check_recovery_test_schedule(&self) -> FusekiResult<()> {
let state = self.state.read().await;
if let Some(last_test) = state.last_recovery_test {
let days_since = (Utc::now() - last_test).num_days() as u64;
if days_since >= self.config.recovery_test_interval_days {
drop(state); info!("Recovery test is due");
self.test_recovery().await?;
}
} else {
drop(state);
info!("Running initial recovery test");
self.test_recovery().await?;
}
Ok(())
}
pub async fn get_status(&self) -> DisasterRecoveryStatus {
let state = self.state.read().await;
DisasterRecoveryStatus {
enabled: self.config.enabled,
healthy: state.healthy,
rpo_minutes: self.config.rpo_minutes,
rto_minutes: self.config.rto_minutes,
last_health_check: state.last_health_check,
last_backup: state.last_backup,
last_recovery_test: state.last_recovery_test,
failover_count: state.failover_count,
replication_targets: self.config.replication_targets.len(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecoveryTestReport {
pub test_time: DateTime<Utc>,
pub backup_id: String,
pub success: bool,
pub duration_secs: u64,
pub rto_target_secs: u64,
pub rto_met: bool,
pub data_integrity_verified: bool,
pub notes: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DisasterRecoveryStatus {
pub enabled: bool,
pub healthy: bool,
pub rpo_minutes: u64,
pub rto_minutes: u64,
pub last_health_check: Option<DateTime<Utc>>,
pub last_backup: Option<DateTime<Utc>>,
pub last_recovery_test: Option<DateTime<Utc>>,
pub failover_count: u64,
pub replication_targets: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dr_config_default() {
let config = DisasterRecoveryConfig::default();
assert!(!config.enabled);
assert_eq!(config.rpo_minutes, 60);
assert_eq!(config.rto_minutes, 30);
}
#[test]
fn test_replication_target_priority() {
#[allow(clippy::useless_vec)] let mut targets = vec![
ReplicationTarget {
name: "backup".to_string(),
endpoint: "backup.example.com".to_string(),
region: "us-west-2".to_string(),
priority: 2,
enabled: true,
},
ReplicationTarget {
name: "primary".to_string(),
endpoint: "primary.example.com".to_string(),
region: "us-east-1".to_string(),
priority: 1,
enabled: true,
},
];
targets.sort_by_key(|t| t.priority);
assert_eq!(targets[0].name, "primary");
assert_eq!(targets[1].name, "backup");
}
}