1use crate::config::CronConfig;
7use crate::git_layer::GitLayer;
8use crate::scheduler::Priority;
9use crate::state_store::StateStore;
10use anyhow::{Result, bail};
11use chrono::{DateTime, Utc};
12use cron::Schedule;
13use parking_lot::{Mutex, RwLock};
14use serde::{Deserialize, Serialize};
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19use uuid::Uuid;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
25#[serde(rename_all = "lowercase")]
26pub enum JobSource {
27 Config,
29 #[default]
31 Api,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct CronJob {
37 pub id: Uuid,
39 pub name: String,
41 pub schedule: String,
43 pub goal: String,
45 #[serde(default)]
47 pub constraints: Vec<String>,
48 #[serde(default)]
50 pub acceptance_criteria: Vec<String>,
51 #[serde(default = "default_toolchain")]
53 pub toolchain: String,
54 #[serde(default)]
56 pub priority: Priority,
57 #[serde(default = "default_true")]
59 pub enabled: bool,
60 #[serde(skip_serializing_if = "Option::is_none")]
62 pub last_run: Option<DateTime<Utc>>,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub next_run: Option<DateTime<Utc>>,
66 #[serde(default)]
68 pub run_count: u64,
69 #[serde(skip_serializing_if = "Option::is_none")]
71 pub last_result: Option<String>,
72 #[serde(skip_serializing_if = "Option::is_none")]
74 pub last_success: Option<bool>,
75 #[serde(default)]
77 pub source: JobSource,
78}
79
80fn default_toolchain() -> String {
81 "default".into()
82}
83
84fn default_true() -> bool {
85 true
86}
87
88impl CronJob {
89 pub fn new(name: String, schedule: String, goal: String) -> Self {
91 Self {
92 id: Uuid::new_v4(),
93 name,
94 schedule,
95 goal,
96 constraints: vec![],
97 acceptance_criteria: vec![],
98 toolchain: default_toolchain(),
99 priority: Priority::default(),
100 enabled: true,
101 last_run: None,
102 next_run: None,
103 run_count: 0,
104 last_result: None,
105 last_success: None,
106 source: JobSource::Api,
107 }
108 }
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct CronJobResult {
114 pub job_id: Uuid,
116 pub job_name: String,
118 pub started_at: DateTime<Utc>,
120 pub finished_at: DateTime<Utc>,
122 pub success: bool,
124 pub summary: String,
126}
127
128#[derive(Debug, Default, Deserialize)]
130pub struct CronJobUpdate {
131 pub name: Option<String>,
133 pub schedule: Option<String>,
135 pub goal: Option<String>,
137 pub constraints: Option<Vec<String>>,
139 pub acceptance_criteria: Option<Vec<String>>,
141 pub toolchain: Option<String>,
143 pub priority: Option<Priority>,
145 pub enabled: Option<bool>,
147}
148
149pub struct CronScheduler {
156 jobs: Arc<RwLock<HashMap<Uuid, CronJob>>>,
157 schedules: Arc<Mutex<HashMap<Uuid, Schedule>>>,
158 running_jobs: Arc<Mutex<HashSet<Uuid>>>,
159 state_store: Arc<StateStore>,
160 cancel: Arc<AtomicBool>,
161 dirty: Arc<AtomicBool>,
162 tick_interval_secs: u64,
163 git_layer: Option<Arc<GitLayer>>,
165 max_concurrent_jobs: usize,
167 job_timeout_secs: u64,
169}
170
171impl CronScheduler {
172 pub fn new(state_store: Arc<StateStore>, tick_interval_secs: u64) -> Self {
178 Self {
179 jobs: Arc::new(RwLock::new(HashMap::new())),
180 schedules: Arc::new(Mutex::new(HashMap::new())),
181 running_jobs: Arc::new(Mutex::new(HashSet::new())),
182 state_store,
183 cancel: Arc::new(AtomicBool::new(false)),
184 dirty: Arc::new(AtomicBool::new(false)),
185 tick_interval_secs,
186 git_layer: None,
187 max_concurrent_jobs: 3,
188 job_timeout_secs: 600,
189 }
190 }
191
192 pub fn set_max_concurrent_jobs(&mut self, max: usize) {
199 if max == 0 {
200 tracing::warn!("set_max_concurrent_jobs(0) would disable all cron jobs; clamping to 1");
201 self.max_concurrent_jobs = 1;
202 } else {
203 self.max_concurrent_jobs = max;
204 }
205 }
206
207 pub fn set_job_timeout_secs(&mut self, secs: u64) {
209 self.job_timeout_secs = secs;
210 }
211
212 pub fn set_git_layer(&mut self, gl: Arc<GitLayer>) {
214 self.git_layer = Some(gl);
215 }
216
217 fn normalize_expr(expr: &str) -> String {
219 let fields: Vec<&str> = expr.split_whitespace().collect();
220 match fields.len() {
221 5 => format!("0 {expr}"),
222 _ => expr.to_string(),
223 }
224 }
225
226 fn parse_schedule(&self, expr: &str) -> Result<Schedule> {
228 let normalized = Self::normalize_expr(expr);
229 Schedule::from_str(&normalized)
230 .map_err(|e| anyhow::anyhow!("Invalid cron expression '{expr}': {e}"))
231 }
232
233 fn next_fire_time(&self, schedule: &Schedule, after: &DateTime<Utc>) -> Option<DateTime<Utc>> {
235 schedule.after(after).next()
236 }
237
238 pub async fn add_job(&self, job: CronJob) -> Result<Uuid> {
240 let schedule = self.parse_schedule(&job.schedule)?;
241 let next = self.next_fire_time(&schedule, &Utc::now());
242 let id = job.id;
243
244 self.schedules.lock().insert(id, schedule);
245 self.jobs.write().insert(
246 id,
247 CronJob {
248 next_run: next,
249 ..job
250 },
251 );
252 self.dirty.store(true, Ordering::Relaxed);
253 self.persist_jobs().await;
254
255 tracing::info!(
256 name = %self.jobs.read().get(&id).map(|j| j.name.as_str()).unwrap_or("?"),
257 %id,
258 "Cron job added"
259 );
260 Ok(id)
261 }
262
263 pub async fn remove_job(&self, id: Uuid) -> Result<()> {
265 self.schedules.lock().remove(&id);
266 self.jobs
267 .write()
268 .remove(&id)
269 .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
270 self.dirty.store(true, Ordering::Relaxed);
271 self.persist_jobs().await;
272 tracing::info!(%id, "Cron job removed");
273 Ok(())
274 }
275
276 pub async fn update_job(&self, id: Uuid, update: CronJobUpdate) -> Result<()> {
278 let should_persist = {
281 let mut jobs = self.jobs.write();
282 let job = jobs
283 .get_mut(&id)
284 .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
285
286 if let Some(name) = update.name {
287 job.name = name;
288 }
289 if let Some(schedule) = &update.schedule {
290 let parsed = self.parse_schedule(schedule)?;
291 self.schedules.lock().insert(id, parsed);
292 job.schedule = schedule.clone();
293 let sched = self.schedules.lock().get(&id).cloned();
295 if let Some(s) = sched {
296 job.next_run = self.next_fire_time(&s, &Utc::now());
297 }
298 }
299 if let Some(goal) = update.goal {
300 job.goal = goal;
301 }
302 if let Some(constraints) = update.constraints {
303 job.constraints = constraints;
304 }
305 if let Some(criteria) = update.acceptance_criteria {
306 job.acceptance_criteria = criteria;
307 }
308 if let Some(toolchain) = update.toolchain {
309 job.toolchain = toolchain;
310 }
311 if let Some(priority) = update.priority {
312 job.priority = priority;
313 }
314 if let Some(enabled) = update.enabled {
315 job.enabled = enabled;
316 }
317
318 self.dirty.store(true, Ordering::Relaxed);
319 true
320 }; if should_persist {
323 self.persist_jobs().await;
324 }
325 Ok(())
326 }
327
328 pub async fn toggle_job(&self, id: Uuid, enabled: bool) -> Result<()> {
330 self.update_job(
331 id,
332 CronJobUpdate {
333 enabled: Some(enabled),
334 ..Default::default()
335 },
336 )
337 .await
338 }
339
340 pub fn list_jobs(&self) -> Vec<CronJob> {
342 self.jobs.read().values().cloned().collect()
343 }
344
345 pub fn get_job(&self, id: Uuid) -> Option<CronJob> {
347 self.jobs.read().get(&id).cloned()
348 }
349
350 pub fn is_running(&self, id: Uuid) -> bool {
352 self.running_jobs.lock().contains(&id)
353 }
354
355 pub fn trigger_job(&self, id: Uuid) -> Result<CronJob> {
359 let job = self
360 .jobs
361 .read()
362 .get(&id)
363 .cloned()
364 .ok_or_else(|| anyhow::anyhow!("Job {id} not found"))?;
365
366 if self.running_jobs.lock().contains(&id) {
367 bail!("Job '{}' is already running", job.name);
368 }
369
370 self.running_jobs.lock().insert(id);
371 Ok(job)
372 }
373
374 pub async fn mark_job_completed(&self, id: Uuid, success: bool, summary: String) {
376 self.running_jobs.lock().remove(&id);
377 let new_next_run = {
378 let mut jobs = self.jobs.write();
379 if let Some(job) = jobs.get_mut(&id) {
380 job.last_run = Some(Utc::now());
381 job.last_result = Some(summary);
382 job.last_success = Some(success);
383 job.run_count += 1;
384 let sched = self.schedules.lock().get(&id).cloned();
386 sched.and_then(|s| self.next_fire_time(&s, &Utc::now()))
387 } else {
388 None
389 }
390 };
391 if let Some(next_run) = new_next_run {
392 let mut jobs = self.jobs.write();
393 if let Some(job) = jobs.get_mut(&id) {
394 job.next_run = Some(next_run);
395 }
396 }
397 self.dirty.store(true, Ordering::Relaxed);
398 self.persist_jobs().await;
399 }
400
401 pub fn stop(&self) {
403 self.cancel.store(true, Ordering::Relaxed);
404 tracing::info!("Cron scheduler stop requested");
405 }
406
407 pub async fn start<F, Fut>(self: Arc<Self>, executor: F)
430 where
431 F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
432 Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
433 {
434 let executor = Arc::new(executor);
435 let mut interval =
436 tokio::time::interval(std::time::Duration::from_secs(self.tick_interval_secs));
437
438 tracing::info!(
439 interval_secs = self.tick_interval_secs,
440 "Cron scheduler started"
441 );
442
443 loop {
444 tokio::select! {
445 _ = interval.tick() => {
446 if self.cancel.load(Ordering::Relaxed) {
447 tracing::info!("Cron scheduler stopped");
448 return;
449 }
450 self.tick_inner(&executor).await;
451 }
452 }
453 }
454 }
455
456 async fn tick_inner<F, Fut>(&self, executor: &Arc<F>)
460 where
461 F: Fn(Uuid, String) -> Fut + Send + Sync + 'static,
462 Fut: std::future::Future<Output = (bool, String)> + Send + 'static,
463 {
464 let now = Utc::now();
465
466 let current_running = self.running_jobs.lock().len();
468 if current_running >= self.max_concurrent_jobs {
469 tracing::debug!(
470 running = current_running,
471 max = self.max_concurrent_jobs,
472 "Cron tick: max concurrent jobs reached, skipping"
473 );
474 return;
475 }
476
477 let due: Vec<(Uuid, String)> = {
478 let jobs = self.jobs.read();
479 jobs.iter()
480 .filter(|(_, job)| {
481 job.enabled
482 && job.next_run.is_some_and(|nr| nr <= now)
483 && !self.running_jobs.lock().contains(&job.id)
484 })
485 .map(|(_, job)| (job.id, job.goal.clone()))
486 .collect()
487 };
488
489 let total_due = due.len();
490 for (spawned, (id, goal)) in due.into_iter().enumerate() {
491 if self.running_jobs.lock().len() >= self.max_concurrent_jobs {
493 tracing::info!(
494 spawned,
495 remaining = total_due - spawned,
496 "Cron tick: max concurrent jobs reached, deferring remaining"
497 );
498 break;
499 }
500
501 self.running_jobs.lock().insert(id);
502 let exec = executor.clone();
503 let me = self.clone();
504 let timeout_secs = self.job_timeout_secs;
505 tokio::spawn(async move {
506 tracing::info!(%id, "Cron job triggered");
507 let result = tokio::time::timeout(
508 std::time::Duration::from_secs(timeout_secs),
509 exec(id, goal),
510 )
511 .await;
512
513 let (success, summary) = match result {
514 Ok((s, m)) => (s, m),
515 Err(_) => {
516 tracing::error!(%id, timeout_secs, "Cron job timed out");
517 (false, format!("Timed out after {timeout_secs} seconds"))
518 }
519 };
520 tracing::info!(%id, success, "Cron job completed");
521 me.mark_job_completed(id, success, summary).await;
522 });
523 }
524 }
525
526 async fn persist_jobs(&self) {
528 let job_list: Vec<CronJob> = {
529 let jobs = self.jobs.read();
530 jobs.values().cloned().collect()
531 };
532 if let Err(e) = self.state_store.save_json("cron", "jobs", &job_list).await {
533 tracing::error!("Failed to persist cron jobs: {}", e);
534 }
535 if let Some(ref gl) = self.git_layer
537 && gl.is_enabled()
538 {
539 let _ = gl.commit_file("cron/jobs.json", "cron: update jobs");
540 }
541 }
542
543 pub async fn restore_jobs(&self) {
545 match self
546 .state_store
547 .load_json::<Vec<CronJob>>("cron", "jobs")
548 .await
549 {
550 Ok(Some(job_list)) => {
551 for mut job in job_list {
552 match self.parse_schedule(&job.schedule) {
554 Ok(schedule) => {
555 job.next_run = self.next_fire_time(&schedule, &Utc::now());
556 self.schedules.lock().insert(job.id, schedule);
557 self.jobs.write().insert(job.id, job);
558 }
559 Err(e) => {
560 tracing::error!(job = %job.name, error = %e, "Skipping job with invalid schedule");
561 }
562 }
563 }
564 tracing::info!(count = self.jobs.read().len(), "Cron jobs restored");
565 }
566 Ok(None) => {
567 tracing::info!("No saved cron jobs found");
568 }
569 Err(e) => {
570 tracing::error!("Failed to restore cron jobs: {}", e);
571 }
572 }
573 }
574
575 pub async fn load_from_config(&self, config: &CronConfig) {
578 if !config.enabled {
579 tracing::info!("Cron scheduler is disabled in config");
580 return;
581 }
582
583 for (name, inline) in &config.jobs {
584 let schedule = inline.schedule.clone();
585 let goal = inline.goal.clone();
586
587 let job = CronJob {
588 id: Uuid::new_v4(),
589 name: name.clone(),
590 schedule: schedule.clone(),
591 goal,
592 constraints: inline.constraints.clone(),
593 acceptance_criteria: inline.acceptance_criteria.clone(),
594 toolchain: inline.toolchain.clone(),
595 priority: inline.priority,
596 enabled: inline.enabled,
597 last_run: None,
598 next_run: None,
599 run_count: 0,
600 last_result: None,
601 last_success: None,
602 source: JobSource::Config,
603 };
604
605 {
607 let jobs = self.jobs.read();
608 if jobs.values().any(|j| j.name == *name) {
609 tracing::debug!(name = %name, "Skipping config job — already exists via API");
610 continue;
611 }
612 }
613
614 if let Err(e) = self.add_job(job).await {
615 tracing::error!(name = %name, error = %e, "Failed to load config job");
616 } else {
617 tracing::info!(name = %name, "Loaded cron job from config");
618 }
619 }
620 }
621}
622
623impl Clone for CronScheduler {
624 fn clone(&self) -> Self {
625 Self {
626 jobs: self.jobs.clone(),
627 schedules: self.schedules.clone(),
628 running_jobs: self.running_jobs.clone(),
629 state_store: self.state_store.clone(),
630 cancel: self.cancel.clone(),
631 dirty: self.dirty.clone(),
632 tick_interval_secs: self.tick_interval_secs,
633 git_layer: self.git_layer.clone(),
634 max_concurrent_jobs: self.max_concurrent_jobs,
635 job_timeout_secs: self.job_timeout_secs,
636 }
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643 use chrono::Timelike;
644
645 fn test_store() -> Arc<StateStore> {
646 let temp_dir = tempfile::tempdir().unwrap();
647 Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap())
648 }
649
650 #[test]
651 fn test_normalize_5field() {
652 assert_eq!(CronScheduler::normalize_expr("0 9 * * *"), "0 0 9 * * *");
653 }
654
655 #[test]
656 fn test_normalize_6field() {
657 assert_eq!(CronScheduler::normalize_expr("0 0 9 * * *"), "0 0 9 * * *");
658 }
659
660 #[test]
661 fn test_normalize_7field() {
662 assert_eq!(
663 CronScheduler::normalize_expr("0 0 9 * * * 2026"),
664 "0 0 9 * * * 2026"
665 );
666 }
667
668 #[test]
669 fn test_parse_valid() {
670 let cs = CronScheduler::new(test_store(), 60);
671 assert!(cs.parse_schedule("0 9 * * *").is_ok());
672 }
673
674 #[test]
675 fn test_parse_invalid() {
676 let cs = CronScheduler::new(test_store(), 60);
677 assert!(cs.parse_schedule("invalid").is_err());
678 }
679
680 #[test]
681 fn test_next_fire_time_daily() {
682 let cs = CronScheduler::new(test_store(), 60);
683 let schedule = cs.parse_schedule("0 9 * * *").unwrap();
684 let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
685 .unwrap()
686 .and_hms_opt(8, 0, 0)
687 .unwrap();
688 let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
689 let next = cs.next_fire_time(&schedule, &now_utc);
690 assert!(next.is_some());
691 let next = next.unwrap();
692 assert_eq!(next.hour(), 9);
693 }
694
695 #[test]
696 fn test_next_fire_time_every_15min() {
697 let cs = CronScheduler::new(test_store(), 60);
698 let schedule = cs.parse_schedule("*/15 * * * *").unwrap();
699 let now = chrono::NaiveDate::from_ymd_opt(2026, 5, 6)
700 .unwrap()
701 .and_hms_opt(10, 7, 0)
702 .unwrap();
703 let now_utc = DateTime::<Utc>::from_naive_utc_and_offset(now, Utc);
704 let next = cs.next_fire_time(&schedule, &now_utc);
705 assert!(next.is_some());
706 let next = next.unwrap();
707 assert_eq!(next.minute(), 15);
708 }
709
710 #[test]
711 fn test_add_job_computes_next_run() {
712 let job = CronJob::new("test".into(), "0 9 * * *".into(), "Test goal".into());
713 assert!(job.next_run.is_none()); assert!(job.enabled);
715 assert_eq!(job.run_count, 0);
716 }
717
718 #[test]
719 fn test_job_source_default() {
720 let job = CronJob::new("test".into(), "0 9 * * *".into(), "goal".into());
721 assert_eq!(job.source, JobSource::Api);
722 }
723
724 #[tokio::test]
725 async fn test_add_job() {
726 let store = test_store();
727 let cs = CronScheduler::new(store, 60);
728 let job = CronJob::new("test-job".into(), "0 9 * * *".into(), "Run me".into());
729 let id = cs.add_job(job).await.unwrap();
730 assert!(cs.get_job(id).is_some());
731 assert_eq!(cs.list_jobs().len(), 1);
732 }
733
734 #[tokio::test]
735 async fn test_remove_job() {
736 let store = test_store();
737 let cs = CronScheduler::new(store, 60);
738 let job = CronJob::new("remove-me".into(), "0 10 * * *".into(), "Gone".into());
739 let id = cs.add_job(job).await.unwrap();
740 cs.remove_job(id).await.unwrap();
741 assert!(cs.get_job(id).is_none());
742 }
743
744 #[tokio::test]
745 async fn test_trigger_job() {
746 let store = test_store();
747 let cs = CronScheduler::new(store, 60);
748 let job = CronJob::new("trigger-me".into(), "0 11 * * *".into(), "Goal text".into());
749 let id = cs.add_job(job).await.unwrap();
750
751 let triggered = cs.trigger_job(id).unwrap();
752 assert_eq!(triggered.goal, "Goal text");
753 assert!(cs.is_running(id));
754
755 cs.mark_job_completed(id, true, "ok".into()).await;
756 assert!(!cs.is_running(id));
757 }
758
759 #[tokio::test]
760 async fn test_trigger_already_running() {
761 let store = test_store();
762 let cs = CronScheduler::new(store, 60);
763 let job = CronJob::new("running".into(), "0 12 * * *".into(), "goal".into());
764 let id = cs.add_job(job).await.unwrap();
765 cs.trigger_job(id).unwrap();
766 let result = cs.trigger_job(id);
767 assert!(result.is_err());
768 }
769
770 #[tokio::test]
771 async fn test_update_job() {
772 let store = test_store();
773 let cs = CronScheduler::new(store, 60);
774 let job = CronJob::new("old-name".into(), "0 9 * * *".into(), "old goal".into());
775 let id = cs.add_job(job).await.unwrap();
776
777 cs.update_job(
778 id,
779 CronJobUpdate {
780 name: Some("new-name".into()),
781 goal: Some("new goal".into()),
782 enabled: Some(false),
783 ..Default::default()
784 },
785 )
786 .await
787 .unwrap();
788
789 let updated = cs.get_job(id).unwrap();
790 assert_eq!(updated.name, "new-name");
791 assert_eq!(updated.goal, "new goal");
792 assert!(!updated.enabled);
793 }
794
795 #[tokio::test]
796 async fn test_toggle_job() {
797 let store = test_store();
798 let cs = CronScheduler::new(store, 60);
799 let job = CronJob::new("toggle".into(), "0 9 * * *".into(), "goal".into());
800 let id = cs.add_job(job).await.unwrap();
801 assert!(cs.get_job(id).unwrap().enabled);
802
803 cs.toggle_job(id, false).await.unwrap();
804 assert!(!cs.get_job(id).unwrap().enabled);
805
806 cs.toggle_job(id, true).await.unwrap();
807 assert!(cs.get_job(id).unwrap().enabled);
808 }
809
810 #[tokio::test]
811 async fn test_mark_completed_updates_next_run() {
812 let store = test_store();
813 let cs = CronScheduler::new(store, 60);
814 let job = CronJob::new("comp".into(), "*/5 * * * *".into(), "goal".into());
815 let id = cs.add_job(job).await.unwrap();
816
817 let before = cs.get_job(id).unwrap().next_run;
818 assert!(before.is_some());
819
820 let now = Utc::now();
822 {
823 let mut jobs = cs.jobs.write();
824 if let Some(j) = jobs.get_mut(&id) {
825 j.next_run = Some(now - chrono::Duration::minutes(5));
826 }
827 }
828
829 cs.mark_job_completed(id, true, "ok".into()).await;
830 let after = cs.get_job(id).unwrap().next_run;
831 assert!(after.is_some());
832 assert!(after.unwrap() >= now);
834 }
835
836 #[test]
837 fn test_max_concurrent_enforced() {
838 let temp_dir = tempfile::tempdir().unwrap();
839 let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
840 let mut scheduler = CronScheduler::new(store, 60);
841 scheduler.set_max_concurrent_jobs(2);
842 assert_eq!(scheduler.max_concurrent_jobs, 2);
843 }
844
845 #[test]
846 fn test_job_timeout_configurable() {
847 let temp_dir = tempfile::tempdir().unwrap();
848 let store = Arc::new(StateStore::new(temp_dir.path().to_path_buf()).unwrap());
849 let mut scheduler = CronScheduler::new(store, 60);
850 scheduler.set_job_timeout_secs(300);
851 assert_eq!(scheduler.job_timeout_secs, 300);
852 }
853}