Skip to main content

a3s_cron/
scheduler.rs

1//! Cron task scheduler
2//!
3//! Provides background task scheduling and execution management.
4
5use crate::parser::CronExpression;
6use crate::store::{CronStore, FileCronStore};
7use crate::telemetry;
8use crate::types::{
9    AgentExecutor, AgentJobConfig, CronError, CronJob, JobExecution, JobStatus, JobType, Result,
10};
11use chrono::Utc;
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::process::Command;
16use tokio::sync::{broadcast, RwLock};
17use tokio::time::{interval, Duration};
18
19/// Scheduler events for monitoring
20#[derive(Debug, Clone)]
21pub enum SchedulerEvent {
22    /// Scheduler started
23    Started,
24    /// Scheduler stopped
25    Stopped,
26    /// Job started execution
27    JobStarted {
28        job_id: String,
29        execution_id: String,
30    },
31    /// Job completed successfully
32    JobCompleted {
33        job_id: String,
34        execution_id: String,
35    },
36    /// Job failed
37    JobFailed {
38        job_id: String,
39        execution_id: String,
40        error: String,
41    },
42    /// Job timed out
43    JobTimeout {
44        job_id: String,
45        execution_id: String,
46    },
47}
48
49/// Cron manager for job scheduling and execution
50pub struct CronManager {
51    /// Storage backend
52    store: Arc<dyn CronStore>,
53    /// Event broadcaster
54    event_tx: broadcast::Sender<SchedulerEvent>,
55    /// Scheduler running flag
56    running: Arc<RwLock<bool>>,
57    /// Workspace directory
58    workspace: String,
59    /// Optional agent executor for agent-mode jobs
60    agent_executor: Option<Arc<dyn AgentExecutor>>,
61}
62
63impl CronManager {
64    /// Create a new cron manager with file-based storage
65    pub async fn new<P: AsRef<Path>>(workspace: P) -> Result<Self> {
66        let workspace_str = workspace.as_ref().to_string_lossy().to_string();
67        let store = Arc::new(FileCronStore::new(&workspace_str).await?);
68        let (event_tx, _) = broadcast::channel(100);
69
70        Ok(Self {
71            store,
72            event_tx,
73            running: Arc::new(RwLock::new(false)),
74            workspace: workspace_str,
75            agent_executor: None,
76        })
77    }
78
79    /// Create a cron manager with a custom store
80    pub fn with_store(store: Arc<dyn CronStore>, workspace: String) -> Self {
81        let (event_tx, _) = broadcast::channel(100);
82        Self {
83            store,
84            event_tx,
85            running: Arc::new(RwLock::new(false)),
86            workspace,
87            agent_executor: None,
88        }
89    }
90
91    /// Set the agent executor for agent-mode cron jobs.
92    pub fn set_agent_executor(&mut self, executor: Arc<dyn AgentExecutor>) {
93        self.agent_executor = Some(executor);
94    }
95
96    /// Subscribe to scheduler events
97    pub fn subscribe(&self) -> broadcast::Receiver<SchedulerEvent> {
98        self.event_tx.subscribe()
99    }
100
101    /// Add a new cron job
102    pub async fn add_job(&self, name: &str, schedule: &str, command: &str) -> Result<CronJob> {
103        // Validate schedule
104        let expr = CronExpression::parse(schedule)?;
105
106        // Check for duplicate name
107        if self.store.find_job_by_name(name).await?.is_some() {
108            return Err(CronError::JobExists(name.to_string()));
109        }
110
111        // Create job
112        let mut job = CronJob::new(name, schedule, command);
113        job.next_run = expr.next_after(Utc::now());
114        job.working_dir = Some(self.workspace.clone());
115
116        // Save
117        self.store.save_job(&job).await?;
118
119        tracing::info!("Added cron job: {} ({})", job.name, job.id);
120        Ok(job)
121    }
122
123    /// Add a new agent-mode cron job.
124    ///
125    /// The `command` field is used as the agent prompt. When the job fires,
126    /// it creates a temporary Agent with the given config and sends the prompt.
127    pub async fn add_agent_job(
128        &self,
129        name: &str,
130        schedule: &str,
131        prompt: &str,
132        config: AgentJobConfig,
133    ) -> Result<CronJob> {
134        let expr = CronExpression::parse(schedule)?;
135
136        if self.store.find_job_by_name(name).await?.is_some() {
137            return Err(CronError::JobExists(name.to_string()));
138        }
139
140        let mut job = CronJob::new(name, schedule, prompt);
141        job.job_type = JobType::Agent;
142        job.agent_config = Some(config);
143        job.next_run = expr.next_after(Utc::now());
144        job.working_dir = Some(self.workspace.clone());
145
146        self.store.save_job(&job).await?;
147
148        tracing::info!("Added agent cron job: {} ({})", job.name, job.id);
149        Ok(job)
150    }
151
152    /// Get a job by ID
153    pub async fn get_job(&self, id: &str) -> Result<Option<CronJob>> {
154        self.store.load_job(id).await
155    }
156
157    /// Get a job by name
158    pub async fn get_job_by_name(&self, name: &str) -> Result<Option<CronJob>> {
159        self.store.find_job_by_name(name).await
160    }
161
162    /// List all jobs
163    pub async fn list_jobs(&self) -> Result<Vec<CronJob>> {
164        self.store.list_jobs().await
165    }
166
167    /// Update a job
168    pub async fn update_job(
169        &self,
170        id: &str,
171        schedule: Option<&str>,
172        command: Option<&str>,
173        timeout_ms: Option<u64>,
174    ) -> Result<CronJob> {
175        let mut job = self
176            .store
177            .load_job(id)
178            .await?
179            .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
180
181        if let Some(schedule) = schedule {
182            let expr = CronExpression::parse(schedule)?;
183            job.schedule = schedule.to_string();
184            job.next_run = expr.next_after(Utc::now());
185        }
186
187        if let Some(command) = command {
188            job.command = command.to_string();
189        }
190
191        if let Some(timeout) = timeout_ms {
192            job.timeout_ms = timeout;
193        }
194
195        job.updated_at = Utc::now();
196        self.store.save_job(&job).await?;
197
198        tracing::info!("Updated cron job: {} ({})", job.name, job.id);
199        Ok(job)
200    }
201
202    /// Pause a job
203    pub async fn pause_job(&self, id: &str) -> Result<CronJob> {
204        let mut job = self
205            .store
206            .load_job(id)
207            .await?
208            .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
209
210        job.status = JobStatus::Paused;
211        job.updated_at = Utc::now();
212        self.store.save_job(&job).await?;
213
214        tracing::info!("Paused cron job: {} ({})", job.name, job.id);
215        Ok(job)
216    }
217
218    /// Resume a paused job
219    pub async fn resume_job(&self, id: &str) -> Result<CronJob> {
220        let mut job = self
221            .store
222            .load_job(id)
223            .await?
224            .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
225
226        job.status = JobStatus::Active;
227        job.updated_at = Utc::now();
228
229        // Recalculate next run
230        if let Ok(expr) = CronExpression::parse(&job.schedule) {
231            job.next_run = expr.next_after(Utc::now());
232        }
233
234        self.store.save_job(&job).await?;
235
236        tracing::info!("Resumed cron job: {} ({})", job.name, job.id);
237        Ok(job)
238    }
239
240    /// Remove a job
241    pub async fn remove_job(&self, id: &str) -> Result<()> {
242        let job = self
243            .store
244            .load_job(id)
245            .await?
246            .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
247
248        self.store.delete_job(id).await?;
249
250        tracing::info!("Removed cron job: {} ({})", job.name, job.id);
251        Ok(())
252    }
253
254    /// Get execution history for a job
255    pub async fn get_history(&self, job_id: &str, limit: usize) -> Result<Vec<JobExecution>> {
256        self.store.load_executions(job_id, limit).await
257    }
258
259    /// Manually run a job
260    pub async fn run_job(&self, id: &str) -> Result<JobExecution> {
261        let job = self
262            .store
263            .load_job(id)
264            .await?
265            .ok_or_else(|| CronError::JobNotFound(id.to_string()))?;
266
267        self.execute_job(&job).await
268    }
269
270    /// Execute a job
271    async fn execute_job(&self, job: &CronJob) -> Result<JobExecution> {
272        let span = tracing::info_span!(
273            "a3s.cron.execute_job",
274            a3s.cron.job_id = %job.id,
275            a3s.cron.job_name = %job.name,
276            a3s.cron.job_status = tracing::field::Empty,
277            a3s.cron.job_duration_ms = tracing::field::Empty,
278        );
279        let _guard = span.enter();
280        let exec_start = Instant::now();
281
282        let mut execution = JobExecution::new(&job.id);
283
284        // Emit start event
285        let _ = self.event_tx.send(SchedulerEvent::JobStarted {
286            job_id: job.id.clone(),
287            execution_id: execution.id.clone(),
288        });
289
290        // Update job status to running
291        let mut running_job = job.clone();
292        running_job.status = JobStatus::Running;
293        self.store.save_job(&running_job).await?;
294
295        // Execute command with timeout
296        let timeout = Duration::from_millis(job.timeout_ms);
297        let working_dir = job.working_dir.as_deref().unwrap_or(&self.workspace);
298
299        // Result type: Ok(Ok((exit_code, stdout, stderr))) or Ok(Err(io_err)) or Err(timeout)
300        let result: std::result::Result<
301            std::result::Result<(i32, String, String), std::io::Error>,
302            tokio::time::error::Elapsed,
303        > = match job.job_type {
304            JobType::Agent => {
305                let agent_executor = self.agent_executor.clone();
306                let agent_config = job.agent_config.clone();
307                let prompt = job.command.clone();
308                let wd = working_dir.to_string();
309
310                tokio::time::timeout(timeout, async move {
311                    let executor = agent_executor.ok_or_else(|| {
312                        std::io::Error::new(
313                            std::io::ErrorKind::Other,
314                            "No agent executor configured for agent-mode cron job",
315                        )
316                    })?;
317                    let config = agent_config.ok_or_else(|| {
318                        std::io::Error::new(
319                            std::io::ErrorKind::Other,
320                            "Agent job missing agent_config",
321                        )
322                    })?;
323                    match executor.execute(&config, &prompt, &wd).await {
324                        Ok(text) => Ok((0, text, String::new())),
325                        Err(e) => Ok((1, String::new(), e)),
326                    }
327                })
328                .await
329            }
330            JobType::Shell => {
331                tokio::time::timeout(timeout, async {
332                    let output = Command::new("sh")
333                        .arg("-c")
334                        .arg(&job.command)
335                        .current_dir(working_dir)
336                        .envs(job.env.iter().map(|(k, v)| (k.as_str(), v.as_str())))
337                        .output()
338                        .await?;
339                    let stdout = String::from_utf8_lossy(&output.stdout).to_string();
340                    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
341                    let exit_code = output.status.code().unwrap_or(-1);
342                    Ok((exit_code, stdout, stderr))
343                })
344                .await
345            }
346        };
347
348        // Process result
349        execution = match result {
350            Ok(Ok((exit_code, stdout, stderr))) => execution.complete(exit_code, stdout, stderr),
351            Ok(Err(e)) => execution.fail(format!("Failed to execute command: {}", e)),
352            Err(_) => {
353                let _ = self.event_tx.send(SchedulerEvent::JobTimeout {
354                    job_id: job.id.clone(),
355                    execution_id: execution.id.clone(),
356                });
357                execution.timeout()
358            }
359        };
360
361        // Save execution
362        self.store.save_execution(&execution).await?;
363
364        // Update job statistics
365        let mut updated_job = job.clone();
366        updated_job.status = JobStatus::Active;
367        updated_job.last_run = Some(execution.started_at);
368        updated_job.updated_at = Utc::now();
369
370        if execution.status == crate::types::ExecutionStatus::Success {
371            updated_job.run_count += 1;
372            let _ = self.event_tx.send(SchedulerEvent::JobCompleted {
373                job_id: job.id.clone(),
374                execution_id: execution.id.clone(),
375            });
376        } else {
377            updated_job.fail_count += 1;
378            let _ = self.event_tx.send(SchedulerEvent::JobFailed {
379                job_id: job.id.clone(),
380                execution_id: execution.id.clone(),
381                error: execution.error.clone().unwrap_or_default(),
382            });
383        }
384
385        // Calculate next run
386        if let Ok(expr) = CronExpression::parse(&updated_job.schedule) {
387            updated_job.next_run = expr.next_after(Utc::now());
388        }
389
390        self.store.save_job(&updated_job).await?;
391
392        // Record telemetry
393        let duration = exec_start.elapsed();
394        let status_str = if execution.status == crate::types::ExecutionStatus::Success {
395            "success"
396        } else if execution.status == crate::types::ExecutionStatus::Timeout {
397            "timeout"
398        } else {
399            "failed"
400        };
401        span.record(telemetry::ATTR_JOB_STATUS, status_str);
402        span.record(telemetry::ATTR_JOB_DURATION_MS, duration.as_millis() as i64);
403        telemetry::record_job_execution(&job.name, status_str, duration.as_secs_f64());
404
405        Ok(execution)
406    }
407
408    /// Start the scheduler background task
409    pub async fn start(&self) -> Result<()> {
410        let mut running = self.running.write().await;
411        if *running {
412            return Ok(());
413        }
414        *running = true;
415        drop(running);
416
417        let _ = self.event_tx.send(SchedulerEvent::Started);
418        tracing::info!("Cron scheduler started");
419
420        let store = self.store.clone();
421        let event_tx = self.event_tx.clone();
422        let running = self.running.clone();
423        let workspace = self.workspace.clone();
424        let agent_executor = self.agent_executor.clone();
425
426        tokio::spawn(async move {
427            let mut ticker = interval(Duration::from_secs(60));
428
429            loop {
430                ticker.tick().await;
431                telemetry::record_scheduler_tick();
432
433                // Check if still running
434                if !*running.read().await {
435                    break;
436                }
437
438                // Get all active jobs
439                let jobs = match store.list_jobs().await {
440                    Ok(jobs) => jobs,
441                    Err(e) => {
442                        tracing::error!("Failed to list jobs: {}", e);
443                        continue;
444                    }
445                };
446
447                let now = Utc::now();
448
449                for job in jobs {
450                    // Skip non-active jobs
451                    if job.status != JobStatus::Active {
452                        continue;
453                    }
454
455                    // Check if job should run
456                    if let Some(next_run) = job.next_run {
457                        if next_run <= now {
458                            // Create a temporary manager for execution
459                            let manager = CronManager {
460                                store: store.clone(),
461                                event_tx: event_tx.clone(),
462                                running: running.clone(),
463                                workspace: workspace.clone(),
464                                agent_executor: agent_executor.clone(),
465                            };
466
467                            if let Err(e) = manager.execute_job(&job).await {
468                                tracing::error!("Failed to execute job {}: {}", job.id, e);
469                            }
470                        }
471                    }
472                }
473            }
474
475            let _ = event_tx.send(SchedulerEvent::Stopped);
476            tracing::info!("Cron scheduler stopped");
477        });
478
479        Ok(())
480    }
481
482    /// Stop the scheduler
483    pub async fn stop(&self) {
484        let mut running = self.running.write().await;
485        *running = false;
486    }
487
488    /// Check if scheduler is running
489    pub async fn is_running(&self) -> bool {
490        *self.running.read().await
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use crate::store::MemoryCronStore;
498
499    fn create_test_manager() -> CronManager {
500        let store = Arc::new(MemoryCronStore::new());
501        CronManager::with_store(store, "/tmp".to_string())
502    }
503
504    #[tokio::test]
505    async fn test_add_job() {
506        let manager = create_test_manager();
507
508        let job = manager
509            .add_job("test-job", "*/5 * * * *", "echo hello")
510            .await
511            .unwrap();
512
513        assert_eq!(job.name, "test-job");
514        assert_eq!(job.schedule, "*/5 * * * *");
515        assert_eq!(job.command, "echo hello");
516        assert!(job.next_run.is_some());
517    }
518
519    #[tokio::test]
520    async fn test_add_duplicate_name() {
521        let manager = create_test_manager();
522
523        manager
524            .add_job("unique", "* * * * *", "echo 1")
525            .await
526            .unwrap();
527
528        let result = manager.add_job("unique", "* * * * *", "echo 2").await;
529        assert!(result.is_err());
530    }
531
532    #[tokio::test]
533    async fn test_add_invalid_schedule() {
534        let manager = create_test_manager();
535
536        let result = manager.add_job("bad", "invalid", "echo").await;
537        assert!(result.is_err());
538    }
539
540    #[tokio::test]
541    async fn test_get_job() {
542        let manager = create_test_manager();
543
544        let job = manager
545            .add_job("findme", "* * * * *", "echo")
546            .await
547            .unwrap();
548
549        let found = manager.get_job(&job.id).await.unwrap();
550        assert!(found.is_some());
551        assert_eq!(found.unwrap().name, "findme");
552    }
553
554    #[tokio::test]
555    async fn test_list_jobs() {
556        let manager = create_test_manager();
557
558        for i in 1..=3 {
559            manager
560                .add_job(&format!("job-{}", i), "* * * * *", "echo")
561                .await
562                .unwrap();
563        }
564
565        let jobs = manager.list_jobs().await.unwrap();
566        assert_eq!(jobs.len(), 3);
567    }
568
569    #[tokio::test]
570    async fn test_update_job() {
571        let manager = create_test_manager();
572
573        let job = manager
574            .add_job("updatable", "* * * * *", "echo v1")
575            .await
576            .unwrap();
577
578        let updated = manager
579            .update_job(&job.id, Some("0 * * * *"), Some("echo v2"), Some(30000))
580            .await
581            .unwrap();
582
583        assert_eq!(updated.schedule, "0 * * * *");
584        assert_eq!(updated.command, "echo v2");
585        assert_eq!(updated.timeout_ms, 30000);
586    }
587
588    #[tokio::test]
589    async fn test_pause_resume() {
590        let manager = create_test_manager();
591
592        let job = manager
593            .add_job("pausable", "* * * * *", "echo")
594            .await
595            .unwrap();
596
597        // Pause
598        let paused = manager.pause_job(&job.id).await.unwrap();
599        assert_eq!(paused.status, JobStatus::Paused);
600
601        // Resume
602        let resumed = manager.resume_job(&job.id).await.unwrap();
603        assert_eq!(resumed.status, JobStatus::Active);
604    }
605
606    #[tokio::test]
607    async fn test_remove_job() {
608        let manager = create_test_manager();
609
610        let job = manager
611            .add_job("removable", "* * * * *", "echo")
612            .await
613            .unwrap();
614
615        manager.remove_job(&job.id).await.unwrap();
616
617        let found = manager.get_job(&job.id).await.unwrap();
618        assert!(found.is_none());
619    }
620
621    #[tokio::test]
622    async fn test_run_job() {
623        let manager = create_test_manager();
624
625        let job = manager
626            .add_job("runnable", "* * * * *", "echo hello")
627            .await
628            .unwrap();
629
630        let execution = manager.run_job(&job.id).await.unwrap();
631        assert!(execution.stdout.contains("hello"));
632    }
633
634    #[tokio::test]
635    async fn test_run_job_failure() {
636        let manager = create_test_manager();
637
638        let job = manager
639            .add_job("failing", "* * * * *", "exit 1")
640            .await
641            .unwrap();
642
643        let execution = manager.run_job(&job.id).await.unwrap();
644        assert_eq!(execution.status, crate::types::ExecutionStatus::Failed);
645    }
646
647    #[tokio::test]
648    async fn test_get_history() {
649        let manager = create_test_manager();
650
651        let job = manager
652            .add_job("historical", "* * * * *", "echo test")
653            .await
654            .unwrap();
655
656        // Run job multiple times
657        for _ in 0..3 {
658            manager.run_job(&job.id).await.unwrap();
659        }
660
661        let history = manager.get_history(&job.id, 10).await.unwrap();
662        assert_eq!(history.len(), 3);
663    }
664
665    #[tokio::test]
666    async fn test_event_subscription() {
667        let manager = create_test_manager();
668        let mut rx = manager.subscribe();
669
670        let job = manager
671            .add_job("evented", "* * * * *", "echo test")
672            .await
673            .unwrap();
674
675        // Run job
676        manager.run_job(&job.id).await.unwrap();
677
678        // Check events
679        let event = rx.try_recv().unwrap();
680        match event {
681            SchedulerEvent::JobStarted { job_id, .. } => {
682                assert_eq!(job_id, job.id);
683            }
684            _ => panic!("Expected JobStarted event"),
685        }
686    }
687
688    // --- Agent-mode tests ---
689
690    /// Mock agent executor for testing
691    struct MockAgentExecutor {
692        response: String,
693        should_fail: bool,
694    }
695
696    #[async_trait::async_trait]
697    impl AgentExecutor for MockAgentExecutor {
698        async fn execute(
699            &self,
700            _config: &AgentJobConfig,
701            _prompt: &str,
702            _working_dir: &str,
703        ) -> std::result::Result<String, String> {
704            if self.should_fail {
705                Err("Mock agent error".to_string())
706            } else {
707                Ok(self.response.clone())
708            }
709        }
710    }
711
712    fn create_agent_config() -> AgentJobConfig {
713        AgentJobConfig {
714            model: "test-model".to_string(),
715            api_key: "test-key".to_string(),
716            workspace: None,
717            system_prompt: None,
718            base_url: None,
719        }
720    }
721
722    #[tokio::test]
723    async fn test_add_agent_job() {
724        let manager = create_test_manager();
725
726        let job = manager
727            .add_agent_job(
728                "agent-task",
729                "*/5 * * * *",
730                "Refactor auth module",
731                create_agent_config(),
732            )
733            .await
734            .unwrap();
735
736        assert_eq!(job.name, "agent-task");
737        assert_eq!(job.job_type, JobType::Agent);
738        assert_eq!(job.command, "Refactor auth module");
739        assert!(job.agent_config.is_some());
740        assert!(job.next_run.is_some());
741    }
742
743    #[tokio::test]
744    async fn test_add_agent_job_duplicate_name() {
745        let manager = create_test_manager();
746
747        manager
748            .add_agent_job("unique-agent", "* * * * *", "prompt", create_agent_config())
749            .await
750            .unwrap();
751
752        let result = manager
753            .add_agent_job(
754                "unique-agent",
755                "* * * * *",
756                "prompt2",
757                create_agent_config(),
758            )
759            .await;
760        assert!(result.is_err());
761    }
762
763    #[tokio::test]
764    async fn test_run_agent_job_success() {
765        let store = Arc::new(MemoryCronStore::new());
766        let mut manager = CronManager::with_store(store, "/tmp".to_string());
767        manager.set_agent_executor(Arc::new(MockAgentExecutor {
768            response: "Refactored 3 files".to_string(),
769            should_fail: false,
770        }));
771
772        let job = manager
773            .add_agent_job(
774                "agent-run",
775                "* * * * *",
776                "Refactor auth",
777                create_agent_config(),
778            )
779            .await
780            .unwrap();
781
782        let execution = manager.run_job(&job.id).await.unwrap();
783        assert_eq!(execution.status, crate::types::ExecutionStatus::Success);
784        assert!(execution.stdout.contains("Refactored 3 files"));
785    }
786
787    #[tokio::test]
788    async fn test_run_agent_job_failure() {
789        let store = Arc::new(MemoryCronStore::new());
790        let mut manager = CronManager::with_store(store, "/tmp".to_string());
791        manager.set_agent_executor(Arc::new(MockAgentExecutor {
792            response: String::new(),
793            should_fail: true,
794        }));
795
796        let job = manager
797            .add_agent_job(
798                "agent-fail",
799                "* * * * *",
800                "Bad prompt",
801                create_agent_config(),
802            )
803            .await
804            .unwrap();
805
806        let execution = manager.run_job(&job.id).await.unwrap();
807        assert_eq!(execution.status, crate::types::ExecutionStatus::Failed);
808    }
809
810    #[tokio::test]
811    async fn test_run_agent_job_no_executor() {
812        let manager = create_test_manager();
813
814        let job = manager
815            .add_agent_job("no-executor", "* * * * *", "prompt", create_agent_config())
816            .await
817            .unwrap();
818
819        let execution = manager.run_job(&job.id).await.unwrap();
820        assert_eq!(execution.status, crate::types::ExecutionStatus::Failed);
821        assert!(execution
822            .error
823            .as_deref()
824            .unwrap_or("")
825            .contains("No agent executor"));
826    }
827
828    #[tokio::test]
829    async fn test_shell_job_type_default() {
830        let manager = create_test_manager();
831
832        let job = manager
833            .add_job("shell-default", "* * * * *", "echo hello")
834            .await
835            .unwrap();
836
837        assert_eq!(job.job_type, JobType::Shell);
838        assert!(job.agent_config.is_none());
839    }
840}