use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use chrono::{DateTime, Utc};
use forge_core::observability::{Metric, Span, SpanKind};
use tokio::sync::RwLock;
use uuid::Uuid;
use super::registry::CronRegistry;
use crate::observability::ObservabilityState;
use forge_core::cron::CronContext;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CronStatus {
Pending,
Running,
Completed,
Failed,
}
impl CronStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Pending => "pending",
Self::Running => "running",
Self::Completed => "completed",
Self::Failed => "failed",
}
}
}
impl FromStr for CronStatus {
type Err = std::convert::Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"pending" => Self::Pending,
"running" => Self::Running,
"completed" => Self::Completed,
"failed" => Self::Failed,
_ => Self::Pending,
})
}
}
#[derive(Debug, Clone)]
pub struct CronRecord {
pub id: Uuid,
pub cron_name: String,
pub scheduled_time: DateTime<Utc>,
pub timezone: String,
pub status: CronStatus,
pub node_id: Option<Uuid>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub error: Option<String>,
}
impl CronRecord {
pub fn new(
cron_name: impl Into<String>,
scheduled_time: DateTime<Utc>,
timezone: impl Into<String>,
) -> Self {
Self {
id: Uuid::new_v4(),
cron_name: cron_name.into(),
scheduled_time,
timezone: timezone.into(),
status: CronStatus::Pending,
node_id: None,
started_at: None,
completed_at: None,
error: None,
}
}
}
#[derive(Debug, Clone)]
pub struct CronRunnerConfig {
pub poll_interval: Duration,
pub node_id: Uuid,
pub is_leader: bool,
}
impl Default for CronRunnerConfig {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
node_id: Uuid::new_v4(),
is_leader: true,
}
}
}
pub struct CronRunner {
registry: Arc<CronRegistry>,
pool: sqlx::PgPool,
http_client: reqwest::Client,
config: CronRunnerConfig,
is_running: Arc<RwLock<bool>>,
observability: Option<ObservabilityState>,
}
impl CronRunner {
pub fn new(
registry: Arc<CronRegistry>,
pool: sqlx::PgPool,
http_client: reqwest::Client,
config: CronRunnerConfig,
) -> Self {
Self {
registry,
pool,
http_client,
config,
is_running: Arc::new(RwLock::new(false)),
observability: None,
}
}
pub fn with_observability(
registry: Arc<CronRegistry>,
pool: sqlx::PgPool,
http_client: reqwest::Client,
config: CronRunnerConfig,
observability: ObservabilityState,
) -> Self {
Self {
registry,
pool,
http_client,
config,
is_running: Arc::new(RwLock::new(false)),
observability: Some(observability),
}
}
pub async fn run(&self) -> forge_core::Result<()> {
{
let mut running = self.is_running.write().await;
if *running {
return Ok(());
}
*running = true;
}
tracing::info!("Cron runner starting");
loop {
if !*self.is_running.read().await {
break;
}
if self.config.is_leader {
if let Err(e) = self.tick().await {
tracing::error!(error = %e, "Cron tick failed");
}
}
tokio::time::sleep(self.config.poll_interval).await;
}
tracing::info!("Cron runner stopped");
Ok(())
}
pub async fn stop(&self) {
let mut running = self.is_running.write().await;
*running = false;
}
async fn tick(&self) -> forge_core::Result<()> {
let now = Utc::now();
let window_start = now
- chrono::Duration::from_std(self.config.poll_interval * 2)
.unwrap_or(chrono::Duration::seconds(2));
let cron_list = self.registry.list();
if cron_list.is_empty() {
tracing::debug!("Cron tick: no crons registered");
} else {
tracing::debug!(
cron_count = cron_list.len(),
"Cron tick checking {} registered crons",
cron_list.len()
);
}
for entry in cron_list {
let info = &entry.info;
let scheduled_times = info
.schedule
.between_in_tz(window_start, now, info.timezone);
if scheduled_times.is_empty() {
tracing::debug!(
cron = info.name,
schedule = info.schedule.expression(),
"No scheduled runs in window"
);
} else {
tracing::info!(
cron = info.name,
schedule = info.schedule.expression(),
scheduled_count = scheduled_times.len(),
"Found scheduled cron runs"
);
}
for scheduled in scheduled_times {
if let Ok(claimed) = self.try_claim(info.name, scheduled, info.timezone).await {
if claimed {
self.execute_cron(entry, scheduled, false).await;
}
}
}
if info.catch_up {
if let Err(e) = self.handle_catch_up(entry).await {
tracing::warn!(
cron = info.name,
error = %e,
"Failed to process catch-up runs"
);
}
}
}
Ok(())
}
async fn try_claim(
&self,
cron_name: &str,
scheduled_time: DateTime<Utc>,
_timezone: &str,
) -> forge_core::Result<bool> {
let result = sqlx::query(
r#"
INSERT INTO forge_cron_runs (id, cron_name, scheduled_time, status, node_id, started_at)
VALUES ($1, $2, $3, 'running', $4, NOW())
ON CONFLICT (cron_name, scheduled_time) DO NOTHING
"#,
)
.bind(Uuid::new_v4())
.bind(cron_name)
.bind(scheduled_time)
.bind(self.config.node_id)
.execute(&self.pool)
.await
.map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
Ok(result.rows_affected() > 0)
}
async fn execute_cron(
&self,
entry: &super::registry::CronEntry,
scheduled_time: DateTime<Utc>,
is_catch_up: bool,
) {
let info = &entry.info;
let run_id = Uuid::new_v4();
let start = Instant::now();
tracing::info!(
cron = info.name,
scheduled_time = %scheduled_time,
is_catch_up = is_catch_up,
"Executing cron"
);
if let Some(ref obs) = self.observability {
let mut metric = Metric::counter("cron_runs_total", 1.0);
metric
.labels
.insert("cron_name".to_string(), info.name.to_string());
metric
.labels
.insert("is_catch_up".to_string(), is_catch_up.to_string());
obs.record_metric(metric).await;
}
let ctx = CronContext::new(
run_id,
info.name.to_string(),
scheduled_time,
info.timezone.to_string(),
is_catch_up,
self.pool.clone(),
self.http_client.clone(),
);
let handler = entry.handler.clone();
let result = tokio::time::timeout(info.timeout, handler(&ctx)).await;
let duration = start.elapsed();
if let Some(ref obs) = self.observability {
let mut duration_metric =
Metric::gauge("cron_duration_seconds", duration.as_secs_f64());
duration_metric
.labels
.insert("cron_name".to_string(), info.name.to_string());
obs.record_metric(duration_metric).await;
}
if let Some(ref obs) = self.observability {
let mut span = Span::new(format!("cron.{}", info.name));
span.kind = SpanKind::Internal;
span.attributes.insert(
"cron.name".to_string(),
serde_json::Value::String(info.name.to_string()),
);
span.attributes.insert(
"cron.run_id".to_string(),
serde_json::Value::String(run_id.to_string()),
);
span.attributes.insert(
"cron.scheduled_time".to_string(),
serde_json::Value::String(scheduled_time.to_rfc3339()),
);
span.attributes.insert(
"cron.is_catch_up".to_string(),
serde_json::Value::Bool(is_catch_up),
);
span.attributes.insert(
"cron.duration_ms".to_string(),
serde_json::Value::Number(serde_json::Number::from(duration.as_millis() as u64)),
);
match &result {
Ok(Ok(())) => {
span.end_ok();
}
Ok(Err(e)) => {
span.end_error(e.to_string());
}
Err(_) => {
span.end_error("Cron timed out");
}
}
obs.record_span(span).await;
}
match result {
Ok(Ok(())) => {
tracing::info!(cron = info.name, "Cron completed successfully");
self.mark_completed(info.name, scheduled_time).await;
if let Some(ref obs) = self.observability {
let mut metric = Metric::counter("cron_success_total", 1.0);
metric
.labels
.insert("cron_name".to_string(), info.name.to_string());
obs.record_metric(metric).await;
}
}
Ok(Err(e)) => {
tracing::error!(cron = info.name, error = %e, "Cron failed");
self.mark_failed(info.name, scheduled_time, &e.to_string())
.await;
if let Some(ref obs) = self.observability {
let mut metric = Metric::counter("cron_failures_total", 1.0);
metric
.labels
.insert("cron_name".to_string(), info.name.to_string());
metric
.labels
.insert("reason".to_string(), "error".to_string());
obs.record_metric(metric).await;
}
}
Err(_) => {
tracing::error!(cron = info.name, "Cron timed out");
self.mark_failed(info.name, scheduled_time, "Execution timed out")
.await;
if let Some(ref obs) = self.observability {
let mut metric = Metric::counter("cron_failures_total", 1.0);
metric
.labels
.insert("cron_name".to_string(), info.name.to_string());
metric
.labels
.insert("reason".to_string(), "timeout".to_string());
obs.record_metric(metric).await;
}
}
}
}
async fn mark_completed(&self, cron_name: &str, scheduled_time: DateTime<Utc>) {
let _ = sqlx::query(
r#"
UPDATE forge_cron_runs
SET status = 'completed', completed_at = NOW()
WHERE cron_name = $1 AND scheduled_time = $2
"#,
)
.bind(cron_name)
.bind(scheduled_time)
.execute(&self.pool)
.await;
}
async fn mark_failed(&self, cron_name: &str, scheduled_time: DateTime<Utc>, error: &str) {
let _ = sqlx::query(
r#"
UPDATE forge_cron_runs
SET status = 'failed', completed_at = NOW(), error = $3
WHERE cron_name = $1 AND scheduled_time = $2
"#,
)
.bind(cron_name)
.bind(scheduled_time)
.bind(error)
.execute(&self.pool)
.await;
}
async fn handle_catch_up(&self, entry: &super::registry::CronEntry) -> forge_core::Result<()> {
let info = &entry.info;
let now = Utc::now();
let last_run: Option<(DateTime<Utc>,)> = sqlx::query_as(
r#"
SELECT scheduled_time
FROM forge_cron_runs
WHERE cron_name = $1 AND status = 'completed'
ORDER BY scheduled_time DESC
LIMIT 1
"#,
)
.bind(info.name)
.fetch_optional(&self.pool)
.await
.map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
let start_time = last_run
.map(|(t,)| t)
.unwrap_or(now - chrono::Duration::days(1));
let missed_times = info.schedule.between_in_tz(start_time, now, info.timezone);
let to_catch_up: Vec<_> = missed_times
.into_iter()
.take(info.catch_up_limit as usize)
.collect();
for scheduled in to_catch_up {
if self.try_claim(info.name, scheduled, info.timezone).await? {
self.execute_cron(entry, scheduled, true).await;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cron_status_conversion() {
assert_eq!(CronStatus::Pending.as_str(), "pending");
assert_eq!(CronStatus::Running.as_str(), "running");
assert_eq!(CronStatus::Completed.as_str(), "completed");
assert_eq!(CronStatus::Failed.as_str(), "failed");
assert_eq!("pending".parse::<CronStatus>(), Ok(CronStatus::Pending));
assert_eq!("running".parse::<CronStatus>(), Ok(CronStatus::Running));
assert_eq!("completed".parse::<CronStatus>(), Ok(CronStatus::Completed));
assert_eq!("failed".parse::<CronStatus>(), Ok(CronStatus::Failed));
}
#[test]
fn test_cron_record_creation() {
let record = CronRecord::new("daily_cleanup", Utc::now(), "UTC");
assert_eq!(record.cron_name, "daily_cleanup");
assert_eq!(record.timezone, "UTC");
assert_eq!(record.status, CronStatus::Pending);
assert!(record.node_id.is_none());
}
#[test]
fn test_cron_runner_config_default() {
let config = CronRunnerConfig::default();
assert_eq!(config.poll_interval, Duration::from_secs(1));
assert!(config.is_leader);
}
}