1#[path = "scheduler/types.rs"]
3pub mod types;
4
5#[path = "scheduler/migration.rs"]
6pub mod migration;
7
8#[path = "scheduler/executor.rs"]
9pub mod executor;
10
11#[path = "scheduler/delivery.rs"]
12pub mod delivery;
13
14use std::collections::HashMap;
15use std::fs;
16use std::io;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19
20use anyhow::{anyhow, Result};
21use async_trait::async_trait;
22use chrono::{DateTime, Local, Utc};
23use serde::{Deserialize, Serialize};
24use tokio::sync::Mutex;
25use tokio_cron_scheduler::{job::JobId, Job, JobScheduler as TokioJobScheduler};
26use tokio_util::sync::CancellationToken;
27
28use crate::agents::AgentEvent;
29use crate::agents::{Agent, SessionConfig};
30use crate::config::paths::Paths;
31use crate::config::Config;
32use crate::conversation::message::Message;
33use crate::conversation::Conversation;
34use crate::posthog;
35use crate::providers::create;
36use crate::recipe::Recipe;
37use crate::scheduler_trait::SchedulerTrait;
38use crate::session::session_manager::SessionType;
39use crate::session::{Session, SessionManager};
40
41type RunningTasksMap = HashMap<String, CancellationToken>;
42type JobsMap = HashMap<String, (JobId, ScheduledJob)>;
43
44pub fn get_default_scheduler_storage_path() -> Result<PathBuf, io::Error> {
45 let data_dir = Paths::data_dir();
46 fs::create_dir_all(&data_dir)?;
47 Ok(data_dir.join("schedules.json"))
48}
49
50pub fn get_default_scheduled_recipes_dir() -> Result<PathBuf, SchedulerError> {
51 let data_dir = Paths::data_dir();
52 let recipes_dir = data_dir.join("scheduled_recipes");
53 fs::create_dir_all(&recipes_dir).map_err(SchedulerError::StorageError)?;
54 Ok(recipes_dir)
55}
56
57#[derive(Debug)]
58pub enum SchedulerError {
59 JobIdExists(String),
60 JobNotFound(String),
61 StorageError(io::Error),
62 RecipeLoadError(String),
63 AgentSetupError(String),
64 PersistError(String),
65 CronParseError(String),
66 SchedulerInternalError(String),
67 AnyhowError(anyhow::Error),
68}
69
70impl std::fmt::Display for SchedulerError {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 SchedulerError::JobIdExists(id) => write!(f, "Job ID '{}' already exists.", id),
74 SchedulerError::JobNotFound(id) => write!(f, "Job ID '{}' not found.", id),
75 SchedulerError::StorageError(e) => write!(f, "Storage error: {}", e),
76 SchedulerError::RecipeLoadError(e) => write!(f, "Recipe load error: {}", e),
77 SchedulerError::AgentSetupError(e) => write!(f, "Agent setup error: {}", e),
78 SchedulerError::PersistError(e) => write!(f, "Failed to persist schedules: {}", e),
79 SchedulerError::CronParseError(e) => write!(f, "Invalid cron string: {}", e),
80 SchedulerError::SchedulerInternalError(e) => {
81 write!(f, "Scheduler internal error: {}", e)
82 }
83 SchedulerError::AnyhowError(e) => write!(f, "Scheduler operation failed: {}", e),
84 }
85 }
86}
87
88impl std::error::Error for SchedulerError {
89 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
90 match self {
91 SchedulerError::StorageError(e) => Some(e),
92 SchedulerError::AnyhowError(e) => Some(e.as_ref()),
93 _ => None,
94 }
95 }
96}
97
98impl From<io::Error> for SchedulerError {
99 fn from(err: io::Error) -> Self {
100 SchedulerError::StorageError(err)
101 }
102}
103
104impl From<serde_json::Error> for SchedulerError {
105 fn from(err: serde_json::Error) -> Self {
106 SchedulerError::PersistError(err.to_string())
107 }
108}
109
110impl From<anyhow::Error> for SchedulerError {
111 fn from(err: anyhow::Error) -> Self {
112 SchedulerError::AnyhowError(err)
113 }
114}
115
116#[derive(Clone, Serialize, Deserialize, Debug, utoipa::ToSchema)]
117pub struct ScheduledJob {
118 pub id: String,
119 pub source: String,
120 pub cron: String,
121 pub last_run: Option<DateTime<Utc>>,
122 #[serde(default)]
123 pub currently_running: bool,
124 #[serde(default)]
125 pub paused: bool,
126 #[serde(default)]
127 pub current_session_id: Option<String>,
128 #[serde(default)]
129 pub process_start_time: Option<DateTime<Utc>>,
130}
131
132async fn persist_jobs(
133 storage_path: &Path,
134 jobs: &Arc<Mutex<JobsMap>>,
135) -> Result<(), SchedulerError> {
136 let jobs_guard = jobs.lock().await;
137 let list: Vec<ScheduledJob> = jobs_guard.values().map(|(_, j)| j.clone()).collect();
138 if let Some(parent) = storage_path.parent() {
139 fs::create_dir_all(parent)?;
140 }
141 let data = serde_json::to_string_pretty(&list)?;
142 fs::write(storage_path, data)?;
143 Ok(())
144}
145
146pub struct Scheduler {
147 tokio_scheduler: TokioJobScheduler,
148 jobs: Arc<Mutex<JobsMap>>,
149 storage_path: PathBuf,
150 running_tasks: Arc<Mutex<RunningTasksMap>>,
151}
152
153impl Scheduler {
154 pub async fn new(storage_path: PathBuf) -> Result<Arc<Self>, SchedulerError> {
155 let internal_scheduler = TokioJobScheduler::new()
156 .await
157 .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
158
159 let jobs = Arc::new(Mutex::new(HashMap::new()));
160 let running_tasks = Arc::new(Mutex::new(HashMap::new()));
161
162 let arc_self = Arc::new(Self {
163 tokio_scheduler: internal_scheduler,
164 jobs,
165 storage_path,
166 running_tasks,
167 });
168
169 arc_self.load_jobs_from_storage().await;
170 arc_self
171 .tokio_scheduler
172 .start()
173 .await
174 .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
175
176 Ok(arc_self)
177 }
178
179 fn create_cron_task(&self, job: ScheduledJob) -> Result<Job, SchedulerError> {
180 let job_for_task = job.clone();
181 let jobs_arc = self.jobs.clone();
182 let storage_path = self.storage_path.clone();
183 let running_tasks_arc = self.running_tasks.clone();
184
185 let cron_parts: Vec<&str> = job.cron.split_whitespace().collect();
186 let cron = match cron_parts.len() {
187 5 => {
188 tracing::warn!(
189 "Job '{}' has legacy 5-field cron '{}', converting to 6-field",
190 job.id,
191 job.cron
192 );
193 format!("0 {}", job.cron)
194 }
195 6 => job.cron.clone(),
196 _ => {
197 return Err(SchedulerError::CronParseError(format!(
198 "Invalid cron expression '{}': expected 5 or 6 fields, got {}",
199 job.cron,
200 cron_parts.len()
201 )))
202 }
203 };
204
205 let local_tz = Local::now().timezone();
206
207 Job::new_async_tz(&cron, local_tz, move |_uuid, _l| {
208 tracing::info!("Cron task triggered for job '{}'", job_for_task.id);
209 let task_job_id = job_for_task.id.clone();
210 let current_jobs_arc = jobs_arc.clone();
211 let local_storage_path = storage_path.clone();
212 let job_to_execute = job_for_task.clone();
213 let running_tasks = running_tasks_arc.clone();
214
215 Box::pin(async move {
216 let should_execute = {
217 let jobs_guard = current_jobs_arc.lock().await;
218 jobs_guard
219 .get(&task_job_id)
220 .map(|(_, j)| !j.paused)
221 .unwrap_or(false)
222 };
223
224 if !should_execute {
225 return;
226 }
227
228 let current_time = Utc::now();
229 {
230 let mut jobs_guard = current_jobs_arc.lock().await;
231 if let Some((_, job)) = jobs_guard.get_mut(&task_job_id) {
232 job.last_run = Some(current_time);
233 job.currently_running = true;
234 job.process_start_time = Some(current_time);
235 }
236 }
237
238 if let Err(e) = persist_jobs(&local_storage_path, ¤t_jobs_arc).await {
239 tracing::error!("Failed to persist job status: {}", e);
240 }
241
242 let cancel_token = CancellationToken::new();
243 {
244 let mut tasks = running_tasks.lock().await;
245 tasks.insert(task_job_id.clone(), cancel_token.clone());
246 }
247
248 let result = execute_job(
249 job_to_execute,
250 current_jobs_arc.clone(),
251 task_job_id.clone(),
252 cancel_token.clone(),
253 )
254 .await;
255
256 {
257 let mut tasks = running_tasks.lock().await;
258 tasks.remove(&task_job_id);
259 }
260
261 {
262 let mut jobs_guard = current_jobs_arc.lock().await;
263 if let Some((_, job)) = jobs_guard.get_mut(&task_job_id) {
264 job.currently_running = false;
265 job.current_session_id = None;
266 job.process_start_time = None;
267 }
268 }
269
270 if let Err(e) = persist_jobs(&local_storage_path, ¤t_jobs_arc).await {
271 tracing::error!("Failed to persist job completion: {}", e);
272 }
273
274 match result {
275 Ok(_) => tracing::info!("Job '{}' completed", task_job_id),
276 Err(ref e) => {
277 tracing::error!("Job '{}' failed: {}", task_job_id, e);
278 crate::posthog::emit_error("scheduler_job_failed", &e.to_string());
279 }
280 }
281 })
282 })
283 .map_err(|e| SchedulerError::CronParseError(e.to_string()))
284 }
285
286 pub async fn add_scheduled_job(
287 &self,
288 original_job_spec: ScheduledJob,
289 make_copy: bool,
290 ) -> Result<(), SchedulerError> {
291 {
292 let jobs_guard = self.jobs.lock().await;
293 if jobs_guard.contains_key(&original_job_spec.id) {
294 return Err(SchedulerError::JobIdExists(original_job_spec.id.clone()));
295 }
296 }
297
298 let mut stored_job = original_job_spec;
299 if make_copy {
300 let original_recipe_path = Path::new(&stored_job.source);
301 if !original_recipe_path.is_file() {
302 return Err(SchedulerError::RecipeLoadError(format!(
303 "Recipe file not found: {}",
304 stored_job.source
305 )));
306 }
307
308 let scheduled_recipes_dir = get_default_scheduled_recipes_dir()?;
309 let original_extension = original_recipe_path
310 .extension()
311 .and_then(|ext| ext.to_str())
312 .unwrap_or("yaml");
313
314 let destination_filename = format!("{}.{}", stored_job.id, original_extension);
315 let destination_recipe_path = scheduled_recipes_dir.join(destination_filename);
316
317 fs::copy(original_recipe_path, &destination_recipe_path)?;
318 stored_job.source = destination_recipe_path.to_string_lossy().into_owned();
319 stored_job.current_session_id = None;
320 stored_job.process_start_time = None;
321 }
322
323 let cron_task = self.create_cron_task(stored_job.clone())?;
324
325 let job_uuid = self
326 .tokio_scheduler
327 .add(cron_task)
328 .await
329 .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
330
331 {
332 let mut jobs_guard = self.jobs.lock().await;
333 jobs_guard.insert(stored_job.id.clone(), (job_uuid, stored_job));
334 }
335
336 persist_jobs(&self.storage_path, &self.jobs).await?;
337 Ok(())
338 }
339
340 pub async fn schedule_recipe(
341 &self,
342 recipe_path: PathBuf,
343 cron_schedule: Option<String>,
344 ) -> Result<(), SchedulerError> {
345 let recipe_path_str = recipe_path.to_string_lossy().to_string();
346
347 let existing_job_id = {
348 let jobs_guard = self.jobs.lock().await;
349 jobs_guard
350 .iter()
351 .find(|(_, (_, job))| job.source == recipe_path_str)
352 .map(|(id, _)| id.clone())
353 };
354
355 match cron_schedule {
356 Some(cron) => {
357 if let Some(job_id) = existing_job_id {
358 self.update_schedule(&job_id, cron).await
359 } else {
360 let job_id = self.generate_unique_job_id(&recipe_path).await;
361 let job = ScheduledJob {
362 id: job_id,
363 source: recipe_path_str,
364 cron,
365 last_run: None,
366 currently_running: false,
367 paused: false,
368 current_session_id: None,
369 process_start_time: None,
370 };
371 self.add_scheduled_job(job, false).await
372 }
373 }
374 None => {
375 if let Some(job_id) = existing_job_id {
376 self.remove_scheduled_job(&job_id, false).await
377 } else {
378 Ok(())
379 }
380 }
381 }
382 }
383
384 async fn generate_unique_job_id(&self, path: &Path) -> String {
385 let base_id = path
386 .file_stem()
387 .and_then(|s| s.to_str())
388 .unwrap_or("unnamed")
389 .to_string();
390
391 let jobs_guard = self.jobs.lock().await;
392 let mut id = base_id.clone();
393 let mut counter = 1;
394
395 while jobs_guard.contains_key(&id) {
396 id = format!("{}_{}", base_id, counter);
397 counter += 1;
398 }
399
400 id
401 }
402
403 async fn load_jobs_from_storage(self: &Arc<Self>) {
404 if !self.storage_path.exists() {
405 return;
406 }
407 let data = match fs::read_to_string(&self.storage_path) {
408 Ok(data) => data,
409 Err(e) => {
410 tracing::error!(
411 "Failed to read schedules.json: {}. Starting with empty schedule list.",
412 e
413 );
414 return;
415 }
416 };
417 if data.trim().is_empty() {
418 return;
419 }
420
421 let list: Vec<ScheduledJob> = match serde_json::from_str(&data) {
422 Ok(jobs) => jobs,
423 Err(e) => {
424 tracing::error!(
425 "Failed to parse schedules.json: {}. Starting with empty schedule list.",
426 e
427 );
428 return;
429 }
430 };
431
432 for job_to_load in list {
433 if !Path::new(&job_to_load.source).exists() {
434 tracing::warn!(
435 "Recipe file {} not found, skipping job '{}'",
436 job_to_load.source,
437 job_to_load.id
438 );
439 continue;
440 }
441
442 let cron_task = match self.create_cron_task(job_to_load.clone()) {
443 Ok(task) => task,
444 Err(e) => {
445 tracing::error!(
446 "Failed to create cron task for job '{}': {}. Skipping.",
447 job_to_load.id,
448 e
449 );
450 continue;
451 }
452 };
453
454 let job_uuid = match self.tokio_scheduler.add(cron_task).await {
455 Ok(uuid) => uuid,
456 Err(e) => {
457 tracing::error!(
458 "Failed to add job '{}' to scheduler: {}. Skipping.",
459 job_to_load.id,
460 e
461 );
462 continue;
463 }
464 };
465
466 let mut jobs_guard = self.jobs.lock().await;
467 jobs_guard.insert(job_to_load.id.clone(), (job_uuid, job_to_load));
468 }
469 }
470
471 pub async fn list_scheduled_jobs(&self) -> Vec<ScheduledJob> {
472 self.jobs
473 .lock()
474 .await
475 .values()
476 .map(|(_, j)| j.clone())
477 .collect()
478 }
479
480 pub async fn remove_scheduled_job(
481 &self,
482 id: &str,
483 remove_recipe: bool,
484 ) -> Result<(), SchedulerError> {
485 let (job_uuid, recipe_path) = {
486 let mut jobs_guard = self.jobs.lock().await;
487 match jobs_guard.remove(id) {
488 Some((uuid, job)) => (uuid, job.source.clone()),
489 None => return Err(SchedulerError::JobNotFound(id.to_string())),
490 }
491 };
492
493 self.tokio_scheduler
494 .remove(&job_uuid)
495 .await
496 .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
497
498 if remove_recipe {
499 let path = Path::new(&recipe_path);
500 if path.exists() {
501 fs::remove_file(path)?;
502 }
503 }
504
505 persist_jobs(&self.storage_path, &self.jobs).await?;
506 Ok(())
507 }
508
509 pub async fn sessions(
510 &self,
511 sched_id: &str,
512 limit: usize,
513 ) -> Result<Vec<(String, Session)>, SchedulerError> {
514 let all_sessions = SessionManager::list_sessions()
515 .await
516 .map_err(|e| SchedulerError::StorageError(io::Error::other(e)))?;
517
518 let mut schedule_sessions: Vec<(String, Session)> = all_sessions
519 .into_iter()
520 .filter(|s| s.schedule_id.as_deref() == Some(sched_id))
521 .map(|s| (s.id.clone(), s))
522 .collect();
523
524 schedule_sessions.sort_by(|a, b| b.1.created_at.cmp(&a.1.created_at));
525 schedule_sessions.truncate(limit);
526
527 Ok(schedule_sessions)
528 }
529
530 pub async fn run_now(&self, sched_id: &str) -> Result<String, SchedulerError> {
531 let job_to_run = {
532 let mut jobs_guard = self.jobs.lock().await;
533 match jobs_guard.get_mut(sched_id) {
534 Some((_, job)) => {
535 if job.currently_running {
536 return Err(SchedulerError::AnyhowError(anyhow!(
537 "Job '{}' is already running",
538 sched_id
539 )));
540 }
541 job.currently_running = true;
542 job.process_start_time = Some(Utc::now());
543 job.clone()
544 }
545 None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
546 }
547 };
548
549 persist_jobs(&self.storage_path, &self.jobs).await?;
550
551 let cancel_token = CancellationToken::new();
552 {
553 let mut tasks = self.running_tasks.lock().await;
554 tasks.insert(sched_id.to_string(), cancel_token.clone());
555 }
556
557 let result = execute_job(
558 job_to_run,
559 self.jobs.clone(),
560 sched_id.to_string(),
561 cancel_token.clone(),
562 )
563 .await;
564
565 {
566 let mut tasks = self.running_tasks.lock().await;
567 tasks.remove(sched_id);
568 }
569
570 {
571 let mut jobs_guard = self.jobs.lock().await;
572 if let Some((_, job)) = jobs_guard.get_mut(sched_id) {
573 job.currently_running = false;
574 job.current_session_id = None;
575 job.process_start_time = None;
576 job.last_run = Some(Utc::now());
577 }
578 }
579
580 persist_jobs(&self.storage_path, &self.jobs).await?;
581
582 match result {
583 Ok(session_id) => Ok(session_id),
584 Err(e) => Err(SchedulerError::AnyhowError(anyhow!(
585 "Job '{}' failed: {}",
586 sched_id,
587 e
588 ))),
589 }
590 }
591
592 pub async fn pause_schedule(&self, sched_id: &str) -> Result<(), SchedulerError> {
593 {
594 let mut jobs_guard = self.jobs.lock().await;
595 match jobs_guard.get_mut(sched_id) {
596 Some((_, job)) => {
597 if job.currently_running {
598 return Err(SchedulerError::AnyhowError(anyhow!(
599 "Cannot pause running schedule '{}'",
600 sched_id
601 )));
602 }
603 job.paused = true;
604 }
605 None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
606 }
607 }
608
609 persist_jobs(&self.storage_path, &self.jobs).await
610 }
611
612 pub async fn unpause_schedule(&self, sched_id: &str) -> Result<(), SchedulerError> {
613 {
614 let mut jobs_guard = self.jobs.lock().await;
615 match jobs_guard.get_mut(sched_id) {
616 Some((_, job)) => job.paused = false,
617 None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
618 }
619 }
620
621 persist_jobs(&self.storage_path, &self.jobs).await
622 }
623
624 pub async fn update_schedule(
625 &self,
626 sched_id: &str,
627 new_cron: String,
628 ) -> Result<(), SchedulerError> {
629 let (old_uuid, updated_job) = {
630 let mut jobs_guard = self.jobs.lock().await;
631 match jobs_guard.get_mut(sched_id) {
632 Some((uuid, job)) => {
633 if job.currently_running {
634 return Err(SchedulerError::AnyhowError(anyhow!(
635 "Cannot update running schedule '{}'",
636 sched_id
637 )));
638 }
639 if new_cron == job.cron {
640 return Ok(());
641 }
642 job.cron = new_cron.clone();
643 (*uuid, job.clone())
644 }
645 None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
646 }
647 };
648
649 self.tokio_scheduler
650 .remove(&old_uuid)
651 .await
652 .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
653
654 let cron_task = self.create_cron_task(updated_job)?;
655 let new_uuid = self
656 .tokio_scheduler
657 .add(cron_task)
658 .await
659 .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?;
660
661 {
662 let mut jobs_guard = self.jobs.lock().await;
663 if let Some((uuid, _)) = jobs_guard.get_mut(sched_id) {
664 *uuid = new_uuid;
665 }
666 }
667
668 persist_jobs(&self.storage_path, &self.jobs).await
669 }
670
671 pub async fn kill_running_job(&self, sched_id: &str) -> Result<(), SchedulerError> {
672 {
673 let jobs_guard = self.jobs.lock().await;
674 match jobs_guard.get(sched_id) {
675 Some((_, job)) if !job.currently_running => {
676 return Err(SchedulerError::AnyhowError(anyhow!(
677 "Schedule '{}' is not running",
678 sched_id
679 )));
680 }
681 None => return Err(SchedulerError::JobNotFound(sched_id.to_string())),
682 _ => {}
683 }
684 }
685
686 {
687 let tasks = self.running_tasks.lock().await;
688 if let Some(token) = tasks.get(sched_id) {
689 token.cancel();
690 }
691 }
692
693 Ok(())
694 }
695
696 pub async fn get_running_job_info(
697 &self,
698 sched_id: &str,
699 ) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError> {
700 let jobs_guard = self.jobs.lock().await;
701 match jobs_guard.get(sched_id) {
702 Some((_, job)) if job.currently_running => {
703 match (&job.current_session_id, &job.process_start_time) {
704 (Some(sid), Some(start)) => Ok(Some((sid.clone(), *start))),
705 _ => Ok(None),
706 }
707 }
708 Some(_) => Ok(None),
709 None => Err(SchedulerError::JobNotFound(sched_id.to_string())),
710 }
711 }
712}
713
714#[allow(clippy::too_many_lines)]
715async fn execute_job(
716 job: ScheduledJob,
717 jobs: Arc<Mutex<JobsMap>>,
718 job_id: String,
719 cancel_token: CancellationToken,
720) -> Result<String> {
721 if job.source.is_empty() {
722 return Ok(job.id.to_string());
723 }
724
725 let recipe_path = Path::new(&job.source);
726 let recipe_content = fs::read_to_string(recipe_path)?;
727
728 let recipe: Recipe = {
729 let extension = recipe_path
730 .extension()
731 .and_then(|s| s.to_str())
732 .unwrap_or("yaml")
733 .to_lowercase();
734
735 match extension.as_str() {
736 "json" | "jsonl" => serde_json::from_str(&recipe_content)?,
737 _ => serde_yaml::from_str(&recipe_content)?,
738 }
739 };
740
741 let agent = Agent::new();
742
743 let config = Config::global();
744 let provider_name = config.get_aster_provider()?;
745 let model_name = config.get_aster_model()?;
746 let model_config = crate::model::ModelConfig::new(&model_name)?;
747
748 let agent_provider = create(&provider_name, model_config).await?;
749
750 if let Some(ref extensions) = recipe.extensions {
751 for ext in extensions {
752 agent.add_extension(ext.clone()).await?;
753 }
754 }
755
756 let session = SessionManager::create_session(
757 std::env::current_dir()?,
758 format!("Scheduled job: {}", job.id),
759 SessionType::Scheduled,
760 )
761 .await?;
762
763 agent.update_provider(agent_provider, &session.id).await?;
764
765 let mut jobs_guard = jobs.lock().await;
766 if let Some((_, job_def)) = jobs_guard.get_mut(job_id.as_str()) {
767 job_def.current_session_id = Some(session.id.clone());
768 }
769 drop(jobs_guard);
770
771 let start_time = std::time::Instant::now();
772 tokio::spawn(async move {
773 let mut props = HashMap::new();
774 props.insert(
775 "trigger".to_string(),
776 serde_json::Value::String("automated".to_string()),
777 );
778 if let Err(e) = posthog::emit_event("schedule_job_started", props).await {
779 tracing::debug!("Failed to send schedule telemetry: {}", e);
780 }
781 });
782
783 let prompt_text = recipe
784 .prompt
785 .as_ref()
786 .or(recipe.instructions.as_ref())
787 .unwrap();
788
789 let user_message = Message::user().with_text(prompt_text);
790 let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]);
791
792 let session_config = SessionConfig {
793 id: session.id.clone(),
794 schedule_id: Some(job.id.clone()),
795 max_turns: None,
796 retry_config: None,
797 system_prompt: None,
798 };
799
800 let session_id = session_config.id.clone();
801 let stream = crate::session_context::with_session_id(Some(session_id.clone()), async {
802 agent
803 .reply(user_message, session_config, Some(cancel_token))
804 .await
805 })
806 .await?;
807
808 use futures::StreamExt;
809 let mut stream = std::pin::pin!(stream);
810
811 while let Some(message_result) = stream.next().await {
812 tokio::task::yield_now().await;
813
814 match message_result {
815 Ok(AgentEvent::Message(msg)) => {
816 conversation.push(msg);
817 }
818 Ok(AgentEvent::HistoryReplaced(updated)) => {
819 conversation = updated;
820 }
821 Ok(_) => {}
822 Err(e) => {
823 tracing::error!("Error in agent stream: {}", e);
824 break;
825 }
826 }
827 }
828
829 SessionManager::update_session(&session.id)
830 .schedule_id(Some(job.id.clone()))
831 .recipe(Some(recipe))
832 .apply()
833 .await?;
834
835 let duration_secs = start_time.elapsed().as_secs();
836 tokio::spawn(async move {
837 let mut props = HashMap::new();
838 props.insert(
839 "trigger".to_string(),
840 serde_json::Value::String("automated".to_string()),
841 );
842 props.insert(
843 "status".to_string(),
844 serde_json::Value::String("completed".to_string()),
845 );
846 props.insert(
847 "duration_seconds".to_string(),
848 serde_json::Value::Number(serde_json::Number::from(duration_secs)),
849 );
850 if let Err(e) = posthog::emit_event("schedule_job_completed", props).await {
851 tracing::debug!("Failed to send schedule telemetry: {}", e);
852 }
853 });
854
855 Ok(session.id)
856}
857
858#[async_trait]
859impl SchedulerTrait for Scheduler {
860 async fn add_scheduled_job(
861 &self,
862 job: ScheduledJob,
863 make_copy: bool,
864 ) -> Result<(), SchedulerError> {
865 self.add_scheduled_job(job, make_copy).await
866 }
867
868 async fn schedule_recipe(
869 &self,
870 recipe_path: PathBuf,
871 cron_schedule: Option<String>,
872 ) -> Result<(), SchedulerError> {
873 self.schedule_recipe(recipe_path, cron_schedule).await
874 }
875
876 async fn list_scheduled_jobs(&self) -> Vec<ScheduledJob> {
877 self.list_scheduled_jobs().await
878 }
879
880 async fn remove_scheduled_job(
881 &self,
882 id: &str,
883 remove_recipe: bool,
884 ) -> Result<(), SchedulerError> {
885 self.remove_scheduled_job(id, remove_recipe).await
886 }
887
888 async fn pause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
889 self.pause_schedule(id).await
890 }
891
892 async fn unpause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
893 self.unpause_schedule(id).await
894 }
895
896 async fn run_now(&self, id: &str) -> Result<String, SchedulerError> {
897 self.run_now(id).await
898 }
899
900 async fn sessions(
901 &self,
902 sched_id: &str,
903 limit: usize,
904 ) -> Result<Vec<(String, Session)>, SchedulerError> {
905 self.sessions(sched_id, limit).await
906 }
907
908 async fn update_schedule(
909 &self,
910 sched_id: &str,
911 new_cron: String,
912 ) -> Result<(), SchedulerError> {
913 self.update_schedule(sched_id, new_cron).await
914 }
915
916 async fn kill_running_job(&self, sched_id: &str) -> Result<(), SchedulerError> {
917 self.kill_running_job(sched_id).await
918 }
919
920 async fn get_running_job_info(
921 &self,
922 sched_id: &str,
923 ) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError> {
924 self.get_running_job_info(sched_id).await
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use super::*;
931 use tempfile::tempdir;
932 use tokio::time::{sleep, Duration};
933
934 fn create_test_recipe(dir: &Path, name: &str) -> PathBuf {
935 let recipe_path = dir.join(format!("{}.yaml", name));
936 fs::write(&recipe_path, "prompt: test\n").unwrap();
937 recipe_path
938 }
939
940 #[tokio::test]
941 async fn test_job_runs_on_schedule() {
942 let temp_dir = tempdir().unwrap();
943 let storage_path = temp_dir.path().join("schedules.json");
944 let recipe_path = create_test_recipe(temp_dir.path(), "scheduled_job");
945 let scheduler = Scheduler::new(storage_path).await.unwrap();
946
947 let job = ScheduledJob {
948 id: "scheduled_job".to_string(),
949 source: recipe_path.to_string_lossy().to_string(),
950 cron: "* * * * * *".to_string(),
951 last_run: None,
952 currently_running: false,
953 paused: false,
954 current_session_id: None,
955 process_start_time: None,
956 };
957
958 scheduler.add_scheduled_job(job, true).await.unwrap();
959 sleep(Duration::from_millis(1500)).await;
960
961 let jobs = scheduler.list_scheduled_jobs().await;
962 assert!(jobs[0].last_run.is_some(), "Job should have run");
963 }
964
965 #[tokio::test]
966 async fn test_paused_job_does_not_run() {
967 let temp_dir = tempdir().unwrap();
968 let storage_path = temp_dir.path().join("schedules.json");
969 let recipe_path = create_test_recipe(temp_dir.path(), "paused_job");
970 let scheduler = Scheduler::new(storage_path).await.unwrap();
971
972 let job = ScheduledJob {
973 id: "paused_job".to_string(),
974 source: recipe_path.to_string_lossy().to_string(),
975 cron: "* * * * * *".to_string(),
976 last_run: None,
977 currently_running: false,
978 paused: false,
979 current_session_id: None,
980 process_start_time: None,
981 };
982
983 scheduler.add_scheduled_job(job, true).await.unwrap();
984 scheduler.pause_schedule("paused_job").await.unwrap();
985 sleep(Duration::from_millis(1500)).await;
986
987 let jobs = scheduler.list_scheduled_jobs().await;
988 assert!(jobs[0].last_run.is_none(), "Paused job should not run");
989 }
990}