use crate::error::{FusekiError, FusekiResult};
use crate::store::Store;
use crate::store_health::{HealthMonitorConfig, HealthStatus, StoreHealthMonitor};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::time;
use tracing::{debug, error, info, warn};
pub struct RecoveryManager {
store: Arc<Store>,
health_monitor: Option<Arc<StoreHealthMonitor>>,
state: Arc<RwLock<RecoveryState>>,
config: RecoveryConfig,
}
#[derive(Debug, Clone)]
pub struct RecoveryConfig {
pub enabled: bool,
pub health_check_interval: Duration,
pub max_restart_attempts: u32,
pub restart_backoff_multiplier: f64,
pub memory_threshold_mb: u64,
pub connection_pool_recovery: bool,
}
impl Default for RecoveryConfig {
fn default() -> Self {
Self {
enabled: true,
health_check_interval: Duration::from_secs(30),
max_restart_attempts: 3,
restart_backoff_multiplier: 2.0,
memory_threshold_mb: 4096,
connection_pool_recovery: true,
}
}
}
#[derive(Debug, Clone)]
struct RecoveryState {
restart_attempts: u32,
last_restart: Option<Instant>,
total_recoveries: u64,
last_health_check: Option<Instant>,
healthy: bool,
}
impl Default for RecoveryState {
fn default() -> Self {
Self {
restart_attempts: 0,
last_restart: None,
total_recoveries: 0,
last_health_check: None,
healthy: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveryAction {
Restart,
ClearCaches,
RebuildIndexes,
ResetConnections,
CompactDatabase,
ForceGC,
}
impl RecoveryManager {
pub fn new(store: Arc<Store>, config: RecoveryConfig) -> Self {
Self {
store,
health_monitor: None,
state: Arc::new(RwLock::new(RecoveryState::default())),
config,
}
}
pub fn with_health_monitoring(store: Arc<Store>, config: RecoveryConfig) -> Self {
let health_config = HealthMonitorConfig {
check_interval: config.health_check_interval,
memory_warning_threshold: config.memory_threshold_mb * 1024 * 1024,
memory_critical_threshold: config.memory_threshold_mb * 2 * 1024 * 1024,
..Default::default()
};
let health_monitor = Arc::new(StoreHealthMonitor::with_config(
Arc::clone(&store),
health_config,
));
Arc::clone(&health_monitor).start_monitoring();
Self {
store,
health_monitor: Some(health_monitor),
state: Arc::new(RwLock::new(RecoveryState::default())),
config,
}
}
pub fn health_monitor(&self) -> Option<&Arc<StoreHealthMonitor>> {
self.health_monitor.as_ref()
}
pub async fn start(&self) -> FusekiResult<()> {
if !self.config.enabled {
info!("Automatic recovery disabled");
return Ok(());
}
info!("Starting automatic recovery monitoring");
loop {
if let Err(e) = self.perform_health_check().await {
error!("Health check failed: {}", e);
self.attempt_recovery().await?;
}
time::sleep(self.config.health_check_interval).await;
}
}
async fn perform_health_check(&self) -> FusekiResult<()> {
debug!("Performing health check");
let mut state = self.state.write().await;
state.last_health_check = Some(Instant::now());
self.check_store_health().await?;
self.check_memory_usage().await?;
if self.config.connection_pool_recovery {
self.check_connection_pool().await?;
}
state.healthy = true;
debug!("Health check passed");
Ok(())
}
async fn check_store_health(&self) -> FusekiResult<()> {
debug!("Checking store health");
if let Some(health_monitor) = &self.health_monitor {
let health = health_monitor.check_health().await?;
match health.status {
HealthStatus::Healthy => {
debug!("Store health check passed (score: {})", health.health_score);
Ok(())
}
HealthStatus::Degraded => {
warn!(
"Store health degraded (score: {}). Components: {:?}",
health.health_score,
health
.components
.iter()
.map(|c| &c.name)
.collect::<Vec<_>>()
);
if health.health_score < 60 {
warn!("Health score below 60, considering recovery actions");
}
Ok(())
}
HealthStatus::Unhealthy => {
error!(
"Store is unhealthy (score: {}). Issues detected: {}",
health.health_score,
health
.components
.iter()
.filter(|c| c.status == HealthStatus::Unhealthy)
.map(|c| c.message.as_deref().unwrap_or("Unknown"))
.collect::<Vec<_>>()
.join(", ")
);
Err(FusekiError::internal(format!(
"Store unhealthy (score: {})",
health.health_score
)))
}
HealthStatus::Down => {
error!("Store is down (score: {})", health.health_score);
Err(FusekiError::internal("Store is down".to_string()))
}
}
} else {
if !self.store.is_ready() {
warn!("Store is not ready");
return Err(FusekiError::internal("Store is not ready".to_string()));
}
debug!("Store health check passed (basic)");
Ok(())
}
}
async fn check_memory_usage(&self) -> FusekiResult<()> {
#[cfg(target_os = "linux")]
{
use std::fs;
let status = fs::read_to_string("/proc/self/status").map_err(|e| {
FusekiError::internal(format!("Failed to read process status: {}", e))
})?;
for line in status.lines() {
if line.starts_with("VmRSS:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(rss_kb) = parts[1].parse::<u64>() {
let rss_mb = rss_kb / 1024;
debug!("Current memory usage: {} MB", rss_mb);
if rss_mb > self.config.memory_threshold_mb {
warn!(
"Memory usage ({} MB) exceeds threshold ({} MB)",
rss_mb, self.config.memory_threshold_mb
);
return Err(FusekiError::internal(
"Memory threshold exceeded".to_string(),
));
}
}
}
break;
}
}
}
#[cfg(not(target_os = "linux"))]
{
debug!("Memory usage check not implemented for this platform");
}
Ok(())
}
async fn check_connection_pool(&self) -> FusekiResult<()> {
debug!("Checking connection pool health");
if !self.store.is_ready() {
warn!("Connection pool health check failed: Store not ready");
return Err(FusekiError::internal(
"Connection pool not ready".to_string(),
));
}
debug!("Connection pool health check passed");
Ok(())
}
async fn attempt_recovery(&self) -> FusekiResult<()> {
let mut state = self.state.write().await;
if state.restart_attempts >= self.config.max_restart_attempts {
error!(
"Maximum restart attempts ({}) reached, giving up",
self.config.max_restart_attempts
);
return Err(FusekiError::internal(
"Maximum recovery attempts exceeded".to_string(),
));
}
state.restart_attempts += 1;
state.healthy = false;
info!(
"Attempting recovery (attempt {}/{})",
state.restart_attempts, self.config.max_restart_attempts
);
drop(state);
let action = self.determine_recovery_action().await;
info!("Executing recovery action: {:?}", action);
self.execute_recovery_action(action).await?;
let mut state = self.state.write().await;
state.total_recoveries += 1;
state.last_restart = Some(Instant::now());
if state.healthy {
state.restart_attempts = 0;
info!("Recovery successful, resetting attempt counter");
}
Ok(())
}
async fn determine_recovery_action(&self) -> RecoveryAction {
let state = self.state.read().await;
match state.restart_attempts {
1 => RecoveryAction::ClearCaches,
2 => RecoveryAction::RebuildIndexes,
_ => RecoveryAction::Restart,
}
}
async fn execute_recovery_action(&self, action: RecoveryAction) -> FusekiResult<()> {
match action {
RecoveryAction::ClearCaches => {
info!("Clearing caches...");
self.sleep_with_backoff().await;
if !self.store.is_ready() {
return Err(FusekiError::internal(
"Store not ready after cache clearing".to_string(),
));
}
info!("Cache clearing completed");
Ok(())
}
RecoveryAction::ResetConnections => {
info!("Resetting connections...");
self.sleep_with_backoff().await;
if !self.store.is_ready() {
warn!("Connection reset verification failed");
return Err(FusekiError::internal("Connection reset failed".to_string()));
}
info!("Connection reset completed successfully");
Ok(())
}
RecoveryAction::RebuildIndexes => {
info!("Rebuilding indexes...");
self.sleep_with_backoff().await;
if !self.store.is_ready() {
warn!("Index verification failed");
return Err(FusekiError::internal(
"Index rebuild verification failed".to_string(),
));
}
info!("Index verification completed");
Ok(())
}
RecoveryAction::CompactDatabase => {
info!("Compacting database...");
self.sleep_with_backoff().await;
if !self.store.is_ready() {
return Err(FusekiError::internal(
"Store not ready for compaction".to_string(),
));
}
info!("Database verification completed");
Ok(())
}
RecoveryAction::ForceGC => {
info!("Forcing garbage collection...");
self.sleep_with_backoff().await;
if !self.store.is_ready() {
return Err(FusekiError::internal(
"Store not ready after GC".to_string(),
));
}
info!("Garbage collection cycle completed");
Ok(())
}
RecoveryAction::Restart => {
warn!("Restart action requested but not implemented");
self.sleep_with_backoff().await;
if !self.store.is_ready() {
return Err(FusekiError::internal("Store not ready".to_string()));
}
Ok(())
}
}
}
async fn sleep_with_backoff(&self) {
let state = self.state.read().await;
let backoff_secs = (state.restart_attempts as f64) * self.config.restart_backoff_multiplier;
let backoff_duration = Duration::from_secs_f64(backoff_secs);
info!("Waiting {:?} before continuing...", backoff_duration);
drop(state);
time::sleep(backoff_duration).await;
}
pub async fn get_statistics(&self) -> RecoveryStatistics {
let state = self.state.read().await;
RecoveryStatistics {
total_recoveries: state.total_recoveries,
current_restart_attempts: state.restart_attempts,
healthy: state.healthy,
last_health_check: state.last_health_check,
last_restart: state.last_restart,
}
}
pub async fn force_recovery(&self, action: RecoveryAction) -> FusekiResult<()> {
info!("Forcing recovery with action: {:?}", action);
self.execute_recovery_action(action).await
}
}
#[derive(Debug, Clone)]
pub struct RecoveryStatistics {
pub total_recoveries: u64,
pub current_restart_attempts: u32,
pub healthy: bool,
pub last_health_check: Option<Instant>,
pub last_restart: Option<Instant>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_recovery_config_default() {
let config = RecoveryConfig::default();
assert_eq!(config.max_restart_attempts, 3);
assert_eq!(config.restart_backoff_multiplier, 2.0);
assert!(config.enabled);
}
#[tokio::test]
async fn test_recovery_state() {
let state = RecoveryState::default();
assert_eq!(state.restart_attempts, 0);
assert!(state.healthy);
assert_eq!(state.total_recoveries, 0);
}
}