1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4
5use crate::engine::{Engine, Task};
6use crate::event::AutonomyLevel;
7use crate::memory::Memory;
8use crate::runtime::recorder::{FsRecorder, Recorder, RunInputs};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Job {
14 pub id: String,
15 pub task: String,
16 pub cron: String,
17 pub autonomy: AutonomyLevel,
18 pub sandbox: String,
19 pub enabled: bool,
20 pub last_run: Option<String>,
21 pub next_run: Option<String>,
22 pub created_at: String,
23}
24
25impl Job {
26 pub fn new(task: String, cron: String) -> Self {
27 Self {
28 id: uuid::Uuid::new_v4().to_string(),
29 task,
30 cron,
31 autonomy: AutonomyLevel::Supervised,
32 sandbox: "local-hardened".into(),
33 enabled: true,
34 last_run: None,
35 next_run: None,
36 created_at: Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
37 }
38 }
39
40 pub fn next_schedule(&self) -> Option<DateTime<Utc>> {
42 use cron::Schedule;
43 use std::str::FromStr;
44 Schedule::from_str(&self.cron)
45 .ok()
46 .and_then(|s| s.upcoming(Utc).next())
47 }
48}
49
50pub fn parse_nl_cron(input: &str) -> Option<String> {
67 let parts: Vec<&str> = input.split_whitespace().collect();
69 if parts.len() == 5 {
70 return None; }
72
73 let lower = input.to_lowercase();
74 let lower = lower.trim();
75
76 if lower == "hourly" || lower == "every hour" {
78 return Some("0 * * * *".into());
79 }
80 if lower == "daily" || lower == "every day" {
81 return Some("0 9 * * *".into());
82 }
83 if lower == "weekly" || lower == "every week" {
84 return Some("0 9 * * 1".into());
85 }
86 if lower == "monthly" || lower == "every month" {
87 return Some("0 9 1 * *".into());
88 }
89 if lower.contains("midnight") {
90 return Some("0 0 * * *".into());
91 }
92 if lower.contains("noon") {
93 return Some("0 12 * * *".into());
94 }
95 if lower == "every minute" {
96 return Some("* * * * *".into());
97 }
98
99 if let Some(n_str) = lower
101 .strip_prefix("every ")
102 .and_then(|s| s.strip_suffix(" minutes"))
103 {
104 if let Ok(n) = n_str.trim().parse::<u32>() {
105 if n > 0 && n < 60 {
106 return Some(format!("*/{} * * * *", n));
107 }
108 }
109 }
110 if let Some(n_str) = lower
111 .strip_prefix("every ")
112 .and_then(|s| s.strip_suffix(" hours"))
113 {
114 if let Ok(n) = n_str.trim().parse::<u32>() {
115 if n > 0 && n <= 24 {
116 return Some(format!("0 */{} * * *", n));
117 }
118 }
119 }
120
121 let dow_map = [
123 ("sunday", 0),
124 ("monday", 1),
125 ("tuesday", 2),
126 ("wednesday", 3),
127 ("thursday", 4),
128 ("friday", 5),
129 ("saturday", 6),
130 ("sun", 0),
131 ("mon", 1),
132 ("tue", 2),
133 ("wed", 3),
134 ("thu", 4),
135 ("fri", 5),
136 ("sat", 6),
137 ];
138
139 fn parse_hour_minute(at_str: &str) -> Option<(u32, u32)> {
141 let s = at_str
142 .trim()
143 .trim_start_matches("at ")
144 .replace("am", "")
145 .replace("pm", "");
146 let is_pm = at_str.contains("pm");
147 if let Some((h, m)) = s.split_once(':') {
148 let h: u32 = h.trim().parse().ok()?;
149 let m: u32 = m.trim().parse().ok()?;
150 let h = if is_pm && h < 12 {
151 h + 12
152 } else if !is_pm && h == 12 {
153 0
154 } else {
155 h
156 };
157 Some((h.min(23), m.min(59)))
158 } else {
159 let h: u32 = s.trim().parse().ok()?;
160 let h = if is_pm && h < 12 {
161 h + 12
162 } else if !is_pm && h == 12 {
163 0
164 } else {
165 h
166 };
167 Some((h.min(23), 0))
168 }
169 }
170
171 for (day_name, dow) in &dow_map {
173 if lower.contains(day_name) {
174 let at_idx = lower.find(" at ");
175 let (h, m) = if let Some(idx) = at_idx {
176 parse_hour_minute(&lower[idx + 4..]).unwrap_or((9, 0))
177 } else {
178 (9, 0)
179 };
180 return Some(format!("{} {} * * {}", m, h, dow));
181 }
182 }
183
184 if lower.contains("every day") || lower.contains("daily") {
186 if let Some(idx) = lower.find(" at ") {
187 let (h, m) = parse_hour_minute(&lower[idx + 4..]).unwrap_or((9, 0));
188 return Some(format!("{} {} * * *", m, h));
189 }
190 return Some("0 9 * * *".into());
191 }
192
193 if lower.contains("every hour") {
195 if let Some(idx) = lower.find(" at ") {
196 let time_part = lower[idx + 4..].trim();
197 let m: u32 = time_part.trim_start_matches(':').parse().unwrap_or(0);
198 return Some(format!("{} * * * *", m.min(59)));
199 }
200 return Some("0 * * * *".into());
201 }
202
203 None
204}
205
206#[async_trait::async_trait]
209pub trait Scheduler: Send + Sync {
210 fn schedule(&self, job: Job) -> anyhow::Result<String>;
211 fn list(&self) -> Vec<Job>;
212 fn cancel(&self, id: &str) -> anyhow::Result<()>;
213 fn get(&self, id: &str) -> Option<Job>;
214 async fn tick(&self) -> Vec<Job>;
216}
217
218pub struct MemoryScheduler {
221 jobs: std::sync::Mutex<Vec<Job>>,
222 memory: Option<Arc<dyn Memory>>,
223 cron_abort: std::sync::Mutex<Option<tokio::task::AbortHandle>>,
227}
228
229impl MemoryScheduler {
230 pub fn new() -> Self {
231 Self {
232 jobs: std::sync::Mutex::new(Vec::new()),
233 memory: None,
234 cron_abort: std::sync::Mutex::new(None),
235 }
236 }
237
238 pub fn with_memory(mut self, memory: Arc<dyn Memory>) -> Self {
239 self.memory = Some(memory);
240 self
241 }
242
243 fn persist_jobs_sync(&self) {
244 if let Some(mem) = &self.memory {
245 let jobs = self.jobs.lock().unwrap();
246 if let Ok(json) = serde_json::to_string_pretty(&*jobs) {
247 let _ = mem.upsert_doc(crate::memory::WorkingDoc {
248 id: "scheduler-jobs".into(),
249 title: "Scheduled Jobs".into(),
250 content: json,
251 updated_at: Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
252 });
253 }
254 }
255 }
256
257 pub fn restore_sync(&self) {
258 if let Some(mem) = &self.memory {
259 let docs = mem.shared_docs();
260 if let Some(doc) = docs.iter().find(|d| d.id == "scheduler-jobs") {
261 if let Ok(jobs) = serde_json::from_str::<Vec<Job>>(&doc.content) {
262 let mut guard = self.jobs.lock().unwrap();
263 *guard = jobs;
264 }
265 }
266 }
267 }
268
269 pub fn stop_cron_loop(&self) {
271 if let Some(abort) = self.cron_abort.lock().unwrap().take() {
272 abort.abort();
273 }
274 }
275
276 pub fn start_cron_loop(
277 self: &Arc<Self>,
278 engine: Arc<Engine>,
279 recorder: Arc<FsRecorder>,
280 ) -> tokio::task::JoinHandle<()> {
281 if let Some(prev) = self.cron_abort.lock().unwrap().take() {
285 prev.abort();
286 }
287 let scheduler = self.clone();
288 let handle = tokio::spawn(async move {
289 loop {
290 tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
291 let due_jobs = scheduler.tick().await;
292 for job in due_jobs {
293 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
294 let task = Task {
295 description: job.task.clone(),
296 context: vec![],
297 };
298 let run_id = uuid::Uuid::new_v4().to_string();
299 recorder.start_run(
300 run_id.clone(),
301 RunInputs {
302 task: job.task.clone(),
303 config_snapshot: serde_json::json!({}),
304 model_id: "scheduled".into(),
305 repo_head: None,
306 timestamp: Utc::now().to_rfc3339(),
307 agent: "scheduler".into(),
308 },
309 );
310
311 let engine_clone = engine.clone();
312 let recorder_clone = recorder.clone();
313 tokio::spawn(async move {
314 let engine_run_id = run_id.clone();
315 let engine_handle = tokio::spawn(async move {
316 engine_clone
317 .drive_with_run_id(task, tx, crate::event::RunId(engine_run_id))
318 .await
319 });
320 while let Some(event) = rx.recv().await {
321 recorder_clone.record(&event);
322 }
323 if let Err(err) = engine_handle.await {
324 tracing::error!("scheduled engine task failed: {}", err);
325 }
326 let _ = recorder_clone.finalize(&run_id);
327 });
328 }
329 }
330 });
331 *self.cron_abort.lock().unwrap() = Some(handle.abort_handle());
335 handle
336 }
337}
338
339#[async_trait::async_trait]
340impl Scheduler for MemoryScheduler {
341 fn schedule(&self, job: Job) -> anyhow::Result<String> {
342 let id = job.id.clone();
343 let mut jobs = self.jobs.lock().unwrap();
344 jobs.push(job);
345 drop(jobs);
346 self.persist_jobs_sync();
347 Ok(id)
348 }
349
350 fn list(&self) -> Vec<Job> {
351 self.jobs.lock().unwrap().clone()
352 }
353
354 fn cancel(&self, id: &str) -> anyhow::Result<()> {
355 let mut jobs = self.jobs.lock().unwrap();
356 jobs.retain(|j| j.id != id);
357 drop(jobs);
358 self.persist_jobs_sync();
359 Ok(())
360 }
361
362 fn get(&self, id: &str) -> Option<Job> {
363 self.jobs
364 .lock()
365 .unwrap()
366 .iter()
367 .find(|j| j.id == id)
368 .cloned()
369 }
370
371 async fn tick(&self) -> Vec<Job> {
372 let now = Utc::now();
373 let mut due = Vec::new();
374 let mut jobs = self.jobs.lock().unwrap();
375
376 for job in jobs.iter_mut() {
377 if !job.enabled {
378 continue;
379 }
380 if let Some(next) = &job.next_run {
381 if let Ok(next_dt) = DateTime::parse_from_rfc3339(next) {
382 if next_dt <= now {
383 due.push(job.clone());
384 job.last_run = Some(now.to_rfc3339());
385 job.next_run = job.next_schedule().map(|dt| dt.to_rfc3339());
386 }
387 }
388 } else {
389 job.next_run = job.next_schedule().map(|dt| dt.to_rfc3339());
390 }
391 }
392
393 drop(jobs);
394 self.persist_jobs_sync();
395 due
396 }
397}
398
399impl Default for MemoryScheduler {
400 fn default() -> Self {
401 Self::new()
402 }
403}