Skip to main content

oxios_kernel/
cron.rs

1//! Cron scheduler for time-based autonomous agent execution.
2//!
3//! Allows scheduling agents to run on cron-like schedules without user intervention.
4//! Supports 5-field (Linux cron) and 6-7 field expressions via the `cron` crate.
5
6use crate::config::CronConfig;
7use crate::git_layer::GitLayer;
8use crate::scheduler::Priority;
9use crate::state_store::StateStore;
10use anyhow::{bail, Result};
11use chrono::{DateTime, Utc};
12use cron::Schedule;
13use parking_lot::{Mutex, RwLock};
14use serde::{Deserialize, Serialize};
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19use uuid::Uuid;
20
21// ── Data types ─────────────────────────────────────────────
22
23/// Source of a cron job.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
25#[serde(rename_all = "lowercase")]
26pub enum JobSource {
27    /// Defined in config.toml.
28    Config,
29    /// Created via API.
30    #[default]
31    Api,
32}
33
34/// A cron job definition.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct CronJob {
37    /// Unique job identifier.
38    pub id: Uuid,
39    /// Human-readable job name.
40    pub name: String,
41    /// Cron expression (e.g. "0 */6 * * *").
42    pub schedule: String,
43    /// Goal description for the agent.
44    pub goal: String,
45    /// Constraints on agent behavior.
46    #[serde(default)]
47    pub constraints: Vec<String>,
48    /// Criteria that must be met for success.
49    #[serde(default)]
50    pub acceptance_criteria: Vec<String>,
51    /// Toolchain preset name.
52    #[serde(default = "default_toolchain")]
53    pub toolchain: String,
54    /// Job priority.
55    #[serde(default)]
56    pub priority: Priority,
57    /// Whether the job is active.
58    #[serde(default = "default_true")]
59    pub enabled: bool,
60    /// Timestamp of the last execution.
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub last_run: Option<DateTime<Utc>>,
63    /// Timestamp of the next scheduled execution.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub next_run: Option<DateTime<Utc>>,
66    /// Number of times this job has run.
67    #[serde(default)]
68    pub run_count: u64,
69    /// Summary of the last execution result.
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub last_result: Option<String>,
72    /// Whether the last execution succeeded.
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub last_success: Option<bool>,
75    /// How the job was created.
76    #[serde(default)]
77    pub source: JobSource,
78}
79
80fn default_toolchain() -> String {
81    "default".into()
82}
83
84fn default_true() -> bool {
85    true
86}
87
88impl CronJob {
89    /// Create a new cron job with a parsed schedule.
90    pub fn new(name: String, schedule: String, goal: String) -> Self {
91        Self {
92            id: Uuid::new_v4(),
93            name,
94            schedule,
95            goal,
96            constraints: vec![],
97            acceptance_criteria: vec![],
98            toolchain: default_toolchain(),
99            priority: Priority::default(),
100            enabled: true,
101            last_run: None,
102            next_run: None,
103            run_count: 0,
104            last_result: None,
105            last_success: None,
106            source: JobSource::Api,
107        }
108    }
109}
110
111/// Result of a single cron job execution.
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct CronJobResult {
114    /// ID of the executed job.
115    pub job_id: Uuid,
116    /// Name of the executed job.
117    pub job_name: String,
118    /// When execution started.
119    pub started_at: DateTime<Utc>,
120    /// When execution finished.
121    pub finished_at: DateTime<Utc>,
122    /// Whether the execution succeeded.
123    pub success: bool,
124    /// Human-readable result summary.
125    pub summary: String,
126}
127
128/// Update fields for an existing cron job.
129#[derive(Debug, Default, Deserialize)]
130pub struct CronJobUpdate {
131    /// New name.
132    pub name: Option<String>,
133    /// New cron expression.
134    pub schedule: Option<String>,
135    /// New goal description.
136    pub goal: Option<String>,
137    /// New constraints.
138    pub constraints: Option<Vec<String>>,
139    /// New acceptance criteria.
140    pub acceptance_criteria: Option<Vec<String>>,
141    /// New toolchain preset.
142    pub toolchain: Option<String>,
143    /// New priority.
144    pub priority: Option<Priority>,
145    /// Enable or disable.
146    pub enabled: Option<bool>,
147}
148
149// ── CronScheduler ───────────────────────────────────────────
150
151/// The cron scheduler for time-based autonomous agent execution.
152///
153/// Allows scheduling agents to run on cron-like schedules without user intervention.
154/// Supports 5-field (Linux cron) and 6-7 field expressions via the `cron` crate.
155pub struct CronScheduler {
156    jobs: Arc<RwLock<HashMap<Uuid, CronJob>>>,
157    schedules: Arc<Mutex<HashMap<Uuid, Schedule>>>,
158    running_jobs: Arc<Mutex<HashSet<Uuid>>>,
159    state_store: Arc<StateStore>,
160    cancel: Arc<AtomicBool>,
161    dirty: Arc<AtomicBool>,
162    tick_interval_secs: u64,
163    /// Optional git layer for version-controlled saves.
164    git_layer: Option<Arc<GitLayer>>,
165    /// Maximum concurrent cron job executions (default: 3).
166    max_concurrent_jobs: usize,
167    /// Timeout for individual cron job execution in seconds (default: 600 = 10 minutes).
168    job_timeout_secs: u64,
169}
170
171impl CronScheduler {
172    /// Create a new CronScheduler.
173    ///
174    /// # Arguments
175    /// * `state_store` - State store for persisting job definitions
176    /// * `tick_interval_secs` - How often to check for due jobs (in seconds)
177    pub fn new(state_store: Arc<StateStore>, tick_interval_secs: u64) -> Self {
178        Self {
179            jobs: Arc::new(RwLock::new(HashMap::new())),
180            schedules: Arc::new(Mutex::new(HashMap::new())),
181            running_jobs: Arc::new(Mutex::new(HashSet::new())),
182            state_store,
183            cancel: Arc::new(AtomicBool::new(false)),
184            dirty: Arc::new(AtomicBool::new(false)),
185            tick_interval_secs,
186            git_layer: None,
187            max_concurrent_jobs: 3,
188            job_timeout_secs: 600,
189        }
190    }
191
192    /// Set the maximum concurrent cron job executions.
193    pub fn set_max_concurrent_jobs(&mut self, max: usize) {
194        self.max_concurrent_jobs = max;
195    }
196
197    /// Set the timeout for individual cron job execution in seconds.
198    pub fn set_job_timeout_secs(&mut self, secs: u64) {
199        self.job_timeout_secs = secs;
200    }
201
202    /// Set the git layer for version-controlled saves.
203    pub fn set_git_layer(&mut self, gl: Arc<GitLayer>) {
204        self.git_layer = Some(gl);
205    }
206
207    /// Normalize a cron expression: prepend seconds field if 5-field (Linux style).
208    fn normalize_expr(expr: &str) -> String {
209        let fields: Vec<&str> = expr.split_whitespace().collect();
210        match fields.len() {
211            5 => format!("0 {}", expr),
212            _ => expr.to_string(),
213        }
214    }
215
216    /// Parse a cron expression into a `Schedule`.
217    fn parse_schedule(&self, expr: &str) -> Result<Schedule> {
218        let normalized = Self::normalize_expr(expr);
219        Schedule::from_str(&normalized)
220            .map_err(|e| anyhow::anyhow!("Invalid cron expression '{}': {}", expr, e))
221    }
222
223    /// Compute the next fire time after `after`.
224    fn next_fire_time(&self, schedule: &Schedule, after: &DateTime<Utc>) -> Option<DateTime<Utc>> {
225        schedule.after(after).next()
226    }
227
228    /// Add a job. Parses schedule, computes next_run, stores.
229    pub async fn add_job(&self, job: CronJob) -> Result<Uuid> {
230        let schedule = self.parse_schedule(&job.schedule)?;
231        let next = self.next_fire_time(&schedule, &Utc::now());
232        let id = job.id;
233
234        self.schedules.lock().insert(id, schedule);
235        self.jobs.write().insert(
236            id,
237            CronJob {
238                next_run: next,
239                ..job
240            },
241        );
242        self.dirty.store(true, Ordering::Relaxed);
243        self.persist_jobs().await;
244
245        tracing::info!(
246            name = %self.jobs.read().get(&id).map(|j| j.name.as_str()).unwrap_or("?"),
247            %id,
248            "Cron job added"
249        );
250        Ok(id)
251    }
252
253    /// Remove a job by ID.
254    pub async fn remove_job(&self, id: Uuid) -> Result<()> {
255        self.schedules.lock().remove(&id);
256        self.jobs
257            .write()
258            .remove(&id)
259            .ok_or_else(|| anyhow::anyhow!("Job {} not found", id))?;
260        self.dirty.store(true, Ordering::Relaxed);
261        self.persist_jobs().await;
262        tracing::info!(%id, "Cron job removed");
263        Ok(())
264    }
265
266    /// Update a job's fields (enabled, schedule, goal, etc).
267    pub async fn update_job(&self, id: Uuid, update: CronJobUpdate) -> Result<()> {
268        // Separate the sync mutation from the async persist to avoid
269        // holding a !Send RwLockWriteGuard across an await point.
270        let should_persist = {
271            let mut jobs = self.jobs.write();
272            let job = jobs
273                .get_mut(&id)
274                .ok_or_else(|| anyhow::anyhow!("Job {} not found", id))?;
275
276            if let Some(name) = update.name {
277                job.name = name;
278            }
279            if let Some(schedule) = &update.schedule {
280                let parsed = self.parse_schedule(schedule)?;
281                self.schedules.lock().insert(id, parsed);
282                job.schedule = schedule.clone();
283                // Recompute next_run
284                let sched = self.schedules.lock().get(&id).cloned();
285                if let Some(s) = sched {
286                    job.next_run = self.next_fire_time(&s, &Utc::now());
287                }
288            }
289            if let Some(goal) = update.goal {
290                job.goal = goal;
291            }
292            if let Some(constraints) = update.constraints {
293                job.constraints = constraints;
294            }
295            if let Some(criteria) = update.acceptance_criteria {
296                job.acceptance_criteria = criteria;
297            }
298            if let Some(toolchain) = update.toolchain {
299                job.toolchain = toolchain;
300            }
301            if let Some(priority) = update.priority {
302                job.priority = priority;
303            }
304            if let Some(enabled) = update.enabled {
305                job.enabled = enabled;
306            }
307
308            self.dirty.store(true, Ordering::Relaxed);
309            true
310        }; // RwLockWriteGuard dropped here, before any .await
311
312        if should_persist {
313            self.persist_jobs().await;
314        }
315        Ok(())
316    }
317
318    /// Toggle a job enabled/disabled.
319    pub async fn toggle_job(&self, id: Uuid, enabled: bool) -> Result<()> {
320        self.update_job(
321            id,
322            CronJobUpdate {
323                enabled: Some(enabled),
324                ..Default::default()
325            },
326        )
327        .await
328    }
329
330    /// List all jobs.
331    pub fn list_jobs(&self) -> Vec<CronJob> {
332        self.jobs.read().values().cloned().collect()
333    }
334
335    /// Get a single job.
336    pub fn get_job(&self, id: Uuid) -> Option<CronJob> {
337        self.jobs.read().get(&id).cloned()
338    }
339
340    /// Check if a job is currently running.
341    pub fn is_running(&self, id: Uuid) -> bool {
342        self.running_jobs.lock().contains(&id)
343    }
344
345    /// Trigger a job immediately (manual execution, ignores schedule).
346    /// Returns the job goal as a string for the caller to execute.
347    /// The caller is responsible for calling `mark_job_completed` after execution.
348    pub fn trigger_job(&self, id: Uuid) -> Result<CronJob> {
349        let job = self
350            .jobs
351            .read()
352            .get(&id)
353            .cloned()
354            .ok_or_else(|| anyhow::anyhow!("Job {} not found", id))?;
355
356        if self.running_jobs.lock().contains(&id) {
357            bail!("Job '{}' is already running", job.name);
358        }
359
360        self.running_jobs.lock().insert(id);
361        Ok(job)
362    }
363
364    /// Mark a job execution as completed.
365    pub async fn mark_job_completed(&self, id: Uuid, success: bool, summary: String) {
366        self.running_jobs.lock().remove(&id);
367        let new_next_run = {
368            let mut jobs = self.jobs.write();
369            if let Some(job) = jobs.get_mut(&id) {
370                job.last_run = Some(Utc::now());
371                job.last_result = Some(summary);
372                job.last_success = Some(success);
373                job.run_count += 1;
374                // Recompute next_run
375                let sched = self.schedules.lock().get(&id).cloned();
376                sched.and_then(|s| self.next_fire_time(&s, &Utc::now()))
377            } else {
378                None
379            }
380        };
381        if let Some(next_run) = new_next_run {
382            let mut jobs = self.jobs.write();
383            if let Some(job) = jobs.get_mut(&id) {
384                job.next_run = Some(next_run);
385            }
386        }
387        self.dirty.store(true, Ordering::Relaxed);
388        self.persist_jobs().await;
389    }
390
391    /// Stop the scheduler loop.
392    pub fn stop(&self) {
393        self.cancel.store(true, Ordering::Relaxed);
394        tracing::info!("Cron scheduler stop requested");
395    }
396
397    /// Start the main loop. Must be called on an `Arc<Self>`.
398    ///
399    /// # Arguments
400    /// * `executor` - Async closure `(Uuid, String) -> Fut` where args are `(job_id, goal)`,
401    ///   returning `(success, summary)`.
402    ///
403    /// # Example
404    ///
405    /// ```ignore
406    /// let scheduler = Arc::new(CronScheduler::new(state_store, 60));
407    /// scheduler.clone().start(|id, goal| async move {
408    ///     // execute the agent...
409    ///     (true, "Done".to_string())
410    /// }).await;
411    /// ```
412    pub async fn start<F, Fut>(self: Arc<Self>, executor: F)
413    where
414        F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
415        Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
416    {
417        let executor = Arc::new(executor);
418        let mut interval =
419            tokio::time::interval(std::time::Duration::from_secs(self.tick_interval_secs));
420
421        tracing::info!(
422            interval_secs = self.tick_interval_secs,
423            "Cron scheduler started"
424        );
425
426        loop {
427            tokio::select! {
428                _ = interval.tick() => {
429                    if self.cancel.load(Ordering::Relaxed) {
430                        tracing::info!("Cron scheduler stopped");
431                        return;
432                    }
433                    self.tick_inner(&executor).await;
434                }
435            }
436        }
437    }
438
439    /// Single tick: find due jobs and spawn execution.
440    ///
441    /// Enforces `max_concurrent_jobs` limit and `job_timeout_secs` per job.
442    async fn tick_inner<F, Fut>(&self, executor: &Arc<F>)
443    where
444        F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
445        Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
446    {
447        let now = Utc::now();
448
449        // Check current concurrency before spawning more
450        let current_running = self.running_jobs.lock().len();
451        if current_running >= self.max_concurrent_jobs {
452            tracing::debug!(
453                running = current_running,
454                max = self.max_concurrent_jobs,
455                "Cron tick: max concurrent jobs reached, skipping"
456            );
457            return;
458        }
459
460        let due: Vec<(Uuid, String)> = {
461            let jobs = self.jobs.read();
462            jobs.iter()
463                .filter(|(_, job)| {
464                    job.enabled
465                        && job.next_run.is_some_and(|nr| nr <= now)
466                        && !self.running_jobs.lock().contains(&job.id)
467                })
468                .map(|(_, job)| (job.id, job.goal.clone()))
469                .collect()
470        };
471
472        let total_due = due.len();
473        for (spawned, (id, goal)) in due.into_iter().enumerate() {
474            // Check concurrency limit before each spawn
475            if self.running_jobs.lock().len() >= self.max_concurrent_jobs {
476                tracing::info!(
477                    spawned,
478                    remaining = total_due - spawned,
479                    "Cron tick: max concurrent jobs reached, deferring remaining"
480                );
481                break;
482            }
483
484            self.running_jobs.lock().insert(id);
485            let exec = executor.clone();
486            let me = self.clone();
487            let timeout_secs = self.job_timeout_secs;
488            tokio::spawn(async move {
489                tracing::info!(%id, "Cron job triggered");
490                let result = tokio::time::timeout(
491                    std::time::Duration::from_secs(timeout_secs),
492                    exec(id, goal),
493                )
494                .await;
495
496                let (success, summary) = match result {
497                    Ok((s, m)) => (s, m),
498                    Err(_) => {
499                        tracing::error!(%id, timeout_secs, "Cron job timed out");
500                        (false, format!("Timed out after {} seconds", timeout_secs))
501                    }
502                };
503                tracing::info!(%id, success, "Cron job completed");
504                me.mark_job_completed(id, success, summary).await;
505            });
506        }
507    }
508
509    /// Persist all jobs to disk.
510    async fn persist_jobs(&self) {
511        let job_list: Vec<CronJob> = {
512            let jobs = self.jobs.read();
513            jobs.values().cloned().collect()
514        };
515        if let Err(e) = self.state_store.save_json("cron", "jobs", &job_list).await {
516            tracing::error!("Failed to persist cron jobs: {}", e);
517        }
518        // Fire-and-forget git commit if git layer is configured
519        if let Some(ref gl) = self.git_layer {
520            if gl.is_enabled() {
521                let _ = gl.commit_file("cron/jobs.json", "cron: update jobs");
522            }
523        }
524    }
525
526    /// Restore jobs from disk on startup.
527    pub async fn restore_jobs(&self) {
528        match self
529            .state_store
530            .load_json::<Vec<CronJob>>("cron", "jobs")
531            .await
532        {
533            Ok(Some(job_list)) => {
534                for mut job in job_list {
535                    // Re-parse schedule and recompute next_run
536                    match self.parse_schedule(&job.schedule) {
537                        Ok(schedule) => {
538                            job.next_run = self.next_fire_time(&schedule, &Utc::now());
539                            self.schedules.lock().insert(job.id, schedule);
540                            self.jobs.write().insert(job.id, job);
541                        }
542                        Err(e) => {
543                            tracing::error!(job = %job.name, error = %e, "Skipping job with invalid schedule");
544                        }
545                    }
546                }
547                tracing::info!(count = self.jobs.read().len(), "Cron jobs restored");
548            }
549            Ok(None) => {
550                tracing::info!("No saved cron jobs found");
551            }
552            Err(e) => {
553                tracing::error!("Failed to restore cron jobs: {}", e);
554            }
555        }
556    }
557
558    /// Load jobs defined in config (called during startup).
559    /// Config-defined jobs are only added if they don't already exist (API wins on conflict).
560    pub async fn load_from_config(&self, config: &CronConfig) {
561        if !config.enabled {
562            tracing::info!("Cron scheduler is disabled in config");
563            return;
564        }
565
566        for (name, inline) in &config.jobs {
567            let schedule = inline.schedule.clone();
568            let goal = inline.goal.clone();
569
570            let job = CronJob {
571                id: Uuid::new_v4(),
572                name: name.clone(),
573                schedule: schedule.clone(),
574                goal,
575                constraints: inline.constraints.clone(),
576                acceptance_criteria: inline.acceptance_criteria.clone(),
577                toolchain: inline.toolchain.clone(),
578                priority: inline.priority,
579                enabled: inline.enabled,
580                last_run: None,
581                next_run: None,
582                run_count: 0,
583                last_result: None,
584                last_success: None,
585                source: JobSource::Config,
586            };
587
588            // Check if a job with this name already exists (from API)
589            {
590                let jobs = self.jobs.read();
591                if jobs.values().any(|j| j.name == *name) {
592                    tracing::debug!(name = %name, "Skipping config job — already exists via API");
593                    continue;
594                }
595            }
596
597            if let Err(e) = self.add_job(job).await {
598                tracing::error!(name = %name, error = %e, "Failed to load config job");
599            } else {
600                tracing::info!(name = %name, "Loaded cron job from config");
601            }
602        }
603    }
604}
605
606impl Clone for CronScheduler {
607    fn clone(&self) -> Self {
608        Self {
609            jobs: self.jobs.clone(),
610            schedules: self.schedules.clone(),
611            running_jobs: self.running_jobs.clone(),
612            state_store: self.state_store.clone(),
613            cancel: self.cancel.clone(),
614            dirty: self.dirty.clone(),
615            tick_interval_secs: self.tick_interval_secs,
616            git_layer: self.git_layer.clone(),
617            max_concurrent_jobs: self.max_concurrent_jobs,
618            job_timeout_secs: self.job_timeout_secs,
619        }
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626    use chrono::Timelike;
627
628    fn test_store() -> Arc<StateStore> {
629        let temp_dir = tempfile::tempdir().unwrap();
630        Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap())
631    }
632
633    #[test]
634    fn test_normalize_5field() {
635        assert_eq!(CronScheduler::normalize_expr("0 9 * * *"), "0 0 9 * * *");
636    }
637
638    #[test]
639    fn test_normalize_6field() {
640        assert_eq!(CronScheduler::normalize_expr("0 0 9 * * *"), "0 0 9 * * *");
641    }
642
643    #[test]
644    fn test_normalize_7field() {
645        assert_eq!(
646            CronScheduler::normalize_expr("0 0 9 * * * 2026"),
647            "0 0 9 * * * 2026"
648        );
649    }
650
651    #[test]
652    fn test_parse_valid() {
653        let cs = CronScheduler::new(test_store(), 60);
654        assert!(cs.parse_schedule("0 9 * * *").is_ok());
655    }
656
657    #[test]
658    fn test_parse_invalid() {
659        let cs = CronScheduler::new(test_store(), 60);
660        assert!(cs.parse_schedule("invalid").is_err());
661    }
662
663    #[test]
664    fn test_next_fire_time_daily() {
665        let cs = CronScheduler::new(test_store(), 60);
666        let schedule = cs.parse_schedule("0 9 * * *").unwrap();
667        let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
668            .unwrap()
669            .and_hms_opt(8, 0, 0)
670            .unwrap();
671        let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
672        let next = cs.next_fire_time(&schedule, &now_utc);
673        assert!(next.is_some());
674        let next = next.unwrap();
675        assert_eq!(next.hour(), 9);
676    }
677
678    #[test]
679    fn test_next_fire_time_every_15min() {
680        let cs = CronScheduler::new(test_store(), 60);
681        let schedule = cs.parse_schedule("*/15 * * * *").unwrap();
682        let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
683            .unwrap()
684            .and_hms_opt(10, 7, 0)
685            .unwrap();
686        let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
687        let next = cs.next_fire_time(&schedule, &now_utc);
688        assert!(next.is_some());
689        let next = next.unwrap();
690        assert_eq!(next.minute(), 15);
691    }
692
693    #[test]
694    fn test_add_job_computes_next_run() {
695        let job = CronJob::new("test".into(), "0 9 * * *".into(), "Test goal".into());
696        assert!(job.next_run.is_none()); // not computed yet
697        assert!(job.enabled);
698        assert_eq!(job.run_count, 0);
699    }
700
701    #[test]
702    fn test_job_source_default() {
703        let job = CronJob::new("test".into(), "0 9 * * *".into(), "goal".into());
704        assert_eq!(job.source, JobSource::Api);
705    }
706
707    #[tokio::test]
708    async fn test_add_job() {
709        let store = test_store();
710        let cs = CronScheduler::new(store, 60);
711        let job = CronJob::new("test-job".into(), "0 9 * * *".into(), "Run me".into());
712        let id = cs.add_job(job).await.unwrap();
713        assert!(cs.get_job(id).is_some());
714        assert_eq!(cs.list_jobs().len(), 1);
715    }
716
717    #[tokio::test]
718    async fn test_remove_job() {
719        let store = test_store();
720        let cs = CronScheduler::new(store, 60);
721        let job = CronJob::new("remove-me".into(), "0 10 * * *".into(), "Gone".into());
722        let id = cs.add_job(job).await.unwrap();
723        cs.remove_job(id).await.unwrap();
724        assert!(cs.get_job(id).is_none());
725    }
726
727    #[tokio::test]
728    async fn test_trigger_job() {
729        let store = test_store();
730        let cs = CronScheduler::new(store, 60);
731        let job = CronJob::new("trigger-me".into(), "0 11 * * *".into(), "Goal text".into());
732        let id = cs.add_job(job).await.unwrap();
733
734        let triggered = cs.trigger_job(id).unwrap();
735        assert_eq!(triggered.goal, "Goal text");
736        assert!(cs.is_running(id));
737
738        cs.mark_job_completed(id, true, "ok".into()).await;
739        assert!(!cs.is_running(id));
740    }
741
742    #[tokio::test]
743    async fn test_trigger_already_running() {
744        let store = test_store();
745        let cs = CronScheduler::new(store, 60);
746        let job = CronJob::new("running".into(), "0 12 * * *".into(), "goal".into());
747        let id = cs.add_job(job).await.unwrap();
748        cs.trigger_job(id).unwrap();
749        let result = cs.trigger_job(id);
750        assert!(result.is_err());
751    }
752
753    #[tokio::test]
754    async fn test_update_job() {
755        let store = test_store();
756        let cs = CronScheduler::new(store, 60);
757        let job = CronJob::new("old-name".into(), "0 9 * * *".into(), "old goal".into());
758        let id = cs.add_job(job).await.unwrap();
759
760        cs.update_job(
761            id,
762            CronJobUpdate {
763                name: Some("new-name".into()),
764                goal: Some("new goal".into()),
765                enabled: Some(false),
766                ..Default::default()
767            },
768        )
769        .await
770        .unwrap();
771
772        let updated = cs.get_job(id).unwrap();
773        assert_eq!(updated.name, "new-name");
774        assert_eq!(updated.goal, "new goal");
775        assert!(!updated.enabled);
776    }
777
778    #[tokio::test]
779    async fn test_toggle_job() {
780        let store = test_store();
781        let cs = CronScheduler::new(store, 60);
782        let job = CronJob::new("toggle".into(), "0 9 * * *".into(), "goal".into());
783        let id = cs.add_job(job).await.unwrap();
784        assert!(cs.get_job(id).unwrap().enabled);
785
786        cs.toggle_job(id, false).await.unwrap();
787        assert!(!cs.get_job(id).unwrap().enabled);
788
789        cs.toggle_job(id, true).await.unwrap();
790        assert!(cs.get_job(id).unwrap().enabled);
791    }
792
793    #[tokio::test]
794    async fn test_mark_completed_updates_next_run() {
795        let store = test_store();
796        let cs = CronScheduler::new(store, 60);
797        let job = CronJob::new("comp".into(), "*/5 * * * *".into(), "goal".into());
798        let id = cs.add_job(job).await.unwrap();
799
800        let before = cs.get_job(id).unwrap().next_run;
801        assert!(before.is_some());
802
803        // Simulate time passing: set next_run to 5 minutes in the past
804        let now = Utc::now();
805        {
806            let mut jobs = cs.jobs.write();
807            if let Some(j) = jobs.get_mut(&id) {
808                j.next_run = Some(now - chrono::Duration::minutes(5));
809            }
810        }
811
812        cs.mark_job_completed(id, true, "ok".into()).await;
813        let after = cs.get_job(id).unwrap().next_run;
814        assert!(after.is_some());
815        // After completion, next_run should be set to a future time (>= now)
816        assert!(after.unwrap() >= now);
817    }
818
819    #[test]
820    fn test_max_concurrent_enforced() {
821        let temp_dir = tempfile::tempdir().unwrap();
822        let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
823        let mut scheduler = CronScheduler::new(store, 60);
824        scheduler.set_max_concurrent_jobs(2);
825        assert_eq!(scheduler.max_concurrent_jobs, 2);
826    }
827
828    #[test]
829    fn test_job_timeout_configurable() {
830        let temp_dir = tempfile::tempdir().unwrap();
831        let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
832        let mut scheduler = CronScheduler::new(store, 60);
833        scheduler.set_job_timeout_secs(300);
834        assert_eq!(scheduler.job_timeout_secs, 300);
835    }
836}