1use crate::config::CronConfig;
7use crate::git_layer::GitLayer;
8use crate::scheduler::Priority;
9use crate::state_store::StateStore;
10use anyhow::{bail, Result};
11use chrono::{DateTime, Utc};
12use cron::Schedule;
13use parking_lot::{Mutex, RwLock};
14use serde::{Deserialize, Serialize};
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19use uuid::Uuid;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
25#[serde(rename_all = "lowercase")]
26pub enum JobSource {
27 Config,
29 #[default]
31 Api,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct CronJob {
37 pub id: Uuid,
39 pub name: String,
41 pub schedule: String,
43 pub goal: String,
45 #[serde(default)]
47 pub constraints: Vec<String>,
48 #[serde(default)]
50 pub acceptance_criteria: Vec<String>,
51 #[serde(default = "default_toolchain")]
53 pub toolchain: String,
54 #[serde(default)]
56 pub priority: Priority,
57 #[serde(default = "default_true")]
59 pub enabled: bool,
60 #[serde(skip_serializing_if = "Option::is_none")]
62 pub last_run: Option<DateTime<Utc>>,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub next_run: Option<DateTime<Utc>>,
66 #[serde(default)]
68 pub run_count: u64,
69 #[serde(skip_serializing_if = "Option::is_none")]
71 pub last_result: Option<String>,
72 #[serde(skip_serializing_if = "Option::is_none")]
74 pub last_success: Option<bool>,
75 #[serde(default)]
77 pub source: JobSource,
78}
79
80fn default_toolchain() -> String {
81 "default".into()
82}
83
84fn default_true() -> bool {
85 true
86}
87
88impl CronJob {
89 pub fn new(name: String, schedule: String, goal: String) -> Self {
91 Self {
92 id: Uuid::new_v4(),
93 name,
94 schedule,
95 goal,
96 constraints: vec![],
97 acceptance_criteria: vec![],
98 toolchain: default_toolchain(),
99 priority: Priority::default(),
100 enabled: true,
101 last_run: None,
102 next_run: None,
103 run_count: 0,
104 last_result: None,
105 last_success: None,
106 source: JobSource::Api,
107 }
108 }
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct CronJobResult {
114 pub job_id: Uuid,
116 pub job_name: String,
118 pub started_at: DateTime<Utc>,
120 pub finished_at: DateTime<Utc>,
122 pub success: bool,
124 pub summary: String,
126}
127
128#[derive(Debug, Default, Deserialize)]
130pub struct CronJobUpdate {
131 pub name: Option<String>,
133 pub schedule: Option<String>,
135 pub goal: Option<String>,
137 pub constraints: Option<Vec<String>>,
139 pub acceptance_criteria: Option<Vec<String>>,
141 pub toolchain: Option<String>,
143 pub priority: Option<Priority>,
145 pub enabled: Option<bool>,
147}
148
149pub struct CronScheduler {
156 jobs: Arc<RwLock<HashMap<Uuid, CronJob>>>,
157 schedules: Arc<Mutex<HashMap<Uuid, Schedule>>>,
158 running_jobs: Arc<Mutex<HashSet<Uuid>>>,
159 state_store: Arc<StateStore>,
160 cancel: Arc<AtomicBool>,
161 dirty: Arc<AtomicBool>,
162 tick_interval_secs: u64,
163 git_layer: Option<Arc<GitLayer>>,
165 max_concurrent_jobs: usize,
167 job_timeout_secs: u64,
169}
170
171impl CronScheduler {
172 pub fn new(state_store: Arc<StateStore>, tick_interval_secs: u64) -> Self {
178 Self {
179 jobs: Arc::new(RwLock::new(HashMap::new())),
180 schedules: Arc::new(Mutex::new(HashMap::new())),
181 running_jobs: Arc::new(Mutex::new(HashSet::new())),
182 state_store,
183 cancel: Arc::new(AtomicBool::new(false)),
184 dirty: Arc::new(AtomicBool::new(false)),
185 tick_interval_secs,
186 git_layer: None,
187 max_concurrent_jobs: 3,
188 job_timeout_secs: 600,
189 }
190 }
191
192 pub fn set_max_concurrent_jobs(&mut self, max: usize) {
194 self.max_concurrent_jobs = max;
195 }
196
197 pub fn set_job_timeout_secs(&mut self, secs: u64) {
199 self.job_timeout_secs = secs;
200 }
201
202 pub fn set_git_layer(&mut self, gl: Arc<GitLayer>) {
204 self.git_layer = Some(gl);
205 }
206
207 fn normalize_expr(expr: &str) -> String {
209 let fields: Vec<&str> = expr.split_whitespace().collect();
210 match fields.len() {
211 5 => format!("0 {expr}"),
212 _ => expr.to_string(),
213 }
214 }
215
216 fn parse_schedule(&self, expr: &str) -> Result<Schedule> {
218 let normalized = Self::normalize_expr(expr);
219 Schedule::from_str(&normalized)
220 .map_err(|e| anyhow::anyhow!("Invalid cron expression '{expr}': {e}"))
221 }
222
223 fn next_fire_time(&self, schedule: &Schedule, after: &DateTime<Utc>) -> Option<DateTime<Utc>> {
225 schedule.after(after).next()
226 }
227
228 pub async fn add_job(&self, job: CronJob) -> Result<Uuid> {
230 let schedule = self.parse_schedule(&job.schedule)?;
231 let next = self.next_fire_time(&schedule, &Utc::now());
232 let id = job.id;
233
234 self.schedules.lock().insert(id, schedule);
235 self.jobs.write().insert(
236 id,
237 CronJob {
238 next_run: next,
239 ..job
240 },
241 );
242 self.dirty.store(true, Ordering::Relaxed);
243 self.persist_jobs().await;
244
245 tracing::info!(
246 name = %self.jobs.read().get(&id).map(|j| j.name.as_str()).unwrap_or("?"),
247 %id,
248 "Cron job added"
249 );
250 Ok(id)
251 }
252
253 pub async fn remove_job(&self, id: Uuid) -> Result<()> {
255 self.schedules.lock().remove(&id);
256 self.jobs
257 .write()
258 .remove(&id)
259 .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
260 self.dirty.store(true, Ordering::Relaxed);
261 self.persist_jobs().await;
262 tracing::info!(%id, "Cron job removed");
263 Ok(())
264 }
265
266 pub async fn update_job(&self, id: Uuid, update: CronJobUpdate) -> Result<()> {
268 let should_persist = {
271 let mut jobs = self.jobs.write();
272 let job = jobs
273 .get_mut(&id)
274 .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
275
276 if let Some(name) = update.name {
277 job.name = name;
278 }
279 if let Some(schedule) = &update.schedule {
280 let parsed = self.parse_schedule(schedule)?;
281 self.schedules.lock().insert(id, parsed);
282 job.schedule = schedule.clone();
283 let sched = self.schedules.lock().get(&id).cloned();
285 if let Some(s) = sched {
286 job.next_run = self.next_fire_time(&s, &Utc::now());
287 }
288 }
289 if let Some(goal) = update.goal {
290 job.goal = goal;
291 }
292 if let Some(constraints) = update.constraints {
293 job.constraints = constraints;
294 }
295 if let Some(criteria) = update.acceptance_criteria {
296 job.acceptance_criteria = criteria;
297 }
298 if let Some(toolchain) = update.toolchain {
299 job.toolchain = toolchain;
300 }
301 if let Some(priority) = update.priority {
302 job.priority = priority;
303 }
304 if let Some(enabled) = update.enabled {
305 job.enabled = enabled;
306 }
307
308 self.dirty.store(true, Ordering::Relaxed);
309 true
310 }; if should_persist {
313 self.persist_jobs().await;
314 }
315 Ok(())
316 }
317
318 pub async fn toggle_job(&self, id: Uuid, enabled: bool) -> Result<()> {
320 self.update_job(
321 id,
322 CronJobUpdate {
323 enabled: Some(enabled),
324 ..Default::default()
325 },
326 )
327 .await
328 }
329
330 pub fn list_jobs(&self) -> Vec<CronJob> {
332 self.jobs.read().values().cloned().collect()
333 }
334
335 pub fn get_job(&self, id: Uuid) -> Option<CronJob> {
337 self.jobs.read().get(&id).cloned()
338 }
339
340 pub fn is_running(&self, id: Uuid) -> bool {
342 self.running_jobs.lock().contains(&id)
343 }
344
345 pub fn trigger_job(&self, id: Uuid) -> Result<CronJob> {
349 let job = self
350 .jobs
351 .read()
352 .get(&id)
353 .cloned()
354 .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
355
356 if self.running_jobs.lock().contains(&id) {
357 bail!("Job '{}' is already running", job.name);
358 }
359
360 self.running_jobs.lock().insert(id);
361 Ok(job)
362 }
363
364 pub async fn mark_job_completed(&self, id: Uuid, success: bool, summary: String) {
366 self.running_jobs.lock().remove(&id);
367 let new_next_run = {
368 let mut jobs = self.jobs.write();
369 if let Some(job) = jobs.get_mut(&id) {
370 job.last_run = Some(Utc::now());
371 job.last_result = Some(summary);
372 job.last_success = Some(success);
373 job.run_count += 1;
374 let sched = self.schedules.lock().get(&id).cloned();
376 sched.and_then(|s| self.next_fire_time(&s, &Utc::now()))
377 } else {
378 None
379 }
380 };
381 if let Some(next_run) = new_next_run {
382 let mut jobs = self.jobs.write();
383 if let Some(job) = jobs.get_mut(&id) {
384 job.next_run = Some(next_run);
385 }
386 }
387 self.dirty.store(true, Ordering::Relaxed);
388 self.persist_jobs().await;
389 }
390
391 pub fn stop(&self) {
393 self.cancel.store(true, Ordering::Relaxed);
394 tracing::info!("Cron scheduler stop requested");
395 }
396
397 pub async fn start<F, Fut>(self: Arc<Self>, executor: F)
420 where
421 F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
422 Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
423 {
424 let executor = Arc::new(executor);
425 let mut interval =
426 tokio::time::interval(std::time::Duration::from_secs(self.tick_interval_secs));
427
428 tracing::info!(
429 interval_secs = self.tick_interval_secs,
430 "Cron scheduler started"
431 );
432
433 loop {
434 tokio::select! {
435 _ = interval.tick() => {
436 if self.cancel.load(Ordering::Relaxed) {
437 tracing::info!("Cron scheduler stopped");
438 return;
439 }
440 self.tick_inner(&executor).await;
441 }
442 }
443 }
444 }
445
446 async fn tick_inner<F, Fut>(&self, executor: &Arc<F>)
450 where
451 F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
452 Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
453 {
454 let now = Utc::now();
455
456 let current_running = self.running_jobs.lock().len();
458 if current_running >= self.max_concurrent_jobs {
459 tracing::debug!(
460 running = current_running,
461 max = self.max_concurrent_jobs,
462 "Cron tick: max concurrent jobs reached, skipping"
463 );
464 return;
465 }
466
467 let due: Vec<(Uuid, String)> = {
468 let jobs = self.jobs.read();
469 jobs.iter()
470 .filter(|(_, job)| {
471 job.enabled
472 && job.next_run.is_some_and(|nr| nr <= now)
473 && !self.running_jobs.lock().contains(&job.id)
474 })
475 .map(|(_, job)| (job.id, job.goal.clone()))
476 .collect()
477 };
478
479 let total_due = due.len();
480 for (spawned, (id, goal)) in due.into_iter().enumerate() {
481 if self.running_jobs.lock().len() >= self.max_concurrent_jobs {
483 tracing::info!(
484 spawned,
485 remaining = total_due - spawned,
486 "Cron tick: max concurrent jobs reached, deferring remaining"
487 );
488 break;
489 }
490
491 self.running_jobs.lock().insert(id);
492 let exec = executor.clone();
493 let me = self.clone();
494 let timeout_secs = self.job_timeout_secs;
495 tokio::spawn(async move {
496 tracing::info!(%id, "Cron job triggered");
497 let result = tokio::time::timeout(
498 std::time::Duration::from_secs(timeout_secs),
499 exec(id, goal),
500 )
501 .await;
502
503 let (success, summary) = match result {
504 Ok((s, m)) => (s, m),
505 Err(_) => {
506 tracing::error!(%id, timeout_secs, "Cron job timed out");
507 (false, format!("Timed out after {timeout_secs} seconds"))
508 }
509 };
510 tracing::info!(%id, success, "Cron job completed");
511 me.mark_job_completed(id, success, summary).await;
512 });
513 }
514 }
515
516 async fn persist_jobs(&self) {
518 let job_list: Vec<CronJob> = {
519 let jobs = self.jobs.read();
520 jobs.values().cloned().collect()
521 };
522 if let Err(e) = self.state_store.save_json("cron", "jobs", &job_list).await {
523 tracing::error!("Failed to persist cron jobs: {}", e);
524 }
525 if let Some(ref gl) = self.git_layer {
527 if gl.is_enabled() {
528 let _ = gl.commit_file("cron/jobs.json", "cron: update jobs");
529 }
530 }
531 }
532
533 pub async fn restore_jobs(&self) {
535 match self
536 .state_store
537 .load_json::<Vec<CronJob>>("cron", "jobs")
538 .await
539 {
540 Ok(Some(job_list)) => {
541 for mut job in job_list {
542 match self.parse_schedule(&job.schedule) {
544 Ok(schedule) => {
545 job.next_run = self.next_fire_time(&schedule, &Utc::now());
546 self.schedules.lock().insert(job.id, schedule);
547 self.jobs.write().insert(job.id, job);
548 }
549 Err(e) => {
550 tracing::error!(job = %job.name, error = %e, "Skipping job with invalid schedule");
551 }
552 }
553 }
554 tracing::info!(count = self.jobs.read().len(), "Cron jobs restored");
555 }
556 Ok(None) => {
557 tracing::info!("No saved cron jobs found");
558 }
559 Err(e) => {
560 tracing::error!("Failed to restore cron jobs: {}", e);
561 }
562 }
563 }
564
565 pub async fn load_from_config(&self, config: &CronConfig) {
568 if !config.enabled {
569 tracing::info!("Cron scheduler is disabled in config");
570 return;
571 }
572
573 for (name, inline) in &config.jobs {
574 let schedule = inline.schedule.clone();
575 let goal = inline.goal.clone();
576
577 let job = CronJob {
578 id: Uuid::new_v4(),
579 name: name.clone(),
580 schedule: schedule.clone(),
581 goal,
582 constraints: inline.constraints.clone(),
583 acceptance_criteria: inline.acceptance_criteria.clone(),
584 toolchain: inline.toolchain.clone(),
585 priority: inline.priority,
586 enabled: inline.enabled,
587 last_run: None,
588 next_run: None,
589 run_count: 0,
590 last_result: None,
591 last_success: None,
592 source: JobSource::Config,
593 };
594
595 {
597 let jobs = self.jobs.read();
598 if jobs.values().any(|j| j.name == *name) {
599 tracing::debug!(name = %name, "Skipping config job — already exists via API");
600 continue;
601 }
602 }
603
604 if let Err(e) = self.add_job(job).await {
605 tracing::error!(name = %name, error = %e, "Failed to load config job");
606 } else {
607 tracing::info!(name = %name, "Loaded cron job from config");
608 }
609 }
610 }
611}
612
613impl Clone for CronScheduler {
614 fn clone(&self) -> Self {
615 Self {
616 jobs: self.jobs.clone(),
617 schedules: self.schedules.clone(),
618 running_jobs: self.running_jobs.clone(),
619 state_store: self.state_store.clone(),
620 cancel: self.cancel.clone(),
621 dirty: self.dirty.clone(),
622 tick_interval_secs: self.tick_interval_secs,
623 git_layer: self.git_layer.clone(),
624 max_concurrent_jobs: self.max_concurrent_jobs,
625 job_timeout_secs: self.job_timeout_secs,
626 }
627 }
628}
629
630#[cfg(test)]
631mod tests {
632 use super::*;
633 use chrono::Timelike;
634
635 fn test_store() -> Arc<StateStore> {
636 let temp_dir = tempfile::tempdir().unwrap();
637 Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap())
638 }
639
640 #[test]
641 fn test_normalize_5field() {
642 assert_eq!(CronScheduler::normalize_expr("0 9 * * *"), "0 0 9 * * *");
643 }
644
645 #[test]
646 fn test_normalize_6field() {
647 assert_eq!(CronScheduler::normalize_expr("0 0 9 * * *"), "0 0 9 * * *");
648 }
649
650 #[test]
651 fn test_normalize_7field() {
652 assert_eq!(
653 CronScheduler::normalize_expr("0 0 9 * * * 2026"),
654 "0 0 9 * * * 2026"
655 );
656 }
657
658 #[test]
659 fn test_parse_valid() {
660 let cs = CronScheduler::new(test_store(), 60);
661 assert!(cs.parse_schedule("0 9 * * *").is_ok());
662 }
663
664 #[test]
665 fn test_parse_invalid() {
666 let cs = CronScheduler::new(test_store(), 60);
667 assert!(cs.parse_schedule("invalid").is_err());
668 }
669
670 #[test]
671 fn test_next_fire_time_daily() {
672 let cs = CronScheduler::new(test_store(), 60);
673 let schedule = cs.parse_schedule("0 9 * * *").unwrap();
674 let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
675 .unwrap()
676 .and_hms_opt(8, 0, 0)
677 .unwrap();
678 let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
679 let next = cs.next_fire_time(&schedule, &now_utc);
680 assert!(next.is_some());
681 let next = next.unwrap();
682 assert_eq!(next.hour(), 9);
683 }
684
685 #[test]
686 fn test_next_fire_time_every_15min() {
687 let cs = CronScheduler::new(test_store(), 60);
688 let schedule = cs.parse_schedule("*/15 * * * *").unwrap();
689 let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
690 .unwrap()
691 .and_hms_opt(10, 7, 0)
692 .unwrap();
693 let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
694 let next = cs.next_fire_time(&schedule, &now_utc);
695 assert!(next.is_some());
696 let next = next.unwrap();
697 assert_eq!(next.minute(), 15);
698 }
699
700 #[test]
701 fn test_add_job_computes_next_run() {
702 let job = CronJob::new("test".into(), "0 9 * * *".into(), "Test goal".into());
703 assert!(job.next_run.is_none()); assert!(job.enabled);
705 assert_eq!(job.run_count, 0);
706 }
707
708 #[test]
709 fn test_job_source_default() {
710 let job = CronJob::new("test".into(), "0 9 * * *".into(), "goal".into());
711 assert_eq!(job.source, JobSource::Api);
712 }
713
714 #[tokio::test]
715 async fn test_add_job() {
716 let store = test_store();
717 let cs = CronScheduler::new(store, 60);
718 let job = CronJob::new("test-job".into(), "0 9 * * *".into(), "Run me".into());
719 let id = cs.add_job(job).await.unwrap();
720 assert!(cs.get_job(id).is_some());
721 assert_eq!(cs.list_jobs().len(), 1);
722 }
723
724 #[tokio::test]
725 async fn test_remove_job() {
726 let store = test_store();
727 let cs = CronScheduler::new(store, 60);
728 let job = CronJob::new("remove-me".into(), "0 10 * * *".into(), "Gone".into());
729 let id = cs.add_job(job).await.unwrap();
730 cs.remove_job(id).await.unwrap();
731 assert!(cs.get_job(id).is_none());
732 }
733
734 #[tokio::test]
735 async fn test_trigger_job() {
736 let store = test_store();
737 let cs = CronScheduler::new(store, 60);
738 let job = CronJob::new("trigger-me".into(), "0 11 * * *".into(), "Goal text".into());
739 let id = cs.add_job(job).await.unwrap();
740
741 let triggered = cs.trigger_job(id).unwrap();
742 assert_eq!(triggered.goal, "Goal text");
743 assert!(cs.is_running(id));
744
745 cs.mark_job_completed(id, true, "ok".into()).await;
746 assert!(!cs.is_running(id));
747 }
748
749 #[tokio::test]
750 async fn test_trigger_already_running() {
751 let store = test_store();
752 let cs = CronScheduler::new(store, 60);
753 let job = CronJob::new("running".into(), "0 12 * * *".into(), "goal".into());
754 let id = cs.add_job(job).await.unwrap();
755 cs.trigger_job(id).unwrap();
756 let result = cs.trigger_job(id);
757 assert!(result.is_err());
758 }
759
760 #[tokio::test]
761 async fn test_update_job() {
762 let store = test_store();
763 let cs = CronScheduler::new(store, 60);
764 let job = CronJob::new("old-name".into(), "0 9 * * *".into(), "old goal".into());
765 let id = cs.add_job(job).await.unwrap();
766
767 cs.update_job(
768 id,
769 CronJobUpdate {
770 name: Some("new-name".into()),
771 goal: Some("new goal".into()),
772 enabled: Some(false),
773 ..Default::default()
774 },
775 )
776 .await
777 .unwrap();
778
779 let updated = cs.get_job(id).unwrap();
780 assert_eq!(updated.name, "new-name");
781 assert_eq!(updated.goal, "new goal");
782 assert!(!updated.enabled);
783 }
784
785 #[tokio::test]
786 async fn test_toggle_job() {
787 let store = test_store();
788 let cs = CronScheduler::new(store, 60);
789 let job = CronJob::new("toggle".into(), "0 9 * * *".into(), "goal".into());
790 let id = cs.add_job(job).await.unwrap();
791 assert!(cs.get_job(id).unwrap().enabled);
792
793 cs.toggle_job(id, false).await.unwrap();
794 assert!(!cs.get_job(id).unwrap().enabled);
795
796 cs.toggle_job(id, true).await.unwrap();
797 assert!(cs.get_job(id).unwrap().enabled);
798 }
799
800 #[tokio::test]
801 async fn test_mark_completed_updates_next_run() {
802 let store = test_store();
803 let cs = CronScheduler::new(store, 60);
804 let job = CronJob::new("comp".into(), "*/5 * * * *".into(), "goal".into());
805 let id = cs.add_job(job).await.unwrap();
806
807 let before = cs.get_job(id).unwrap().next_run;
808 assert!(before.is_some());
809
810 let now = Utc::now();
812 {
813 let mut jobs = cs.jobs.write();
814 if let Some(j) = jobs.get_mut(&id) {
815 j.next_run = Some(now - chrono::Duration::minutes(5));
816 }
817 }
818
819 cs.mark_job_completed(id, true, "ok".into()).await;
820 let after = cs.get_job(id).unwrap().next_run;
821 assert!(after.is_some());
822 assert!(after.unwrap() >= now);
824 }
825
826 #[test]
827 fn test_max_concurrent_enforced() {
828 let temp_dir = tempfile::tempdir().unwrap();
829 let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
830 let mut scheduler = CronScheduler::new(store, 60);
831 scheduler.set_max_concurrent_jobs(2);
832 assert_eq!(scheduler.max_concurrent_jobs, 2);
833 }
834
835 #[test]
836 fn test_job_timeout_configurable() {
837 let temp_dir = tempfile::tempdir().unwrap();
838 let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
839 let mut scheduler = CronScheduler::new(store, 60);
840 scheduler.set_job_timeout_secs(300);
841 assert_eq!(scheduler.job_timeout_secs, 300);
842 }
843}