Skip to main content

sparrow/runtime/
scheduler.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4
5use crate::engine::{Engine, Task};
6use crate::event::AutonomyLevel;
7use crate::memory::Memory;
8use crate::runtime::recorder::{FsRecorder, Recorder, RunInputs};
9
10// ─── Job ────────────────────────────────────────────────────────────────────────
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Job {
14    pub id: String,
15    pub task: String,
16    pub cron: String,
17    pub autonomy: AutonomyLevel,
18    pub sandbox: String,
19    pub enabled: bool,
20    pub last_run: Option<String>,
21    pub next_run: Option<String>,
22    pub created_at: String,
23}
24
25impl Job {
26    pub fn new(task: String, cron: String) -> Self {
27        Self {
28            id: uuid::Uuid::new_v4().to_string(),
29            task,
30            cron,
31            autonomy: AutonomyLevel::Supervised,
32            sandbox: "local-hardened".into(),
33            enabled: true,
34            last_run: None,
35            next_run: None,
36            created_at: Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
37        }
38    }
39
40    /// Uses the `cron` crate for full cron expression support.
41    pub fn next_schedule(&self) -> Option<DateTime<Utc>> {
42        use cron::Schedule;
43        use std::str::FromStr;
44        Schedule::from_str(&self.cron)
45            .ok()
46            .and_then(|s| s.upcoming(Utc).next())
47    }
48}
49
50// ─── Natural-language → cron parser ────────────────────────────────────────────
51
52/// Try to convert a natural-language schedule expression into a cron string.
53/// Examples:
54///   "every minute"            → "* * * * *"
55///   "every hour" / "hourly"   → "0 * * * *"
56///   "every day at 2am"        → "0 2 * * *"
57///   "daily at 9:30"           → "30 9 * * *"
58///   "every monday at 8"       → "0 8 * * 1"
59///   "every 15 minutes"        → "*/15 * * * *"
60///   "midnight"                → "0 0 * * *"
61///   "noon"                    → "0 12 * * *"
62///   "weekly"                  → "0 9 * * 1"
63///   "monthly"                 → "0 9 1 * *"
64///
65/// Returns `None` if the input already looks like a cron expression or cannot be parsed.
66pub fn parse_nl_cron(input: &str) -> Option<String> {
67    // Already a cron expression (5 whitespace-separated tokens, first can be */N or digits)
68    let parts: Vec<&str> = input.split_whitespace().collect();
69    if parts.len() == 5 {
70        return None; // looks like raw cron, leave as-is
71    }
72
73    let lower = input.to_lowercase();
74    let lower = lower.trim();
75
76    // Named shortcuts
77    if lower == "hourly" || lower == "every hour" {
78        return Some("0 * * * *".into());
79    }
80    if lower == "daily" || lower == "every day" {
81        return Some("0 9 * * *".into());
82    }
83    if lower == "weekly" || lower == "every week" {
84        return Some("0 9 * * 1".into());
85    }
86    if lower == "monthly" || lower == "every month" {
87        return Some("0 9 1 * *".into());
88    }
89    if lower.contains("midnight") {
90        return Some("0 0 * * *".into());
91    }
92    if lower.contains("noon") {
93        return Some("0 12 * * *".into());
94    }
95    if lower == "every minute" {
96        return Some("* * * * *".into());
97    }
98
99    // "every N minutes"
100    if let Some(n_str) = lower
101        .strip_prefix("every ")
102        .and_then(|s| s.strip_suffix(" minutes"))
103    {
104        if let Ok(n) = n_str.trim().parse::<u32>() {
105            if n > 0 && n < 60 {
106                return Some(format!("*/{} * * * *", n));
107            }
108        }
109    }
110    if let Some(n_str) = lower
111        .strip_prefix("every ")
112        .and_then(|s| s.strip_suffix(" hours"))
113    {
114        if let Ok(n) = n_str.trim().parse::<u32>() {
115            if n > 0 && n <= 24 {
116                return Some(format!("0 */{} * * *", n));
117            }
118        }
119    }
120
121    // Day-of-week mapping
122    let dow_map = [
123        ("sunday", 0),
124        ("monday", 1),
125        ("tuesday", 2),
126        ("wednesday", 3),
127        ("thursday", 4),
128        ("friday", 5),
129        ("saturday", 6),
130        ("sun", 0),
131        ("mon", 1),
132        ("tue", 2),
133        ("wed", 3),
134        ("thu", 4),
135        ("fri", 5),
136        ("sat", 6),
137    ];
138
139    // Parse optional hour from "at H", "at H:MM", "at Ham/pm"
140    fn parse_hour_minute(at_str: &str) -> Option<(u32, u32)> {
141        let s = at_str
142            .trim()
143            .trim_start_matches("at ")
144            .replace("am", "")
145            .replace("pm", "");
146        let is_pm = at_str.contains("pm");
147        if let Some((h, m)) = s.split_once(':') {
148            let h: u32 = h.trim().parse().ok()?;
149            let m: u32 = m.trim().parse().ok()?;
150            let h = if is_pm && h < 12 {
151                h + 12
152            } else if !is_pm && h == 12 {
153                0
154            } else {
155                h
156            };
157            Some((h.min(23), m.min(59)))
158        } else {
159            let h: u32 = s.trim().parse().ok()?;
160            let h = if is_pm && h < 12 {
161                h + 12
162            } else if !is_pm && h == 12 {
163                0
164            } else {
165                h
166            };
167            Some((h.min(23), 0))
168        }
169    }
170
171    // "every <day> at <time>"
172    for (day_name, dow) in &dow_map {
173        if lower.contains(day_name) {
174            let at_idx = lower.find(" at ");
175            let (h, m) = if let Some(idx) = at_idx {
176                parse_hour_minute(&lower[idx + 4..]).unwrap_or((9, 0))
177            } else {
178                (9, 0)
179            };
180            return Some(format!("{} {} * * {}", m, h, dow));
181        }
182    }
183
184    // "every day at <time>" / "daily at <time>"
185    if lower.contains("every day") || lower.contains("daily") {
186        if let Some(idx) = lower.find(" at ") {
187            let (h, m) = parse_hour_minute(&lower[idx + 4..]).unwrap_or((9, 0));
188            return Some(format!("{} {} * * *", m, h));
189        }
190        return Some("0 9 * * *".into());
191    }
192
193    // "every hour at :MM" / "every hour at MM"
194    if lower.contains("every hour") {
195        if let Some(idx) = lower.find(" at ") {
196            let time_part = lower[idx + 4..].trim();
197            let m: u32 = time_part.trim_start_matches(':').parse().unwrap_or(0);
198            return Some(format!("{} * * * *", m.min(59)));
199        }
200        return Some("0 * * * *".into());
201    }
202
203    None
204}
205
206// ─── THE SCHEDULER TRAIT ────────────────────────────────────────────────────────
207
208#[async_trait::async_trait]
209pub trait Scheduler: Send + Sync {
210    fn schedule(&self, job: Job) -> anyhow::Result<String>;
211    fn list(&self) -> Vec<Job>;
212    fn cancel(&self, id: &str) -> anyhow::Result<()>;
213    fn get(&self, id: &str) -> Option<Job>;
214    /// Run all due jobs
215    async fn tick(&self) -> Vec<Job>;
216}
217
218// ─── In-memory scheduler (M4) — persists to memory via WorkingDocs ──────────────
219
220pub struct MemoryScheduler {
221    jobs: std::sync::Mutex<Vec<Job>>,
222    memory: Option<Arc<dyn Memory>>,
223    /// Abort handle for the running cron loop, if any. Repeated
224    /// `start_cron_loop()` calls cancel the previous loop instead of leaking
225    /// parallel copies (each loop would otherwise re-fire every due job).
226    cron_abort: std::sync::Mutex<Option<tokio::task::AbortHandle>>,
227}
228
229impl MemoryScheduler {
230    pub fn new() -> Self {
231        Self {
232            jobs: std::sync::Mutex::new(Vec::new()),
233            memory: None,
234            cron_abort: std::sync::Mutex::new(None),
235        }
236    }
237
238    pub fn with_memory(mut self, memory: Arc<dyn Memory>) -> Self {
239        self.memory = Some(memory);
240        self
241    }
242
243    fn persist_jobs_sync(&self) {
244        if let Some(mem) = &self.memory {
245            let jobs = self.jobs.lock().unwrap();
246            if let Ok(json) = serde_json::to_string_pretty(&*jobs) {
247                let _ = mem.upsert_doc(crate::memory::WorkingDoc {
248                    id: "scheduler-jobs".into(),
249                    title: "Scheduled Jobs".into(),
250                    content: json,
251                    updated_at: Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
252                });
253            }
254        }
255    }
256
257    pub fn restore_sync(&self) {
258        if let Some(mem) = &self.memory {
259            let docs = mem.shared_docs();
260            if let Some(doc) = docs.iter().find(|d| d.id == "scheduler-jobs") {
261                if let Ok(jobs) = serde_json::from_str::<Vec<Job>>(&doc.content) {
262                    let mut guard = self.jobs.lock().unwrap();
263                    *guard = jobs;
264                }
265            }
266        }
267    }
268
269    /// Stop the running cron loop if any. Idempotent.
270    pub fn stop_cron_loop(&self) {
271        if let Some(abort) = self.cron_abort.lock().unwrap().take() {
272            abort.abort();
273        }
274    }
275
276    pub fn start_cron_loop(
277        self: &Arc<Self>,
278        engine: Arc<Engine>,
279        recorder: Arc<FsRecorder>,
280    ) -> tokio::task::JoinHandle<()> {
281        // Cancel any previously-running loop so repeated start() calls don't
282        // leak parallel ticking loops (each tick spawns engine work — multiple
283        // loops would fire the same job multiple times).
284        if let Some(prev) = self.cron_abort.lock().unwrap().take() {
285            prev.abort();
286        }
287        let scheduler = self.clone();
288        let handle = tokio::spawn(async move {
289            loop {
290                tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
291                let due_jobs = scheduler.tick().await;
292                for job in due_jobs {
293                    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
294                    let task = Task {
295                        description: job.task.clone(),
296                        context: vec![],
297                    };
298                    let run_id = uuid::Uuid::new_v4().to_string();
299                    recorder.start_run(
300                        run_id.clone(),
301                        RunInputs {
302                            task: job.task.clone(),
303                            config_snapshot: serde_json::json!({}),
304                            model_id: "scheduled".into(),
305                            repo_head: None,
306                            timestamp: Utc::now().to_rfc3339(),
307                            agent: "scheduler".into(),
308                        },
309                    );
310
311                    let engine_clone = engine.clone();
312                    let recorder_clone = recorder.clone();
313                    tokio::spawn(async move {
314                        let engine_run_id = run_id.clone();
315                        let engine_handle = tokio::spawn(async move {
316                            engine_clone
317                                .drive_with_run_id(task, tx, crate::event::RunId(engine_run_id))
318                                .await
319                        });
320                        while let Some(event) = rx.recv().await {
321                            recorder_clone.record(&event);
322                        }
323                        if let Err(err) = engine_handle.await {
324                            tracing::error!("scheduled engine task failed: {}", err);
325                        }
326                        let _ = recorder_clone.finalize(&run_id);
327                    });
328                }
329            }
330        });
331        // Store an AbortHandle so the next start() or an explicit stop() can
332        // cancel this loop. The original JoinHandle is still returned to the
333        // caller for backward compatibility.
334        *self.cron_abort.lock().unwrap() = Some(handle.abort_handle());
335        handle
336    }
337}
338
339#[async_trait::async_trait]
340impl Scheduler for MemoryScheduler {
341    fn schedule(&self, job: Job) -> anyhow::Result<String> {
342        let id = job.id.clone();
343        let mut jobs = self.jobs.lock().unwrap();
344        jobs.push(job);
345        drop(jobs);
346        self.persist_jobs_sync();
347        Ok(id)
348    }
349
350    fn list(&self) -> Vec<Job> {
351        self.jobs.lock().unwrap().clone()
352    }
353
354    fn cancel(&self, id: &str) -> anyhow::Result<()> {
355        let mut jobs = self.jobs.lock().unwrap();
356        jobs.retain(|j| j.id != id);
357        drop(jobs);
358        self.persist_jobs_sync();
359        Ok(())
360    }
361
362    fn get(&self, id: &str) -> Option<Job> {
363        self.jobs
364            .lock()
365            .unwrap()
366            .iter()
367            .find(|j| j.id == id)
368            .cloned()
369    }
370
371    async fn tick(&self) -> Vec<Job> {
372        let now = Utc::now();
373        let mut due = Vec::new();
374        let mut jobs = self.jobs.lock().unwrap();
375
376        for job in jobs.iter_mut() {
377            if !job.enabled {
378                continue;
379            }
380            if let Some(next) = &job.next_run {
381                if let Ok(next_dt) = DateTime::parse_from_rfc3339(next) {
382                    if next_dt <= now {
383                        due.push(job.clone());
384                        job.last_run = Some(now.to_rfc3339());
385                        job.next_run = job.next_schedule().map(|dt| dt.to_rfc3339());
386                    }
387                }
388            } else {
389                job.next_run = job.next_schedule().map(|dt| dt.to_rfc3339());
390            }
391        }
392
393        drop(jobs);
394        self.persist_jobs_sync();
395        due
396    }
397}
398
399impl Default for MemoryScheduler {
400    fn default() -> Self {
401        Self::new()
402    }
403}