pylon_runtime/
scheduler.rs1use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11
12use serde::Serialize;
13
14use crate::cron::CronExpr;
15use crate::jobs::{JobHandler, JobQueue};
16
17#[allow(dead_code)]
23struct ScheduledTask {
24 name: String,
25 cron: CronExpr,
26 handler: JobHandler,
27 enabled: bool,
28 last_run: Option<u64>,
30}
31
32#[derive(Debug, Clone, Serialize)]
38pub struct TaskInfo {
39 pub name: String,
40 pub enabled: bool,
41 pub last_run: Option<u64>,
42}
43
44pub struct Scheduler {
50 tasks: Mutex<Vec<ScheduledTask>>,
51 job_queue: Arc<JobQueue>,
52 running: AtomicBool,
53}
54
55impl Scheduler {
56 pub fn new(job_queue: Arc<JobQueue>) -> Self {
57 Self {
58 tasks: Mutex::new(Vec::new()),
59 job_queue,
60 running: AtomicBool::new(true),
61 }
62 }
63
64 pub fn schedule(&self, name: &str, cron_expr: &str, handler: JobHandler) -> Result<(), String> {
67 let cron = CronExpr::parse(cron_expr)?;
68
69 self.job_queue.register(name, Arc::clone(&handler));
71
72 self.tasks.lock().unwrap().push(ScheduledTask {
73 name: name.to_string(),
74 cron,
75 handler,
76 enabled: true,
77 last_run: None,
78 });
79
80 Ok(())
81 }
82
83 pub fn start(self: Arc<Self>) -> SchedulerHandle {
85 let scheduler = Arc::clone(&self);
86 let handle = std::thread::spawn(move || {
87 while scheduler.running.load(Ordering::Relaxed) {
88 scheduler.tick();
89 for _ in 0..30 {
92 if !scheduler.running.load(Ordering::Relaxed) {
93 return;
94 }
95 std::thread::sleep(Duration::from_secs(1));
96 }
97 }
98 });
99 SchedulerHandle {
100 scheduler: self,
101 handle: Some(handle),
102 }
103 }
104
105 pub fn tick(&self) {
110 let now = std::time::SystemTime::now()
111 .duration_since(std::time::UNIX_EPOCH)
112 .unwrap_or_default()
113 .as_secs();
114 self.tick_at(now);
115 }
116
117 fn tick_at(&self, now: u64) {
119 let current_minute = now / 60;
120
121 let mut tasks = self.tasks.lock().unwrap();
122 for task in tasks.iter_mut() {
123 if !task.enabled {
124 continue;
125 }
126
127 let last_minute = task.last_run.map(|t| t / 60).unwrap_or(0);
128 if current_minute > last_minute && task.cron.matches(now) {
129 task.last_run = Some(now);
130 self.job_queue.enqueue(
131 &task.name,
132 serde_json::json!({
133 "scheduled": true,
134 "timestamp": now,
135 }),
136 );
137 }
138 }
139 }
140
141 pub fn list_tasks(&self) -> Vec<TaskInfo> {
143 self.tasks
144 .lock()
145 .unwrap()
146 .iter()
147 .map(|t| TaskInfo {
148 name: t.name.clone(),
149 enabled: t.enabled,
150 last_run: t.last_run,
151 })
152 .collect()
153 }
154
155 pub fn set_enabled(&self, name: &str, enabled: bool) -> bool {
157 let mut tasks = self.tasks.lock().unwrap();
158 if let Some(task) = tasks.iter_mut().find(|t| t.name == name) {
159 task.enabled = enabled;
160 true
161 } else {
162 false
163 }
164 }
165
166 pub fn trigger(&self, name: &str) -> bool {
169 let tasks = self.tasks.lock().unwrap();
170 if tasks.iter().any(|t| t.name == name) {
171 let now = std::time::SystemTime::now()
172 .duration_since(std::time::UNIX_EPOCH)
173 .unwrap_or_default()
174 .as_secs();
175 drop(tasks);
176 self.job_queue.enqueue(
177 name,
178 serde_json::json!({
179 "scheduled": true,
180 "manual_trigger": true,
181 "timestamp": now,
182 }),
183 );
184 true
185 } else {
186 false
187 }
188 }
189}
190
191pub struct SchedulerHandle {
197 scheduler: Arc<Scheduler>,
198 #[allow(dead_code)]
199 handle: Option<std::thread::JoinHandle<()>>,
200}
201
202impl SchedulerHandle {
203 pub fn stop(&self) {
205 self.scheduler.running.store(false, Ordering::Relaxed);
206 }
207}
208
209#[cfg(test)]
214mod tests {
215 use super::*;
216 use crate::jobs::JobResult;
217
218 #[test]
219 fn schedule_registers_handler() {
220 let q = Arc::new(JobQueue::new(100));
221 let sched = Scheduler::new(Arc::clone(&q));
222
223 sched
224 .schedule("cleanup", "*/5 * * * *", Arc::new(|_| JobResult::Success))
225 .unwrap();
226
227 let tasks = sched.list_tasks();
228 assert_eq!(tasks.len(), 1);
229 assert_eq!(tasks[0].name, "cleanup");
230 assert!(tasks[0].enabled);
231 assert!(tasks[0].last_run.is_none());
232
233 let stats = q.stats();
235 assert!(stats.handlers.contains(&"cleanup".to_string()));
236 }
237
238 #[test]
239 fn schedule_rejects_bad_cron() {
240 let q = Arc::new(JobQueue::new(100));
241 let sched = Scheduler::new(Arc::clone(&q));
242
243 let result = sched.schedule("bad", "not a cron", Arc::new(|_| JobResult::Success));
244 assert!(result.is_err());
245 }
246
247 #[test]
248 fn tick_enqueues_matching_tasks() {
249 let q = Arc::new(JobQueue::new(100));
250 let sched = Scheduler::new(Arc::clone(&q));
251
252 sched
254 .schedule("every_min", "* * * * *", Arc::new(|_| JobResult::Success))
255 .unwrap();
256
257 sched.tick_at(1705314600); assert_eq!(q.pending_count(), 1);
261 }
262
263 #[test]
264 fn tick_deduplicates_within_same_minute() {
265 let q = Arc::new(JobQueue::new(100));
266 let sched = Scheduler::new(Arc::clone(&q));
267
268 sched
269 .schedule("dedup", "* * * * *", Arc::new(|_| JobResult::Success))
270 .unwrap();
271
272 sched.tick_at(1705314600);
274 sched.tick_at(1705314615);
276
277 assert_eq!(q.pending_count(), 1);
279 }
280
281 #[test]
282 fn tick_enqueues_again_next_minute() {
283 let q = Arc::new(JobQueue::new(100));
284 let sched = Scheduler::new(Arc::clone(&q));
285
286 sched
287 .schedule("repeat", "* * * * *", Arc::new(|_| JobResult::Success))
288 .unwrap();
289
290 sched.tick_at(1705314600);
291 sched.tick_at(1705314660); assert_eq!(q.pending_count(), 2);
294 }
295
296 #[test]
297 fn tick_skips_disabled_tasks() {
298 let q = Arc::new(JobQueue::new(100));
299 let sched = Scheduler::new(Arc::clone(&q));
300
301 sched
302 .schedule("disabled", "* * * * *", Arc::new(|_| JobResult::Success))
303 .unwrap();
304 sched.set_enabled("disabled", false);
305
306 sched.tick_at(1705314600);
307 assert_eq!(q.pending_count(), 0);
308 }
309
310 #[test]
311 fn set_enabled_returns_false_for_unknown() {
312 let q = Arc::new(JobQueue::new(100));
313 let sched = Scheduler::new(Arc::clone(&q));
314 assert!(!sched.set_enabled("nonexistent", false));
315 }
316
317 #[test]
318 fn trigger_enqueues_immediately() {
319 let q = Arc::new(JobQueue::new(100));
320 let sched = Scheduler::new(Arc::clone(&q));
321
322 sched
323 .schedule("manual", "0 0 1 1 *", Arc::new(|_| JobResult::Success))
324 .unwrap();
325
326 assert!(sched.trigger("manual"));
327 assert_eq!(q.pending_count(), 1);
328 }
329
330 #[test]
331 fn trigger_returns_false_for_unknown() {
332 let q = Arc::new(JobQueue::new(100));
333 let sched = Scheduler::new(Arc::clone(&q));
334 assert!(!sched.trigger("nonexistent"));
335 }
336
337 #[test]
338 fn tick_does_not_match_wrong_schedule() {
339 let q = Arc::new(JobQueue::new(100));
340 let sched = Scheduler::new(Arc::clone(&q));
341
342 sched
344 .schedule("yearly", "0 0 1 1 *", Arc::new(|_| JobResult::Success))
345 .unwrap();
346
347 sched.tick_at(1705314600);
349 assert_eq!(q.pending_count(), 0);
350 }
351}