Skip to main content

oxios_kernel/
cron.rs

1//! Cron scheduler for time-based autonomous agent execution.
2//!
3//! Allows scheduling agents to run on cron-like schedules without user intervention.
4//! Supports 5-field (Linux cron) and 6-7 field expressions via the `cron` crate.
5
6use crate::config::CronConfig;
7use crate::git_layer::GitLayer;
8use crate::scheduler::Priority;
9use crate::state_store::StateStore;
10use anyhow::{Result, bail};
11use chrono::{DateTime, Utc};
12use cron::Schedule;
13use parking_lot::{Mutex, RwLock};
14use serde::{Deserialize, Serialize};
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19use uuid::Uuid;
20
21// ── Data types ─────────────────────────────────────────────
22
23/// Source of a cron job.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
25#[serde(rename_all = "lowercase")]
26pub enum JobSource {
27    /// Defined in config.toml.
28    Config,
29    /// Created via API.
30    #[default]
31    Api,
32}
33
34/// A cron job definition.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct CronJob {
37    /// Unique job identifier.
38    pub id: Uuid,
39    /// Human-readable job name.
40    pub name: String,
41    /// Cron expression (e.g. "0 */6 * * *").
42    pub schedule: String,
43    /// Goal description for the agent.
44    pub goal: String,
45    /// Constraints on agent behavior.
46    #[serde(default)]
47    pub constraints: Vec<String>,
48    /// Criteria that must be met for success.
49    #[serde(default)]
50    pub acceptance_criteria: Vec<String>,
51    /// Toolchain preset name.
52    #[serde(default = "default_toolchain")]
53    pub toolchain: String,
54    /// Job priority.
55    #[serde(default)]
56    pub priority: Priority,
57    /// Whether the job is active.
58    #[serde(default = "default_true")]
59    pub enabled: bool,
60    /// Timestamp of the last execution.
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub last_run: Option<DateTime<Utc>>,
63    /// Timestamp of the next scheduled execution.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub next_run: Option<DateTime<Utc>>,
66    /// Number of times this job has run.
67    #[serde(default)]
68    pub run_count: u64,
69    /// Summary of the last execution result.
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub last_result: Option<String>,
72    /// Whether the last execution succeeded.
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub last_success: Option<bool>,
75    /// How the job was created.
76    #[serde(default)]
77    pub source: JobSource,
78}
79
80fn default_toolchain() -> String {
81    "default".into()
82}
83
84fn default_true() -> bool {
85    true
86}
87
88impl CronJob {
89    /// Create a new cron job with a parsed schedule.
90    pub fn new(name: String, schedule: String, goal: String) -> Self {
91        Self {
92            id: Uuid::new_v4(),
93            name,
94            schedule,
95            goal,
96            constraints: vec![],
97            acceptance_criteria: vec![],
98            toolchain: default_toolchain(),
99            priority: Priority::default(),
100            enabled: true,
101            last_run: None,
102            next_run: None,
103            run_count: 0,
104            last_result: None,
105            last_success: None,
106            source: JobSource::Api,
107        }
108    }
109}
110
111/// Result of a single cron job execution.
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct CronJobResult {
114    /// ID of the executed job.
115    pub job_id: Uuid,
116    /// Name of the executed job.
117    pub job_name: String,
118    /// When execution started.
119    pub started_at: DateTime<Utc>,
120    /// When execution finished.
121    pub finished_at: DateTime<Utc>,
122    /// Whether the execution succeeded.
123    pub success: bool,
124    /// Human-readable result summary.
125    pub summary: String,
126}
127
128/// Update fields for an existing cron job.
129#[derive(Debug, Default, Deserialize)]
130pub struct CronJobUpdate {
131    /// New name.
132    pub name: Option<String>,
133    /// New cron expression.
134    pub schedule: Option<String>,
135    /// New goal description.
136    pub goal: Option<String>,
137    /// New constraints.
138    pub constraints: Option<Vec<String>>,
139    /// New acceptance criteria.
140    pub acceptance_criteria: Option<Vec<String>>,
141    /// New toolchain preset.
142    pub toolchain: Option<String>,
143    /// New priority.
144    pub priority: Option<Priority>,
145    /// Enable or disable.
146    pub enabled: Option<bool>,
147}
148
149// ── CronScheduler ───────────────────────────────────────────
150
151/// The cron scheduler for time-based autonomous agent execution.
152///
153/// Allows scheduling agents to run on cron-like schedules without user intervention.
154/// Supports 5-field (Linux cron) and 6-7 field expressions via the `cron` crate.
155pub struct CronScheduler {
156    jobs: Arc<RwLock<HashMap<Uuid, CronJob>>>,
157    schedules: Arc<Mutex<HashMap<Uuid, Schedule>>>,
158    running_jobs: Arc<Mutex<HashSet<Uuid>>>,
159    state_store: Arc<StateStore>,
160    cancel: Arc<AtomicBool>,
161    dirty: Arc<AtomicBool>,
162    tick_interval_secs: u64,
163    /// Optional git layer for version-controlled saves.
164    git_layer: Option<Arc<GitLayer>>,
165    /// Maximum concurrent cron job executions (default: 3).
166    max_concurrent_jobs: usize,
167    /// Timeout for individual cron job execution in seconds (default: 600 = 10 minutes).
168    job_timeout_secs: u64,
169}
170
171impl CronScheduler {
172    /// Create a new CronScheduler.
173    ///
174    /// # Arguments
175    /// * `state_store` - State store for persisting job definitions
176    /// * `tick_interval_secs` - How often to check for due jobs (in seconds)
177    pub fn new(state_store: Arc<StateStore>, tick_interval_secs: u64) -> Self {
178        Self {
179            jobs: Arc::new(RwLock::new(HashMap::new())),
180            schedules: Arc::new(Mutex::new(HashMap::new())),
181            running_jobs: Arc::new(Mutex::new(HashSet::new())),
182            state_store,
183            cancel: Arc::new(AtomicBool::new(false)),
184            dirty: Arc::new(AtomicBool::new(false)),
185            tick_interval_secs,
186            git_layer: None,
187            max_concurrent_jobs: 3,
188            job_timeout_secs: 600,
189        }
190    }
191
192    /// Set the maximum concurrent cron job executions.
193    ///
194    /// A value of `0` is rejected — it would silently disable every cron tick
195    /// (`current_running(0) >= max(0)` is always true). We clamp to `1` and
196    /// warn so the caller can correct their configuration instead of losing
197    /// all scheduled jobs without any signal.
198    pub fn set_max_concurrent_jobs(&mut self, max: usize) {
199        if max == 0 {
200            tracing::warn!("set_max_concurrent_jobs(0) would disable all cron jobs; clamping to 1");
201            self.max_concurrent_jobs = 1;
202        } else {
203            self.max_concurrent_jobs = max;
204        }
205    }
206
207    /// Set the timeout for individual cron job execution in seconds.
208    pub fn set_job_timeout_secs(&mut self, secs: u64) {
209        self.job_timeout_secs = secs;
210    }
211
212    /// Set the git layer for version-controlled saves.
213    pub fn set_git_layer(&mut self, gl: Arc<GitLayer>) {
214        self.git_layer = Some(gl);
215    }
216
217    /// Normalize a cron expression: prepend seconds field if 5-field (Linux style).
218    fn normalize_expr(expr: &str) -> String {
219        let fields: Vec<&str> = expr.split_whitespace().collect();
220        match fields.len() {
221            5 => format!("0 {expr}"),
222            _ => expr.to_string(),
223        }
224    }
225
226    /// Parse a cron expression into a `Schedule`.
227    fn parse_schedule(&self, expr: &str) -> Result<Schedule> {
228        let normalized = Self::normalize_expr(expr);
229        Schedule::from_str(&normalized)
230            .map_err(|e| anyhow::anyhow!("Invalid cron expression '{expr}': {e}"))
231    }
232
233    /// Compute the next fire time after `after`.
234    fn next_fire_time(&self, schedule: &Schedule, after: &DateTime<Utc>) -> Option<DateTime<Utc>> {
235        schedule.after(after).next()
236    }
237
238    /// Add a job. Parses schedule, computes next_run, stores.
239    pub async fn add_job(&self, job: CronJob) -> Result<Uuid> {
240        let schedule = self.parse_schedule(&job.schedule)?;
241        let next = self.next_fire_time(&schedule, &Utc::now());
242        let id = job.id;
243
244        self.schedules.lock().insert(id, schedule);
245        self.jobs.write().insert(
246            id,
247            CronJob {
248                next_run: next,
249                ..job
250            },
251        );
252        self.dirty.store(true, Ordering::Relaxed);
253        self.persist_jobs().await;
254
255        tracing::info!(
256            name = %self.jobs.read().get(&id).map(|j| j.name.as_str()).unwrap_or("?"),
257            %id,
258            "Cron job added"
259        );
260        Ok(id)
261    }
262
263    /// Remove a job by ID.
264    pub async fn remove_job(&self, id: Uuid) -> Result<()> {
265        self.schedules.lock().remove(&id);
266        self.jobs
267            .write()
268            .remove(&id)
269            .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
270        self.dirty.store(true, Ordering::Relaxed);
271        self.persist_jobs().await;
272        tracing::info!(%id, "Cron job removed");
273        Ok(())
274    }
275
276    /// Update a job's fields (enabled, schedule, goal, etc).
277    pub async fn update_job(&self, id: Uuid, update: CronJobUpdate) -> Result<()> {
278        // Separate the sync mutation from the async persist to avoid
279        // holding a !Send RwLockWriteGuard across an await point.
280        let should_persist = {
281            let mut jobs = self.jobs.write();
282            let job = jobs
283                .get_mut(&id)
284                .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
285
286            if let Some(name) = update.name {
287                job.name = name;
288            }
289            if let Some(schedule) = &update.schedule {
290                let parsed = self.parse_schedule(schedule)?;
291                self.schedules.lock().insert(id, parsed);
292                job.schedule = schedule.clone();
293                // Recompute next_run
294                let sched = self.schedules.lock().get(&id).cloned();
295                if let Some(s) = sched {
296                    job.next_run = self.next_fire_time(&s, &Utc::now());
297                }
298            }
299            if let Some(goal) = update.goal {
300                job.goal = goal;
301            }
302            if let Some(constraints) = update.constraints {
303                job.constraints = constraints;
304            }
305            if let Some(criteria) = update.acceptance_criteria {
306                job.acceptance_criteria = criteria;
307            }
308            if let Some(toolchain) = update.toolchain {
309                job.toolchain = toolchain;
310            }
311            if let Some(priority) = update.priority {
312                job.priority = priority;
313            }
314            if let Some(enabled) = update.enabled {
315                job.enabled = enabled;
316            }
317
318            self.dirty.store(true, Ordering::Relaxed);
319            true
320        }; // RwLockWriteGuard dropped here, before any .await
321
322        if should_persist {
323            self.persist_jobs().await;
324        }
325        Ok(())
326    }
327
328    /// Toggle a job enabled/disabled.
329    pub async fn toggle_job(&self, id: Uuid, enabled: bool) -> Result<()> {
330        self.update_job(
331            id,
332            CronJobUpdate {
333                enabled: Some(enabled),
334                ..Default::default()
335            },
336        )
337        .await
338    }
339
340    /// List all jobs.
341    pub fn list_jobs(&self) -> Vec<CronJob> {
342        self.jobs.read().values().cloned().collect()
343    }
344
345    /// Get a single job.
346    pub fn get_job(&self, id: Uuid) -> Option<CronJob> {
347        self.jobs.read().get(&id).cloned()
348    }
349
350    /// Check if a job is currently running.
351    pub fn is_running(&self, id: Uuid) -> bool {
352        self.running_jobs.lock().contains(&id)
353    }
354
355    /// Trigger a job immediately (manual execution, ignores schedule).
356    /// Returns the job goal as a string for the caller to execute.
357    /// The caller is responsible for calling `mark_job_completed` after execution.
358    pub fn trigger_job(&self, id: Uuid) -> Result<CronJob> {
359        let job = self
360            .jobs
361            .read()
362            .get(&id)
363            .cloned()
364            .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
365
366        if self.running_jobs.lock().contains(&id) {
367            bail!("Job '{}' is already running", job.name);
368        }
369
370        self.running_jobs.lock().insert(id);
371        Ok(job)
372    }
373
374    /// Mark a job execution as completed.
375    pub async fn mark_job_completed(&self, id: Uuid, success: bool, summary: String) {
376        self.running_jobs.lock().remove(&id);
377        let new_next_run = {
378            let mut jobs = self.jobs.write();
379            if let Some(job) = jobs.get_mut(&id) {
380                job.last_run = Some(Utc::now());
381                job.last_result = Some(summary);
382                job.last_success = Some(success);
383                job.run_count += 1;
384                // Recompute next_run
385                let sched = self.schedules.lock().get(&id).cloned();
386                sched.and_then(|s| self.next_fire_time(&s, &Utc::now()))
387            } else {
388                None
389            }
390        };
391        if let Some(next_run) = new_next_run {
392            let mut jobs = self.jobs.write();
393            if let Some(job) = jobs.get_mut(&id) {
394                job.next_run = Some(next_run);
395            }
396        }
397        self.dirty.store(true, Ordering::Relaxed);
398        self.persist_jobs().await;
399    }
400
401    /// Stop the scheduler loop.
402    pub fn stop(&self) {
403        self.cancel.store(true, Ordering::Relaxed);
404        tracing::info!("Cron scheduler stop requested");
405    }
406
407    /// Start the main loop. Must be called on an `Arc<Self>`.
408    ///
409    /// # Arguments
410    /// * `executor` - Async closure `(Uuid, String) -> Fut` where args are `(job_id, goal)`,
411    ///   returning `(success, summary)`.
412    ///
413    /// # Example
414    ///
415    /// ```no_run
416    /// use std::sync::Arc;
417    /// use oxios_kernel::state_store::StateStore;
418    /// use oxios_kernel::cron::CronScheduler;
419    ///
420    /// # async fn example() {
421    /// let state_store = Arc::new(StateStore::new("/tmp/state".into()).unwrap());
422    /// let scheduler = Arc::new(CronScheduler::new(state_store, 60));
423    /// scheduler.clone().start(|id, goal| async move {
424    ///     // execute the agent...
425    ///     (true, "Done".to_string())
426    /// }).await;
427    /// # }
428    /// ```
429    pub async fn start<F, Fut>(self: Arc<Self>, executor: F)
430    where
431        F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
432        Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
433    {
434        let executor = Arc::new(executor);
435        let mut interval =
436            tokio::time::interval(std::time::Duration::from_secs(self.tick_interval_secs));
437
438        tracing::info!(
439            interval_secs = self.tick_interval_secs,
440            "Cron scheduler started"
441        );
442
443        loop {
444            tokio::select! {
445                _ = interval.tick() => {
446                    if self.cancel.load(Ordering::Relaxed) {
447                        tracing::info!("Cron scheduler stopped");
448                        return;
449                    }
450                    self.tick_inner(&executor).await;
451                }
452            }
453        }
454    }
455
456    /// Single tick: find due jobs and spawn execution.
457    ///
458    /// Enforces `max_concurrent_jobs` limit and `job_timeout_secs` per job.
459    async fn tick_inner<F, Fut>(&self, executor: &Arc<F>)
460    where
461        F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
462        Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
463    {
464        let now = Utc::now();
465
466        // Check current concurrency before spawning more
467        let current_running = self.running_jobs.lock().len();
468        if current_running >= self.max_concurrent_jobs {
469            tracing::debug!(
470                running = current_running,
471                max = self.max_concurrent_jobs,
472                "Cron tick: max concurrent jobs reached, skipping"
473            );
474            return;
475        }
476
477        let due: Vec<(Uuid, String)> = {
478            let jobs = self.jobs.read();
479            jobs.iter()
480                .filter(|(_, job)| {
481                    job.enabled
482                        && job.next_run.is_some_and(|nr| nr <= now)
483                        && !self.running_jobs.lock().contains(&job.id)
484                })
485                .map(|(_, job)| (job.id, job.goal.clone()))
486                .collect()
487        };
488
489        let total_due = due.len();
490        for (spawned, (id, goal)) in due.into_iter().enumerate() {
491            // Check concurrency limit before each spawn
492            if self.running_jobs.lock().len() >= self.max_concurrent_jobs {
493                tracing::info!(
494                    spawned,
495                    remaining = total_due - spawned,
496                    "Cron tick: max concurrent jobs reached, deferring remaining"
497                );
498                break;
499            }
500
501            self.running_jobs.lock().insert(id);
502            let exec = executor.clone();
503            let me = self.clone();
504            let timeout_secs = self.job_timeout_secs;
505            tokio::spawn(async move {
506                tracing::info!(%id, "Cron job triggered");
507                let result = tokio::time::timeout(
508                    std::time::Duration::from_secs(timeout_secs),
509                    exec(id, goal),
510                )
511                .await;
512
513                let (success, summary) = match result {
514                    Ok((s, m)) => (s, m),
515                    Err(_) => {
516                        tracing::error!(%id, timeout_secs, "Cron job timed out");
517                        (false, format!("Timed out after {timeout_secs} seconds"))
518                    }
519                };
520                tracing::info!(%id, success, "Cron job completed");
521                me.mark_job_completed(id, success, summary).await;
522            });
523        }
524    }
525
526    /// Persist all jobs to disk.
527    async fn persist_jobs(&self) {
528        let job_list: Vec<CronJob> = {
529            let jobs = self.jobs.read();
530            jobs.values().cloned().collect()
531        };
532        if let Err(e) = self.state_store.save_json("cron", "jobs", &job_list).await {
533            tracing::error!("Failed to persist cron jobs: {}", e);
534        }
535        // Fire-and-forget git commit if git layer is configured
536        if let Some(ref gl) = self.git_layer
537            && gl.is_enabled()
538        {
539            let _ = gl.commit_file("cron/jobs.json", "cron: update jobs");
540        }
541    }
542
543    /// Restore jobs from disk on startup.
544    pub async fn restore_jobs(&self) {
545        match self
546            .state_store
547            .load_json::<Vec<CronJob>>("cron", "jobs")
548            .await
549        {
550            Ok(Some(job_list)) => {
551                for mut job in job_list {
552                    // Re-parse schedule and recompute next_run
553                    match self.parse_schedule(&job.schedule) {
554                        Ok(schedule) => {
555                            job.next_run = self.next_fire_time(&schedule, &Utc::now());
556                            self.schedules.lock().insert(job.id, schedule);
557                            self.jobs.write().insert(job.id, job);
558                        }
559                        Err(e) => {
560                            tracing::error!(job = %job.name, error = %e, "Skipping job with invalid schedule");
561                        }
562                    }
563                }
564                tracing::info!(count = self.jobs.read().len(), "Cron jobs restored");
565            }
566            Ok(None) => {
567                tracing::info!("No saved cron jobs found");
568            }
569            Err(e) => {
570                tracing::error!("Failed to restore cron jobs: {}", e);
571            }
572        }
573    }
574
575    /// Load jobs defined in config (called during startup).
576    /// Config-defined jobs are only added if they don't already exist (API wins on conflict).
577    pub async fn load_from_config(&self, config: &CronConfig) {
578        if !config.enabled {
579            tracing::info!("Cron scheduler is disabled in config");
580            return;
581        }
582
583        for (name, inline) in &config.jobs {
584            let schedule = inline.schedule.clone();
585            let goal = inline.goal.clone();
586
587            let job = CronJob {
588                id: Uuid::new_v4(),
589                name: name.clone(),
590                schedule: schedule.clone(),
591                goal,
592                constraints: inline.constraints.clone(),
593                acceptance_criteria: inline.acceptance_criteria.clone(),
594                toolchain: inline.toolchain.clone(),
595                priority: inline.priority,
596                enabled: inline.enabled,
597                last_run: None,
598                next_run: None,
599                run_count: 0,
600                last_result: None,
601                last_success: None,
602                source: JobSource::Config,
603            };
604
605            // Check if a job with this name already exists (from API)
606            {
607                let jobs = self.jobs.read();
608                if jobs.values().any(|j| j.name == *name) {
609                    tracing::debug!(name = %name, "Skipping config job — already exists via API");
610                    continue;
611                }
612            }
613
614            if let Err(e) = self.add_job(job).await {
615                tracing::error!(name = %name, error = %e, "Failed to load config job");
616            } else {
617                tracing::info!(name = %name, "Loaded cron job from config");
618            }
619        }
620    }
621}
622
623impl Clone for CronScheduler {
624    fn clone(&self) -> Self {
625        Self {
626            jobs: self.jobs.clone(),
627            schedules: self.schedules.clone(),
628            running_jobs: self.running_jobs.clone(),
629            state_store: self.state_store.clone(),
630            cancel: self.cancel.clone(),
631            dirty: self.dirty.clone(),
632            tick_interval_secs: self.tick_interval_secs,
633            git_layer: self.git_layer.clone(),
634            max_concurrent_jobs: self.max_concurrent_jobs,
635            job_timeout_secs: self.job_timeout_secs,
636        }
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643    use chrono::Timelike;
644
645    fn test_store() -> Arc<StateStore> {
646        let temp_dir = tempfile::tempdir().unwrap();
647        Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap())
648    }
649
650    #[test]
651    fn test_normalize_5field() {
652        assert_eq!(CronScheduler::normalize_expr("0 9 * * *"), "0 0 9 * * *");
653    }
654
655    #[test]
656    fn test_normalize_6field() {
657        assert_eq!(CronScheduler::normalize_expr("0 0 9 * * *"), "0 0 9 * * *");
658    }
659
660    #[test]
661    fn test_normalize_7field() {
662        assert_eq!(
663            CronScheduler::normalize_expr("0 0 9 * * * 2026"),
664            "0 0 9 * * * 2026"
665        );
666    }
667
668    #[test]
669    fn test_parse_valid() {
670        let cs = CronScheduler::new(test_store(), 60);
671        assert!(cs.parse_schedule("0 9 * * *").is_ok());
672    }
673
674    #[test]
675    fn test_parse_invalid() {
676        let cs = CronScheduler::new(test_store(), 60);
677        assert!(cs.parse_schedule("invalid").is_err());
678    }
679
680    #[test]
681    fn test_next_fire_time_daily() {
682        let cs = CronScheduler::new(test_store(), 60);
683        let schedule = cs.parse_schedule("0 9 * * *").unwrap();
684        let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
685            .unwrap()
686            .and_hms_opt(8, 0, 0)
687            .unwrap();
688        let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
689        let next = cs.next_fire_time(&schedule, &now_utc);
690        assert!(next.is_some());
691        let next = next.unwrap();
692        assert_eq!(next.hour(), 9);
693    }
694
695    #[test]
696    fn test_next_fire_time_every_15min() {
697        let cs = CronScheduler::new(test_store(), 60);
698        let schedule = cs.parse_schedule("*/15 * * * *").unwrap();
699        let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
700            .unwrap()
701            .and_hms_opt(10, 7, 0)
702            .unwrap();
703        let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
704        let next = cs.next_fire_time(&schedule, &now_utc);
705        assert!(next.is_some());
706        let next = next.unwrap();
707        assert_eq!(next.minute(), 15);
708    }
709
710    #[test]
711    fn test_add_job_computes_next_run() {
712        let job = CronJob::new("test".into(), "0 9 * * *".into(), "Test goal".into());
713        assert!(job.next_run.is_none()); // not computed yet
714        assert!(job.enabled);
715        assert_eq!(job.run_count, 0);
716    }
717
718    #[test]
719    fn test_job_source_default() {
720        let job = CronJob::new("test".into(), "0 9 * * *".into(), "goal".into());
721        assert_eq!(job.source, JobSource::Api);
722    }
723
724    #[tokio::test]
725    async fn test_add_job() {
726        let store = test_store();
727        let cs = CronScheduler::new(store, 60);
728        let job = CronJob::new("test-job".into(), "0 9 * * *".into(), "Run me".into());
729        let id = cs.add_job(job).await.unwrap();
730        assert!(cs.get_job(id).is_some());
731        assert_eq!(cs.list_jobs().len(), 1);
732    }
733
734    #[tokio::test]
735    async fn test_remove_job() {
736        let store = test_store();
737        let cs = CronScheduler::new(store, 60);
738        let job = CronJob::new("remove-me".into(), "0 10 * * *".into(), "Gone".into());
739        let id = cs.add_job(job).await.unwrap();
740        cs.remove_job(id).await.unwrap();
741        assert!(cs.get_job(id).is_none());
742    }
743
744    #[tokio::test]
745    async fn test_trigger_job() {
746        let store = test_store();
747        let cs = CronScheduler::new(store, 60);
748        let job = CronJob::new("trigger-me".into(), "0 11 * * *".into(), "Goal text".into());
749        let id = cs.add_job(job).await.unwrap();
750
751        let triggered = cs.trigger_job(id).unwrap();
752        assert_eq!(triggered.goal, "Goal text");
753        assert!(cs.is_running(id));
754
755        cs.mark_job_completed(id, true, "ok".into()).await;
756        assert!(!cs.is_running(id));
757    }
758
759    #[tokio::test]
760    async fn test_trigger_already_running() {
761        let store = test_store();
762        let cs = CronScheduler::new(store, 60);
763        let job = CronJob::new("running".into(), "0 12 * * *".into(), "goal".into());
764        let id = cs.add_job(job).await.unwrap();
765        cs.trigger_job(id).unwrap();
766        let result = cs.trigger_job(id);
767        assert!(result.is_err());
768    }
769
770    #[tokio::test]
771    async fn test_update_job() {
772        let store = test_store();
773        let cs = CronScheduler::new(store, 60);
774        let job = CronJob::new("old-name".into(), "0 9 * * *".into(), "old goal".into());
775        let id = cs.add_job(job).await.unwrap();
776
777        cs.update_job(
778            id,
779            CronJobUpdate {
780                name: Some("new-name".into()),
781                goal: Some("new goal".into()),
782                enabled: Some(false),
783                ..Default::default()
784            },
785        )
786        .await
787        .unwrap();
788
789        let updated = cs.get_job(id).unwrap();
790        assert_eq!(updated.name, "new-name");
791        assert_eq!(updated.goal, "new goal");
792        assert!(!updated.enabled);
793    }
794
795    #[tokio::test]
796    async fn test_toggle_job() {
797        let store = test_store();
798        let cs = CronScheduler::new(store, 60);
799        let job = CronJob::new("toggle".into(), "0 9 * * *".into(), "goal".into());
800        let id = cs.add_job(job).await.unwrap();
801        assert!(cs.get_job(id).unwrap().enabled);
802
803        cs.toggle_job(id, false).await.unwrap();
804        assert!(!cs.get_job(id).unwrap().enabled);
805
806        cs.toggle_job(id, true).await.unwrap();
807        assert!(cs.get_job(id).unwrap().enabled);
808    }
809
810    #[tokio::test]
811    async fn test_mark_completed_updates_next_run() {
812        let store = test_store();
813        let cs = CronScheduler::new(store, 60);
814        let job = CronJob::new("comp".into(), "*/5 * * * *".into(), "goal".into());
815        let id = cs.add_job(job).await.unwrap();
816
817        let before = cs.get_job(id).unwrap().next_run;
818        assert!(before.is_some());
819
820        // Simulate time passing: set next_run to 5 minutes in the past
821        let now = Utc::now();
822        {
823            let mut jobs = cs.jobs.write();
824            if let Some(j) = jobs.get_mut(&id) {
825                j.next_run = Some(now - chrono::Duration::minutes(5));
826            }
827        }
828
829        cs.mark_job_completed(id, true, "ok".into()).await;
830        let after = cs.get_job(id).unwrap().next_run;
831        assert!(after.is_some());
832        // After completion, next_run should be set to a future time (>= now)
833        assert!(after.unwrap() >= now);
834    }
835
836    #[test]
837    fn test_max_concurrent_enforced() {
838        let temp_dir = tempfile::tempdir().unwrap();
839        let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
840        let mut scheduler = CronScheduler::new(store, 60);
841        scheduler.set_max_concurrent_jobs(2);
842        assert_eq!(scheduler.max_concurrent_jobs, 2);
843    }
844
845    #[test]
846    fn test_job_timeout_configurable() {
847        let temp_dir = tempfile::tempdir().unwrap();
848        let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
849        let mut scheduler = CronScheduler::new(store, 60);
850        scheduler.set_job_timeout_secs(300);
851        assert_eq!(scheduler.job_timeout_secs, 300);
852    }
853}