Skip to main content

aster/
scheduler.rs

1// 调度器增强子模块
2#[path = "scheduler/types.rs"]
3pub mod types;
4
5#[path = "scheduler/migration.rs"]
6pub mod migration;
7
8#[path = "scheduler/executor.rs"]
9pub mod executor;
10
11#[path = "scheduler/delivery.rs"]
12pub mod delivery;
13
14use std::collections::HashMap;
15use std::fs;
16use std::io;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19
20use anyhow::{anyhow, Result};
21use async_trait::async_trait;
22use chrono::{DateTime, Local, Utc};
23use serde::{Deserialize, Serialize};
24use tokio::sync::Mutex;
25use tokio_cron_scheduler::{job::JobId, Job, JobScheduler as TokioJobScheduler};
26use tokio_util::sync::CancellationToken;
27
28use crate::agents::AgentEvent;
29use crate::agents::{Agent, SessionConfig};
30use crate::config::paths::Paths;
31use crate::config::Config;
32use crate::conversation::message::Message;
33use crate::conversation::Conversation;
34use crate::posthog;
35use crate::providers::create;
36use crate::recipe::Recipe;
37use crate::scheduler_trait::SchedulerTrait;
38use crate::session::session_manager::SessionType;
39use crate::session::{Session, SessionManager};
40
41type RunningTasksMap = HashMap<String, CancellationToken>;
42type JobsMap = HashMap<String, (JobId, ScheduledJob)>;
43
44pub fn get_default_scheduler_storage_path() -> Result<PathBuf, io::Error> {
45    let data_dir = Paths::data_dir();
46    fs::create_dir_all(&data_dir)?;
47    Ok(data_dir.join("schedules.json"))
48}
49
50pub fn get_default_scheduled_recipes_dir() -> Result<PathBuf, SchedulerError> {
51    let data_dir = Paths::data_dir();
52    let recipes_dir = data_dir.join("scheduled_recipes");
53    fs::create_dir_all(&recipes_dir).map_err(SchedulerError::StorageError)?;
54    Ok(recipes_dir)
55}
56
57#[derive(Debug)]
58pub enum SchedulerError {
59    JobIdExists(String),
60    JobNotFound(String),
61    StorageError(io::Error),
62    RecipeLoadError(String),
63    AgentSetupError(String),
64    PersistError(String),
65    CronParseError(String),
66    SchedulerInternalError(String),
67    AnyhowError(anyhow::Error),
68}
69
70impl std::fmt::Display for SchedulerError {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            SchedulerError::JobIdExists(id) => write!(f, "Job ID '{}' already exists.", id),
74            SchedulerError::JobNotFound(id) => write!(f, "Job ID '{}' not found.", id),
75            SchedulerError::StorageError(e) => write!(f, "Storage error: {}", e),
76            SchedulerError::RecipeLoadError(e) => write!(f, "Recipe load error: {}", e),
77            SchedulerError::AgentSetupError(e) => write!(f, "Agent setup error: {}", e),
78            SchedulerError::PersistError(e) => write!(f, "Failed to persist schedules: {}", e),
79            SchedulerError::CronParseError(e) => write!(f, "Invalid cron string: {}", e),
80            SchedulerError::SchedulerInternalError(e) => {
81                write!(f, "Scheduler internal error: {}", e)
82            }
83            SchedulerError::AnyhowError(e) => write!(f, "Scheduler operation failed: {}", e),
84        }
85    }
86}
87
88impl std::error::Error for SchedulerError {
89    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
90        match self {
91            SchedulerError::StorageError(e) => Some(e),
92            SchedulerError::AnyhowError(e) => Some(e.as_ref()),
93            _ => None,
94        }
95    }
96}
97
98impl From<io::Error> for SchedulerError {
99    fn from(err: io::Error) -> Self {
100        SchedulerError::StorageError(err)
101    }
102}
103
104impl From<serde_json::Error> for SchedulerError {
105    fn from(err: serde_json::Error) -> Self {
106        SchedulerError::PersistError(err.to_string())
107    }
108}
109
110impl From<anyhow::Error> for SchedulerError {
111    fn from(err: anyhow::Error) -> Self {
112        SchedulerError::AnyhowError(err)
113    }
114}
115
116#[derive(Clone, Serialize, Deserialize, Debug, utoipa::ToSchema)]
117pub struct ScheduledJob {
118    pub id: String,
119    pub source: String,
120    pub cron: String,
121    pub last_run: Option<DateTime<Utc>>,
122    #[serde(default)]
123    pub currently_running: bool,
124    #[serde(default)]
125    pub paused: bool,
126    #[serde(default)]
127    pub current_session_id: Option<String>,
128    #[serde(default)]
129    pub process_start_time: Option<DateTime<Utc>>,
130}
131
132async fn persist_jobs(
133    storage_path: &Path,
134    jobs: &Arc<Mutex<JobsMap>>,
135) -> Result<(), SchedulerError> {
136    let jobs_guard = jobs.lock().await;
137    let list: Vec<ScheduledJob> = jobs_guard.values().map(|(_, j)| j.clone()).collect();
138    if let Some(parent) = storage_path.parent() {
139        fs::create_dir_all(parent)?;
140    }
141    let data = serde_json::to_string_pretty(&list)?;
142    fs::write(storage_path, data)?;
143    Ok(())
144}
145
146pub struct Scheduler {
147    tokio_scheduler: TokioJobScheduler,
148    jobs: Arc<Mutex<JobsMap>>,
149    storage_path: PathBuf,
150    running_tasks: Arc<Mutex<RunningTasksMap>>,
151}
152
153impl Scheduler {
154    pub async fn new(storage_path: PathBuf) -> Result<Arc<Self>, SchedulerError> {
155        let internal_scheduler = TokioJobScheduler::new()
156            .await
157            .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
158
159        let jobs = Arc::new(Mutex::new(HashMap::new()));
160        let running_tasks = Arc::new(Mutex::new(HashMap::new()));
161
162        let arc_self = Arc::new(Self {
163            tokio_scheduler: internal_scheduler,
164            jobs,
165            storage_path,
166            running_tasks,
167        });
168
169        arc_self.load_jobs_from_storage().await;
170        arc_self
171            .tokio_scheduler
172            .start()
173            .await
174            .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
175
176        Ok(arc_self)
177    }
178
179    fn create_cron_task(&self, job: ScheduledJob) -> Result<Job, SchedulerError> {
180        let job_for_task = job.clone();
181        let jobs_arc = self.jobs.clone();
182        let storage_path = self.storage_path.clone();
183        let running_tasks_arc = self.running_tasks.clone();
184
185        let cron_parts: Vec<&str> = job.cron.split_whitespace().collect();
186        let cron = match cron_parts.len() {
187            5 => {
188                tracing::warn!(
189                    "Job '{}' has legacy 5-field cron '{}', converting to 6-field",
190                    job.id,
191                    job.cron
192                );
193                format!("0 {}", job.cron)
194            }
195            6 => job.cron.clone(),
196            _ => {
197                return Err(SchedulerError::CronParseError(format!(
198                    "Invalid cron expression '{}': expected 5 or 6 fields, got {}",
199                    job.cron,
200                    cron_parts.len()
201                )))
202            }
203        };
204
205        let local_tz = Local::now().timezone();
206
207        Job::new_async_tz(&cron, local_tz, move |_uuid, _l| {
208            tracing::info!("Cron task triggered for job '{}'", job_for_task.id);
209            let task_job_id = job_for_task.id.clone();
210            let current_jobs_arc = jobs_arc.clone();
211            let local_storage_path = storage_path.clone();
212            let job_to_execute = job_for_task.clone();
213            let running_tasks = running_tasks_arc.clone();
214
215            Box::pin(async move {
216                let should_execute = {
217                    let jobs_guard = current_jobs_arc.lock().await;
218                    jobs_guard
219                        .get(&task_job_id)
220                        .map(|(_, j)| !j.paused)
221                        .unwrap_or(false)
222                };
223
224                if !should_execute {
225                    return;
226                }
227
228                let current_time = Utc::now();
229                {
230                    let mut jobs_guard = current_jobs_arc.lock().await;
231                    if let Some((_, job)) = jobs_guard.get_mut(&task_job_id) {
232                        job.last_run = Some(current_time);
233                        job.currently_running = true;
234                        job.process_start_time = Some(current_time);
235                    }
236                }
237
238                if let Err(e) = persist_jobs(&local_storage_path, &current_jobs_arc).await {
239                    tracing::error!("Failed to persist job status: {}", e);
240                }
241
242                let cancel_token = CancellationToken::new();
243                {
244                    let mut tasks = running_tasks.lock().await;
245                    tasks.insert(task_job_id.clone(), cancel_token.clone());
246                }
247
248                let result = execute_job(
249                    job_to_execute,
250                    current_jobs_arc.clone(),
251                    task_job_id.clone(),
252                    cancel_token.clone(),
253                )
254                .await;
255
256                {
257                    let mut tasks = running_tasks.lock().await;
258                    tasks.remove(&task_job_id);
259                }
260
261                {
262                    let mut jobs_guard = current_jobs_arc.lock().await;
263                    if let Some((_, job)) = jobs_guard.get_mut(&task_job_id) {
264                        job.currently_running = false;
265                        job.current_session_id = None;
266                        job.process_start_time = None;
267                    }
268                }
269
270                if let Err(e) = persist_jobs(&local_storage_path, &current_jobs_arc).await {
271                    tracing::error!("Failed to persist job completion: {}", e);
272                }
273
274                match result {
275                    Ok(_) => tracing::info!("Job '{}' completed", task_job_id),
276                    Err(ref e) => {
277                        tracing::error!("Job '{}' failed: {}", task_job_id, e);
278                        crate::posthog::emit_error("scheduler_job_failed", &e.to_string());
279                    }
280                }
281            })
282        })
283        .map_err(|e| SchedulerError::CronParseError(e.to_string()))
284    }
285
286    pub async fn add_scheduled_job(
287        &self,
288        original_job_spec: ScheduledJob,
289        make_copy: bool,
290    ) -> Result<(), SchedulerError> {
291        {
292            let jobs_guard = self.jobs.lock().await;
293            if jobs_guard.contains_key(&original_job_spec.id) {
294                return Err(SchedulerError::JobIdExists(original_job_spec.id.clone()));
295            }
296        }
297
298        let mut stored_job = original_job_spec;
299        if make_copy {
300            let original_recipe_path = Path::new(&stored_job.source);
301            if !original_recipe_path.is_file() {
302                return Err(SchedulerError::RecipeLoadError(format!(
303                    "Recipe file not found: {}",
304                    stored_job.source
305                )));
306            }
307
308            let scheduled_recipes_dir = get_default_scheduled_recipes_dir()?;
309            let original_extension = original_recipe_path
310                .extension()
311                .and_then(|ext| ext.to_str())
312                .unwrap_or("yaml");
313
314            let destination_filename = format!("{}.{}", stored_job.id, original_extension);
315            let destination_recipe_path = scheduled_recipes_dir.join(destination_filename);
316
317            fs::copy(original_recipe_path, &destination_recipe_path)?;
318            stored_job.source = destination_recipe_path.to_string_lossy().into_owned();
319            stored_job.current_session_id = None;
320            stored_job.process_start_time = None;
321        }
322
323        let cron_task = self.create_cron_task(stored_job.clone())?;
324
325        let job_uuid = self
326            .tokio_scheduler
327            .add(cron_task)
328            .await
329            .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
330
331        {
332            let mut jobs_guard = self.jobs.lock().await;
333            jobs_guard.insert(stored_job.id.clone(), (job_uuid, stored_job));
334        }
335
336        persist_jobs(&self.storage_path, &self.jobs).await?;
337        Ok(())
338    }
339
340    pub async fn schedule_recipe(
341        &self,
342        recipe_path: PathBuf,
343        cron_schedule: Option<String>,
344    ) -> Result<(), SchedulerError> {
345        let recipe_path_str = recipe_path.to_string_lossy().to_string();
346
347        let existing_job_id = {
348            let jobs_guard = self.jobs.lock().await;
349            jobs_guard
350                .iter()
351                .find(|(_, (_, job))| job.source == recipe_path_str)
352                .map(|(id, _)| id.clone())
353        };
354
355        match cron_schedule {
356            Some(cron) => {
357                if let Some(job_id) = existing_job_id {
358                    self.update_schedule(&job_id, cron).await
359                } else {
360                    let job_id = self.generate_unique_job_id(&recipe_path).await;
361                    let job = ScheduledJob {
362                        id: job_id,
363                        source: recipe_path_str,
364                        cron,
365                        last_run: None,
366                        currently_running: false,
367                        paused: false,
368                        current_session_id: None,
369                        process_start_time: None,
370                    };
371                    self.add_scheduled_job(job, false).await
372                }
373            }
374            None => {
375                if let Some(job_id) = existing_job_id {
376                    self.remove_scheduled_job(&job_id, false).await
377                } else {
378                    Ok(())
379                }
380            }
381        }
382    }
383
384    async fn generate_unique_job_id(&self, path: &Path) -> String {
385        let base_id = path
386            .file_stem()
387            .and_then(|s| s.to_str())
388            .unwrap_or("unnamed")
389            .to_string();
390
391        let jobs_guard = self.jobs.lock().await;
392        let mut id = base_id.clone();
393        let mut counter = 1;
394
395        while jobs_guard.contains_key(&id) {
396            id = format!("{}_{}", base_id, counter);
397            counter += 1;
398        }
399
400        id
401    }
402
403    async fn load_jobs_from_storage(self: &Arc<Self>) {
404        if !self.storage_path.exists() {
405            return;
406        }
407        let data = match fs::read_to_string(&self.storage_path) {
408            Ok(data) => data,
409            Err(e) => {
410                tracing::error!(
411                    "Failed to read schedules.json: {}. Starting with empty schedule list.",
412                    e
413                );
414                return;
415            }
416        };
417        if data.trim().is_empty() {
418            return;
419        }
420
421        let list: Vec<ScheduledJob> = match serde_json::from_str(&data) {
422            Ok(jobs) => jobs,
423            Err(e) => {
424                tracing::error!(
425                    "Failed to parse schedules.json: {}. Starting with empty schedule list.",
426                    e
427                );
428                return;
429            }
430        };
431
432        for job_to_load in list {
433            if !Path::new(&job_to_load.source).exists() {
434                tracing::warn!(
435                    "Recipe file {} not found, skipping job '{}'",
436                    job_to_load.source,
437                    job_to_load.id
438                );
439                continue;
440            }
441
442            let cron_task = match self.create_cron_task(job_to_load.clone()) {
443                Ok(task) => task,
444                Err(e) => {
445                    tracing::error!(
446                        "Failed to create cron task for job '{}': {}. Skipping.",
447                        job_to_load.id,
448                        e
449                    );
450                    continue;
451                }
452            };
453
454            let job_uuid = match self.tokio_scheduler.add(cron_task).await {
455                Ok(uuid) => uuid,
456                Err(e) => {
457                    tracing::error!(
458                        "Failed to add job '{}' to scheduler: {}. Skipping.",
459                        job_to_load.id,
460                        e
461                    );
462                    continue;
463                }
464            };
465
466            let mut jobs_guard = self.jobs.lock().await;
467            jobs_guard.insert(job_to_load.id.clone(), (job_uuid, job_to_load));
468        }
469    }
470
471    pub async fn list_scheduled_jobs(&self) -> Vec<ScheduledJob> {
472        self.jobs
473            .lock()
474            .await
475            .values()
476            .map(|(_, j)| j.clone())
477            .collect()
478    }
479
480    pub async fn remove_scheduled_job(
481        &self,
482        id: &str,
483        remove_recipe: bool,
484    ) -> Result<(), SchedulerError> {
485        let (job_uuid, recipe_path) = {
486            let mut jobs_guard = self.jobs.lock().await;
487            match jobs_guard.remove(id) {
488                Some((uuid, job)) => (uuid, job.source.clone()),
489                None => return Err(SchedulerError::JobNotFound(id.to_string())),
490            }
491        };
492
493        self.tokio_scheduler
494            .remove(&job_uuid)
495            .await
496            .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
497
498        if remove_recipe {
499            let path = Path::new(&recipe_path);
500            if path.exists() {
501                fs::remove_file(path)?;
502            }
503        }
504
505        persist_jobs(&self.storage_path, &self.jobs).await?;
506        Ok(())
507    }
508
509    pub async fn sessions(
510        &self,
511        sched_id: &str,
512        limit: usize,
513    ) -> Result<Vec<(String, Session)>, SchedulerError> {
514        let all_sessions = SessionManager::list_sessions()
515            .await
516            .map_err(|e| SchedulerError::StorageError(io::Error::other(e)))?;
517
518        let mut schedule_sessions: Vec<(String, Session)> = all_sessions
519            .into_iter()
520            .filter(|s| s.schedule_id.as_deref() == Some(sched_id))
521            .map(|s| (s.id.clone(), s))
522            .collect();
523
524        schedule_sessions.sort_by(|a, b| b.1.created_at.cmp(&a.1.created_at));
525        schedule_sessions.truncate(limit);
526
527        Ok(schedule_sessions)
528    }
529
530    pub async fn run_now(&self, sched_id: &str) -> Result<String, SchedulerError> {
531        let job_to_run = {
532            let mut jobs_guard = self.jobs.lock().await;
533            match jobs_guard.get_mut(sched_id) {
534                Some((_, job)) => {
535                    if job.currently_running {
536                        return Err(SchedulerError::AnyhowError(anyhow!(
537                            "Job '{}' is already running",
538                            sched_id
539                        )));
540                    }
541                    job.currently_running = true;
542                    job.process_start_time = Some(Utc::now());
543                    job.clone()
544                }
545                None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
546            }
547        };
548
549        persist_jobs(&self.storage_path, &self.jobs).await?;
550
551        let cancel_token = CancellationToken::new();
552        {
553            let mut tasks = self.running_tasks.lock().await;
554            tasks.insert(sched_id.to_string(), cancel_token.clone());
555        }
556
557        let result = execute_job(
558            job_to_run,
559            self.jobs.clone(),
560            sched_id.to_string(),
561            cancel_token.clone(),
562        )
563        .await;
564
565        {
566            let mut tasks = self.running_tasks.lock().await;
567            tasks.remove(sched_id);
568        }
569
570        {
571            let mut jobs_guard = self.jobs.lock().await;
572            if let Some((_, job)) = jobs_guard.get_mut(sched_id) {
573                job.currently_running = false;
574                job.current_session_id = None;
575                job.process_start_time = None;
576                job.last_run = Some(Utc::now());
577            }
578        }
579
580        persist_jobs(&self.storage_path, &self.jobs).await?;
581
582        match result {
583            Ok(session_id) => Ok(session_id),
584            Err(e) => Err(SchedulerError::AnyhowError(anyhow!(
585                "Job '{}' failed: {}",
586                sched_id,
587                e
588            ))),
589        }
590    }
591
592    pub async fn pause_schedule(&self, sched_id: &str) -> Result<(), SchedulerError> {
593        {
594            let mut jobs_guard = self.jobs.lock().await;
595            match jobs_guard.get_mut(sched_id) {
596                Some((_, job)) => {
597                    if job.currently_running {
598                        return Err(SchedulerError::AnyhowError(anyhow!(
599                            "Cannot pause running schedule '{}'",
600                            sched_id
601                        )));
602                    }
603                    job.paused = true;
604                }
605                None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
606            }
607        }
608
609        persist_jobs(&self.storage_path, &self.jobs).await
610    }
611
612    pub async fn unpause_schedule(&self, sched_id: &str) -> Result<(), SchedulerError> {
613        {
614            let mut jobs_guard = self.jobs.lock().await;
615            match jobs_guard.get_mut(sched_id) {
616                Some((_, job)) => job.paused = false,
617                None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
618            }
619        }
620
621        persist_jobs(&self.storage_path, &self.jobs).await
622    }
623
624    pub async fn update_schedule(
625        &self,
626        sched_id: &str,
627        new_cron: String,
628    ) -> Result<(), SchedulerError> {
629        let (old_uuid, updated_job) = {
630            let mut jobs_guard = self.jobs.lock().await;
631            match jobs_guard.get_mut(sched_id) {
632                Some((uuid, job)) => {
633                    if job.currently_running {
634                        return Err(SchedulerError::AnyhowError(anyhow!(
635                            "Cannot update running schedule '{}'",
636                            sched_id
637                        )));
638                    }
639                    if new_cron == job.cron {
640                        return Ok(());
641                    }
642                    job.cron = new_cron.clone();
643                    (*uuid, job.clone())
644                }
645                None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
646            }
647        };
648
649        self.tokio_scheduler
650            .remove(&old_uuid)
651            .await
652            .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
653
654        let cron_task = self.create_cron_task(updated_job)?;
655        let new_uuid = self
656            .tokio_scheduler
657            .add(cron_task)
658            .await
659            .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
660
661        {
662            let mut jobs_guard = self.jobs.lock().await;
663            if let Some((uuid, _)) = jobs_guard.get_mut(sched_id) {
664                *uuid = new_uuid;
665            }
666        }
667
668        persist_jobs(&self.storage_path, &self.jobs).await
669    }
670
671    pub async fn kill_running_job(&self, sched_id: &str) -> Result<(), SchedulerError> {
672        {
673            let jobs_guard = self.jobs.lock().await;
674            match jobs_guard.get(sched_id) {
675                Some((_, job)) if !job.currently_running => {
676                    return Err(SchedulerError::AnyhowError(anyhow!(
677                        "Schedule '{}' is not running",
678                        sched_id
679                    )));
680                }
681                None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
682                _ => {}
683            }
684        }
685
686        {
687            let tasks = self.running_tasks.lock().await;
688            if let Some(token) = tasks.get(sched_id) {
689                token.cancel();
690            }
691        }
692
693        Ok(())
694    }
695
696    pub async fn get_running_job_info(
697        &self,
698        sched_id: &str,
699    ) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError> {
700        let jobs_guard = self.jobs.lock().await;
701        match jobs_guard.get(sched_id) {
702            Some((_, job)) if job.currently_running => {
703                match (&job.current_session_id, &job.process_start_time) {
704                    (Some(sid), Some(start)) => Ok(Some((sid.clone(), *start))),
705                    _ => Ok(None),
706                }
707            }
708            Some(_) => Ok(None),
709            None => Err(SchedulerError::JobNotFound(sched_id.to_string())),
710        }
711    }
712}
713
714#[allow(clippy::too_many_lines)]
715async fn execute_job(
716    job: ScheduledJob,
717    jobs: Arc<Mutex<JobsMap>>,
718    job_id: String,
719    cancel_token: CancellationToken,
720) -> Result<String> {
721    if job.source.is_empty() {
722        return Ok(job.id.to_string());
723    }
724
725    let recipe_path = Path::new(&job.source);
726    let recipe_content = fs::read_to_string(recipe_path)?;
727
728    let recipe: Recipe = {
729        let extension = recipe_path
730            .extension()
731            .and_then(|s| s.to_str())
732            .unwrap_or("yaml")
733            .to_lowercase();
734
735        match extension.as_str() {
736            "json" | "jsonl" => serde_json::from_str(&recipe_content)?,
737            _ => serde_yaml::from_str(&recipe_content)?,
738        }
739    };
740
741    let agent = Agent::new();
742
743    let config = Config::global();
744    let provider_name = config.get_aster_provider()?;
745    let model_name = config.get_aster_model()?;
746    let model_config = crate::model::ModelConfig::new(&model_name)?;
747
748    let agent_provider = create(&provider_name, model_config).await?;
749
750    if let Some(ref extensions) = recipe.extensions {
751        for ext in extensions {
752            agent.add_extension(ext.clone()).await?;
753        }
754    }
755
756    let session = SessionManager::create_session(
757        std::env::current_dir()?,
758        format!("Scheduled job: {}", job.id),
759        SessionType::Scheduled,
760    )
761    .await?;
762
763    agent.update_provider(agent_provider, &session.id).await?;
764
765    let mut jobs_guard = jobs.lock().await;
766    if let Some((_, job_def)) = jobs_guard.get_mut(job_id.as_str()) {
767        job_def.current_session_id = Some(session.id.clone());
768    }
769    drop(jobs_guard);
770
771    let start_time = std::time::Instant::now();
772    tokio::spawn(async move {
773        let mut props = HashMap::new();
774        props.insert(
775            "trigger".to_string(),
776            serde_json::Value::String("automated".to_string()),
777        );
778        if let Err(e) = posthog::emit_event("schedule_job_started", props).await {
779            tracing::debug!("Failed to send schedule telemetry: {}", e);
780        }
781    });
782
783    let prompt_text = recipe
784        .prompt
785        .as_ref()
786        .or(recipe.instructions.as_ref())
787        .unwrap();
788
789    let user_message = Message::user().with_text(prompt_text);
790    let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]);
791
792    let session_config = SessionConfig {
793        id: session.id.clone(),
794        schedule_id: Some(job.id.clone()),
795        max_turns: None,
796        retry_config: None,
797        system_prompt: None,
798    };
799
800    let session_id = session_config.id.clone();
801    let stream = crate::session_context::with_session_id(Some(session_id.clone()), async {
802        agent
803            .reply(user_message, session_config, Some(cancel_token))
804            .await
805    })
806    .await?;
807
808    use futures::StreamExt;
809    let mut stream = std::pin::pin!(stream);
810
811    while let Some(message_result) = stream.next().await {
812        tokio::task::yield_now().await;
813
814        match message_result {
815            Ok(AgentEvent::Message(msg)) => {
816                conversation.push(msg);
817            }
818            Ok(AgentEvent::HistoryReplaced(updated)) => {
819                conversation = updated;
820            }
821            Ok(_) => {}
822            Err(e) => {
823                tracing::error!("Error in agent stream: {}", e);
824                break;
825            }
826        }
827    }
828
829    SessionManager::update_session(&session.id)
830        .schedule_id(Some(job.id.clone()))
831        .recipe(Some(recipe))
832        .apply()
833        .await?;
834
835    let duration_secs = start_time.elapsed().as_secs();
836    tokio::spawn(async move {
837        let mut props = HashMap::new();
838        props.insert(
839            "trigger".to_string(),
840            serde_json::Value::String("automated".to_string()),
841        );
842        props.insert(
843            "status".to_string(),
844            serde_json::Value::String("completed".to_string()),
845        );
846        props.insert(
847            "duration_seconds".to_string(),
848            serde_json::Value::Number(serde_json::Number::from(duration_secs)),
849        );
850        if let Err(e) = posthog::emit_event("schedule_job_completed", props).await {
851            tracing::debug!("Failed to send schedule telemetry: {}", e);
852        }
853    });
854
855    Ok(session.id)
856}
857
858#[async_trait]
859impl SchedulerTrait for Scheduler {
860    async fn add_scheduled_job(
861        &self,
862        job: ScheduledJob,
863        make_copy: bool,
864    ) -> Result<(), SchedulerError> {
865        self.add_scheduled_job(job, make_copy).await
866    }
867
868    async fn schedule_recipe(
869        &self,
870        recipe_path: PathBuf,
871        cron_schedule: Option<String>,
872    ) -> Result<(), SchedulerError> {
873        self.schedule_recipe(recipe_path, cron_schedule).await
874    }
875
876    async fn list_scheduled_jobs(&self) -> Vec<ScheduledJob> {
877        self.list_scheduled_jobs().await
878    }
879
880    async fn remove_scheduled_job(
881        &self,
882        id: &str,
883        remove_recipe: bool,
884    ) -> Result<(), SchedulerError> {
885        self.remove_scheduled_job(id, remove_recipe).await
886    }
887
888    async fn pause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
889        self.pause_schedule(id).await
890    }
891
892    async fn unpause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
893        self.unpause_schedule(id).await
894    }
895
896    async fn run_now(&self, id: &str) -> Result<String, SchedulerError> {
897        self.run_now(id).await
898    }
899
900    async fn sessions(
901        &self,
902        sched_id: &str,
903        limit: usize,
904    ) -> Result<Vec<(String, Session)>, SchedulerError> {
905        self.sessions(sched_id, limit).await
906    }
907
908    async fn update_schedule(
909        &self,
910        sched_id: &str,
911        new_cron: String,
912    ) -> Result<(), SchedulerError> {
913        self.update_schedule(sched_id, new_cron).await
914    }
915
916    async fn kill_running_job(&self, sched_id: &str) -> Result<(), SchedulerError> {
917        self.kill_running_job(sched_id).await
918    }
919
920    async fn get_running_job_info(
921        &self,
922        sched_id: &str,
923    ) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError> {
924        self.get_running_job_info(sched_id).await
925    }
926}
927
928#[cfg(test)]
929mod tests {
930    use super::*;
931    use tempfile::tempdir;
932    use tokio::time::{sleep, Duration};
933
934    fn create_test_recipe(dir: &Path, name: &str) -> PathBuf {
935        let recipe_path = dir.join(format!("{}.yaml", name));
936        fs::write(&recipe_path, "prompt: test\n").unwrap();
937        recipe_path
938    }
939
940    #[tokio::test]
941    async fn test_job_runs_on_schedule() {
942        let temp_dir = tempdir().unwrap();
943        let storage_path = temp_dir.path().join("schedules.json");
944        let recipe_path = create_test_recipe(temp_dir.path(), "scheduled_job");
945        let scheduler = Scheduler::new(storage_path).await.unwrap();
946
947        let job = ScheduledJob {
948            id: "scheduled_job".to_string(),
949            source: recipe_path.to_string_lossy().to_string(),
950            cron: "* * * * * *".to_string(),
951            last_run: None,
952            currently_running: false,
953            paused: false,
954            current_session_id: None,
955            process_start_time: None,
956        };
957
958        scheduler.add_scheduled_job(job, true).await.unwrap();
959        sleep(Duration::from_millis(1500)).await;
960
961        let jobs = scheduler.list_scheduled_jobs().await;
962        assert!(jobs[0].last_run.is_some(), "Job should have run");
963    }
964
965    #[tokio::test]
966    async fn test_paused_job_does_not_run() {
967        let temp_dir = tempdir().unwrap();
968        let storage_path = temp_dir.path().join("schedules.json");
969        let recipe_path = create_test_recipe(temp_dir.path(), "paused_job");
970        let scheduler = Scheduler::new(storage_path).await.unwrap();
971
972        let job = ScheduledJob {
973            id: "paused_job".to_string(),
974            source: recipe_path.to_string_lossy().to_string(),
975            cron: "* * * * * *".to_string(),
976            last_run: None,
977            currently_running: false,
978            paused: false,
979            current_session_id: None,
980            process_start_time: None,
981        };
982
983        scheduler.add_scheduled_job(job, true).await.unwrap();
984        scheduler.pause_schedule("paused_job").await.unwrap();
985        sleep(Duration::from_millis(1500)).await;
986
987        let jobs = scheduler.list_scheduled_jobs().await;
988        assert!(jobs[0].last_run.is_none(), "Paused job should not run");
989    }
990}