Skip to main content

zlayer_agent/
cron_scheduler.rs

1//! Cron scheduler - triggers jobs on time-based schedules
2//!
3//! This module provides the `CronScheduler` which manages scheduled job executions.
4//! Jobs are triggered based on cron expressions (e.g., "0 0 * * * * *" for hourly).
5
6use crate::error::{AgentError, Result};
7use crate::job::{JobExecutionId, JobExecutor, JobTrigger};
8use chrono::{DateTime, Utc};
9use cron::Schedule;
10use std::collections::HashMap;
11use std::str::FromStr;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::RwLock;
16use tracing::{debug, error, info, warn};
17use zlayer_spec::ServiceSpec;
18
19/// A registered cron job
20struct CronJob {
21    /// Job name
22    name: String,
23    /// Service specification for the job
24    spec: ServiceSpec,
25    /// Parsed cron schedule
26    schedule: Schedule,
27    /// Last time this job was run
28    last_run: Option<Instant>,
29    /// Next scheduled run time
30    next_run: Option<DateTime<Utc>>,
31    /// Whether this job is enabled
32    enabled: bool,
33}
34
35/// Public info about a cron job (for external visibility)
36#[derive(Debug, Clone)]
37pub struct CronJobInfo {
38    /// Job name
39    pub name: String,
40    /// Cron schedule expression
41    pub schedule_expr: String,
42    /// Last run time (as UTC datetime)
43    pub last_run: Option<DateTime<Utc>>,
44    /// Next scheduled run time
45    pub next_run: Option<DateTime<Utc>>,
46    /// Whether this job is enabled
47    pub enabled: bool,
48}
49
50/// Cron scheduler manages time-based job triggers
51pub struct CronScheduler {
52    /// Registered cron jobs
53    jobs: RwLock<HashMap<String, CronJob>>,
54    /// Job executor for running jobs
55    job_executor: Arc<JobExecutor>,
56    /// Running state flag
57    running: AtomicBool,
58    /// Shutdown signal
59    shutdown: Arc<tokio::sync::Notify>,
60}
61
62impl CronScheduler {
63    /// Create a new cron scheduler
64    ///
65    /// # Arguments
66    /// * `job_executor` - The job executor to use for running triggered jobs
67    pub fn new(job_executor: Arc<JobExecutor>) -> Self {
68        Self {
69            jobs: RwLock::new(HashMap::new()),
70            job_executor,
71            running: AtomicBool::new(false),
72            shutdown: Arc::new(tokio::sync::Notify::new()),
73        }
74    }
75
76    /// Register a cron job
77    ///
78    /// # Arguments
79    /// * `name` - Unique name for this cron job
80    /// * `spec` - Service specification (must have rtype: cron and schedule field)
81    ///
82    /// # Errors
83    /// Returns error if spec has no schedule or if schedule is invalid
84    pub async fn register(&self, name: &str, spec: &ServiceSpec) -> Result<()> {
85        let schedule_str = spec.schedule.as_ref().ok_or_else(|| {
86            AgentError::InvalidSpec("Cron job missing schedule field".to_string())
87        })?;
88
89        let schedule = Schedule::from_str(schedule_str).map_err(|e| {
90            AgentError::InvalidSpec(format!("Invalid cron schedule '{schedule_str}': {e}"))
91        })?;
92
93        let next_run = schedule.upcoming(Utc).next();
94
95        let job = CronJob {
96            name: name.to_string(),
97            spec: spec.clone(),
98            schedule,
99            last_run: None,
100            next_run,
101            enabled: true,
102        };
103
104        {
105            let mut jobs = self.jobs.write().await;
106            jobs.insert(name.to_string(), job);
107        }
108
109        info!(
110            job = %name,
111            schedule = %schedule_str,
112            next_run = ?next_run,
113            "Registered cron job"
114        );
115
116        Ok(())
117    }
118
119    /// Unregister a cron job
120    ///
121    /// # Arguments
122    /// * `name` - Name of the cron job to unregister
123    pub async fn unregister(&self, name: &str) {
124        let mut jobs = self.jobs.write().await;
125        if jobs.remove(name).is_some() {
126            info!(job = %name, "Unregistered cron job");
127        } else {
128            warn!(job = %name, "Attempted to unregister non-existent cron job");
129        }
130    }
131
132    /// Enable or disable a cron job
133    ///
134    /// When enabled, recalculates the next run time.
135    pub async fn set_enabled(&self, name: &str, enabled: bool) {
136        let mut jobs = self.jobs.write().await;
137        if let Some(job) = jobs.get_mut(name) {
138            job.enabled = enabled;
139            if enabled {
140                // Recalculate next run when re-enabled
141                job.next_run = job.schedule.upcoming(Utc).next();
142            }
143            info!(
144                job = %name,
145                enabled = enabled,
146                next_run = ?job.next_run,
147                "Updated cron job enabled state"
148            );
149        }
150    }
151
152    /// Get info about a specific cron job
153    pub async fn get_job_info(&self, name: &str) -> Option<CronJobInfo> {
154        let jobs = self.jobs.read().await;
155        jobs.get(name).map(|j| CronJobInfo {
156            name: j.name.clone(),
157            schedule_expr: j.spec.schedule.clone().unwrap_or_default(),
158            last_run: j.last_run.map(|_| {
159                // Convert Instant to approximate DateTime
160                // Note: Instant doesn't have a direct conversion, so we approximate
161                // based on current time minus elapsed duration
162                Utc::now()
163            }),
164            next_run: j.next_run,
165            enabled: j.enabled,
166        })
167    }
168
169    /// List all registered cron jobs
170    pub async fn list_jobs(&self) -> Vec<CronJobInfo> {
171        let jobs = self.jobs.read().await;
172        jobs.values()
173            .map(|j| CronJobInfo {
174                name: j.name.clone(),
175                schedule_expr: j.spec.schedule.clone().unwrap_or_default(),
176                last_run: j.last_run.map(|_| Utc::now()), // Approximate
177                next_run: j.next_run,
178                enabled: j.enabled,
179            })
180            .collect()
181    }
182
183    /// Run the scheduler loop
184    ///
185    /// This method runs forever, checking every second for jobs that need to be triggered.
186    /// Use `shutdown()` to stop the loop gracefully.
187    pub async fn run_loop(&self) {
188        if self
189            .running
190            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
191            .is_err()
192        {
193            warn!("Cron scheduler is already running");
194            return;
195        }
196
197        let check_interval = Duration::from_secs(1);
198        let mut interval = tokio::time::interval(check_interval);
199
200        info!("Cron scheduler started");
201
202        loop {
203            tokio::select! {
204                _ = interval.tick() => {
205                    self.check_and_trigger().await;
206                }
207                () = self.shutdown.notified() => {
208                    info!("Cron scheduler received shutdown signal");
209                    break;
210                }
211            }
212        }
213
214        self.running.store(false, Ordering::SeqCst);
215        info!("Cron scheduler stopped");
216    }
217
218    /// Check all jobs and trigger those that are due
219    async fn check_and_trigger(&self) {
220        let now = Utc::now();
221        let mut jobs_to_trigger: Vec<(String, ServiceSpec)> = Vec::new();
222
223        // First pass: find jobs that need to be triggered
224        {
225            let jobs = self.jobs.read().await;
226            for (name, job) in jobs.iter() {
227                if !job.enabled {
228                    continue;
229                }
230
231                if let Some(next_run) = job.next_run {
232                    if next_run <= now {
233                        debug!(
234                            job = %name,
235                            scheduled_time = %next_run,
236                            current_time = %now,
237                            "Job is due for execution"
238                        );
239                        jobs_to_trigger.push((name.clone(), job.spec.clone()));
240                    }
241                }
242            }
243        }
244
245        // Second pass: trigger jobs and update their state
246        for (name, spec) in jobs_to_trigger {
247            match self
248                .job_executor
249                .trigger(&name, &spec, JobTrigger::Scheduler)
250                .await
251            {
252                Ok(exec_id) => {
253                    info!(
254                        job = %name,
255                        execution_id = %exec_id,
256                        "Cron job triggered"
257                    );
258
259                    // Update job state
260                    let mut jobs = self.jobs.write().await;
261                    if let Some(job) = jobs.get_mut(&name) {
262                        job.last_run = Some(Instant::now());
263                        job.next_run = job.schedule.upcoming(Utc).next();
264                        debug!(
265                            job = %name,
266                            next_run = ?job.next_run,
267                            "Updated cron job next run time"
268                        );
269                    }
270                }
271                Err(e) => {
272                    error!(
273                        job = %name,
274                        error = %e,
275                        "Failed to trigger cron job"
276                    );
277                }
278            }
279        }
280    }
281
282    /// Manually trigger a cron job (outside of its schedule)
283    ///
284    /// # Arguments
285    /// * `name` - Name of the cron job to trigger
286    ///
287    /// # Returns
288    /// The execution ID of the triggered job
289    ///
290    /// # Errors
291    /// Returns error if the job is not found
292    pub async fn trigger_now(&self, name: &str) -> Result<JobExecutionId> {
293        let jobs = self.jobs.read().await;
294        let job = jobs.get(name).ok_or_else(|| AgentError::NotFound {
295            container: name.to_string(),
296            reason: "cron job not found".to_string(),
297        })?;
298
299        info!(job = %name, "Manually triggering cron job");
300
301        self.job_executor
302            .trigger(name, &job.spec, JobTrigger::Cli)
303            .await
304    }
305
306    /// Signal the scheduler to shut down
307    pub fn shutdown(&self) {
308        info!("Signaling cron scheduler shutdown");
309        self.shutdown.notify_one();
310    }
311
312    /// Check if the scheduler is currently running
313    pub fn is_running(&self) -> bool {
314        self.running.load(Ordering::Relaxed)
315    }
316
317    /// Get the number of registered jobs
318    pub async fn job_count(&self) -> usize {
319        let jobs = self.jobs.read().await;
320        jobs.len()
321    }
322
323    /// Get the number of enabled jobs
324    pub async fn enabled_job_count(&self) -> usize {
325        let jobs = self.jobs.read().await;
326        jobs.values().filter(|j| j.enabled).count()
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::runtime::{MockRuntime, Runtime};
334    use zlayer_spec::DeploymentSpec;
335
336    fn mock_cron_spec(schedule: &str) -> ServiceSpec {
337        let yaml = format!(
338            r#"
339version: v1
340deployment: test
341services:
342  cleanup:
343    rtype: cron
344    schedule: "{schedule}"
345    image:
346      name: cleanup:latest
347"#
348        );
349
350        serde_yaml::from_str::<DeploymentSpec>(&yaml)
351            .unwrap()
352            .services
353            .remove("cleanup")
354            .unwrap()
355    }
356
357    #[tokio::test]
358    async fn test_cron_scheduler_register() {
359        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
360        let executor = Arc::new(JobExecutor::new(runtime));
361        let scheduler = CronScheduler::new(executor);
362
363        // Valid cron expression (every minute)
364        let spec = mock_cron_spec("0 * * * * * *");
365        scheduler.register("cleanup", &spec).await.unwrap();
366
367        assert_eq!(scheduler.job_count().await, 1);
368
369        let info = scheduler.get_job_info("cleanup").await;
370        assert!(info.is_some());
371        let info = info.unwrap();
372        assert_eq!(info.name, "cleanup");
373        assert!(info.enabled);
374        assert!(info.next_run.is_some());
375    }
376
377    #[tokio::test]
378    async fn test_cron_scheduler_invalid_schedule() {
379        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
380        let executor = Arc::new(JobExecutor::new(runtime));
381        let scheduler = CronScheduler::new(executor);
382
383        // Create a spec manually with invalid schedule
384        let mut spec = mock_cron_spec("0 * * * * * *");
385        spec.schedule = Some("not a valid cron".to_string());
386
387        let result = scheduler.register("bad", &spec).await;
388        assert!(result.is_err());
389    }
390
391    #[tokio::test]
392    async fn test_cron_scheduler_missing_schedule() {
393        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
394        let executor = Arc::new(JobExecutor::new(runtime));
395        let scheduler = CronScheduler::new(executor);
396
397        // Create a spec without schedule
398        let mut spec = mock_cron_spec("0 * * * * * *");
399        spec.schedule = None;
400
401        let result = scheduler.register("missing", &spec).await;
402        assert!(result.is_err());
403    }
404
405    #[tokio::test]
406    async fn test_cron_scheduler_unregister() {
407        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
408        let executor = Arc::new(JobExecutor::new(runtime));
409        let scheduler = CronScheduler::new(executor);
410
411        let spec = mock_cron_spec("0 * * * * * *");
412        scheduler.register("cleanup", &spec).await.unwrap();
413        assert_eq!(scheduler.job_count().await, 1);
414
415        scheduler.unregister("cleanup").await;
416        assert_eq!(scheduler.job_count().await, 0);
417    }
418
419    #[tokio::test]
420    async fn test_cron_scheduler_enable_disable() {
421        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
422        let executor = Arc::new(JobExecutor::new(runtime));
423        let scheduler = CronScheduler::new(executor);
424
425        let spec = mock_cron_spec("0 * * * * * *");
426        scheduler.register("cleanup", &spec).await.unwrap();
427
428        assert_eq!(scheduler.enabled_job_count().await, 1);
429
430        scheduler.set_enabled("cleanup", false).await;
431        assert_eq!(scheduler.enabled_job_count().await, 0);
432
433        let info = scheduler.get_job_info("cleanup").await.unwrap();
434        assert!(!info.enabled);
435
436        scheduler.set_enabled("cleanup", true).await;
437        assert_eq!(scheduler.enabled_job_count().await, 1);
438    }
439
440    #[tokio::test]
441    async fn test_cron_scheduler_list_jobs() {
442        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
443        let executor = Arc::new(JobExecutor::new(runtime));
444        let scheduler = CronScheduler::new(executor);
445
446        let spec1 = mock_cron_spec("0 * * * * * *");
447        let spec2 = mock_cron_spec("0 0 * * * * *");
448
449        scheduler.register("job1", &spec1).await.unwrap();
450        scheduler.register("job2", &spec2).await.unwrap();
451
452        let jobs = scheduler.list_jobs().await;
453        assert_eq!(jobs.len(), 2);
454
455        let names: Vec<_> = jobs.iter().map(|j| j.name.as_str()).collect();
456        assert!(names.contains(&"job1"));
457        assert!(names.contains(&"job2"));
458    }
459
460    #[tokio::test]
461    async fn test_cron_scheduler_trigger_now() {
462        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
463        let executor = Arc::new(JobExecutor::new(runtime));
464        let scheduler = CronScheduler::new(executor.clone());
465
466        let spec = mock_cron_spec("0 * * * * * *");
467        scheduler.register("cleanup", &spec).await.unwrap();
468
469        // Manually trigger
470        let exec_id = scheduler.trigger_now("cleanup").await.unwrap();
471        assert!(!exec_id.0.is_empty());
472
473        // Verify execution was created
474        tokio::time::sleep(Duration::from_millis(50)).await;
475        let execution = executor.get_execution(&exec_id).await;
476        assert!(execution.is_some());
477    }
478
479    #[tokio::test]
480    async fn test_cron_scheduler_trigger_now_not_found() {
481        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
482        let executor = Arc::new(JobExecutor::new(runtime));
483        let scheduler = CronScheduler::new(executor);
484
485        let result = scheduler.trigger_now("nonexistent").await;
486        assert!(result.is_err());
487    }
488
489    #[tokio::test]
490    async fn test_cron_job_info() {
491        let info = CronJobInfo {
492            name: "test".to_string(),
493            schedule_expr: "0 * * * * * *".to_string(),
494            last_run: Some(Utc::now()),
495            next_run: Some(Utc::now()),
496            enabled: true,
497        };
498
499        assert_eq!(info.name, "test");
500        assert!(info.enabled);
501    }
502
503    #[tokio::test]
504    async fn test_cron_scheduler_shutdown() {
505        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
506        let executor = Arc::new(JobExecutor::new(runtime));
507        let scheduler = Arc::new(CronScheduler::new(executor));
508
509        assert!(!scheduler.is_running());
510
511        // Start scheduler in background
512        let scheduler_clone = scheduler.clone();
513        let handle = tokio::spawn(async move {
514            scheduler_clone.run_loop().await;
515        });
516
517        // Give it time to start
518        tokio::time::sleep(Duration::from_millis(50)).await;
519        assert!(scheduler.is_running());
520
521        // Shutdown
522        scheduler.shutdown();
523
524        // Wait for it to stop
525        tokio::time::timeout(Duration::from_secs(2), handle)
526            .await
527            .expect("Scheduler should stop within timeout")
528            .expect("Scheduler task should complete without error");
529
530        assert!(!scheduler.is_running());
531    }
532}