Skip to main content

forge_runtime/cron/
scheduler.rs

1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use super::registry::CronRegistry;
10use forge_core::CircuitBreakerClient;
11use forge_core::cron::CronContext;
12
13/// Cron run status.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum CronStatus {
16    /// Pending execution.
17    Pending,
18    /// Currently running.
19    Running,
20    /// Completed successfully.
21    Completed,
22    /// Failed with error.
23    Failed,
24}
25
26impl CronStatus {
27    /// Convert to string for database storage.
28    pub fn as_str(&self) -> &'static str {
29        match self {
30            Self::Pending => "pending",
31            Self::Running => "running",
32            Self::Completed => "completed",
33            Self::Failed => "failed",
34        }
35    }
36}
37
38impl FromStr for CronStatus {
39    type Err = std::convert::Infallible;
40
41    fn from_str(s: &str) -> Result<Self, Self::Err> {
42        Ok(match s {
43            "pending" => Self::Pending,
44            "running" => Self::Running,
45            "completed" => Self::Completed,
46            "failed" => Self::Failed,
47            _ => Self::Pending,
48        })
49    }
50}
51
52/// A cron run record from the database.
53#[derive(Debug, Clone)]
54pub struct CronRecord {
55    /// Run ID.
56    pub id: Uuid,
57    /// Cron name.
58    pub cron_name: String,
59    /// Scheduled time.
60    pub scheduled_time: DateTime<Utc>,
61    /// Timezone.
62    pub timezone: String,
63    /// Current status.
64    pub status: CronStatus,
65    /// Node that executed the cron.
66    pub node_id: Option<Uuid>,
67    /// When execution started.
68    pub started_at: Option<DateTime<Utc>>,
69    /// When execution completed.
70    pub completed_at: Option<DateTime<Utc>>,
71    /// Error message if failed.
72    pub error: Option<String>,
73}
74
75impl CronRecord {
76    /// Create a new pending cron record.
77    pub fn new(
78        cron_name: impl Into<String>,
79        scheduled_time: DateTime<Utc>,
80        timezone: impl Into<String>,
81    ) -> Self {
82        Self {
83            id: Uuid::new_v4(),
84            cron_name: cron_name.into(),
85            scheduled_time,
86            timezone: timezone.into(),
87            status: CronStatus::Pending,
88            node_id: None,
89            started_at: None,
90            completed_at: None,
91            error: None,
92        }
93    }
94}
95
96/// Configuration for the cron runner.
97#[derive(Debug, Clone)]
98pub struct CronRunnerConfig {
99    /// How often to check for due crons.
100    pub poll_interval: Duration,
101    /// Node ID for this runner.
102    pub node_id: Uuid,
103    /// Whether this node is the leader (only leaders run crons).
104    pub is_leader: bool,
105}
106
107impl Default for CronRunnerConfig {
108    fn default() -> Self {
109        Self {
110            poll_interval: Duration::from_secs(1),
111            node_id: Uuid::new_v4(),
112            is_leader: true,
113        }
114    }
115}
116
117/// Cron scheduler and executor.
118pub struct CronRunner {
119    registry: Arc<CronRegistry>,
120    pool: sqlx::PgPool,
121    http_client: CircuitBreakerClient,
122    config: CronRunnerConfig,
123    is_running: Arc<RwLock<bool>>,
124}
125
126impl CronRunner {
127    /// Create a new cron runner.
128    pub fn new(
129        registry: Arc<CronRegistry>,
130        pool: sqlx::PgPool,
131        http_client: CircuitBreakerClient,
132        config: CronRunnerConfig,
133    ) -> Self {
134        Self {
135            registry,
136            pool,
137            http_client,
138            config,
139            is_running: Arc::new(RwLock::new(false)),
140        }
141    }
142
143    /// Start the cron runner loop.
144    pub async fn run(&self) -> forge_core::Result<()> {
145        {
146            let mut running = self.is_running.write().await;
147            if *running {
148                return Ok(());
149            }
150            *running = true;
151        }
152
153        tracing::info!("Cron runner starting");
154
155        loop {
156            if !*self.is_running.read().await {
157                break;
158            }
159
160            if self.config.is_leader {
161                if let Err(e) = self.tick().await {
162                    tracing::error!(error = %e, "Cron tick failed");
163                }
164            }
165
166            tokio::time::sleep(self.config.poll_interval).await;
167        }
168
169        tracing::info!("Cron runner stopped");
170        Ok(())
171    }
172
173    /// Stop the cron runner.
174    pub async fn stop(&self) {
175        let mut running = self.is_running.write().await;
176        *running = false;
177    }
178
179    /// Execute one tick of the scheduler.
180    async fn tick(&self) -> forge_core::Result<()> {
181        let now = Utc::now();
182        // Look back 2x poll interval to catch any scheduled times we might have missed
183        let window_start = now
184            - chrono::Duration::from_std(self.config.poll_interval * 2)
185                .unwrap_or(chrono::Duration::seconds(2));
186
187        let cron_list = self.registry.list();
188
189        if cron_list.is_empty() {
190            tracing::trace!("Cron tick: no crons registered");
191        } else {
192            tracing::trace!(
193                cron_count = cron_list.len(),
194                "Cron tick checking {} registered crons",
195                cron_list.len()
196            );
197        }
198
199        for entry in cron_list {
200            let info = &entry.info;
201
202            let scheduled_times = info
203                .schedule
204                .between_in_tz(window_start, now, info.timezone);
205            if !scheduled_times.is_empty() {
206                tracing::trace!(
207                    cron = info.name,
208                    schedule = info.schedule.expression(),
209                    scheduled_count = scheduled_times.len(),
210                    "Found scheduled cron runs"
211                );
212            }
213
214            for scheduled in scheduled_times {
215                // Try to claim this cron run (database ensures exactly-once execution)
216                if let Ok(claimed) = self.try_claim(info.name, scheduled, info.timezone).await {
217                    if claimed {
218                        // Execute the cron
219                        self.execute_cron(entry, scheduled, false).await;
220                    }
221                }
222            }
223
224            // Handle catch-up if enabled
225            if info.catch_up {
226                if let Err(e) = self.handle_catch_up(entry).await {
227                    tracing::warn!(
228                        cron = info.name,
229                        error = %e,
230                        "Failed to process catch-up runs"
231                    );
232                }
233            }
234        }
235
236        Ok(())
237    }
238
239    /// Try to claim a cron run (returns true if claimed successfully).
240    async fn try_claim(
241        &self,
242        cron_name: &str,
243        scheduled_time: DateTime<Utc>,
244        _timezone: &str,
245    ) -> forge_core::Result<bool> {
246        // Insert with ON CONFLICT DO NOTHING to ensure exactly-once execution
247        let result = sqlx::query(
248            r#"
249            INSERT INTO forge_cron_runs (id, cron_name, scheduled_time, status, node_id, started_at)
250            VALUES ($1, $2, $3, 'running', $4, NOW())
251            ON CONFLICT (cron_name, scheduled_time) DO NOTHING
252            "#,
253        )
254        .bind(Uuid::new_v4())
255        .bind(cron_name)
256        .bind(scheduled_time)
257        .bind(self.config.node_id)
258        .execute(&self.pool)
259        .await
260        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
261
262        Ok(result.rows_affected() > 0)
263    }
264
265    /// Execute a cron job.
266    async fn execute_cron(
267        &self,
268        entry: &super::registry::CronEntry,
269        scheduled_time: DateTime<Utc>,
270        is_catch_up: bool,
271    ) {
272        let info = &entry.info;
273        let run_id = Uuid::new_v4();
274
275        tracing::debug!(
276            cron = info.name,
277            scheduled_time = %scheduled_time,
278            is_catch_up = is_catch_up,
279            "Executing cron"
280        );
281
282        let ctx = CronContext::new(
283            run_id,
284            info.name.to_string(),
285            scheduled_time,
286            info.timezone.to_string(),
287            is_catch_up,
288            self.pool.clone(),
289            self.http_client.inner().clone(),
290        );
291
292        // Execute with timeout
293        let handler = entry.handler.clone();
294        let result = tokio::time::timeout(info.timeout, handler(&ctx)).await;
295
296        match result {
297            Ok(Ok(())) => {
298                tracing::info!(
299                    cron = info.name,
300                    scheduled_time = %scheduled_time,
301                    "Cron executed"
302                );
303                self.mark_completed(info.name, scheduled_time).await;
304            }
305            Ok(Err(e)) => {
306                tracing::error!(cron = info.name, error = %e, "Cron failed");
307                self.mark_failed(info.name, scheduled_time, &e.to_string())
308                    .await;
309            }
310            Err(_) => {
311                tracing::error!(cron = info.name, "Cron timed out");
312                self.mark_failed(info.name, scheduled_time, "Execution timed out")
313                    .await;
314            }
315        }
316    }
317
318    /// Mark a cron run as completed.
319    async fn mark_completed(&self, cron_name: &str, scheduled_time: DateTime<Utc>) {
320        let _ = sqlx::query(
321            r#"
322            UPDATE forge_cron_runs
323            SET status = 'completed', completed_at = NOW()
324            WHERE cron_name = $1 AND scheduled_time = $2
325            "#,
326        )
327        .bind(cron_name)
328        .bind(scheduled_time)
329        .execute(&self.pool)
330        .await;
331    }
332
333    /// Mark a cron run as failed.
334    async fn mark_failed(&self, cron_name: &str, scheduled_time: DateTime<Utc>, error: &str) {
335        let _ = sqlx::query(
336            r#"
337            UPDATE forge_cron_runs
338            SET status = 'failed', completed_at = NOW(), error = $3
339            WHERE cron_name = $1 AND scheduled_time = $2
340            "#,
341        )
342        .bind(cron_name)
343        .bind(scheduled_time)
344        .bind(error)
345        .execute(&self.pool)
346        .await;
347    }
348
349    /// Handle catch-up for missed runs.
350    async fn handle_catch_up(&self, entry: &super::registry::CronEntry) -> forge_core::Result<()> {
351        let info = &entry.info;
352        let now = Utc::now();
353
354        // Find the last completed run
355        let last_run: Option<(DateTime<Utc>,)> = sqlx::query_as(
356            r#"
357            SELECT scheduled_time
358            FROM forge_cron_runs
359            WHERE cron_name = $1 AND status = 'completed'
360            ORDER BY scheduled_time DESC
361            LIMIT 1
362            "#,
363        )
364        .bind(info.name)
365        .fetch_optional(&self.pool)
366        .await
367        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
368
369        let start_time = last_run
370            .map(|(t,)| t)
371            .unwrap_or(now - chrono::Duration::days(1));
372
373        // Get all scheduled times between last run and now
374        let missed_times = info.schedule.between_in_tz(start_time, now, info.timezone);
375
376        // Limit catch-up runs
377        let to_catch_up: Vec<_> = missed_times
378            .into_iter()
379            .take(info.catch_up_limit as usize)
380            .collect();
381
382        for scheduled in to_catch_up {
383            // Try to claim and execute
384            if self.try_claim(info.name, scheduled, info.timezone).await? {
385                self.execute_cron(entry, scheduled, true).await;
386            }
387        }
388
389        Ok(())
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[test]
398    fn test_cron_status_conversion() {
399        assert_eq!(CronStatus::Pending.as_str(), "pending");
400        assert_eq!(CronStatus::Running.as_str(), "running");
401        assert_eq!(CronStatus::Completed.as_str(), "completed");
402        assert_eq!(CronStatus::Failed.as_str(), "failed");
403
404        assert_eq!("pending".parse::<CronStatus>(), Ok(CronStatus::Pending));
405        assert_eq!("running".parse::<CronStatus>(), Ok(CronStatus::Running));
406        assert_eq!("completed".parse::<CronStatus>(), Ok(CronStatus::Completed));
407        assert_eq!("failed".parse::<CronStatus>(), Ok(CronStatus::Failed));
408    }
409
410    #[test]
411    fn test_cron_record_creation() {
412        let record = CronRecord::new("daily_cleanup", Utc::now(), "UTC");
413        assert_eq!(record.cron_name, "daily_cleanup");
414        assert_eq!(record.timezone, "UTC");
415        assert_eq!(record.status, CronStatus::Pending);
416        assert!(record.node_id.is_none());
417    }
418
419    #[test]
420    fn test_cron_runner_config_default() {
421        let config = CronRunnerConfig::default();
422        assert_eq!(config.poll_interval, Duration::from_secs(1));
423        assert!(config.is_leader);
424    }
425}