Skip to main content

forge_runtime/cron/
scheduler.rs

1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use chrono::{DateTime, Utc};
6use tokio::sync::RwLock;
7use tracing::{Instrument, Span, field};
8use uuid::Uuid;
9
10use super::registry::CronRegistry;
11use crate::cluster::LeaderElection;
12use forge_core::CircuitBreakerClient;
13use forge_core::cron::CronContext;
14
15/// Cron run status.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CronStatus {
18    /// Pending execution.
19    Pending,
20    /// Currently running.
21    Running,
22    /// Completed successfully.
23    Completed,
24    /// Failed with error.
25    Failed,
26}
27
28impl CronStatus {
29    /// Convert to string for database storage.
30    pub fn as_str(&self) -> &'static str {
31        match self {
32            Self::Pending => "pending",
33            Self::Running => "running",
34            Self::Completed => "completed",
35            Self::Failed => "failed",
36        }
37    }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct ParseCronStatusError(pub String);
42
43impl std::fmt::Display for ParseCronStatusError {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(f, "invalid cron status: '{}'", self.0)
46    }
47}
48
49impl std::error::Error for ParseCronStatusError {}
50
51impl FromStr for CronStatus {
52    type Err = ParseCronStatusError;
53
54    fn from_str(s: &str) -> Result<Self, Self::Err> {
55        match s {
56            "pending" => Ok(Self::Pending),
57            "running" => Ok(Self::Running),
58            "completed" => Ok(Self::Completed),
59            "failed" => Ok(Self::Failed),
60            _ => Err(ParseCronStatusError(s.to_string())),
61        }
62    }
63}
64
65/// A cron run record from the database.
66#[derive(Debug, Clone)]
67pub struct CronRecord {
68    /// Run ID.
69    pub id: Uuid,
70    /// Cron name.
71    pub cron_name: String,
72    /// Scheduled time.
73    pub scheduled_time: DateTime<Utc>,
74    /// Timezone.
75    pub timezone: String,
76    /// Current status.
77    pub status: CronStatus,
78    /// Node that executed the cron.
79    pub node_id: Option<Uuid>,
80    /// When execution started.
81    pub started_at: Option<DateTime<Utc>>,
82    /// When execution completed.
83    pub completed_at: Option<DateTime<Utc>>,
84    /// Error message if failed.
85    pub error: Option<String>,
86}
87
88impl CronRecord {
89    /// Create a new pending cron record.
90    pub fn new(
91        cron_name: impl Into<String>,
92        scheduled_time: DateTime<Utc>,
93        timezone: impl Into<String>,
94    ) -> Self {
95        Self {
96            id: Uuid::new_v4(),
97            cron_name: cron_name.into(),
98            scheduled_time,
99            timezone: timezone.into(),
100            status: CronStatus::Pending,
101            node_id: None,
102            started_at: None,
103            completed_at: None,
104            error: None,
105        }
106    }
107}
108
109/// Configuration for the cron runner.
110#[derive(Clone)]
111pub struct CronRunnerConfig {
112    /// How often to check for due crons.
113    pub poll_interval: Duration,
114    /// Node ID for this runner.
115    pub node_id: Uuid,
116    /// Static leadership fallback when no election handle is configured.
117    pub is_leader: bool,
118    /// Dynamic leader election handle.
119    pub leader_election: Option<Arc<LeaderElection>>,
120    /// Threshold after which a running cron slot is considered stale.
121    pub run_stale_threshold: Duration,
122}
123
124impl Default for CronRunnerConfig {
125    fn default() -> Self {
126        Self {
127            poll_interval: Duration::from_secs(1),
128            node_id: Uuid::new_v4(),
129            is_leader: true,
130            leader_election: None,
131            run_stale_threshold: Duration::from_secs(15 * 60),
132        }
133    }
134}
135
136/// Cron scheduler and executor.
137pub struct CronRunner {
138    registry: Arc<CronRegistry>,
139    pool: sqlx::PgPool,
140    http_client: CircuitBreakerClient,
141    config: CronRunnerConfig,
142    is_running: Arc<RwLock<bool>>,
143}
144
145impl CronRunner {
146    /// Create a new cron runner.
147    pub fn new(
148        registry: Arc<CronRegistry>,
149        pool: sqlx::PgPool,
150        http_client: CircuitBreakerClient,
151        config: CronRunnerConfig,
152    ) -> Self {
153        Self {
154            registry,
155            pool,
156            http_client,
157            config,
158            is_running: Arc::new(RwLock::new(false)),
159        }
160    }
161
162    /// Start the cron runner loop.
163    pub async fn run(&self) -> forge_core::Result<()> {
164        {
165            let mut running = self.is_running.write().await;
166            if *running {
167                return Ok(());
168            }
169            *running = true;
170        }
171
172        tracing::debug!("Cron runner starting");
173
174        loop {
175            if !*self.is_running.read().await {
176                break;
177            }
178
179            if self.is_leader()
180                && let Err(e) = self.tick().await
181            {
182                tracing::warn!(error = %e, "Cron tick failed");
183            }
184
185            tokio::time::sleep(self.config.poll_interval).await;
186        }
187
188        tracing::debug!("Cron runner stopped");
189        Ok(())
190    }
191
192    /// Stop the cron runner.
193    pub async fn stop(&self) {
194        let mut running = self.is_running.write().await;
195        *running = false;
196    }
197
198    fn is_leader(&self) -> bool {
199        self.config
200            .leader_election
201            .as_ref()
202            .map(|e| e.is_leader())
203            .unwrap_or(self.config.is_leader)
204    }
205
206    /// Execute one tick of the scheduler.
207    async fn tick(&self) -> forge_core::Result<()> {
208        let tick_span = tracing::info_span!(
209            "cron.tick",
210            cron.tick_id = %Uuid::new_v4(),
211            cron.jobs_checked = field::Empty,
212            cron.jobs_executed = field::Empty,
213        );
214
215        async {
216            let now = Utc::now();
217            // Look back 2x poll interval to catch any scheduled times we might have missed
218            let window_start = now
219                - chrono::Duration::from_std(self.config.poll_interval * 2)
220                    .unwrap_or(chrono::Duration::seconds(2));
221
222            let cron_list = self.registry.list();
223            let mut jobs_executed = 0u32;
224
225            Span::current().record("cron.jobs_checked", cron_list.len());
226
227            if cron_list.is_empty() {
228                tracing::trace!("Cron tick: no crons registered");
229            } else {
230                tracing::trace!(
231                    cron_count = cron_list.len(),
232                    "Cron tick checking {} registered crons",
233                    cron_list.len()
234                );
235            }
236
237            for entry in cron_list {
238                let info = &entry.info;
239
240                let scheduled_times = info
241                    .schedule
242                    .between_in_tz(window_start, now, info.timezone);
243
244                // Record missed runs that we found
245                if scheduled_times.len() > 1 {
246                    tracing::info!(
247                        cron.name = info.name,
248                        cron.missed_count = scheduled_times.len() - 1,
249                        "Detected missed cron runs"
250                    );
251                    Span::current().record("cron.missed_runs", scheduled_times.len() - 1);
252                }
253
254                if !scheduled_times.is_empty() {
255                    tracing::trace!(
256                        cron = info.name,
257                        schedule = info.schedule.expression(),
258                        scheduled_count = scheduled_times.len(),
259                        "Found scheduled cron runs"
260                    );
261                }
262
263                for scheduled in scheduled_times {
264                    // Try to claim this cron run; only claimed slots execute.
265                    if let Ok(Some(run_id)) =
266                        self.try_claim(info.name, scheduled, info.timezone).await
267                    {
268                        self.execute_cron(entry, run_id, scheduled, false).await;
269                        jobs_executed += 1;
270                    }
271                }
272
273                // Handle catch-up if enabled
274                if info.catch_up
275                    && let Err(e) = self.handle_catch_up(entry).await
276                {
277                    tracing::warn!(
278                        cron = info.name,
279                        error = %e,
280                        "Failed to process catch-up runs"
281                    );
282                }
283            }
284
285            Span::current().record("cron.jobs_executed", jobs_executed);
286            Ok(())
287        }
288        .instrument(tick_span)
289        .await
290    }
291
292    /// Try to claim a cron run.
293    ///
294    /// Returns the run ID if claimed (or stale-reclaimed), otherwise None.
295    async fn try_claim(
296        &self,
297        cron_name: &str,
298        scheduled_time: DateTime<Utc>,
299        _timezone: &str,
300    ) -> forge_core::Result<Option<Uuid>> {
301        let claim_id = Uuid::new_v4();
302        let stale_threshold = chrono::Duration::from_std(self.config.run_stale_threshold)
303            .unwrap_or(chrono::Duration::minutes(15));
304
305        // Insert new run, or reclaim stale running row if previous node crashed.
306        let result = sqlx::query(
307            r#"
308            INSERT INTO forge_cron_runs (id, cron_name, scheduled_time, status, node_id, started_at)
309            VALUES ($1, $2, $3, 'running', $4, NOW())
310            ON CONFLICT (cron_name, scheduled_time) DO UPDATE
311            SET
312                id = EXCLUDED.id,
313                status = 'running',
314                node_id = EXCLUDED.node_id,
315                started_at = NOW(),
316                completed_at = NULL,
317                error = NULL
318            WHERE forge_cron_runs.status = 'running'
319              AND forge_cron_runs.started_at < NOW() - $5
320            "#,
321        )
322        .bind(claim_id)
323        .bind(cron_name)
324        .bind(scheduled_time)
325        .bind(self.config.node_id)
326        .bind(stale_threshold)
327        .execute(&self.pool)
328        .await
329        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
330
331        if result.rows_affected() > 0 {
332            Ok(Some(claim_id))
333        } else {
334            Ok(None)
335        }
336    }
337
338    /// Execute a cron job.
339    async fn execute_cron(
340        &self,
341        entry: &super::registry::CronEntry,
342        run_id: Uuid,
343        scheduled_time: DateTime<Utc>,
344        is_catch_up: bool,
345    ) {
346        let info = &entry.info;
347        let start_time = Instant::now();
348
349        let exec_span = tracing::info_span!(
350            "cron.execute",
351            cron.name = info.name,
352            cron.run_id = %run_id,
353            cron.schedule = info.schedule.expression(),
354            cron.timezone = info.timezone,
355            cron.scheduled_time = %scheduled_time,
356            cron.is_catch_up = is_catch_up,
357            cron.duration_ms = field::Empty,
358            cron.status = field::Empty,
359            otel.name = %format!("cron {}", info.name),
360        );
361
362        async {
363            tracing::trace!("Executing cron");
364
365            if is_catch_up {
366                tracing::info!(
367                    cron.name = info.name,
368                    cron.scheduled_time = %scheduled_time,
369                    "Executing catch-up run"
370                );
371            }
372
373            let ctx = CronContext::new(
374                run_id,
375                info.name.to_string(),
376                scheduled_time,
377                info.timezone.to_string(),
378                is_catch_up,
379                self.pool.clone(),
380                self.http_client.inner().clone(),
381            );
382
383            // Execute with timeout
384            let handler = entry.handler.clone();
385            let result = tokio::time::timeout(info.timeout, handler(&ctx)).await;
386
387            let duration_ms = start_time.elapsed().as_millis() as u64;
388            Span::current().record("cron.duration_ms", duration_ms);
389
390            match result {
391                Ok(Ok(())) => {
392                    Span::current().record("cron.status", "completed");
393                    tracing::debug!(cron.duration_ms = duration_ms, "Cron executed");
394                    self.mark_completed(run_id, info.name).await;
395                }
396                Ok(Err(e)) => {
397                    Span::current().record("cron.status", "failed");
398                    tracing::warn!(
399                        cron.duration_ms = duration_ms,
400                        error = %e,
401                        "Cron failed"
402                    );
403                    self.mark_failed(run_id, info.name, &e.to_string()).await;
404                }
405                Err(_) => {
406                    Span::current().record("cron.status", "timeout");
407                    tracing::warn!(
408                        cron.duration_ms = duration_ms,
409                        cron.timeout_ms = info.timeout.as_millis() as u64,
410                        "Cron timed out"
411                    );
412                    self.mark_failed(run_id, info.name, "Execution timed out")
413                        .await;
414                }
415            }
416        }
417        .instrument(exec_span)
418        .await
419    }
420
421    /// Mark a cron run as completed.
422    async fn mark_completed(&self, run_id: Uuid, cron_name: &str) {
423        if let Err(e) = sqlx::query(
424            r#"
425            UPDATE forge_cron_runs
426            SET status = 'completed', completed_at = NOW()
427            WHERE id = $1 AND node_id = $2
428            "#,
429        )
430        .bind(run_id)
431        .bind(self.config.node_id)
432        .execute(&self.pool)
433        .await
434        {
435            tracing::error!(cron = cron_name, error = %e, "Failed to mark cron completed");
436        }
437    }
438
439    /// Mark a cron run as failed.
440    async fn mark_failed(&self, run_id: Uuid, cron_name: &str, error: &str) {
441        if let Err(e) = sqlx::query(
442            r#"
443            UPDATE forge_cron_runs
444            SET status = 'failed', completed_at = NOW(), error = $3
445            WHERE id = $1 AND node_id = $2
446            "#,
447        )
448        .bind(run_id)
449        .bind(self.config.node_id)
450        .bind(error)
451        .execute(&self.pool)
452        .await
453        {
454            tracing::error!(cron = cron_name, error = %e, "Failed to mark cron failed");
455        }
456    }
457
458    /// Handle catch-up for missed runs.
459    async fn handle_catch_up(&self, entry: &super::registry::CronEntry) -> forge_core::Result<()> {
460        let info = &entry.info;
461        let now = Utc::now();
462
463        let catch_up_span = tracing::info_span!(
464            "cron.catch_up",
465            cron.name = info.name,
466            cron.missed_count = field::Empty,
467            cron.executed_count = field::Empty,
468        );
469
470        async {
471            // Find the last completed run
472            let last_run: Option<(DateTime<Utc>,)> = sqlx::query_as(
473                r#"
474                SELECT scheduled_time
475                FROM forge_cron_runs
476                WHERE cron_name = $1 AND status = 'completed'
477                ORDER BY scheduled_time DESC
478                LIMIT 1
479                "#,
480            )
481            .bind(info.name)
482            .fetch_optional(&self.pool)
483            .await
484            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
485
486            let start_time = last_run
487                .map(|(t,)| t)
488                .unwrap_or(now - chrono::Duration::days(1));
489
490            // Get all scheduled times between last run and now
491            let missed_times = info.schedule.between_in_tz(start_time, now, info.timezone);
492
493            // Limit catch-up runs
494            let to_catch_up: Vec<_> = missed_times
495                .into_iter()
496                .take(info.catch_up_limit as usize)
497                .collect();
498
499            Span::current().record("cron.missed_count", to_catch_up.len());
500
501            if !to_catch_up.is_empty() {
502                tracing::info!(
503                    cron.name = info.name,
504                    cron.catch_up_count = to_catch_up.len(),
505                    cron.catch_up_limit = info.catch_up_limit,
506                    "Processing catch-up runs"
507                );
508            }
509
510            let mut executed_count = 0u32;
511            for scheduled in to_catch_up {
512                // Try to claim and execute
513                if let Some(run_id) = self.try_claim(info.name, scheduled, info.timezone).await? {
514                    self.execute_cron(entry, run_id, scheduled, true).await;
515                    executed_count += 1;
516                }
517            }
518
519            Span::current().record("cron.executed_count", executed_count);
520            Ok(())
521        }
522        .instrument(catch_up_span)
523        .await
524    }
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530
531    #[test]
532    fn test_cron_status_conversion() {
533        assert_eq!(CronStatus::Pending.as_str(), "pending");
534        assert_eq!(CronStatus::Running.as_str(), "running");
535        assert_eq!(CronStatus::Completed.as_str(), "completed");
536        assert_eq!(CronStatus::Failed.as_str(), "failed");
537
538        assert_eq!("pending".parse::<CronStatus>(), Ok(CronStatus::Pending));
539        assert_eq!("running".parse::<CronStatus>(), Ok(CronStatus::Running));
540        assert_eq!("completed".parse::<CronStatus>(), Ok(CronStatus::Completed));
541        assert_eq!("failed".parse::<CronStatus>(), Ok(CronStatus::Failed));
542        assert!("invalid".parse::<CronStatus>().is_err());
543    }
544
545    #[test]
546    fn test_cron_record_creation() {
547        let record = CronRecord::new("daily_cleanup", Utc::now(), "UTC");
548        assert_eq!(record.cron_name, "daily_cleanup");
549        assert_eq!(record.timezone, "UTC");
550        assert_eq!(record.status, CronStatus::Pending);
551        assert!(record.node_id.is_none());
552    }
553
554    #[test]
555    fn test_cron_runner_config_default() {
556        let config = CronRunnerConfig::default();
557        assert_eq!(config.poll_interval, Duration::from_secs(1));
558        assert!(config.is_leader);
559    }
560}