1use std::sync::Arc;
8
9use chrono::{Timelike, Utc};
10use tracing::{error, info, warn};
11
12use crate::config::Config;
13use crate::llm::provider::Provider;
14use crate::permissions::PermissionChecker;
15use crate::query::{QueryEngine, QueryEngineConfig, StreamSink};
16use crate::state::AppState;
17use crate::tools::registry::ToolRegistry;
18
19use super::cron::CronExpr;
20use super::storage::{RunResult, Schedule, ScheduleStore};
21
22#[derive(Debug)]
24pub struct JobOutcome {
25 pub schedule_name: String,
26 pub success: bool,
27 pub turns: usize,
28 pub cost_usd: f64,
29 pub response_summary: String,
30 pub session_id: String,
31}
32
33pub struct ScheduleExecutor {
35 llm: Arc<dyn Provider>,
36 config: Config,
37}
38
39impl ScheduleExecutor {
40 pub fn new(llm: Arc<dyn Provider>, config: Config) -> Self {
41 Self { llm, config }
42 }
43
44 pub async fn run_once(
46 &self,
47 schedule: &Schedule,
48 sink: &dyn StreamSink,
49 ) -> Result<JobOutcome, String> {
50 info!("Running schedule '{}': {}", schedule.name, schedule.prompt);
51
52 let mut config = self.config.clone();
53 if let Some(ref model) = schedule.model {
54 config.api.model = model.clone();
55 }
56 if let Some(ref perm) = schedule.permission_mode {
57 config.permissions.default_mode = match perm.as_str() {
58 "allow" => crate::config::PermissionMode::Allow,
59 "deny" => crate::config::PermissionMode::Deny,
60 "plan" => crate::config::PermissionMode::Plan,
61 _ => crate::config::PermissionMode::Allow, };
63 } else {
64 config.permissions.default_mode = crate::config::PermissionMode::Allow;
66 }
67 if let Some(max_cost) = schedule.max_cost_usd {
68 config.api.max_cost_usd = Some(max_cost);
69 }
70
71 let prev_dir = std::env::current_dir().ok();
73 if std::path::Path::new(&schedule.cwd).is_dir() {
74 let _ = std::env::set_current_dir(&schedule.cwd);
75 }
76
77 let tool_registry = ToolRegistry::default_tools();
78 let permission_checker = PermissionChecker::from_config(&config.permissions);
79 let app_state = AppState::new(config.clone());
80 let session_id = app_state.session_id.clone();
81
82 let mut engine = QueryEngine::new(
83 self.llm.clone(),
84 tool_registry,
85 permission_checker,
86 app_state,
87 QueryEngineConfig {
88 max_turns: schedule.max_turns.or(Some(25)),
89 verbose: false,
90 unattended: true,
91 },
92 );
93
94 engine.load_hooks(&config.hooks);
95
96 let result = engine.run_turn_with_sink(&schedule.prompt, sink).await;
97
98 if let Some(prev) = prev_dir {
100 let _ = std::env::set_current_dir(prev);
101 }
102
103 let state = engine.state();
104 let success = result.is_ok();
105 let response = if let Err(ref e) = result {
106 format!("Error: {e}")
107 } else {
108 state
110 .messages
111 .iter()
112 .rev()
113 .find_map(|m| match m {
114 crate::llm::message::Message::Assistant(a) => {
115 let text: String = a
116 .content
117 .iter()
118 .filter_map(|b| {
119 if let crate::llm::message::ContentBlock::Text { text } = b {
120 Some(text.as_str())
121 } else {
122 None
123 }
124 })
125 .collect::<Vec<_>>()
126 .join("");
127 if text.is_empty() { None } else { Some(text) }
128 }
129 _ => None,
130 })
131 .unwrap_or_default()
132 };
133
134 let _ = crate::services::session::save_session_full(
136 &session_id,
137 &state.messages,
138 &state.cwd,
139 &state.config.api.model,
140 state.turn_count,
141 state.total_cost_usd,
142 state.total_usage.input_tokens,
143 state.total_usage.output_tokens,
144 false,
145 );
146
147 let summary = if response.len() > 500 {
149 format!("{}...", &response[..497])
150 } else {
151 response
152 };
153
154 Ok(JobOutcome {
155 schedule_name: schedule.name.clone(),
156 success,
157 turns: state.turn_count,
158 cost_usd: state.total_cost_usd,
159 response_summary: summary,
160 session_id,
161 })
162 }
163
164 pub async fn check_and_run(&self, store: &ScheduleStore) {
166 let now = Utc::now().naive_utc();
167 let schedules = store.list();
168
169 for schedule in schedules {
170 if !schedule.enabled {
171 continue;
172 }
173
174 let cron = match CronExpr::parse(&schedule.cron) {
175 Ok(c) => c,
176 Err(e) => {
177 warn!(
178 "Schedule '{}': invalid cron '{}': {e}",
179 schedule.name, schedule.cron
180 );
181 continue;
182 }
183 };
184
185 if !cron.matches(&now) {
187 continue;
188 }
189
190 if let Some(ref last) = schedule.last_run_at {
192 let last_naive = last.naive_utc();
193 if last_naive.date() == now.date()
194 && last_naive.hour() == now.hour()
195 && last_naive.minute() == now.minute()
196 {
197 continue;
198 }
199 }
200
201 info!("Schedule '{}' triggered at {}", schedule.name, now);
202
203 let outcome = self.run_once(&schedule, &crate::query::NullSink).await;
204
205 let mut updated = schedule.clone();
207 updated.last_run_at = Some(Utc::now());
208 match outcome {
209 Ok(ref o) => {
210 updated.last_result = Some(RunResult {
211 started_at: Utc::now() - chrono::Duration::seconds(1),
212 finished_at: Utc::now(),
213 success: o.success,
214 turns: o.turns,
215 cost_usd: o.cost_usd,
216 summary: o.response_summary.clone(),
217 session_id: o.session_id.clone(),
218 });
219 info!(
220 "Schedule '{}' completed: success={}, turns={}, cost=${:.4}",
221 updated.name, o.success, o.turns, o.cost_usd
222 );
223 }
224 Err(ref e) => {
225 updated.last_result = Some(RunResult {
226 started_at: Utc::now(),
227 finished_at: Utc::now(),
228 success: false,
229 turns: 0,
230 cost_usd: 0.0,
231 summary: e.clone(),
232 session_id: String::new(),
233 });
234 error!("Schedule '{}' failed: {e}", updated.name);
235 }
236 }
237
238 if let Err(e) = store.save(&updated) {
239 error!("Failed to save schedule state for '{}': {e}", updated.name);
240 }
241 }
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn test_job_outcome_fields() {
251 let outcome = JobOutcome {
252 schedule_name: "test".to_string(),
253 success: true,
254 turns: 3,
255 cost_usd: 0.05,
256 response_summary: "done".to_string(),
257 session_id: "abc".to_string(),
258 };
259 assert!(outcome.success);
260 assert_eq!(outcome.turns, 3);
261 }
262}