1use crate::parser::CronExpression;
6use crate::store::{CronStore, FileCronStore};
7use crate::telemetry;
8use crate::types::{
9 AgentExecutor, AgentJobConfig, CronError, CronJob, JobExecution, JobStatus, JobType, Result,
10};
11use chrono::Utc;
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::process::Command;
16use tokio::sync::{broadcast, RwLock};
17use tokio::time::{interval, Duration};
18
19#[derive(Debug, Clone)]
21pub enum SchedulerEvent {
22 Started,
24 Stopped,
26 JobStarted {
28 job_id: String,
29 execution_id: String,
30 },
31 JobCompleted {
33 job_id: String,
34 execution_id: String,
35 },
36 JobFailed {
38 job_id: String,
39 execution_id: String,
40 error: String,
41 },
42 JobTimeout {
44 job_id: String,
45 execution_id: String,
46 },
47}
48
49pub struct CronManager {
51 store: Arc<dyn CronStore>,
53 event_tx: broadcast::Sender<SchedulerEvent>,
55 running: Arc<RwLock<bool>>,
57 workspace: String,
59 agent_executor: Option<Arc<dyn AgentExecutor>>,
61}
62
63impl CronManager {
64 pub async fn new<P: AsRef<Path>>(workspace: P) -> Result<Self> {
66 let workspace_str = workspace.as_ref().to_string_lossy().to_string();
67 let store = Arc::new(FileCronStore::new(&workspace_str).await?);
68 let (event_tx, _) = broadcast::channel(100);
69
70 Ok(Self {
71 store,
72 event_tx,
73 running: Arc::new(RwLock::new(false)),
74 workspace: workspace_str,
75 agent_executor: None,
76 })
77 }
78
79 pub fn with_store(store: Arc<dyn CronStore>, workspace: String) -> Self {
81 let (event_tx, _) = broadcast::channel(100);
82 Self {
83 store,
84 event_tx,
85 running: Arc::new(RwLock::new(false)),
86 workspace,
87 agent_executor: None,
88 }
89 }
90
91 pub fn set_agent_executor(&mut self, executor: Arc<dyn AgentExecutor>) {
93 self.agent_executor = Some(executor);
94 }
95
96 pub fn subscribe(&self) -> broadcast::Receiver<SchedulerEvent> {
98 self.event_tx.subscribe()
99 }
100
101 pub async fn add_job(&self, name: &str, schedule: &str, command: &str) -> Result<CronJob> {
103 let expr = CronExpression::parse(schedule)?;
105
106 if self.store.find_job_by_name(name).await?.is_some() {
108 return Err(CronError::JobExists(name.to_string()));
109 }
110
111 let mut job = CronJob::new(name, schedule, command);
113 job.next_run = expr.next_after(Utc::now());
114 job.working_dir = Some(self.workspace.clone());
115
116 self.store.save_job(&job).await?;
118
119 tracing::info!("Added cron job: {} ({})", job.name, job.id);
120 Ok(job)
121 }
122
123 pub async fn add_agent_job(
128 &self,
129 name: &str,
130 schedule: &str,
131 prompt: &str,
132 config: AgentJobConfig,
133 ) -> Result<CronJob> {
134 let expr = CronExpression::parse(schedule)?;
135
136 if self.store.find_job_by_name(name).await?.is_some() {
137 return Err(CronError::JobExists(name.to_string()));
138 }
139
140 let mut job = CronJob::new(name, schedule, prompt);
141 job.job_type = JobType::Agent;
142 job.agent_config = Some(config);
143 job.next_run = expr.next_after(Utc::now());
144 job.working_dir = Some(self.workspace.clone());
145
146 self.store.save_job(&job).await?;
147
148 tracing::info!("Added agent cron job: {} ({})", job.name, job.id);
149 Ok(job)
150 }
151
152 pub async fn get_job(&self, id: &str) -> Result<Option<CronJob>> {
154 self.store.load_job(id).await
155 }
156
157 pub async fn get_job_by_name(&self, name: &str) -> Result<Option<CronJob>> {
159 self.store.find_job_by_name(name).await
160 }
161
162 pub async fn list_jobs(&self) -> Result<Vec<CronJob>> {
164 self.store.list_jobs().await
165 }
166
167 pub async fn update_job(
169 &self,
170 id: &str,
171 schedule: Option<&str>,
172 command: Option<&str>,
173 timeout_ms: Option<u64>,
174 ) -> Result<CronJob> {
175 let mut job = self
176 .store
177 .load_job(id)
178 .await?
179 .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
180
181 if let Some(schedule) = schedule {
182 let expr = CronExpression::parse(schedule)?;
183 job.schedule = schedule.to_string();
184 job.next_run = expr.next_after(Utc::now());
185 }
186
187 if let Some(command) = command {
188 job.command = command.to_string();
189 }
190
191 if let Some(timeout) = timeout_ms {
192 job.timeout_ms = timeout;
193 }
194
195 job.updated_at = Utc::now();
196 self.store.save_job(&job).await?;
197
198 tracing::info!("Updated cron job: {} ({})", job.name, job.id);
199 Ok(job)
200 }
201
202 pub async fn pause_job(&self, id: &str) -> Result<CronJob> {
204 let mut job = self
205 .store
206 .load_job(id)
207 .await?
208 .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
209
210 job.status = JobStatus::Paused;
211 job.updated_at = Utc::now();
212 self.store.save_job(&job).await?;
213
214 tracing::info!("Paused cron job: {} ({})", job.name, job.id);
215 Ok(job)
216 }
217
218 pub async fn resume_job(&self, id: &str) -> Result<CronJob> {
220 let mut job = self
221 .store
222 .load_job(id)
223 .await?
224 .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
225
226 job.status = JobStatus::Active;
227 job.updated_at = Utc::now();
228
229 if let Ok(expr) = CronExpression::parse(&job.schedule) {
231 job.next_run = expr.next_after(Utc::now());
232 }
233
234 self.store.save_job(&job).await?;
235
236 tracing::info!("Resumed cron job: {} ({})", job.name, job.id);
237 Ok(job)
238 }
239
240 pub async fn remove_job(&self, id: &str) -> Result<()> {
242 let job = self
243 .store
244 .load_job(id)
245 .await?
246 .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
247
248 self.store.delete_job(id).await?;
249
250 tracing::info!("Removed cron job: {} ({})", job.name, job.id);
251 Ok(())
252 }
253
254 pub async fn get_history(&self, job_id: &str, limit: usize) -> Result<Vec<JobExecution>> {
256 self.store.load_executions(job_id, limit).await
257 }
258
259 pub async fn run_job(&self, id: &str) -> Result<JobExecution> {
261 let job = self
262 .store
263 .load_job(id)
264 .await?
265 .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
266
267 self.execute_job(&job).await
268 }
269
270 async fn execute_job(&self, job: &CronJob) -> Result<JobExecution> {
272 let span = tracing::info_span!(
273 "a3s.cron.execute_job",
274 a3s.cron.job_id = %job.id,
275 a3s.cron.job_name = %job.name,
276 a3s.cron.job_status = tracing::field::Empty,
277 a3s.cron.job_duration_ms = tracing::field::Empty,
278 );
279 let _guard = span.enter();
280 let exec_start = Instant::now();
281
282 let mut execution = JobExecution::new(&job.id);
283
284 let _ = self.event_tx.send(SchedulerEvent::JobStarted {
286 job_id: job.id.clone(),
287 execution_id: execution.id.clone(),
288 });
289
290 let mut running_job = job.clone();
292 running_job.status = JobStatus::Running;
293 self.store.save_job(&running_job).await?;
294
295 let timeout = Duration::from_millis(job.timeout_ms);
297 let working_dir = job.working_dir.as_deref().unwrap_or(&self.workspace);
298
299 let result: std::result::Result<
301 std::result::Result<(i32, String, String), std::io::Error>,
302 tokio::time::error::Elapsed,
303 > = match job.job_type {
304 JobType::Agent => {
305 let agent_executor = self.agent_executor.clone();
306 let agent_config = job.agent_config.clone();
307 let prompt = job.command.clone();
308 let wd = working_dir.to_string();
309
310 tokio::time::timeout(timeout, async move {
311 let executor = agent_executor.ok_or_else(|| {
312 std::io::Error::new(
313 std::io::ErrorKind::Other,
314 "No agent executor configured for agent-mode cron job",
315 )
316 })?;
317 let config = agent_config.ok_or_else(|| {
318 std::io::Error::new(
319 std::io::ErrorKind::Other,
320 "Agent job missing agent_config",
321 )
322 })?;
323 match executor.execute(&config, &prompt, &wd).await {
324 Ok(text) => Ok((0, text, String::new())),
325 Err(e) => Ok((1, String::new(), e)),
326 }
327 })
328 .await
329 }
330 JobType::Shell => {
331 tokio::time::timeout(timeout, async {
332 let output = Command::new("sh")
333 .arg("-c")
334 .arg(&job.command)
335 .current_dir(working_dir)
336 .envs(job.env.iter().map(|(k, v)| (k.as_str(), v.as_str())))
337 .output()
338 .await?;
339 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
340 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
341 let exit_code = output.status.code().unwrap_or(-1);
342 Ok((exit_code, stdout, stderr))
343 })
344 .await
345 }
346 };
347
348 execution = match result {
350 Ok(Ok((exit_code, stdout, stderr))) => execution.complete(exit_code, stdout, stderr),
351 Ok(Err(e)) => execution.fail(format!("Failed to execute command: {}", e)),
352 Err(_) => {
353 let _ = self.event_tx.send(SchedulerEvent::JobTimeout {
354 job_id: job.id.clone(),
355 execution_id: execution.id.clone(),
356 });
357 execution.timeout()
358 }
359 };
360
361 self.store.save_execution(&execution).await?;
363
364 let mut updated_job = job.clone();
366 updated_job.status = JobStatus::Active;
367 updated_job.last_run = Some(execution.started_at);
368 updated_job.updated_at = Utc::now();
369
370 if execution.status == crate::types::ExecutionStatus::Success {
371 updated_job.run_count += 1;
372 let _ = self.event_tx.send(SchedulerEvent::JobCompleted {
373 job_id: job.id.clone(),
374 execution_id: execution.id.clone(),
375 });
376 } else {
377 updated_job.fail_count += 1;
378 let _ = self.event_tx.send(SchedulerEvent::JobFailed {
379 job_id: job.id.clone(),
380 execution_id: execution.id.clone(),
381 error: execution.error.clone().unwrap_or_default(),
382 });
383 }
384
385 if let Ok(expr) = CronExpression::parse(&updated_job.schedule) {
387 updated_job.next_run = expr.next_after(Utc::now());
388 }
389
390 self.store.save_job(&updated_job).await?;
391
392 let duration = exec_start.elapsed();
394 let status_str = if execution.status == crate::types::ExecutionStatus::Success {
395 "success"
396 } else if execution.status == crate::types::ExecutionStatus::Timeout {
397 "timeout"
398 } else {
399 "failed"
400 };
401 span.record(telemetry::ATTR_JOB_STATUS, status_str);
402 span.record(telemetry::ATTR_JOB_DURATION_MS, duration.as_millis() as i64);
403 telemetry::record_job_execution(&job.name, status_str, duration.as_secs_f64());
404
405 Ok(execution)
406 }
407
408 pub async fn start(&self) -> Result<()> {
410 let mut running = self.running.write().await;
411 if *running {
412 return Ok(());
413 }
414 *running = true;
415 drop(running);
416
417 let _ = self.event_tx.send(SchedulerEvent::Started);
418 tracing::info!("Cron scheduler started");
419
420 let store = self.store.clone();
421 let event_tx = self.event_tx.clone();
422 let running = self.running.clone();
423 let workspace = self.workspace.clone();
424 let agent_executor = self.agent_executor.clone();
425
426 tokio::spawn(async move {
427 let mut ticker = interval(Duration::from_secs(60));
428
429 loop {
430 ticker.tick().await;
431 telemetry::record_scheduler_tick();
432
433 if !*running.read().await {
435 break;
436 }
437
438 let jobs = match store.list_jobs().await {
440 Ok(jobs) => jobs,
441 Err(e) => {
442 tracing::error!("Failed to list jobs: {}", e);
443 continue;
444 }
445 };
446
447 let now = Utc::now();
448
449 for job in jobs {
450 if job.status != JobStatus::Active {
452 continue;
453 }
454
455 if let Some(next_run) = job.next_run {
457 if next_run <= now {
458 let manager = CronManager {
460 store: store.clone(),
461 event_tx: event_tx.clone(),
462 running: running.clone(),
463 workspace: workspace.clone(),
464 agent_executor: agent_executor.clone(),
465 };
466
467 if let Err(e) = manager.execute_job(&job).await {
468 tracing::error!("Failed to execute job {}: {}", job.id, e);
469 }
470 }
471 }
472 }
473 }
474
475 let _ = event_tx.send(SchedulerEvent::Stopped);
476 tracing::info!("Cron scheduler stopped");
477 });
478
479 Ok(())
480 }
481
482 pub async fn stop(&self) {
484 let mut running = self.running.write().await;
485 *running = false;
486 }
487
488 pub async fn is_running(&self) -> bool {
490 *self.running.read().await
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use crate::store::MemoryCronStore;
498
499 fn create_test_manager() -> CronManager {
500 let store = Arc::new(MemoryCronStore::new());
501 CronManager::with_store(store, "/tmp".to_string())
502 }
503
504 #[tokio::test]
505 async fn test_add_job() {
506 let manager = create_test_manager();
507
508 let job = manager
509 .add_job("test-job", "*/5 * * * *", "echo hello")
510 .await
511 .unwrap();
512
513 assert_eq!(job.name, "test-job");
514 assert_eq!(job.schedule, "*/5 * * * *");
515 assert_eq!(job.command, "echo hello");
516 assert!(job.next_run.is_some());
517 }
518
519 #[tokio::test]
520 async fn test_add_duplicate_name() {
521 let manager = create_test_manager();
522
523 manager
524 .add_job("unique", "* * * * *", "echo 1")
525 .await
526 .unwrap();
527
528 let result = manager.add_job("unique", "* * * * *", "echo 2").await;
529 assert!(result.is_err());
530 }
531
532 #[tokio::test]
533 async fn test_add_invalid_schedule() {
534 let manager = create_test_manager();
535
536 let result = manager.add_job("bad", "invalid", "echo").await;
537 assert!(result.is_err());
538 }
539
540 #[tokio::test]
541 async fn test_get_job() {
542 let manager = create_test_manager();
543
544 let job = manager
545 .add_job("findme", "* * * * *", "echo")
546 .await
547 .unwrap();
548
549 let found = manager.get_job(&job.id).await.unwrap();
550 assert!(found.is_some());
551 assert_eq!(found.unwrap().name, "findme");
552 }
553
554 #[tokio::test]
555 async fn test_list_jobs() {
556 let manager = create_test_manager();
557
558 for i in 1..=3 {
559 manager
560 .add_job(&format!("job-{}", i), "* * * * *", "echo")
561 .await
562 .unwrap();
563 }
564
565 let jobs = manager.list_jobs().await.unwrap();
566 assert_eq!(jobs.len(), 3);
567 }
568
569 #[tokio::test]
570 async fn test_update_job() {
571 let manager = create_test_manager();
572
573 let job = manager
574 .add_job("updatable", "* * * * *", "echo v1")
575 .await
576 .unwrap();
577
578 let updated = manager
579 .update_job(&job.id, Some("0 * * * *"), Some("echo v2"), Some(30000))
580 .await
581 .unwrap();
582
583 assert_eq!(updated.schedule, "0 * * * *");
584 assert_eq!(updated.command, "echo v2");
585 assert_eq!(updated.timeout_ms, 30000);
586 }
587
588 #[tokio::test]
589 async fn test_pause_resume() {
590 let manager = create_test_manager();
591
592 let job = manager
593 .add_job("pausable", "* * * * *", "echo")
594 .await
595 .unwrap();
596
597 let paused = manager.pause_job(&job.id).await.unwrap();
599 assert_eq!(paused.status, JobStatus::Paused);
600
601 let resumed = manager.resume_job(&job.id).await.unwrap();
603 assert_eq!(resumed.status, JobStatus::Active);
604 }
605
606 #[tokio::test]
607 async fn test_remove_job() {
608 let manager = create_test_manager();
609
610 let job = manager
611 .add_job("removable", "* * * * *", "echo")
612 .await
613 .unwrap();
614
615 manager.remove_job(&job.id).await.unwrap();
616
617 let found = manager.get_job(&job.id).await.unwrap();
618 assert!(found.is_none());
619 }
620
621 #[tokio::test]
622 async fn test_run_job() {
623 let manager = create_test_manager();
624
625 let job = manager
626 .add_job("runnable", "* * * * *", "echo hello")
627 .await
628 .unwrap();
629
630 let execution = manager.run_job(&job.id).await.unwrap();
631 assert!(execution.stdout.contains("hello"));
632 }
633
634 #[tokio::test]
635 async fn test_run_job_failure() {
636 let manager = create_test_manager();
637
638 let job = manager
639 .add_job("failing", "* * * * *", "exit 1")
640 .await
641 .unwrap();
642
643 let execution = manager.run_job(&job.id).await.unwrap();
644 assert_eq!(execution.status, crate::types::ExecutionStatus::Failed);
645 }
646
647 #[tokio::test]
648 async fn test_get_history() {
649 let manager = create_test_manager();
650
651 let job = manager
652 .add_job("historical", "* * * * *", "echo test")
653 .await
654 .unwrap();
655
656 for _ in 0..3 {
658 manager.run_job(&job.id).await.unwrap();
659 }
660
661 let history = manager.get_history(&job.id, 10).await.unwrap();
662 assert_eq!(history.len(), 3);
663 }
664
665 #[tokio::test]
666 async fn test_event_subscription() {
667 let manager = create_test_manager();
668 let mut rx = manager.subscribe();
669
670 let job = manager
671 .add_job("evented", "* * * * *", "echo test")
672 .await
673 .unwrap();
674
675 manager.run_job(&job.id).await.unwrap();
677
678 let event = rx.try_recv().unwrap();
680 match event {
681 SchedulerEvent::JobStarted { job_id, .. } => {
682 assert_eq!(job_id, job.id);
683 }
684 _ => panic!("Expected JobStarted event"),
685 }
686 }
687
688 struct MockAgentExecutor {
692 response: String,
693 should_fail: bool,
694 }
695
696 #[async_trait::async_trait]
697 impl AgentExecutor for MockAgentExecutor {
698 async fn execute(
699 &self,
700 _config: &AgentJobConfig,
701 _prompt: &str,
702 _working_dir: &str,
703 ) -> std::result::Result<String, String> {
704 if self.should_fail {
705 Err("Mock agent error".to_string())
706 } else {
707 Ok(self.response.clone())
708 }
709 }
710 }
711
712 fn create_agent_config() -> AgentJobConfig {
713 AgentJobConfig {
714 model: "test-model".to_string(),
715 api_key: "test-key".to_string(),
716 workspace: None,
717 system_prompt: None,
718 base_url: None,
719 }
720 }
721
722 #[tokio::test]
723 async fn test_add_agent_job() {
724 let manager = create_test_manager();
725
726 let job = manager
727 .add_agent_job(
728 "agent-task",
729 "*/5 * * * *",
730 "Refactor auth module",
731 create_agent_config(),
732 )
733 .await
734 .unwrap();
735
736 assert_eq!(job.name, "agent-task");
737 assert_eq!(job.job_type, JobType::Agent);
738 assert_eq!(job.command, "Refactor auth module");
739 assert!(job.agent_config.is_some());
740 assert!(job.next_run.is_some());
741 }
742
743 #[tokio::test]
744 async fn test_add_agent_job_duplicate_name() {
745 let manager = create_test_manager();
746
747 manager
748 .add_agent_job("unique-agent", "* * * * *", "prompt", create_agent_config())
749 .await
750 .unwrap();
751
752 let result = manager
753 .add_agent_job(
754 "unique-agent",
755 "* * * * *",
756 "prompt2",
757 create_agent_config(),
758 )
759 .await;
760 assert!(result.is_err());
761 }
762
763 #[tokio::test]
764 async fn test_run_agent_job_success() {
765 let store = Arc::new(MemoryCronStore::new());
766 let mut manager = CronManager::with_store(store, "/tmp".to_string());
767 manager.set_agent_executor(Arc::new(MockAgentExecutor {
768 response: "Refactored 3 files".to_string(),
769 should_fail: false,
770 }));
771
772 let job = manager
773 .add_agent_job(
774 "agent-run",
775 "* * * * *",
776 "Refactor auth",
777 create_agent_config(),
778 )
779 .await
780 .unwrap();
781
782 let execution = manager.run_job(&job.id).await.unwrap();
783 assert_eq!(execution.status, crate::types::ExecutionStatus::Success);
784 assert!(execution.stdout.contains("Refactored 3 files"));
785 }
786
787 #[tokio::test]
788 async fn test_run_agent_job_failure() {
789 let store = Arc::new(MemoryCronStore::new());
790 let mut manager = CronManager::with_store(store, "/tmp".to_string());
791 manager.set_agent_executor(Arc::new(MockAgentExecutor {
792 response: String::new(),
793 should_fail: true,
794 }));
795
796 let job = manager
797 .add_agent_job(
798 "agent-fail",
799 "* * * * *",
800 "Bad prompt",
801 create_agent_config(),
802 )
803 .await
804 .unwrap();
805
806 let execution = manager.run_job(&job.id).await.unwrap();
807 assert_eq!(execution.status, crate::types::ExecutionStatus::Failed);
808 }
809
810 #[tokio::test]
811 async fn test_run_agent_job_no_executor() {
812 let manager = create_test_manager();
813
814 let job = manager
815 .add_agent_job("no-executor", "* * * * *", "prompt", create_agent_config())
816 .await
817 .unwrap();
818
819 let execution = manager.run_job(&job.id).await.unwrap();
820 assert_eq!(execution.status, crate::types::ExecutionStatus::Failed);
821 assert!(execution
822 .error
823 .as_deref()
824 .unwrap_or("")
825 .contains("No agent executor"));
826 }
827
828 #[tokio::test]
829 async fn test_shell_job_type_default() {
830 let manager = create_test_manager();
831
832 let job = manager
833 .add_job("shell-default", "* * * * *", "echo hello")
834 .await
835 .unwrap();
836
837 assert_eq!(job.job_type, JobType::Shell);
838 assert!(job.agent_config.is_none());
839 }
840}