1use crate::error::{AgentError, Result};
7use crate::job::{JobExecutionId, JobExecutor, JobTrigger};
8use chrono::{DateTime, Utc};
9use cron::Schedule;
10use std::collections::HashMap;
11use std::str::FromStr;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::RwLock;
16use tracing::{debug, error, info, warn};
17use zlayer_spec::ServiceSpec;
18
19struct CronJob {
21 name: String,
23 spec: ServiceSpec,
25 schedule: Schedule,
27 last_run: Option<Instant>,
29 next_run: Option<DateTime<Utc>>,
31 enabled: bool,
33}
34
35#[derive(Debug, Clone)]
37pub struct CronJobInfo {
38 pub name: String,
40 pub schedule_expr: String,
42 pub last_run: Option<DateTime<Utc>>,
44 pub next_run: Option<DateTime<Utc>>,
46 pub enabled: bool,
48}
49
50pub struct CronScheduler {
52 jobs: RwLock<HashMap<String, CronJob>>,
54 job_executor: Arc<JobExecutor>,
56 running: AtomicBool,
58 shutdown: Arc<tokio::sync::Notify>,
60}
61
62impl CronScheduler {
63 pub fn new(job_executor: Arc<JobExecutor>) -> Self {
68 Self {
69 jobs: RwLock::new(HashMap::new()),
70 job_executor,
71 running: AtomicBool::new(false),
72 shutdown: Arc::new(tokio::sync::Notify::new()),
73 }
74 }
75
76 pub async fn register(&self, name: &str, spec: &ServiceSpec) -> Result<()> {
85 let schedule_str = spec.schedule.as_ref().ok_or_else(|| {
86 AgentError::InvalidSpec("Cron job missing schedule field".to_string())
87 })?;
88
89 let schedule = Schedule::from_str(schedule_str).map_err(|e| {
90 AgentError::InvalidSpec(format!("Invalid cron schedule '{schedule_str}': {e}"))
91 })?;
92
93 let next_run = schedule.upcoming(Utc).next();
94
95 let job = CronJob {
96 name: name.to_string(),
97 spec: spec.clone(),
98 schedule,
99 last_run: None,
100 next_run,
101 enabled: true,
102 };
103
104 {
105 let mut jobs = self.jobs.write().await;
106 jobs.insert(name.to_string(), job);
107 }
108
109 info!(
110 job = %name,
111 schedule = %schedule_str,
112 next_run = ?next_run,
113 "Registered cron job"
114 );
115
116 Ok(())
117 }
118
119 pub async fn unregister(&self, name: &str) {
124 let mut jobs = self.jobs.write().await;
125 if jobs.remove(name).is_some() {
126 info!(job = %name, "Unregistered cron job");
127 } else {
128 warn!(job = %name, "Attempted to unregister non-existent cron job");
129 }
130 }
131
132 pub async fn set_enabled(&self, name: &str, enabled: bool) {
136 let mut jobs = self.jobs.write().await;
137 if let Some(job) = jobs.get_mut(name) {
138 job.enabled = enabled;
139 if enabled {
140 job.next_run = job.schedule.upcoming(Utc).next();
142 }
143 info!(
144 job = %name,
145 enabled = enabled,
146 next_run = ?job.next_run,
147 "Updated cron job enabled state"
148 );
149 }
150 }
151
152 pub async fn get_job_info(&self, name: &str) -> Option<CronJobInfo> {
154 let jobs = self.jobs.read().await;
155 jobs.get(name).map(|j| CronJobInfo {
156 name: j.name.clone(),
157 schedule_expr: j.spec.schedule.clone().unwrap_or_default(),
158 last_run: j.last_run.map(|_| {
159 Utc::now()
163 }),
164 next_run: j.next_run,
165 enabled: j.enabled,
166 })
167 }
168
169 pub async fn list_jobs(&self) -> Vec<CronJobInfo> {
171 let jobs = self.jobs.read().await;
172 jobs.values()
173 .map(|j| CronJobInfo {
174 name: j.name.clone(),
175 schedule_expr: j.spec.schedule.clone().unwrap_or_default(),
176 last_run: j.last_run.map(|_| Utc::now()), next_run: j.next_run,
178 enabled: j.enabled,
179 })
180 .collect()
181 }
182
183 pub async fn run_loop(&self) {
188 if self
189 .running
190 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
191 .is_err()
192 {
193 warn!("Cron scheduler is already running");
194 return;
195 }
196
197 let check_interval = Duration::from_secs(1);
198 let mut interval = tokio::time::interval(check_interval);
199
200 info!("Cron scheduler started");
201
202 loop {
203 tokio::select! {
204 _ = interval.tick() => {
205 self.check_and_trigger().await;
206 }
207 () = self.shutdown.notified() => {
208 info!("Cron scheduler received shutdown signal");
209 break;
210 }
211 }
212 }
213
214 self.running.store(false, Ordering::SeqCst);
215 info!("Cron scheduler stopped");
216 }
217
218 async fn check_and_trigger(&self) {
220 let now = Utc::now();
221 let mut jobs_to_trigger: Vec<(String, ServiceSpec)> = Vec::new();
222
223 {
225 let jobs = self.jobs.read().await;
226 for (name, job) in jobs.iter() {
227 if !job.enabled {
228 continue;
229 }
230
231 if let Some(next_run) = job.next_run {
232 if next_run <= now {
233 debug!(
234 job = %name,
235 scheduled_time = %next_run,
236 current_time = %now,
237 "Job is due for execution"
238 );
239 jobs_to_trigger.push((name.clone(), job.spec.clone()));
240 }
241 }
242 }
243 }
244
245 for (name, spec) in jobs_to_trigger {
247 match self
248 .job_executor
249 .trigger(&name, &spec, JobTrigger::Scheduler)
250 .await
251 {
252 Ok(exec_id) => {
253 info!(
254 job = %name,
255 execution_id = %exec_id,
256 "Cron job triggered"
257 );
258
259 let mut jobs = self.jobs.write().await;
261 if let Some(job) = jobs.get_mut(&name) {
262 job.last_run = Some(Instant::now());
263 job.next_run = job.schedule.upcoming(Utc).next();
264 debug!(
265 job = %name,
266 next_run = ?job.next_run,
267 "Updated cron job next run time"
268 );
269 }
270 }
271 Err(e) => {
272 error!(
273 job = %name,
274 error = %e,
275 "Failed to trigger cron job"
276 );
277 }
278 }
279 }
280 }
281
282 pub async fn trigger_now(&self, name: &str) -> Result<JobExecutionId> {
293 let jobs = self.jobs.read().await;
294 let job = jobs.get(name).ok_or_else(|| AgentError::NotFound {
295 container: name.to_string(),
296 reason: "cron job not found".to_string(),
297 })?;
298
299 info!(job = %name, "Manually triggering cron job");
300
301 self.job_executor
302 .trigger(name, &job.spec, JobTrigger::Cli)
303 .await
304 }
305
306 pub fn shutdown(&self) {
308 info!("Signaling cron scheduler shutdown");
309 self.shutdown.notify_one();
310 }
311
312 pub fn is_running(&self) -> bool {
314 self.running.load(Ordering::Relaxed)
315 }
316
317 pub async fn job_count(&self) -> usize {
319 let jobs = self.jobs.read().await;
320 jobs.len()
321 }
322
323 pub async fn enabled_job_count(&self) -> usize {
325 let jobs = self.jobs.read().await;
326 jobs.values().filter(|j| j.enabled).count()
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::runtime::{MockRuntime, Runtime};
334 use zlayer_spec::DeploymentSpec;
335
336 fn mock_cron_spec(schedule: &str) -> ServiceSpec {
337 let yaml = format!(
338 r#"
339version: v1
340deployment: test
341services:
342 cleanup:
343 rtype: cron
344 schedule: "{schedule}"
345 image:
346 name: cleanup:latest
347"#
348 );
349
350 serde_yaml::from_str::<DeploymentSpec>(&yaml)
351 .unwrap()
352 .services
353 .remove("cleanup")
354 .unwrap()
355 }
356
357 #[tokio::test]
358 async fn test_cron_scheduler_register() {
359 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
360 let executor = Arc::new(JobExecutor::new(runtime));
361 let scheduler = CronScheduler::new(executor);
362
363 let spec = mock_cron_spec("0 * * * * * *");
365 scheduler.register("cleanup", &spec).await.unwrap();
366
367 assert_eq!(scheduler.job_count().await, 1);
368
369 let info = scheduler.get_job_info("cleanup").await;
370 assert!(info.is_some());
371 let info = info.unwrap();
372 assert_eq!(info.name, "cleanup");
373 assert!(info.enabled);
374 assert!(info.next_run.is_some());
375 }
376
377 #[tokio::test]
378 async fn test_cron_scheduler_invalid_schedule() {
379 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
380 let executor = Arc::new(JobExecutor::new(runtime));
381 let scheduler = CronScheduler::new(executor);
382
383 let mut spec = mock_cron_spec("0 * * * * * *");
385 spec.schedule = Some("not a valid cron".to_string());
386
387 let result = scheduler.register("bad", &spec).await;
388 assert!(result.is_err());
389 }
390
391 #[tokio::test]
392 async fn test_cron_scheduler_missing_schedule() {
393 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
394 let executor = Arc::new(JobExecutor::new(runtime));
395 let scheduler = CronScheduler::new(executor);
396
397 let mut spec = mock_cron_spec("0 * * * * * *");
399 spec.schedule = None;
400
401 let result = scheduler.register("missing", &spec).await;
402 assert!(result.is_err());
403 }
404
405 #[tokio::test]
406 async fn test_cron_scheduler_unregister() {
407 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
408 let executor = Arc::new(JobExecutor::new(runtime));
409 let scheduler = CronScheduler::new(executor);
410
411 let spec = mock_cron_spec("0 * * * * * *");
412 scheduler.register("cleanup", &spec).await.unwrap();
413 assert_eq!(scheduler.job_count().await, 1);
414
415 scheduler.unregister("cleanup").await;
416 assert_eq!(scheduler.job_count().await, 0);
417 }
418
419 #[tokio::test]
420 async fn test_cron_scheduler_enable_disable() {
421 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
422 let executor = Arc::new(JobExecutor::new(runtime));
423 let scheduler = CronScheduler::new(executor);
424
425 let spec = mock_cron_spec("0 * * * * * *");
426 scheduler.register("cleanup", &spec).await.unwrap();
427
428 assert_eq!(scheduler.enabled_job_count().await, 1);
429
430 scheduler.set_enabled("cleanup", false).await;
431 assert_eq!(scheduler.enabled_job_count().await, 0);
432
433 let info = scheduler.get_job_info("cleanup").await.unwrap();
434 assert!(!info.enabled);
435
436 scheduler.set_enabled("cleanup", true).await;
437 assert_eq!(scheduler.enabled_job_count().await, 1);
438 }
439
440 #[tokio::test]
441 async fn test_cron_scheduler_list_jobs() {
442 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
443 let executor = Arc::new(JobExecutor::new(runtime));
444 let scheduler = CronScheduler::new(executor);
445
446 let spec1 = mock_cron_spec("0 * * * * * *");
447 let spec2 = mock_cron_spec("0 0 * * * * *");
448
449 scheduler.register("job1", &spec1).await.unwrap();
450 scheduler.register("job2", &spec2).await.unwrap();
451
452 let jobs = scheduler.list_jobs().await;
453 assert_eq!(jobs.len(), 2);
454
455 let names: Vec<_> = jobs.iter().map(|j| j.name.as_str()).collect();
456 assert!(names.contains(&"job1"));
457 assert!(names.contains(&"job2"));
458 }
459
460 #[tokio::test]
461 async fn test_cron_scheduler_trigger_now() {
462 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
463 let executor = Arc::new(JobExecutor::new(runtime));
464 let scheduler = CronScheduler::new(executor.clone());
465
466 let spec = mock_cron_spec("0 * * * * * *");
467 scheduler.register("cleanup", &spec).await.unwrap();
468
469 let exec_id = scheduler.trigger_now("cleanup").await.unwrap();
471 assert!(!exec_id.0.is_empty());
472
473 tokio::time::sleep(Duration::from_millis(50)).await;
475 let execution = executor.get_execution(&exec_id).await;
476 assert!(execution.is_some());
477 }
478
479 #[tokio::test]
480 async fn test_cron_scheduler_trigger_now_not_found() {
481 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
482 let executor = Arc::new(JobExecutor::new(runtime));
483 let scheduler = CronScheduler::new(executor);
484
485 let result = scheduler.trigger_now("nonexistent").await;
486 assert!(result.is_err());
487 }
488
489 #[tokio::test]
490 async fn test_cron_job_info() {
491 let info = CronJobInfo {
492 name: "test".to_string(),
493 schedule_expr: "0 * * * * * *".to_string(),
494 last_run: Some(Utc::now()),
495 next_run: Some(Utc::now()),
496 enabled: true,
497 };
498
499 assert_eq!(info.name, "test");
500 assert!(info.enabled);
501 }
502
503 #[tokio::test]
504 async fn test_cron_scheduler_shutdown() {
505 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
506 let executor = Arc::new(JobExecutor::new(runtime));
507 let scheduler = Arc::new(CronScheduler::new(executor));
508
509 assert!(!scheduler.is_running());
510
511 let scheduler_clone = scheduler.clone();
513 let handle = tokio::spawn(async move {
514 scheduler_clone.run_loop().await;
515 });
516
517 tokio::time::sleep(Duration::from_millis(50)).await;
519 assert!(scheduler.is_running());
520
521 scheduler.shutdown();
523
524 tokio::time::timeout(Duration::from_secs(2), handle)
526 .await
527 .expect("Scheduler should stop within timeout")
528 .expect("Scheduler task should complete without error");
529
530 assert!(!scheduler.is_running());
531 }
532}