1use crate::config::CronConfig;
7use crate::git_layer::GitLayer;
8use crate::scheduler::Priority;
9use crate::state_store::StateStore;
10use anyhow::{bail, Result};
11use chrono::{DateTime, Utc};
12use cron::Schedule;
13use parking_lot::{Mutex, RwLock};
14use serde::{Deserialize, Serialize};
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19use uuid::Uuid;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
25#[serde(rename_all = "lowercase")]
26pub enum JobSource {
27 Config,
29 #[default]
31 Api,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct CronJob {
37 pub id: Uuid,
39 pub name: String,
41 pub schedule: String,
43 pub goal: String,
45 #[serde(default)]
47 pub constraints: Vec<String>,
48 #[serde(default)]
50 pub acceptance_criteria: Vec<String>,
51 #[serde(default = "default_toolchain")]
53 pub toolchain: String,
54 #[serde(default)]
56 pub priority: Priority,
57 #[serde(default = "default_true")]
59 pub enabled: bool,
60 #[serde(skip_serializing_if = "Option::is_none")]
62 pub last_run: Option<DateTime<Utc>>,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub next_run: Option<DateTime<Utc>>,
66 #[serde(default)]
68 pub run_count: u64,
69 #[serde(skip_serializing_if = "Option::is_none")]
71 pub last_result: Option<String>,
72 #[serde(skip_serializing_if = "Option::is_none")]
74 pub last_success: Option<bool>,
75 #[serde(default)]
77 pub source: JobSource,
78}
79
80fn default_toolchain() -> String {
81 "default".into()
82}
83
84fn default_true() -> bool {
85 true
86}
87
88impl CronJob {
89 pub fn new(name: String, schedule: String, goal: String) -> Self {
91 Self {
92 id: Uuid::new_v4(),
93 name,
94 schedule,
95 goal,
96 constraints: vec![],
97 acceptance_criteria: vec![],
98 toolchain: default_toolchain(),
99 priority: Priority::default(),
100 enabled: true,
101 last_run: None,
102 next_run: None,
103 run_count: 0,
104 last_result: None,
105 last_success: None,
106 source: JobSource::Api,
107 }
108 }
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct CronJobResult {
114 pub job_id: Uuid,
116 pub job_name: String,
118 pub started_at: DateTime<Utc>,
120 pub finished_at: DateTime<Utc>,
122 pub success: bool,
124 pub summary: String,
126}
127
128#[derive(Debug, Default, Deserialize)]
130pub struct CronJobUpdate {
131 pub name: Option<String>,
133 pub schedule: Option<String>,
135 pub goal: Option<String>,
137 pub constraints: Option<Vec<String>>,
139 pub acceptance_criteria: Option<Vec<String>>,
141 pub toolchain: Option<String>,
143 pub priority: Option<Priority>,
145 pub enabled: Option<bool>,
147}
148
149pub struct CronScheduler {
156 jobs: Arc<RwLock<HashMap<Uuid, CronJob>>>,
157 schedules: Arc<Mutex<HashMap<Uuid, Schedule>>>,
158 running_jobs: Arc<Mutex<HashSet<Uuid>>>,
159 state_store: Arc<StateStore>,
160 cancel: Arc<AtomicBool>,
161 dirty: Arc<AtomicBool>,
162 tick_interval_secs: u64,
163 git_layer: Option<Arc<GitLayer>>,
165 max_concurrent_jobs: usize,
167 job_timeout_secs: u64,
169}
170
171impl CronScheduler {
172 pub fn new(state_store: Arc<StateStore>, tick_interval_secs: u64) -> Self {
178 Self {
179 jobs: Arc::new(RwLock::new(HashMap::new())),
180 schedules: Arc::new(Mutex::new(HashMap::new())),
181 running_jobs: Arc::new(Mutex::new(HashSet::new())),
182 state_store,
183 cancel: Arc::new(AtomicBool::new(false)),
184 dirty: Arc::new(AtomicBool::new(false)),
185 tick_interval_secs,
186 git_layer: None,
187 max_concurrent_jobs: 3,
188 job_timeout_secs: 600,
189 }
190 }
191
192 pub fn set_max_concurrent_jobs(&mut self, max: usize) {
194 self.max_concurrent_jobs = max;
195 }
196
197 pub fn set_job_timeout_secs(&mut self, secs: u64) {
199 self.job_timeout_secs = secs;
200 }
201
202 pub fn set_git_layer(&mut self, gl: Arc<GitLayer>) {
204 self.git_layer = Some(gl);
205 }
206
207 fn normalize_expr(expr: &str) -> String {
209 let fields: Vec<&str> = expr.split_whitespace().collect();
210 match fields.len() {
211 5 => format!("0 {}", expr),
212 _ => expr.to_string(),
213 }
214 }
215
216 fn parse_schedule(&self, expr: &str) -> Result<Schedule> {
218 let normalized = Self::normalize_expr(expr);
219 Schedule::from_str(&normalized)
220 .map_err(|e| anyhow::anyhow!("Invalid cron expression '{}': {}", expr, e))
221 }
222
223 fn next_fire_time(&self, schedule: &Schedule, after: &DateTime<Utc>) -> Option<DateTime<Utc>> {
225 schedule.after(after).next()
226 }
227
228 pub async fn add_job(&self, job: CronJob) -> Result<Uuid> {
230 let schedule = self.parse_schedule(&job.schedule)?;
231 let next = self.next_fire_time(&schedule, &Utc::now());
232 let id = job.id;
233
234 self.schedules.lock().insert(id, schedule);
235 self.jobs.write().insert(
236 id,
237 CronJob {
238 next_run: next,
239 ..job
240 },
241 );
242 self.dirty.store(true, Ordering::Relaxed);
243 self.persist_jobs().await;
244
245 tracing::info!(
246 name = %self.jobs.read().get(&id).map(|j| j.name.as_str()).unwrap_or("?"),
247 %id,
248 "Cron job added"
249 );
250 Ok(id)
251 }
252
253 pub async fn remove_job(&self, id: Uuid) -> Result<()> {
255 self.schedules.lock().remove(&id);
256 self.jobs
257 .write()
258 .remove(&id)
259 .ok_or_else(|| anyhow::anyhow!("Job {} not found", id))?;
260 self.dirty.store(true, Ordering::Relaxed);
261 self.persist_jobs().await;
262 tracing::info!(%id, "Cron job removed");
263 Ok(())
264 }
265
266 pub async fn update_job(&self, id: Uuid, update: CronJobUpdate) -> Result<()> {
268 let should_persist = {
271 let mut jobs = self.jobs.write();
272 let job = jobs
273 .get_mut(&id)
274 .ok_or_else(|| anyhow::anyhow!("Job {} not found", id))?;
275
276 if let Some(name) = update.name {
277 job.name = name;
278 }
279 if let Some(schedule) = &update.schedule {
280 let parsed = self.parse_schedule(schedule)?;
281 self.schedules.lock().insert(id, parsed);
282 job.schedule = schedule.clone();
283 let sched = self.schedules.lock().get(&id).cloned();
285 if let Some(s) = sched {
286 job.next_run = self.next_fire_time(&s, &Utc::now());
287 }
288 }
289 if let Some(goal) = update.goal {
290 job.goal = goal;
291 }
292 if let Some(constraints) = update.constraints {
293 job.constraints = constraints;
294 }
295 if let Some(criteria) = update.acceptance_criteria {
296 job.acceptance_criteria = criteria;
297 }
298 if let Some(toolchain) = update.toolchain {
299 job.toolchain = toolchain;
300 }
301 if let Some(priority) = update.priority {
302 job.priority = priority;
303 }
304 if let Some(enabled) = update.enabled {
305 job.enabled = enabled;
306 }
307
308 self.dirty.store(true, Ordering::Relaxed);
309 true
310 }; if should_persist {
313 self.persist_jobs().await;
314 }
315 Ok(())
316 }
317
318 pub async fn toggle_job(&self, id: Uuid, enabled: bool) -> Result<()> {
320 self.update_job(
321 id,
322 CronJobUpdate {
323 enabled: Some(enabled),
324 ..Default::default()
325 },
326 )
327 .await
328 }
329
330 pub fn list_jobs(&self) -> Vec<CronJob> {
332 self.jobs.read().values().cloned().collect()
333 }
334
335 pub fn get_job(&self, id: Uuid) -> Option<CronJob> {
337 self.jobs.read().get(&id).cloned()
338 }
339
340 pub fn is_running(&self, id: Uuid) -> bool {
342 self.running_jobs.lock().contains(&id)
343 }
344
345 pub fn trigger_job(&self, id: Uuid) -> Result<CronJob> {
349 let job = self
350 .jobs
351 .read()
352 .get(&id)
353 .cloned()
354 .ok_or_else(|| anyhow::anyhow!("Job {} not found", id))?;
355
356 if self.running_jobs.lock().contains(&id) {
357 bail!("Job '{}' is already running", job.name);
358 }
359
360 self.running_jobs.lock().insert(id);
361 Ok(job)
362 }
363
364 pub async fn mark_job_completed(&self, id: Uuid, success: bool, summary: String) {
366 self.running_jobs.lock().remove(&id);
367 let new_next_run = {
368 let mut jobs = self.jobs.write();
369 if let Some(job) = jobs.get_mut(&id) {
370 job.last_run = Some(Utc::now());
371 job.last_result = Some(summary);
372 job.last_success = Some(success);
373 job.run_count += 1;
374 let sched = self.schedules.lock().get(&id).cloned();
376 sched.and_then(|s| self.next_fire_time(&s, &Utc::now()))
377 } else {
378 None
379 }
380 };
381 if let Some(next_run) = new_next_run {
382 let mut jobs = self.jobs.write();
383 if let Some(job) = jobs.get_mut(&id) {
384 job.next_run = Some(next_run);
385 }
386 }
387 self.dirty.store(true, Ordering::Relaxed);
388 self.persist_jobs().await;
389 }
390
391 pub fn stop(&self) {
393 self.cancel.store(true, Ordering::Relaxed);
394 tracing::info!("Cron scheduler stop requested");
395 }
396
397 pub async fn start<F, Fut>(self: Arc<Self>, executor: F)
413 where
414 F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
415 Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
416 {
417 let executor = Arc::new(executor);
418 let mut interval =
419 tokio::time::interval(std::time::Duration::from_secs(self.tick_interval_secs));
420
421 tracing::info!(
422 interval_secs = self.tick_interval_secs,
423 "Cron scheduler started"
424 );
425
426 loop {
427 tokio::select! {
428 _ = interval.tick() => {
429 if self.cancel.load(Ordering::Relaxed) {
430 tracing::info!("Cron scheduler stopped");
431 return;
432 }
433 self.tick_inner(&executor).await;
434 }
435 }
436 }
437 }
438
439 async fn tick_inner<F, Fut>(&self, executor: &Arc<F>)
443 where
444 F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
445 Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
446 {
447 let now = Utc::now();
448
449 let current_running = self.running_jobs.lock().len();
451 if current_running >= self.max_concurrent_jobs {
452 tracing::debug!(
453 running = current_running,
454 max = self.max_concurrent_jobs,
455 "Cron tick: max concurrent jobs reached, skipping"
456 );
457 return;
458 }
459
460 let due: Vec<(Uuid, String)> = {
461 let jobs = self.jobs.read();
462 jobs.iter()
463 .filter(|(_, job)| {
464 job.enabled
465 && job.next_run.is_some_and(|nr| nr <= now)
466 && !self.running_jobs.lock().contains(&job.id)
467 })
468 .map(|(_, job)| (job.id, job.goal.clone()))
469 .collect()
470 };
471
472 let total_due = due.len();
473 for (spawned, (id, goal)) in due.into_iter().enumerate() {
474 if self.running_jobs.lock().len() >= self.max_concurrent_jobs {
476 tracing::info!(
477 spawned,
478 remaining = total_due - spawned,
479 "Cron tick: max concurrent jobs reached, deferring remaining"
480 );
481 break;
482 }
483
484 self.running_jobs.lock().insert(id);
485 let exec = executor.clone();
486 let me = self.clone();
487 let timeout_secs = self.job_timeout_secs;
488 tokio::spawn(async move {
489 tracing::info!(%id, "Cron job triggered");
490 let result = tokio::time::timeout(
491 std::time::Duration::from_secs(timeout_secs),
492 exec(id, goal),
493 )
494 .await;
495
496 let (success, summary) = match result {
497 Ok((s, m)) => (s, m),
498 Err(_) => {
499 tracing::error!(%id, timeout_secs, "Cron job timed out");
500 (false, format!("Timed out after {} seconds", timeout_secs))
501 }
502 };
503 tracing::info!(%id, success, "Cron job completed");
504 me.mark_job_completed(id, success, summary).await;
505 });
506 }
507 }
508
509 async fn persist_jobs(&self) {
511 let job_list: Vec<CronJob> = {
512 let jobs = self.jobs.read();
513 jobs.values().cloned().collect()
514 };
515 if let Err(e) = self.state_store.save_json("cron", "jobs", &job_list).await {
516 tracing::error!("Failed to persist cron jobs: {}", e);
517 }
518 if let Some(ref gl) = self.git_layer {
520 if gl.is_enabled() {
521 let _ = gl.commit_file("cron/jobs.json", "cron: update jobs");
522 }
523 }
524 }
525
526 pub async fn restore_jobs(&self) {
528 match self
529 .state_store
530 .load_json::<Vec<CronJob>>("cron", "jobs")
531 .await
532 {
533 Ok(Some(job_list)) => {
534 for mut job in job_list {
535 match self.parse_schedule(&job.schedule) {
537 Ok(schedule) => {
538 job.next_run = self.next_fire_time(&schedule, &Utc::now());
539 self.schedules.lock().insert(job.id, schedule);
540 self.jobs.write().insert(job.id, job);
541 }
542 Err(e) => {
543 tracing::error!(job = %job.name, error = %e, "Skipping job with invalid schedule");
544 }
545 }
546 }
547 tracing::info!(count = self.jobs.read().len(), "Cron jobs restored");
548 }
549 Ok(None) => {
550 tracing::info!("No saved cron jobs found");
551 }
552 Err(e) => {
553 tracing::error!("Failed to restore cron jobs: {}", e);
554 }
555 }
556 }
557
558 pub async fn load_from_config(&self, config: &CronConfig) {
561 if !config.enabled {
562 tracing::info!("Cron scheduler is disabled in config");
563 return;
564 }
565
566 for (name, inline) in &config.jobs {
567 let schedule = inline.schedule.clone();
568 let goal = inline.goal.clone();
569
570 let job = CronJob {
571 id: Uuid::new_v4(),
572 name: name.clone(),
573 schedule: schedule.clone(),
574 goal,
575 constraints: inline.constraints.clone(),
576 acceptance_criteria: inline.acceptance_criteria.clone(),
577 toolchain: inline.toolchain.clone(),
578 priority: inline.priority,
579 enabled: inline.enabled,
580 last_run: None,
581 next_run: None,
582 run_count: 0,
583 last_result: None,
584 last_success: None,
585 source: JobSource::Config,
586 };
587
588 {
590 let jobs = self.jobs.read();
591 if jobs.values().any(|j| j.name == *name) {
592 tracing::debug!(name = %name, "Skipping config job — already exists via API");
593 continue;
594 }
595 }
596
597 if let Err(e) = self.add_job(job).await {
598 tracing::error!(name = %name, error = %e, "Failed to load config job");
599 } else {
600 tracing::info!(name = %name, "Loaded cron job from config");
601 }
602 }
603 }
604}
605
606impl Clone for CronScheduler {
607 fn clone(&self) -> Self {
608 Self {
609 jobs: self.jobs.clone(),
610 schedules: self.schedules.clone(),
611 running_jobs: self.running_jobs.clone(),
612 state_store: self.state_store.clone(),
613 cancel: self.cancel.clone(),
614 dirty: self.dirty.clone(),
615 tick_interval_secs: self.tick_interval_secs,
616 git_layer: self.git_layer.clone(),
617 max_concurrent_jobs: self.max_concurrent_jobs,
618 job_timeout_secs: self.job_timeout_secs,
619 }
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626 use chrono::Timelike;
627
628 fn test_store() -> Arc<StateStore> {
629 let temp_dir = tempfile::tempdir().unwrap();
630 Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap())
631 }
632
633 #[test]
634 fn test_normalize_5field() {
635 assert_eq!(CronScheduler::normalize_expr("0 9 * * *"), "0 0 9 * * *");
636 }
637
638 #[test]
639 fn test_normalize_6field() {
640 assert_eq!(CronScheduler::normalize_expr("0 0 9 * * *"), "0 0 9 * * *");
641 }
642
643 #[test]
644 fn test_normalize_7field() {
645 assert_eq!(
646 CronScheduler::normalize_expr("0 0 9 * * * 2026"),
647 "0 0 9 * * * 2026"
648 );
649 }
650
651 #[test]
652 fn test_parse_valid() {
653 let cs = CronScheduler::new(test_store(), 60);
654 assert!(cs.parse_schedule("0 9 * * *").is_ok());
655 }
656
657 #[test]
658 fn test_parse_invalid() {
659 let cs = CronScheduler::new(test_store(), 60);
660 assert!(cs.parse_schedule("invalid").is_err());
661 }
662
663 #[test]
664 fn test_next_fire_time_daily() {
665 let cs = CronScheduler::new(test_store(), 60);
666 let schedule = cs.parse_schedule("0 9 * * *").unwrap();
667 let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
668 .unwrap()
669 .and_hms_opt(8, 0, 0)
670 .unwrap();
671 let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
672 let next = cs.next_fire_time(&schedule, &now_utc);
673 assert!(next.is_some());
674 let next = next.unwrap();
675 assert_eq!(next.hour(), 9);
676 }
677
678 #[test]
679 fn test_next_fire_time_every_15min() {
680 let cs = CronScheduler::new(test_store(), 60);
681 let schedule = cs.parse_schedule("*/15 * * * *").unwrap();
682 let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
683 .unwrap()
684 .and_hms_opt(10, 7, 0)
685 .unwrap();
686 let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
687 let next = cs.next_fire_time(&schedule, &now_utc);
688 assert!(next.is_some());
689 let next = next.unwrap();
690 assert_eq!(next.minute(), 15);
691 }
692
693 #[test]
694 fn test_add_job_computes_next_run() {
695 let job = CronJob::new("test".into(), "0 9 * * *".into(), "Test goal".into());
696 assert!(job.next_run.is_none()); assert!(job.enabled);
698 assert_eq!(job.run_count, 0);
699 }
700
701 #[test]
702 fn test_job_source_default() {
703 let job = CronJob::new("test".into(), "0 9 * * *".into(), "goal".into());
704 assert_eq!(job.source, JobSource::Api);
705 }
706
707 #[tokio::test]
708 async fn test_add_job() {
709 let store = test_store();
710 let cs = CronScheduler::new(store, 60);
711 let job = CronJob::new("test-job".into(), "0 9 * * *".into(), "Run me".into());
712 let id = cs.add_job(job).await.unwrap();
713 assert!(cs.get_job(id).is_some());
714 assert_eq!(cs.list_jobs().len(), 1);
715 }
716
717 #[tokio::test]
718 async fn test_remove_job() {
719 let store = test_store();
720 let cs = CronScheduler::new(store, 60);
721 let job = CronJob::new("remove-me".into(), "0 10 * * *".into(), "Gone".into());
722 let id = cs.add_job(job).await.unwrap();
723 cs.remove_job(id).await.unwrap();
724 assert!(cs.get_job(id).is_none());
725 }
726
727 #[tokio::test]
728 async fn test_trigger_job() {
729 let store = test_store();
730 let cs = CronScheduler::new(store, 60);
731 let job = CronJob::new("trigger-me".into(), "0 11 * * *".into(), "Goal text".into());
732 let id = cs.add_job(job).await.unwrap();
733
734 let triggered = cs.trigger_job(id).unwrap();
735 assert_eq!(triggered.goal, "Goal text");
736 assert!(cs.is_running(id));
737
738 cs.mark_job_completed(id, true, "ok".into()).await;
739 assert!(!cs.is_running(id));
740 }
741
742 #[tokio::test]
743 async fn test_trigger_already_running() {
744 let store = test_store();
745 let cs = CronScheduler::new(store, 60);
746 let job = CronJob::new("running".into(), "0 12 * * *".into(), "goal".into());
747 let id = cs.add_job(job).await.unwrap();
748 cs.trigger_job(id).unwrap();
749 let result = cs.trigger_job(id);
750 assert!(result.is_err());
751 }
752
753 #[tokio::test]
754 async fn test_update_job() {
755 let store = test_store();
756 let cs = CronScheduler::new(store, 60);
757 let job = CronJob::new("old-name".into(), "0 9 * * *".into(), "old goal".into());
758 let id = cs.add_job(job).await.unwrap();
759
760 cs.update_job(
761 id,
762 CronJobUpdate {
763 name: Some("new-name".into()),
764 goal: Some("new goal".into()),
765 enabled: Some(false),
766 ..Default::default()
767 },
768 )
769 .await
770 .unwrap();
771
772 let updated = cs.get_job(id).unwrap();
773 assert_eq!(updated.name, "new-name");
774 assert_eq!(updated.goal, "new goal");
775 assert!(!updated.enabled);
776 }
777
778 #[tokio::test]
779 async fn test_toggle_job() {
780 let store = test_store();
781 let cs = CronScheduler::new(store, 60);
782 let job = CronJob::new("toggle".into(), "0 9 * * *".into(), "goal".into());
783 let id = cs.add_job(job).await.unwrap();
784 assert!(cs.get_job(id).unwrap().enabled);
785
786 cs.toggle_job(id, false).await.unwrap();
787 assert!(!cs.get_job(id).unwrap().enabled);
788
789 cs.toggle_job(id, true).await.unwrap();
790 assert!(cs.get_job(id).unwrap().enabled);
791 }
792
793 #[tokio::test]
794 async fn test_mark_completed_updates_next_run() {
795 let store = test_store();
796 let cs = CronScheduler::new(store, 60);
797 let job = CronJob::new("comp".into(), "*/5 * * * *".into(), "goal".into());
798 let id = cs.add_job(job).await.unwrap();
799
800 let before = cs.get_job(id).unwrap().next_run;
801 assert!(before.is_some());
802
803 let now = Utc::now();
805 {
806 let mut jobs = cs.jobs.write();
807 if let Some(j) = jobs.get_mut(&id) {
808 j.next_run = Some(now - chrono::Duration::minutes(5));
809 }
810 }
811
812 cs.mark_job_completed(id, true, "ok".into()).await;
813 let after = cs.get_job(id).unwrap().next_run;
814 assert!(after.is_some());
815 assert!(after.unwrap() >= now);
817 }
818
819 #[test]
820 fn test_max_concurrent_enforced() {
821 let temp_dir = tempfile::tempdir().unwrap();
822 let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
823 let mut scheduler = CronScheduler::new(store, 60);
824 scheduler.set_max_concurrent_jobs(2);
825 assert_eq!(scheduler.max_concurrent_jobs, 2);
826 }
827
828 #[test]
829 fn test_job_timeout_configurable() {
830 let temp_dir = tempfile::tempdir().unwrap();
831 let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
832 let mut scheduler = CronScheduler::new(store, 60);
833 scheduler.set_job_timeout_secs(300);
834 assert_eq!(scheduler.job_timeout_secs, 300);
835 }
836}