use crate::Session;
use anyhow::Result;
use chrono::Utc;
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use vibesql_parser::Parser;
use super::storage::{ExecutionHistoryRecord, ScheduleRecord, ScheduleStatus};
#[derive(Debug, Clone)]
pub struct ScheduleExecutorConfig {
pub max_retries: u32,
pub initial_backoff: StdDuration,
pub max_backoff: StdDuration,
pub backoff_multiplier: f64,
}
impl Default for ScheduleExecutorConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_backoff: StdDuration::from_secs(5),
max_backoff: StdDuration::from_secs(300),
backoff_multiplier: 2.0,
}
}
}
pub struct ScheduleExecutor {
config: ScheduleExecutorConfig,
}
impl ScheduleExecutor {
pub fn new(config: ScheduleExecutorConfig) -> Self {
Self { config }
}
pub async fn execute_schedule(
&self,
schedule: &ScheduleRecord,
session: Arc<Mutex<Session>>,
) -> Result<ExecutionHistoryRecord> {
let started_at = Utc::now();
let statement = match Parser::parse_sql(&schedule.sql) {
Ok(stmt) => stmt,
Err(e) => {
let error_msg = e.to_string();
return Ok(ExecutionHistoryRecord {
id: None,
schedule_id: Some(schedule.id.clone()),
cron_name: None,
started_at,
completed_at: Some(Utc::now()),
status: ScheduleStatus::Failed,
error: Some(error_msg),
rows_affected: None,
});
}
};
#[allow(unused_assignments)] let mut last_error: Option<String> = None;
let mut attempt = 0;
loop {
attempt += 1;
let backoff = self.calculate_backoff(attempt - 1);
match self.execute_statement(&statement, &session).await {
Ok(rows_affected) => {
info!(
schedule_id = %schedule.id,
rows_affected = rows_affected,
attempts = attempt,
"Schedule executed successfully"
);
return Ok(ExecutionHistoryRecord {
id: None,
schedule_id: Some(schedule.id.clone()),
cron_name: None,
started_at,
completed_at: Some(Utc::now()),
status: ScheduleStatus::Completed,
error: None,
rows_affected: Some(rows_affected as i64),
});
}
Err(e) => {
last_error = Some(e.to_string());
warn!(
schedule_id = %schedule.id,
attempt = attempt,
error = %e,
"Schedule execution failed, will retry"
);
if attempt >= self.config.max_retries {
error!(
schedule_id = %schedule.id,
attempts = attempt,
error = %last_error.as_ref().unwrap(),
"Schedule execution failed after all retries"
);
return Ok(ExecutionHistoryRecord {
id: None,
schedule_id: Some(schedule.id.clone()),
cron_name: None,
started_at,
completed_at: Some(Utc::now()),
status: ScheduleStatus::Failed,
error: last_error,
rows_affected: None,
});
}
tokio::time::sleep(backoff).await;
}
}
}
}
fn calculate_backoff(&self, attempt: u32) -> StdDuration {
let backoff_secs = self.config.initial_backoff.as_secs_f64()
* self.config.backoff_multiplier.powi(attempt as i32);
let max_secs = self.config.max_backoff.as_secs_f64();
let capped_secs = backoff_secs.min(max_secs);
StdDuration::from_secs_f64(capped_secs)
}
async fn execute_statement(
&self,
_statement: &vibesql_ast::Statement,
_session: &Arc<Mutex<Session>>,
) -> Result<usize> {
Ok(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backoff_calculation() {
let config = ScheduleExecutorConfig {
initial_backoff: StdDuration::from_secs(5),
max_backoff: StdDuration::from_secs(300),
backoff_multiplier: 2.0,
..Default::default()
};
let executor = ScheduleExecutor::new(config);
assert_eq!(executor.calculate_backoff(0), StdDuration::from_secs(5));
assert_eq!(executor.calculate_backoff(1), StdDuration::from_secs(10));
assert_eq!(executor.calculate_backoff(2), StdDuration::from_secs(20));
let very_high = executor.calculate_backoff(100);
assert_eq!(very_high, StdDuration::from_secs(300));
}
}