use crate::context::Context;
use crate::dal::DAL;
use crate::executor::{WorkflowExecutionError, WorkflowExecutor};
use crate::models::schedule::ScheduleExecution;
use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct CronRecoveryConfig {
pub check_interval: Duration,
pub lost_threshold_minutes: i32,
pub max_recovery_age: Duration,
pub max_recovery_attempts: usize,
pub recover_disabled_schedules: bool,
}
impl Default for CronRecoveryConfig {
fn default() -> Self {
Self {
check_interval: Duration::from_secs(300), lost_threshold_minutes: 10,
max_recovery_age: Duration::from_secs(86400), max_recovery_attempts: 3,
recover_disabled_schedules: false,
}
}
}
#[derive(Clone)]
pub struct CronRecoveryService {
dal: Arc<DAL>,
executor: Arc<dyn WorkflowExecutor>,
config: CronRecoveryConfig,
shutdown: watch::Receiver<bool>,
recovery_attempts: Arc<tokio::sync::Mutex<HashMap<crate::database::UniversalUuid, usize>>>,
}
impl CronRecoveryService {
pub fn new(
dal: Arc<DAL>,
executor: Arc<dyn WorkflowExecutor>,
config: CronRecoveryConfig,
shutdown: watch::Receiver<bool>,
) -> Self {
Self {
dal,
executor,
config,
shutdown,
recovery_attempts: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
}
}
pub fn with_defaults(
dal: Arc<DAL>,
executor: Arc<dyn WorkflowExecutor>,
shutdown: watch::Receiver<bool>,
) -> Self {
Self::new(dal, executor, CronRecoveryConfig::default(), shutdown)
}
pub async fn run_recovery_loop(&mut self) -> Result<(), WorkflowExecutionError> {
info!(
"Starting cron recovery service (interval: {:?}, threshold: {} minutes)",
self.config.check_interval, self.config.lost_threshold_minutes
);
let mut interval = tokio::time::interval(self.config.check_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = self.check_and_recover_lost_executions().await {
error!("Error in cron recovery service: {}", e);
}
}
_ = self.shutdown.changed() => {
if *self.shutdown.borrow() {
info!("Cron recovery service received shutdown signal");
break;
}
}
}
}
info!("Cron recovery service stopped");
Ok(())
}
async fn check_and_recover_lost_executions(&self) -> Result<(), WorkflowExecutionError> {
debug!("Checking for lost cron executions");
let lost_executions = self
.dal
.schedule_execution()
.find_lost_executions(self.config.lost_threshold_minutes)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to find lost executions: {}", e),
})?;
if lost_executions.is_empty() {
debug!("No lost executions found");
return Ok(());
}
info!("Found {} lost cron execution(s)", lost_executions.len());
for execution in lost_executions {
if let Err(e) = self.recover_execution(&execution).await {
error!(
"Failed to recover execution {} for schedule {}: {}",
execution.id, execution.schedule_id, e
);
}
}
Ok(())
}
async fn recover_execution(
&self,
execution: &ScheduleExecution,
) -> Result<(), WorkflowExecutionError> {
let scheduled_time = execution
.scheduled_time
.as_ref()
.map(|t| t.0)
.unwrap_or(execution.created_at.0);
let execution_age = Utc::now() - scheduled_time;
if execution_age > chrono::Duration::from_std(self.config.max_recovery_age).unwrap() {
warn!(
"Execution {} is too old to recover (age: {:?}), abandoning",
execution.id, execution_age
);
return Ok(());
}
let mut attempts = self.recovery_attempts.lock().await;
let attempt_count = attempts.entry(execution.id).or_insert(0);
*attempt_count += 1;
if *attempt_count > self.config.max_recovery_attempts {
error!(
"Execution {} has exceeded max recovery attempts ({}), abandoning",
execution.id, self.config.max_recovery_attempts
);
return Ok(());
}
info!(
"Attempting recovery of execution {} (schedule: {}, attempt: {}/{})",
execution.id, execution.schedule_id, attempt_count, self.config.max_recovery_attempts
);
let schedule = match self.dal.schedule().get_by_id(execution.schedule_id).await {
Ok(sched) => sched,
Err(e) => {
warn!(
"Schedule {} not found for execution {}, skipping recovery: {}",
execution.schedule_id, execution.id, e
);
return Ok(());
}
};
if !self.config.recover_disabled_schedules && !schedule.enabled.is_true() {
info!(
"Schedule {} is disabled, skipping recovery of execution {}",
schedule.id, execution.id
);
return Ok(());
}
let mut context = Context::new();
context
.insert("is_recovery", serde_json::json!(true))
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert("recovery_attempt", serde_json::json!(attempt_count))
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert(
"original_execution_id",
serde_json::json!(execution.id.to_string()),
)
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert(
"scheduled_time",
serde_json::json!(scheduled_time.to_rfc3339()),
)
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert("schedule_id", serde_json::json!(schedule.id.to_string()))
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert(
"schedule_timezone",
serde_json::json!(schedule.timezone.as_deref().unwrap_or("UTC")),
)
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert(
"schedule_expression",
serde_json::json!(schedule.cron_expression.as_deref().unwrap_or("")),
)
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
info!(
"Executing recovery for workflow '{}' (execution: {}, schedule: {})",
schedule.workflow_name, execution.id, schedule.id
);
match self
.executor
.execute(&schedule.workflow_name, context)
.await
{
Ok(workflow_result) => {
if let Err(e) = self
.dal
.schedule_execution()
.update_workflow_execution_id(
execution.id,
crate::database::UniversalUuid(workflow_result.execution_id),
)
.await
{
error!(
"Failed to update audit record for recovered execution {}: {}",
execution.id, e
);
}
info!(
"Successfully recovered execution {} (new workflow execution: {})",
execution.id, workflow_result.execution_id
);
attempts.remove(&execution.id);
Ok(())
}
Err(e) => {
error!(
"Failed to recover execution {} for workflow '{}': {}",
execution.id, schedule.workflow_name, e
);
Err(e)
}
}
}
pub async fn clear_recovery_attempts(&self) {
let mut attempts = self.recovery_attempts.lock().await;
attempts.clear();
info!("Cleared recovery attempts cache");
}
pub async fn get_recovery_attempts(
&self,
execution_id: crate::database::UniversalUuid,
) -> usize {
let attempts = self.recovery_attempts.lock().await;
attempts.get(&execution_id).copied().unwrap_or(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_recovery_config_default() {
let config = CronRecoveryConfig::default();
assert_eq!(config.check_interval, Duration::from_secs(300));
assert_eq!(config.lost_threshold_minutes, 10);
assert_eq!(config.max_recovery_age, Duration::from_secs(86400));
assert_eq!(config.max_recovery_attempts, 3);
assert!(!config.recover_disabled_schedules);
}
#[test]
fn test_recovery_config_custom() {
let config = CronRecoveryConfig {
check_interval: Duration::from_secs(60),
lost_threshold_minutes: 5,
max_recovery_age: Duration::from_secs(3600),
max_recovery_attempts: 5,
recover_disabled_schedules: true,
};
assert_eq!(config.check_interval, Duration::from_secs(60));
assert_eq!(config.lost_threshold_minutes, 5);
assert_eq!(config.max_recovery_age, Duration::from_secs(3600));
assert_eq!(config.max_recovery_attempts, 5);
assert!(config.recover_disabled_schedules);
}
#[test]
fn test_recovery_config_clone() {
let config = CronRecoveryConfig::default();
let cloned = config.clone();
assert_eq!(config.check_interval, cloned.check_interval);
assert_eq!(config.lost_threshold_minutes, cloned.lost_threshold_minutes);
assert_eq!(config.max_recovery_attempts, cloned.max_recovery_attempts);
}
#[test]
fn test_recovery_config_default_recovery_window() {
let config = CronRecoveryConfig::default();
assert_eq!(config.max_recovery_age.as_secs(), 86400);
assert_eq!(config.check_interval.as_secs(), 300);
}
}