Skip to main content

pylon_runtime/
scheduler.rs

1//! Cron-based scheduler that enqueues jobs on a recurring schedule.
2//!
3//! The scheduler maintains a list of named tasks, each with a cron expression
4//! and an associated job handler. A background thread wakes every 30 seconds
5//! and enqueues jobs for any tasks whose cron expression matches the current
6//! minute (deduplicating within the same minute).
7
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11
12use serde::Serialize;
13
14use crate::cron::CronExpr;
15use crate::jobs::{JobHandler, JobQueue};
16
17// ---------------------------------------------------------------------------
18// Scheduled task
19// ---------------------------------------------------------------------------
20
21/// Internal representation of a recurring scheduled task.
22#[allow(dead_code)]
23struct ScheduledTask {
24    name: String,
25    cron: CronExpr,
26    handler: JobHandler,
27    enabled: bool,
28    /// Unix timestamp of the last time this task was enqueued.
29    last_run: Option<u64>,
30}
31
32// ---------------------------------------------------------------------------
33// Public task info
34// ---------------------------------------------------------------------------
35
36/// Read-only information about a scheduled task, returned by `list_tasks()`.
37#[derive(Debug, Clone, Serialize)]
38pub struct TaskInfo {
39    pub name: String,
40    pub enabled: bool,
41    pub last_run: Option<u64>,
42}
43
44// ---------------------------------------------------------------------------
45// Scheduler
46// ---------------------------------------------------------------------------
47
48/// The cron scheduler. Runs registered tasks on their configured schedules.
49pub struct Scheduler {
50    tasks: Mutex<Vec<ScheduledTask>>,
51    job_queue: Arc<JobQueue>,
52    running: AtomicBool,
53}
54
55impl Scheduler {
56    pub fn new(job_queue: Arc<JobQueue>) -> Self {
57        Self {
58            tasks: Mutex::new(Vec::new()),
59            job_queue,
60            running: AtomicBool::new(true),
61        }
62    }
63
64    /// Register a cron task. The handler will also be registered with the job
65    /// queue so that workers can execute it.
66    pub fn schedule(&self, name: &str, cron_expr: &str, handler: JobHandler) -> Result<(), String> {
67        let cron = CronExpr::parse(cron_expr)?;
68
69        // Register handler with job queue so workers can pick it up.
70        self.job_queue.register(name, Arc::clone(&handler));
71
72        self.tasks.lock().unwrap().push(ScheduledTask {
73            name: name.to_string(),
74            cron,
75            handler,
76            enabled: true,
77            last_run: None,
78        });
79
80        Ok(())
81    }
82
83    /// Start the scheduler loop in a background thread.
84    pub fn start(self: Arc<Self>) -> SchedulerHandle {
85        let scheduler = Arc::clone(&self);
86        let handle = std::thread::spawn(move || {
87            while scheduler.running.load(Ordering::Relaxed) {
88                scheduler.tick();
89                // Sleep in short intervals so we can observe the shutdown flag
90                // without waiting a full 30 seconds.
91                for _ in 0..30 {
92                    if !scheduler.running.load(Ordering::Relaxed) {
93                        return;
94                    }
95                    std::thread::sleep(Duration::from_secs(1));
96                }
97            }
98        });
99        SchedulerHandle {
100            scheduler: self,
101            handle: Some(handle),
102        }
103    }
104
105    /// Check all tasks and enqueue any that match the current time.
106    ///
107    /// This is also useful for testing: call `tick()` directly to simulate
108    /// the scheduler loop without waiting for the background thread.
109    pub fn tick(&self) {
110        let now = std::time::SystemTime::now()
111            .duration_since(std::time::UNIX_EPOCH)
112            .unwrap_or_default()
113            .as_secs();
114        self.tick_at(now);
115    }
116
117    /// Internal tick with an explicit timestamp (for testability).
118    fn tick_at(&self, now: u64) {
119        let current_minute = now / 60;
120
121        let mut tasks = self.tasks.lock().unwrap();
122        for task in tasks.iter_mut() {
123            if !task.enabled {
124                continue;
125            }
126
127            let last_minute = task.last_run.map(|t| t / 60).unwrap_or(0);
128            if current_minute > last_minute && task.cron.matches(now) {
129                task.last_run = Some(now);
130                self.job_queue.enqueue(
131                    &task.name,
132                    serde_json::json!({
133                        "scheduled": true,
134                        "timestamp": now,
135                    }),
136                );
137            }
138        }
139    }
140
141    /// List all scheduled tasks.
142    pub fn list_tasks(&self) -> Vec<TaskInfo> {
143        self.tasks
144            .lock()
145            .unwrap()
146            .iter()
147            .map(|t| TaskInfo {
148                name: t.name.clone(),
149                enabled: t.enabled,
150                last_run: t.last_run,
151            })
152            .collect()
153    }
154
155    /// Enable or disable a task by name. Returns true if the task was found.
156    pub fn set_enabled(&self, name: &str, enabled: bool) -> bool {
157        let mut tasks = self.tasks.lock().unwrap();
158        if let Some(task) = tasks.iter_mut().find(|t| t.name == name) {
159            task.enabled = enabled;
160            true
161        } else {
162            false
163        }
164    }
165
166    /// Manually trigger a scheduled task by enqueueing it immediately.
167    /// Returns true if the task was found and enqueued.
168    pub fn trigger(&self, name: &str) -> bool {
169        let tasks = self.tasks.lock().unwrap();
170        if tasks.iter().any(|t| t.name == name) {
171            let now = std::time::SystemTime::now()
172                .duration_since(std::time::UNIX_EPOCH)
173                .unwrap_or_default()
174                .as_secs();
175            drop(tasks);
176            self.job_queue.enqueue(
177                name,
178                serde_json::json!({
179                    "scheduled": true,
180                    "manual_trigger": true,
181                    "timestamp": now,
182                }),
183            );
184            true
185        } else {
186            false
187        }
188    }
189}
190
191// ---------------------------------------------------------------------------
192// SchedulerHandle
193// ---------------------------------------------------------------------------
194
195/// Handle returned by `Scheduler::start()` to stop the background thread.
196pub struct SchedulerHandle {
197    scheduler: Arc<Scheduler>,
198    #[allow(dead_code)]
199    handle: Option<std::thread::JoinHandle<()>>,
200}
201
202impl SchedulerHandle {
203    /// Signal the scheduler to stop after its current sleep cycle.
204    pub fn stop(&self) {
205        self.scheduler.running.store(false, Ordering::Relaxed);
206    }
207}
208
209// ---------------------------------------------------------------------------
210// Tests
211// ---------------------------------------------------------------------------
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216    use crate::jobs::JobResult;
217
218    #[test]
219    fn schedule_registers_handler() {
220        let q = Arc::new(JobQueue::new(100));
221        let sched = Scheduler::new(Arc::clone(&q));
222
223        sched
224            .schedule("cleanup", "*/5 * * * *", Arc::new(|_| JobResult::Success))
225            .unwrap();
226
227        let tasks = sched.list_tasks();
228        assert_eq!(tasks.len(), 1);
229        assert_eq!(tasks[0].name, "cleanup");
230        assert!(tasks[0].enabled);
231        assert!(tasks[0].last_run.is_none());
232
233        // Handler should be registered with the queue.
234        let stats = q.stats();
235        assert!(stats.handlers.contains(&"cleanup".to_string()));
236    }
237
238    #[test]
239    fn schedule_rejects_bad_cron() {
240        let q = Arc::new(JobQueue::new(100));
241        let sched = Scheduler::new(Arc::clone(&q));
242
243        let result = sched.schedule("bad", "not a cron", Arc::new(|_| JobResult::Success));
244        assert!(result.is_err());
245    }
246
247    #[test]
248    fn tick_enqueues_matching_tasks() {
249        let q = Arc::new(JobQueue::new(100));
250        let sched = Scheduler::new(Arc::clone(&q));
251
252        // Every minute.
253        sched
254            .schedule("every_min", "* * * * *", Arc::new(|_| JobResult::Success))
255            .unwrap();
256
257        // A specific timestamp that matches "* * * * *".
258        sched.tick_at(1705314600); // 2024-01-15 10:30 UTC
259
260        assert_eq!(q.pending_count(), 1);
261    }
262
263    #[test]
264    fn tick_deduplicates_within_same_minute() {
265        let q = Arc::new(JobQueue::new(100));
266        let sched = Scheduler::new(Arc::clone(&q));
267
268        sched
269            .schedule("dedup", "* * * * *", Arc::new(|_| JobResult::Success))
270            .unwrap();
271
272        // Tick at :30 seconds.
273        sched.tick_at(1705314600);
274        // Tick again at :45 seconds (same minute).
275        sched.tick_at(1705314615);
276
277        // Should only have enqueued once.
278        assert_eq!(q.pending_count(), 1);
279    }
280
281    #[test]
282    fn tick_enqueues_again_next_minute() {
283        let q = Arc::new(JobQueue::new(100));
284        let sched = Scheduler::new(Arc::clone(&q));
285
286        sched
287            .schedule("repeat", "* * * * *", Arc::new(|_| JobResult::Success))
288            .unwrap();
289
290        sched.tick_at(1705314600);
291        sched.tick_at(1705314660); // next minute
292
293        assert_eq!(q.pending_count(), 2);
294    }
295
296    #[test]
297    fn tick_skips_disabled_tasks() {
298        let q = Arc::new(JobQueue::new(100));
299        let sched = Scheduler::new(Arc::clone(&q));
300
301        sched
302            .schedule("disabled", "* * * * *", Arc::new(|_| JobResult::Success))
303            .unwrap();
304        sched.set_enabled("disabled", false);
305
306        sched.tick_at(1705314600);
307        assert_eq!(q.pending_count(), 0);
308    }
309
310    #[test]
311    fn set_enabled_returns_false_for_unknown() {
312        let q = Arc::new(JobQueue::new(100));
313        let sched = Scheduler::new(Arc::clone(&q));
314        assert!(!sched.set_enabled("nonexistent", false));
315    }
316
317    #[test]
318    fn trigger_enqueues_immediately() {
319        let q = Arc::new(JobQueue::new(100));
320        let sched = Scheduler::new(Arc::clone(&q));
321
322        sched
323            .schedule("manual", "0 0 1 1 *", Arc::new(|_| JobResult::Success))
324            .unwrap();
325
326        assert!(sched.trigger("manual"));
327        assert_eq!(q.pending_count(), 1);
328    }
329
330    #[test]
331    fn trigger_returns_false_for_unknown() {
332        let q = Arc::new(JobQueue::new(100));
333        let sched = Scheduler::new(Arc::clone(&q));
334        assert!(!sched.trigger("nonexistent"));
335    }
336
337    #[test]
338    fn tick_does_not_match_wrong_schedule() {
339        let q = Arc::new(JobQueue::new(100));
340        let sched = Scheduler::new(Arc::clone(&q));
341
342        // Only at midnight on January 1st.
343        sched
344            .schedule("yearly", "0 0 1 1 *", Arc::new(|_| JobResult::Success))
345            .unwrap();
346
347        // 2024-01-15 10:30 should NOT match.
348        sched.tick_at(1705314600);
349        assert_eq!(q.pending_count(), 0);
350    }
351}