Skip to main content

oxios_kernel/
cron.rs

1//! Cron scheduler for time-based autonomous agent execution.
2//!
3//! Allows scheduling agents to run on cron-like schedules without user intervention.
4//! Supports 5-field (Linux cron) and 6-7 field expressions via the `cron` crate.
5
6use crate::config::CronConfig;
7use crate::git_layer::GitLayer;
8use crate::scheduler::Priority;
9use crate::state_store::StateStore;
10use anyhow::{bail, Result};
11use chrono::{DateTime, Utc};
12use cron::Schedule;
13use parking_lot::{Mutex, RwLock};
14use serde::{Deserialize, Serialize};
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19use uuid::Uuid;
20
21// ── Data types ─────────────────────────────────────────────
22
23/// Source of a cron job.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
25#[serde(rename_all = "lowercase")]
26pub enum JobSource {
27    /// Defined in config.toml.
28    Config,
29    /// Created via API.
30    #[default]
31    Api,
32}
33
34/// A cron job definition.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct CronJob {
37    /// Unique job identifier.
38    pub id: Uuid,
39    /// Human-readable job name.
40    pub name: String,
41    /// Cron expression (e.g. "0 */6 * * *").
42    pub schedule: String,
43    /// Goal description for the agent.
44    pub goal: String,
45    /// Constraints on agent behavior.
46    #[serde(default)]
47    pub constraints: Vec<String>,
48    /// Criteria that must be met for success.
49    #[serde(default)]
50    pub acceptance_criteria: Vec<String>,
51    /// Toolchain preset name.
52    #[serde(default = "default_toolchain")]
53    pub toolchain: String,
54    /// Job priority.
55    #[serde(default)]
56    pub priority: Priority,
57    /// Whether the job is active.
58    #[serde(default = "default_true")]
59    pub enabled: bool,
60    /// Timestamp of the last execution.
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub last_run: Option<DateTime<Utc>>,
63    /// Timestamp of the next scheduled execution.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub next_run: Option<DateTime<Utc>>,
66    /// Number of times this job has run.
67    #[serde(default)]
68    pub run_count: u64,
69    /// Summary of the last execution result.
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub last_result: Option<String>,
72    /// Whether the last execution succeeded.
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub last_success: Option<bool>,
75    /// How the job was created.
76    #[serde(default)]
77    pub source: JobSource,
78}
79
80fn default_toolchain() -> String {
81    "default".into()
82}
83
84fn default_true() -> bool {
85    true
86}
87
88impl CronJob {
89    /// Create a new cron job with a parsed schedule.
90    pub fn new(name: String, schedule: String, goal: String) -> Self {
91        Self {
92            id: Uuid::new_v4(),
93            name,
94            schedule,
95            goal,
96            constraints: vec![],
97            acceptance_criteria: vec![],
98            toolchain: default_toolchain(),
99            priority: Priority::default(),
100            enabled: true,
101            last_run: None,
102            next_run: None,
103            run_count: 0,
104            last_result: None,
105            last_success: None,
106            source: JobSource::Api,
107        }
108    }
109}
110
111/// Result of a single cron job execution.
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct CronJobResult {
114    /// ID of the executed job.
115    pub job_id: Uuid,
116    /// Name of the executed job.
117    pub job_name: String,
118    /// When execution started.
119    pub started_at: DateTime<Utc>,
120    /// When execution finished.
121    pub finished_at: DateTime<Utc>,
122    /// Whether the execution succeeded.
123    pub success: bool,
124    /// Human-readable result summary.
125    pub summary: String,
126}
127
128/// Update fields for an existing cron job.
129#[derive(Debug, Default, Deserialize)]
130pub struct CronJobUpdate {
131    /// New name.
132    pub name: Option<String>,
133    /// New cron expression.
134    pub schedule: Option<String>,
135    /// New goal description.
136    pub goal: Option<String>,
137    /// New constraints.
138    pub constraints: Option<Vec<String>>,
139    /// New acceptance criteria.
140    pub acceptance_criteria: Option<Vec<String>>,
141    /// New toolchain preset.
142    pub toolchain: Option<String>,
143    /// New priority.
144    pub priority: Option<Priority>,
145    /// Enable or disable.
146    pub enabled: Option<bool>,
147}
148
149// ── CronScheduler ───────────────────────────────────────────
150
151/// The cron scheduler for time-based autonomous agent execution.
152///
153/// Allows scheduling agents to run on cron-like schedules without user intervention.
154/// Supports 5-field (Linux cron) and 6-7 field expressions via the `cron` crate.
155pub struct CronScheduler {
156    jobs: Arc<RwLock<HashMap<Uuid, CronJob>>>,
157    schedules: Arc<Mutex<HashMap<Uuid, Schedule>>>,
158    running_jobs: Arc<Mutex<HashSet<Uuid>>>,
159    state_store: Arc<StateStore>,
160    cancel: Arc<AtomicBool>,
161    dirty: Arc<AtomicBool>,
162    tick_interval_secs: u64,
163    /// Optional git layer for version-controlled saves.
164    git_layer: Option<Arc<GitLayer>>,
165    /// Maximum concurrent cron job executions (default: 3).
166    max_concurrent_jobs: usize,
167    /// Timeout for individual cron job execution in seconds (default: 600 = 10 minutes).
168    job_timeout_secs: u64,
169}
170
171impl CronScheduler {
172    /// Create a new CronScheduler.
173    ///
174    /// # Arguments
175    /// * `state_store` - State store for persisting job definitions
176    /// * `tick_interval_secs` - How often to check for due jobs (in seconds)
177    pub fn new(state_store: Arc<StateStore>, tick_interval_secs: u64) -> Self {
178        Self {
179            jobs: Arc::new(RwLock::new(HashMap::new())),
180            schedules: Arc::new(Mutex::new(HashMap::new())),
181            running_jobs: Arc::new(Mutex::new(HashSet::new())),
182            state_store,
183            cancel: Arc::new(AtomicBool::new(false)),
184            dirty: Arc::new(AtomicBool::new(false)),
185            tick_interval_secs,
186            git_layer: None,
187            max_concurrent_jobs: 3,
188            job_timeout_secs: 600,
189        }
190    }
191
192    /// Set the maximum concurrent cron job executions.
193    pub fn set_max_concurrent_jobs(&mut self, max: usize) {
194        self.max_concurrent_jobs = max;
195    }
196
197    /// Set the timeout for individual cron job execution in seconds.
198    pub fn set_job_timeout_secs(&mut self, secs: u64) {
199        self.job_timeout_secs = secs;
200    }
201
202    /// Set the git layer for version-controlled saves.
203    pub fn set_git_layer(&mut self, gl: Arc<GitLayer>) {
204        self.git_layer = Some(gl);
205    }
206
207    /// Normalize a cron expression: prepend seconds field if 5-field (Linux style).
208    fn normalize_expr(expr: &str) -> String {
209        let fields: Vec<&str> = expr.split_whitespace().collect();
210        match fields.len() {
211            5 => format!("0 {expr}"),
212            _ => expr.to_string(),
213        }
214    }
215
216    /// Parse a cron expression into a `Schedule`.
217    fn parse_schedule(&self, expr: &str) -> Result<Schedule> {
218        let normalized = Self::normalize_expr(expr);
219        Schedule::from_str(&normalized)
220            .map_err(|e| anyhow::anyhow!("Invalid cron expression '{expr}': {e}"))
221    }
222
223    /// Compute the next fire time after `after`.
224    fn next_fire_time(&self, schedule: &Schedule, after: &DateTime<Utc>) -> Option<DateTime<Utc>> {
225        schedule.after(after).next()
226    }
227
228    /// Add a job. Parses schedule, computes next_run, stores.
229    pub async fn add_job(&self, job: CronJob) -> Result<Uuid> {
230        let schedule = self.parse_schedule(&job.schedule)?;
231        let next = self.next_fire_time(&schedule, &Utc::now());
232        let id = job.id;
233
234        self.schedules.lock().insert(id, schedule);
235        self.jobs.write().insert(
236            id,
237            CronJob {
238                next_run: next,
239                ..job
240            },
241        );
242        self.dirty.store(true, Ordering::Relaxed);
243        self.persist_jobs().await;
244
245        tracing::info!(
246            name = %self.jobs.read().get(&id).map(|j| j.name.as_str()).unwrap_or("?"),
247            %id,
248            "Cron job added"
249        );
250        Ok(id)
251    }
252
253    /// Remove a job by ID.
254    pub async fn remove_job(&self, id: Uuid) -> Result<()> {
255        self.schedules.lock().remove(&id);
256        self.jobs
257            .write()
258            .remove(&id)
259            .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
260        self.dirty.store(true, Ordering::Relaxed);
261        self.persist_jobs().await;
262        tracing::info!(%id, "Cron job removed");
263        Ok(())
264    }
265
266    /// Update a job's fields (enabled, schedule, goal, etc).
267    pub async fn update_job(&self, id: Uuid, update: CronJobUpdate) -> Result<()> {
268        // Separate the sync mutation from the async persist to avoid
269        // holding a !Send RwLockWriteGuard across an await point.
270        let should_persist = {
271            let mut jobs = self.jobs.write();
272            let job = jobs
273                .get_mut(&id)
274                .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
275
276            if let Some(name) = update.name {
277                job.name = name;
278            }
279            if let Some(schedule) = &update.schedule {
280                let parsed = self.parse_schedule(schedule)?;
281                self.schedules.lock().insert(id, parsed);
282                job.schedule = schedule.clone();
283                // Recompute next_run
284                let sched = self.schedules.lock().get(&id).cloned();
285                if let Some(s) = sched {
286                    job.next_run = self.next_fire_time(&s, &Utc::now());
287                }
288            }
289            if let Some(goal) = update.goal {
290                job.goal = goal;
291            }
292            if let Some(constraints) = update.constraints {
293                job.constraints = constraints;
294            }
295            if let Some(criteria) = update.acceptance_criteria {
296                job.acceptance_criteria = criteria;
297            }
298            if let Some(toolchain) = update.toolchain {
299                job.toolchain = toolchain;
300            }
301            if let Some(priority) = update.priority {
302                job.priority = priority;
303            }
304            if let Some(enabled) = update.enabled {
305                job.enabled = enabled;
306            }
307
308            self.dirty.store(true, Ordering::Relaxed);
309            true
310        }; // RwLockWriteGuard dropped here, before any .await
311
312        if should_persist {
313            self.persist_jobs().await;
314        }
315        Ok(())
316    }
317
318    /// Toggle a job enabled/disabled.
319    pub async fn toggle_job(&self, id: Uuid, enabled: bool) -> Result<()> {
320        self.update_job(
321            id,
322            CronJobUpdate {
323                enabled: Some(enabled),
324                ..Default::default()
325            },
326        )
327        .await
328    }
329
330    /// List all jobs.
331    pub fn list_jobs(&self) -> Vec<CronJob> {
332        self.jobs.read().values().cloned().collect()
333    }
334
335    /// Get a single job.
336    pub fn get_job(&self, id: Uuid) -> Option<CronJob> {
337        self.jobs.read().get(&id).cloned()
338    }
339
340    /// Check if a job is currently running.
341    pub fn is_running(&self, id: Uuid) -> bool {
342        self.running_jobs.lock().contains(&id)
343    }
344
345    /// Trigger a job immediately (manual execution, ignores schedule).
346    /// Returns the job goal as a string for the caller to execute.
347    /// The caller is responsible for calling `mark_job_completed` after execution.
348    pub fn trigger_job(&self, id: Uuid) -> Result<CronJob> {
349        let job = self
350            .jobs
351            .read()
352            .get(&id)
353            .cloned()
354            .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
355
356        if self.running_jobs.lock().contains(&id) {
357            bail!("Job '{}' is already running", job.name);
358        }
359
360        self.running_jobs.lock().insert(id);
361        Ok(job)
362    }
363
364    /// Mark a job execution as completed.
365    pub async fn mark_job_completed(&self, id: Uuid, success: bool, summary: String) {
366        self.running_jobs.lock().remove(&id);
367        let new_next_run = {
368            let mut jobs = self.jobs.write();
369            if let Some(job) = jobs.get_mut(&id) {
370                job.last_run = Some(Utc::now());
371                job.last_result = Some(summary);
372                job.last_success = Some(success);
373                job.run_count += 1;
374                // Recompute next_run
375                let sched = self.schedules.lock().get(&id).cloned();
376                sched.and_then(|s| self.next_fire_time(&s, &Utc::now()))
377            } else {
378                None
379            }
380        };
381        if let Some(next_run) = new_next_run {
382            let mut jobs = self.jobs.write();
383            if let Some(job) = jobs.get_mut(&id) {
384                job.next_run = Some(next_run);
385            }
386        }
387        self.dirty.store(true, Ordering::Relaxed);
388        self.persist_jobs().await;
389    }
390
391    /// Stop the scheduler loop.
392    pub fn stop(&self) {
393        self.cancel.store(true, Ordering::Relaxed);
394        tracing::info!("Cron scheduler stop requested");
395    }
396
397    /// Start the main loop. Must be called on an `Arc<Self>`.
398    ///
399    /// # Arguments
400    /// * `executor` - Async closure `(Uuid, String) -> Fut` where args are `(job_id, goal)`,
401    ///   returning `(success, summary)`.
402    ///
403    /// # Example
404    ///
405    /// ```no_run
406    /// use std::sync::Arc;
407    /// use oxios_kernel::state_store::StateStore;
408    /// use oxios_kernel::cron::CronScheduler;
409    ///
410    /// # async fn example() {
411    /// let state_store = Arc::new(StateStore::new("/tmp/state".into()).unwrap());
412    /// let scheduler = Arc::new(CronScheduler::new(state_store, 60));
413    /// scheduler.clone().start(|id, goal| async move {
414    ///     // execute the agent...
415    ///     (true, "Done".to_string())
416    /// }).await;
417    /// # }
418    /// ```
419    pub async fn start<F, Fut>(self: Arc<Self>, executor: F)
420    where
421        F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
422        Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
423    {
424        let executor = Arc::new(executor);
425        let mut interval =
426            tokio::time::interval(std::time::Duration::from_secs(self.tick_interval_secs));
427
428        tracing::info!(
429            interval_secs = self.tick_interval_secs,
430            "Cron scheduler started"
431        );
432
433        loop {
434            tokio::select! {
435                _ = interval.tick() => {
436                    if self.cancel.load(Ordering::Relaxed) {
437                        tracing::info!("Cron scheduler stopped");
438                        return;
439                    }
440                    self.tick_inner(&executor).await;
441                }
442            }
443        }
444    }
445
446    /// Single tick: find due jobs and spawn execution.
447    ///
448    /// Enforces `max_concurrent_jobs` limit and `job_timeout_secs` per job.
449    async fn tick_inner<F, Fut>(&self, executor: &Arc<F>)
450    where
451        F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
452        Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
453    {
454        let now = Utc::now();
455
456        // Check current concurrency before spawning more
457        let current_running = self.running_jobs.lock().len();
458        if current_running >= self.max_concurrent_jobs {
459            tracing::debug!(
460                running = current_running,
461                max = self.max_concurrent_jobs,
462                "Cron tick: max concurrent jobs reached, skipping"
463            );
464            return;
465        }
466
467        let due: Vec<(Uuid, String)> = {
468            let jobs = self.jobs.read();
469            jobs.iter()
470                .filter(|(_, job)| {
471                    job.enabled
472                        && job.next_run.is_some_and(|nr| nr <= now)
473                        && !self.running_jobs.lock().contains(&job.id)
474                })
475                .map(|(_, job)| (job.id, job.goal.clone()))
476                .collect()
477        };
478
479        let total_due = due.len();
480        for (spawned, (id, goal)) in due.into_iter().enumerate() {
481            // Check concurrency limit before each spawn
482            if self.running_jobs.lock().len() >= self.max_concurrent_jobs {
483                tracing::info!(
484                    spawned,
485                    remaining = total_due - spawned,
486                    "Cron tick: max concurrent jobs reached, deferring remaining"
487                );
488                break;
489            }
490
491            self.running_jobs.lock().insert(id);
492            let exec = executor.clone();
493            let me = self.clone();
494            let timeout_secs = self.job_timeout_secs;
495            tokio::spawn(async move {
496                tracing::info!(%id, "Cron job triggered");
497                let result = tokio::time::timeout(
498                    std::time::Duration::from_secs(timeout_secs),
499                    exec(id, goal),
500                )
501                .await;
502
503                let (success, summary) = match result {
504                    Ok((s, m)) => (s, m),
505                    Err(_) => {
506                        tracing::error!(%id, timeout_secs, "Cron job timed out");
507                        (false, format!("Timed out after {timeout_secs} seconds"))
508                    }
509                };
510                tracing::info!(%id, success, "Cron job completed");
511                me.mark_job_completed(id, success, summary).await;
512            });
513        }
514    }
515
516    /// Persist all jobs to disk.
517    async fn persist_jobs(&self) {
518        let job_list: Vec<CronJob> = {
519            let jobs = self.jobs.read();
520            jobs.values().cloned().collect()
521        };
522        if let Err(e) = self.state_store.save_json("cron", "jobs", &job_list).await {
523            tracing::error!("Failed to persist cron jobs: {}", e);
524        }
525        // Fire-and-forget git commit if git layer is configured
526        if let Some(ref gl) = self.git_layer {
527            if gl.is_enabled() {
528                let _ = gl.commit_file("cron/jobs.json", "cron: update jobs");
529            }
530        }
531    }
532
533    /// Restore jobs from disk on startup.
534    pub async fn restore_jobs(&self) {
535        match self
536            .state_store
537            .load_json::<Vec<CronJob>>("cron", "jobs")
538            .await
539        {
540            Ok(Some(job_list)) => {
541                for mut job in job_list {
542                    // Re-parse schedule and recompute next_run
543                    match self.parse_schedule(&job.schedule) {
544                        Ok(schedule) => {
545                            job.next_run = self.next_fire_time(&schedule, &Utc::now());
546                            self.schedules.lock().insert(job.id, schedule);
547                            self.jobs.write().insert(job.id, job);
548                        }
549                        Err(e) => {
550                            tracing::error!(job = %job.name, error = %e, "Skipping job with invalid schedule");
551                        }
552                    }
553                }
554                tracing::info!(count = self.jobs.read().len(), "Cron jobs restored");
555            }
556            Ok(None) => {
557                tracing::info!("No saved cron jobs found");
558            }
559            Err(e) => {
560                tracing::error!("Failed to restore cron jobs: {}", e);
561            }
562        }
563    }
564
565    /// Load jobs defined in config (called during startup).
566    /// Config-defined jobs are only added if they don't already exist (API wins on conflict).
567    pub async fn load_from_config(&self, config: &CronConfig) {
568        if !config.enabled {
569            tracing::info!("Cron scheduler is disabled in config");
570            return;
571        }
572
573        for (name, inline) in &config.jobs {
574            let schedule = inline.schedule.clone();
575            let goal = inline.goal.clone();
576
577            let job = CronJob {
578                id: Uuid::new_v4(),
579                name: name.clone(),
580                schedule: schedule.clone(),
581                goal,
582                constraints: inline.constraints.clone(),
583                acceptance_criteria: inline.acceptance_criteria.clone(),
584                toolchain: inline.toolchain.clone(),
585                priority: inline.priority,
586                enabled: inline.enabled,
587                last_run: None,
588                next_run: None,
589                run_count: 0,
590                last_result: None,
591                last_success: None,
592                source: JobSource::Config,
593            };
594
595            // Check if a job with this name already exists (from API)
596            {
597                let jobs = self.jobs.read();
598                if jobs.values().any(|j| j.name == *name) {
599                    tracing::debug!(name = %name, "Skipping config job — already exists via API");
600                    continue;
601                }
602            }
603
604            if let Err(e) = self.add_job(job).await {
605                tracing::error!(name = %name, error = %e, "Failed to load config job");
606            } else {
607                tracing::info!(name = %name, "Loaded cron job from config");
608            }
609        }
610    }
611}
612
613impl Clone for CronScheduler {
614    fn clone(&self) -> Self {
615        Self {
616            jobs: self.jobs.clone(),
617            schedules: self.schedules.clone(),
618            running_jobs: self.running_jobs.clone(),
619            state_store: self.state_store.clone(),
620            cancel: self.cancel.clone(),
621            dirty: self.dirty.clone(),
622            tick_interval_secs: self.tick_interval_secs,
623            git_layer: self.git_layer.clone(),
624            max_concurrent_jobs: self.max_concurrent_jobs,
625            job_timeout_secs: self.job_timeout_secs,
626        }
627    }
628}
629
630#[cfg(test)]
631mod tests {
632    use super::*;
633    use chrono::Timelike;
634
635    fn test_store() -> Arc<StateStore> {
636        let temp_dir = tempfile::tempdir().unwrap();
637        Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap())
638    }
639
640    #[test]
641    fn test_normalize_5field() {
642        assert_eq!(CronScheduler::normalize_expr("0 9 * * *"), "0 0 9 * * *");
643    }
644
645    #[test]
646    fn test_normalize_6field() {
647        assert_eq!(CronScheduler::normalize_expr("0 0 9 * * *"), "0 0 9 * * *");
648    }
649
650    #[test]
651    fn test_normalize_7field() {
652        assert_eq!(
653            CronScheduler::normalize_expr("0 0 9 * * * 2026"),
654            "0 0 9 * * * 2026"
655        );
656    }
657
658    #[test]
659    fn test_parse_valid() {
660        let cs = CronScheduler::new(test_store(), 60);
661        assert!(cs.parse_schedule("0 9 * * *").is_ok());
662    }
663
664    #[test]
665    fn test_parse_invalid() {
666        let cs = CronScheduler::new(test_store(), 60);
667        assert!(cs.parse_schedule("invalid").is_err());
668    }
669
670    #[test]
671    fn test_next_fire_time_daily() {
672        let cs = CronScheduler::new(test_store(), 60);
673        let schedule = cs.parse_schedule("0 9 * * *").unwrap();
674        let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
675            .unwrap()
676            .and_hms_opt(8, 0, 0)
677            .unwrap();
678        let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
679        let next = cs.next_fire_time(&schedule, &now_utc);
680        assert!(next.is_some());
681        let next = next.unwrap();
682        assert_eq!(next.hour(), 9);
683    }
684
685    #[test]
686    fn test_next_fire_time_every_15min() {
687        let cs = CronScheduler::new(test_store(), 60);
688        let schedule = cs.parse_schedule("*/15 * * * *").unwrap();
689        let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
690            .unwrap()
691            .and_hms_opt(10, 7, 0)
692            .unwrap();
693        let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
694        let next = cs.next_fire_time(&schedule, &now_utc);
695        assert!(next.is_some());
696        let next = next.unwrap();
697        assert_eq!(next.minute(), 15);
698    }
699
700    #[test]
701    fn test_add_job_computes_next_run() {
702        let job = CronJob::new("test".into(), "0 9 * * *".into(), "Test goal".into());
703        assert!(job.next_run.is_none()); // not computed yet
704        assert!(job.enabled);
705        assert_eq!(job.run_count, 0);
706    }
707
708    #[test]
709    fn test_job_source_default() {
710        let job = CronJob::new("test".into(), "0 9 * * *".into(), "goal".into());
711        assert_eq!(job.source, JobSource::Api);
712    }
713
714    #[tokio::test]
715    async fn test_add_job() {
716        let store = test_store();
717        let cs = CronScheduler::new(store, 60);
718        let job = CronJob::new("test-job".into(), "0 9 * * *".into(), "Run me".into());
719        let id = cs.add_job(job).await.unwrap();
720        assert!(cs.get_job(id).is_some());
721        assert_eq!(cs.list_jobs().len(), 1);
722    }
723
724    #[tokio::test]
725    async fn test_remove_job() {
726        let store = test_store();
727        let cs = CronScheduler::new(store, 60);
728        let job = CronJob::new("remove-me".into(), "0 10 * * *".into(), "Gone".into());
729        let id = cs.add_job(job).await.unwrap();
730        cs.remove_job(id).await.unwrap();
731        assert!(cs.get_job(id).is_none());
732    }
733
734    #[tokio::test]
735    async fn test_trigger_job() {
736        let store = test_store();
737        let cs = CronScheduler::new(store, 60);
738        let job = CronJob::new("trigger-me".into(), "0 11 * * *".into(), "Goal text".into());
739        let id = cs.add_job(job).await.unwrap();
740
741        let triggered = cs.trigger_job(id).unwrap();
742        assert_eq!(triggered.goal, "Goal text");
743        assert!(cs.is_running(id));
744
745        cs.mark_job_completed(id, true, "ok".into()).await;
746        assert!(!cs.is_running(id));
747    }
748
749    #[tokio::test]
750    async fn test_trigger_already_running() {
751        let store = test_store();
752        let cs = CronScheduler::new(store, 60);
753        let job = CronJob::new("running".into(), "0 12 * * *".into(), "goal".into());
754        let id = cs.add_job(job).await.unwrap();
755        cs.trigger_job(id).unwrap();
756        let result = cs.trigger_job(id);
757        assert!(result.is_err());
758    }
759
760    #[tokio::test]
761    async fn test_update_job() {
762        let store = test_store();
763        let cs = CronScheduler::new(store, 60);
764        let job = CronJob::new("old-name".into(), "0 9 * * *".into(), "old goal".into());
765        let id = cs.add_job(job).await.unwrap();
766
767        cs.update_job(
768            id,
769            CronJobUpdate {
770                name: Some("new-name".into()),
771                goal: Some("new goal".into()),
772                enabled: Some(false),
773                ..Default::default()
774            },
775        )
776        .await
777        .unwrap();
778
779        let updated = cs.get_job(id).unwrap();
780        assert_eq!(updated.name, "new-name");
781        assert_eq!(updated.goal, "new goal");
782        assert!(!updated.enabled);
783    }
784
785    #[tokio::test]
786    async fn test_toggle_job() {
787        let store = test_store();
788        let cs = CronScheduler::new(store, 60);
789        let job = CronJob::new("toggle".into(), "0 9 * * *".into(), "goal".into());
790        let id = cs.add_job(job).await.unwrap();
791        assert!(cs.get_job(id).unwrap().enabled);
792
793        cs.toggle_job(id, false).await.unwrap();
794        assert!(!cs.get_job(id).unwrap().enabled);
795
796        cs.toggle_job(id, true).await.unwrap();
797        assert!(cs.get_job(id).unwrap().enabled);
798    }
799
800    #[tokio::test]
801    async fn test_mark_completed_updates_next_run() {
802        let store = test_store();
803        let cs = CronScheduler::new(store, 60);
804        let job = CronJob::new("comp".into(), "*/5 * * * *".into(), "goal".into());
805        let id = cs.add_job(job).await.unwrap();
806
807        let before = cs.get_job(id).unwrap().next_run;
808        assert!(before.is_some());
809
810        // Simulate time passing: set next_run to 5 minutes in the past
811        let now = Utc::now();
812        {
813            let mut jobs = cs.jobs.write();
814            if let Some(j) = jobs.get_mut(&id) {
815                j.next_run = Some(now - chrono::Duration::minutes(5));
816            }
817        }
818
819        cs.mark_job_completed(id, true, "ok".into()).await;
820        let after = cs.get_job(id).unwrap().next_run;
821        assert!(after.is_some());
822        // After completion, next_run should be set to a future time (>= now)
823        assert!(after.unwrap() >= now);
824    }
825
826    #[test]
827    fn test_max_concurrent_enforced() {
828        let temp_dir = tempfile::tempdir().unwrap();
829        let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
830        let mut scheduler = CronScheduler::new(store, 60);
831        scheduler.set_max_concurrent_jobs(2);
832        assert_eq!(scheduler.max_concurrent_jobs, 2);
833    }
834
835    #[test]
836    fn test_job_timeout_configurable() {
837        let temp_dir = tempfile::tempdir().unwrap();
838        let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
839        let mut scheduler = CronScheduler::new(store, 60);
840        scheduler.set_job_timeout_secs(300);
841        assert_eq!(scheduler.job_timeout_secs, 300);
842    }
843}