Skip to main content

agent_code_lib/schedule/
executor.rs

1//! Schedule execution engine.
2//!
3//! Runs the daemon loop that checks schedules every 30 seconds and
4//! executes matching jobs. Also provides one-shot execution for
5//! `agent schedule run <name>`.
6
7use std::sync::Arc;
8
9use chrono::{Timelike, Utc};
10use tracing::{error, info, warn};
11
12use crate::config::Config;
13use crate::llm::provider::Provider;
14use crate::permissions::PermissionChecker;
15use crate::query::{QueryEngine, QueryEngineConfig, StreamSink};
16use crate::state::AppState;
17use crate::tools::registry::ToolRegistry;
18
19use super::cron::CronExpr;
20use super::storage::{RunResult, Schedule, ScheduleStore};
21
22/// Outcome of a single scheduled run.
23#[derive(Debug)]
24pub struct JobOutcome {
25    pub schedule_name: String,
26    pub success: bool,
27    pub turns: usize,
28    pub cost_usd: f64,
29    pub response_summary: String,
30    pub session_id: String,
31}
32
33/// Executes scheduled agent jobs.
34pub struct ScheduleExecutor {
35    llm: Arc<dyn Provider>,
36    config: Config,
37}
38
39impl ScheduleExecutor {
40    pub fn new(llm: Arc<dyn Provider>, config: Config) -> Self {
41        Self { llm, config }
42    }
43
44    /// Run a single schedule by name (for `agent schedule run`).
45    pub async fn run_once(
46        &self,
47        schedule: &Schedule,
48        sink: &dyn StreamSink,
49    ) -> Result<JobOutcome, String> {
50        info!("Running schedule '{}': {}", schedule.name, schedule.prompt);
51
52        let mut config = self.config.clone();
53        if let Some(ref model) = schedule.model {
54            config.api.model = model.clone();
55        }
56        if let Some(ref perm) = schedule.permission_mode {
57            config.permissions.default_mode = match perm.as_str() {
58                "allow" => crate::config::PermissionMode::Allow,
59                "deny" => crate::config::PermissionMode::Deny,
60                "plan" => crate::config::PermissionMode::Plan,
61                _ => crate::config::PermissionMode::Allow, // schedules default to allow
62            };
63        } else {
64            // Schedules run non-interactively — default to allow.
65            config.permissions.default_mode = crate::config::PermissionMode::Allow;
66        }
67        if let Some(max_cost) = schedule.max_cost_usd {
68            config.api.max_cost_usd = Some(max_cost);
69        }
70
71        // Set cwd for the session.
72        let prev_dir = std::env::current_dir().ok();
73        if std::path::Path::new(&schedule.cwd).is_dir() {
74            let _ = std::env::set_current_dir(&schedule.cwd);
75        }
76
77        let tool_registry = ToolRegistry::default_tools();
78        let permission_checker = PermissionChecker::from_config(&config.permissions);
79        let app_state = AppState::new(config.clone());
80        let session_id = app_state.session_id.clone();
81
82        let mut engine = QueryEngine::new(
83            self.llm.clone(),
84            tool_registry,
85            permission_checker,
86            app_state,
87            QueryEngineConfig {
88                max_turns: schedule.max_turns.or(Some(25)),
89                verbose: false,
90                unattended: true,
91            },
92        );
93
94        engine.load_hooks(&config.hooks);
95
96        let result = engine.run_turn_with_sink(&schedule.prompt, sink).await;
97
98        // Restore cwd.
99        if let Some(prev) = prev_dir {
100            let _ = std::env::set_current_dir(prev);
101        }
102
103        let state = engine.state();
104        let success = result.is_ok();
105        let response = if let Err(ref e) = result {
106            format!("Error: {e}")
107        } else {
108            // Extract last assistant text from messages.
109            state
110                .messages
111                .iter()
112                .rev()
113                .find_map(|m| match m {
114                    crate::llm::message::Message::Assistant(a) => {
115                        let text: String = a
116                            .content
117                            .iter()
118                            .filter_map(|b| {
119                                if let crate::llm::message::ContentBlock::Text { text } = b {
120                                    Some(text.as_str())
121                                } else {
122                                    None
123                                }
124                            })
125                            .collect::<Vec<_>>()
126                            .join("");
127                        if text.is_empty() { None } else { Some(text) }
128                    }
129                    _ => None,
130                })
131                .unwrap_or_default()
132        };
133
134        // Save session for later /resume.
135        let _ = crate::services::session::save_session_full(
136            &session_id,
137            &state.messages,
138            &state.cwd,
139            &state.config.api.model,
140            state.turn_count,
141            state.total_cost_usd,
142            state.total_usage.input_tokens,
143            state.total_usage.output_tokens,
144            false,
145        );
146
147        // Truncate summary.
148        let summary = if response.len() > 500 {
149            format!("{}...", &response[..497])
150        } else {
151            response
152        };
153
154        Ok(JobOutcome {
155            schedule_name: schedule.name.clone(),
156            success,
157            turns: state.turn_count,
158            cost_usd: state.total_cost_usd,
159            response_summary: summary,
160            session_id,
161        })
162    }
163
164    /// Check all schedules and run any that are due.
165    pub async fn check_and_run(&self, store: &ScheduleStore) {
166        let now = Utc::now().naive_utc();
167        let schedules = store.list();
168
169        for schedule in schedules {
170            if !schedule.enabled {
171                continue;
172            }
173
174            let cron = match CronExpr::parse(&schedule.cron) {
175                Ok(c) => c,
176                Err(e) => {
177                    warn!(
178                        "Schedule '{}': invalid cron '{}': {e}",
179                        schedule.name, schedule.cron
180                    );
181                    continue;
182                }
183            };
184
185            // Skip if not matching current minute.
186            if !cron.matches(&now) {
187                continue;
188            }
189
190            // Skip if already ran this minute.
191            if let Some(ref last) = schedule.last_run_at {
192                let last_naive = last.naive_utc();
193                if last_naive.date() == now.date()
194                    && last_naive.hour() == now.hour()
195                    && last_naive.minute() == now.minute()
196                {
197                    continue;
198                }
199            }
200
201            info!("Schedule '{}' triggered at {}", schedule.name, now);
202
203            let outcome = self.run_once(&schedule, &crate::query::NullSink).await;
204
205            // Update last_run state.
206            let mut updated = schedule.clone();
207            updated.last_run_at = Some(Utc::now());
208            match outcome {
209                Ok(ref o) => {
210                    updated.last_result = Some(RunResult {
211                        started_at: Utc::now() - chrono::Duration::seconds(1),
212                        finished_at: Utc::now(),
213                        success: o.success,
214                        turns: o.turns,
215                        cost_usd: o.cost_usd,
216                        summary: o.response_summary.clone(),
217                        session_id: o.session_id.clone(),
218                    });
219                    info!(
220                        "Schedule '{}' completed: success={}, turns={}, cost=${:.4}",
221                        updated.name, o.success, o.turns, o.cost_usd
222                    );
223                }
224                Err(ref e) => {
225                    updated.last_result = Some(RunResult {
226                        started_at: Utc::now(),
227                        finished_at: Utc::now(),
228                        success: false,
229                        turns: 0,
230                        cost_usd: 0.0,
231                        summary: e.clone(),
232                        session_id: String::new(),
233                    });
234                    error!("Schedule '{}' failed: {e}", updated.name);
235                }
236            }
237
238            if let Err(e) = store.save(&updated) {
239                error!("Failed to save schedule state for '{}': {e}", updated.name);
240            }
241        }
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn test_job_outcome_fields() {
251        let outcome = JobOutcome {
252            schedule_name: "test".to_string(),
253            success: true,
254            turns: 3,
255            cost_usd: 0.05,
256            response_summary: "done".to_string(),
257            session_id: "abc".to_string(),
258        };
259        assert!(outcome.success);
260        assert_eq!(outcome.turns, 3);
261    }
262}