Skip to main content

rustyclaw_core/
cron.rs

1//! Cron job scheduling for RustyClaw.
2//!
3//! Provides a simple job scheduler that persists jobs to disk and can
4//! trigger agent turns or system events on schedule.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12/// Unique identifier for a cron job.
13pub type JobId = String;
14
15/// Generate a unique job ID.
16fn generate_job_id() -> JobId {
17    let timestamp = SystemTime::now()
18        .duration_since(UNIX_EPOCH)
19        .unwrap_or_default()
20        .as_millis();
21    format!("job-{:x}", timestamp)
22}
23
24/// Schedule kinds for cron jobs.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(tag = "kind", rename_all = "camelCase")]
27pub enum Schedule {
28    /// One-shot at an absolute time (ISO 8601).
29    At { at: String },
30    /// Recurring interval in milliseconds.
31    Every {
32        every_ms: u64,
33        #[serde(skip_serializing_if = "Option::is_none")]
34        anchor_ms: Option<u64>,
35    },
36    /// Cron expression (5-field).
37    Cron {
38        expr: String,
39        #[serde(skip_serializing_if = "Option::is_none")]
40        tz: Option<String>,
41    },
42}
43
44/// Payload kinds for cron jobs.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46#[serde(tag = "kind", rename_all = "camelCase")]
47pub enum Payload {
48    /// System event injected into main session.
49    SystemEvent { text: String },
50    /// Agent turn in an isolated session.
51    AgentTurn {
52        message: String,
53        #[serde(skip_serializing_if = "Option::is_none")]
54        model: Option<String>,
55        #[serde(skip_serializing_if = "Option::is_none")]
56        thinking: Option<String>,
57        #[serde(skip_serializing_if = "Option::is_none")]
58        timeout_seconds: Option<u64>,
59    },
60}
61
62/// Delivery configuration for isolated jobs.
63#[derive(Debug, Clone, Serialize, Deserialize, Default)]
64#[serde(rename_all = "camelCase")]
65pub struct Delivery {
66    #[serde(default)]
67    pub mode: DeliveryMode,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub channel: Option<String>,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub to: Option<String>,
72    #[serde(default)]
73    pub best_effort: bool,
74}
75
76/// Delivery mode.
77#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
78#[serde(rename_all = "camelCase")]
79pub enum DeliveryMode {
80    #[default]
81    Announce,
82    None,
83}
84
85/// Session target for job execution.
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87#[serde(rename_all = "camelCase")]
88pub enum SessionTarget {
89    Main,
90    Isolated,
91}
92
93/// A cron job definition.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95#[serde(rename_all = "camelCase")]
96pub struct CronJob {
97    pub job_id: JobId,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub name: Option<String>,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub description: Option<String>,
102    pub schedule: Schedule,
103    pub session_target: SessionTarget,
104    pub payload: Payload,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub delivery: Option<Delivery>,
107    #[serde(default = "default_true")]
108    pub enabled: bool,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub agent_id: Option<String>,
111    /// For one-shot jobs, delete after successful run.
112    #[serde(default)]
113    pub delete_after_run: bool,
114    /// Last run timestamp (ms since epoch).
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub last_run_ms: Option<u64>,
117    /// Next scheduled run timestamp (ms since epoch).
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub next_run_ms: Option<u64>,
120    /// Created timestamp (ms since epoch).
121    pub created_ms: u64,
122}
123
124fn default_true() -> bool {
125    true
126}
127
128impl CronJob {
129    /// Create a new job with the given parameters.
130    pub fn new(
131        name: Option<String>,
132        schedule: Schedule,
133        session_target: SessionTarget,
134        payload: Payload,
135    ) -> Self {
136        let now_ms = SystemTime::now()
137            .duration_since(UNIX_EPOCH)
138            .unwrap_or_default()
139            .as_millis() as u64;
140
141        let delete_after_run = matches!(schedule, Schedule::At { .. });
142
143        Self {
144            job_id: generate_job_id(),
145            name,
146            description: None,
147            schedule,
148            session_target,
149            payload,
150            delivery: None,
151            enabled: true,
152            agent_id: None,
153            delete_after_run,
154            last_run_ms: None,
155            next_run_ms: None,
156            created_ms: now_ms,
157        }
158    }
159}
160
161/// Run history entry.
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(rename_all = "camelCase")]
164pub struct RunEntry {
165    pub job_id: JobId,
166    pub run_id: String,
167    pub started_ms: u64,
168    pub finished_ms: Option<u64>,
169    pub status: RunStatus,
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub error: Option<String>,
172}
173
174/// Run status.
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176#[serde(rename_all = "camelCase")]
177pub enum RunStatus {
178    Running,
179    Ok,
180    Error,
181    Timeout,
182    Skipped,
183}
184
185/// Cron job store that persists jobs to disk.
186pub struct CronStore {
187    /// Path to the jobs file.
188    jobs_path: PathBuf,
189    /// Path to the runs directory.
190    runs_dir: PathBuf,
191    /// In-memory job cache.
192    jobs: HashMap<JobId, CronJob>,
193}
194
195impl CronStore {
196    /// Create or load a cron store from the given directory.
197    pub fn new(cron_dir: &Path) -> Result<Self, String> {
198        let jobs_path = cron_dir.join("jobs.json");
199        let runs_dir = cron_dir.join("runs");
200
201        // Ensure directories exist
202        fs::create_dir_all(cron_dir)
203            .map_err(|e| format!("Failed to create cron directory: {}", e))?;
204        fs::create_dir_all(&runs_dir)
205            .map_err(|e| format!("Failed to create runs directory: {}", e))?;
206
207        // Load existing jobs
208        let jobs = if jobs_path.exists() {
209            let content = fs::read_to_string(&jobs_path)
210                .map_err(|e| format!("Failed to read jobs file: {}", e))?;
211            serde_json::from_str(&content)
212                .map_err(|e| format!("Failed to parse jobs file: {}", e))?
213        } else {
214            HashMap::new()
215        };
216
217        Ok(Self {
218            jobs_path,
219            runs_dir,
220            jobs,
221        })
222    }
223
224    /// Save jobs to disk.
225    fn save(&self) -> Result<(), String> {
226        let content = serde_json::to_string_pretty(&self.jobs)
227            .map_err(|e| format!("Failed to serialize jobs: {}", e))?;
228        fs::write(&self.jobs_path, content)
229            .map_err(|e| format!("Failed to write jobs file: {}", e))?;
230        Ok(())
231    }
232
233    /// Add a new job.
234    pub fn add(&mut self, job: CronJob) -> Result<JobId, String> {
235        let id = job.job_id.clone();
236        self.jobs.insert(id.clone(), job);
237        self.save()?;
238        Ok(id)
239    }
240
241    /// Get a job by ID.
242    pub fn get(&self, job_id: &str) -> Option<&CronJob> {
243        self.jobs.get(job_id)
244    }
245
246    /// List all jobs.
247    pub fn list(&self, include_disabled: bool) -> Vec<&CronJob> {
248        self.jobs
249            .values()
250            .filter(|j| include_disabled || j.enabled)
251            .collect()
252    }
253
254    /// Update a job with a patch.
255    pub fn update(&mut self, job_id: &str, patch: CronJobPatch) -> Result<(), String> {
256        let job = self
257            .jobs
258            .get_mut(job_id)
259            .ok_or_else(|| format!("Job not found: {}", job_id))?;
260
261        if let Some(name) = patch.name {
262            job.name = Some(name);
263        }
264        if let Some(enabled) = patch.enabled {
265            job.enabled = enabled;
266        }
267        if let Some(schedule) = patch.schedule {
268            job.schedule = schedule;
269        }
270        if let Some(payload) = patch.payload {
271            job.payload = payload;
272        }
273        if let Some(delivery) = patch.delivery {
274            job.delivery = Some(delivery);
275        }
276
277        self.save()
278    }
279
280    /// Remove a job.
281    pub fn remove(&mut self, job_id: &str) -> Result<CronJob, String> {
282        let job = self
283            .jobs
284            .remove(job_id)
285            .ok_or_else(|| format!("Job not found: {}", job_id))?;
286        self.save()?;
287        Ok(job)
288    }
289
290    /// Get run history for a job.
291    pub fn get_runs(&self, job_id: &str, limit: usize) -> Result<Vec<RunEntry>, String> {
292        let runs_file = self.runs_dir.join(format!("{}.jsonl", job_id));
293        if !runs_file.exists() {
294            return Ok(Vec::new());
295        }
296
297        let content = fs::read_to_string(&runs_file)
298            .map_err(|e| format!("Failed to read runs file: {}", e))?;
299
300        let runs: Vec<RunEntry> = content
301            .lines()
302            .filter_map(|line| serde_json::from_str(line).ok())
303            .collect();
304
305        // Return last N runs
306        Ok(runs.into_iter().rev().take(limit).collect())
307    }
308
309    /// Record a run.
310    pub fn record_run(&self, entry: &RunEntry) -> Result<(), String> {
311        let runs_file = self.runs_dir.join(format!("{}.jsonl", entry.job_id));
312        let line = serde_json::to_string(entry)
313            .map_err(|e| format!("Failed to serialize run entry: {}", e))?;
314
315        use std::io::Write;
316        let mut file = fs::OpenOptions::new()
317            .create(true)
318            .append(true)
319            .open(&runs_file)
320            .map_err(|e| format!("Failed to open runs file: {}", e))?;
321
322        writeln!(file, "{}", line)
323            .map_err(|e| format!("Failed to write run entry: {}", e))?;
324
325        Ok(())
326    }
327}
328
329/// Patch for updating a cron job.
330#[derive(Debug, Clone, Default, Serialize, Deserialize)]
331#[serde(rename_all = "camelCase")]
332pub struct CronJobPatch {
333    #[serde(skip_serializing_if = "Option::is_none")]
334    pub name: Option<String>,
335    #[serde(skip_serializing_if = "Option::is_none")]
336    pub enabled: Option<bool>,
337    #[serde(skip_serializing_if = "Option::is_none")]
338    pub schedule: Option<Schedule>,
339    #[serde(skip_serializing_if = "Option::is_none")]
340    pub payload: Option<Payload>,
341    #[serde(skip_serializing_if = "Option::is_none")]
342    pub delivery: Option<Delivery>,
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use tempfile::TempDir;
349
350    #[test]
351    fn test_create_job() {
352        let job = CronJob::new(
353            Some("Test Job".to_string()),
354            Schedule::At {
355                at: "2026-02-12T18:00:00Z".to_string(),
356            },
357            SessionTarget::Main,
358            Payload::SystemEvent {
359                text: "Test reminder".to_string(),
360            },
361        );
362
363        assert!(job.job_id.starts_with("job-"));
364        assert_eq!(job.name, Some("Test Job".to_string()));
365        assert!(job.enabled);
366        assert!(job.delete_after_run); // One-shot jobs auto-delete
367    }
368
369    #[test]
370    fn test_cron_store_add_list() {
371        let dir = TempDir::new().unwrap();
372        let mut store = CronStore::new(dir.path()).unwrap();
373
374        let job = CronJob::new(
375            Some("Test".to_string()),
376            Schedule::Every {
377                every_ms: 60000,
378                anchor_ms: None,
379            },
380            SessionTarget::Isolated,
381            Payload::AgentTurn {
382                message: "Do something".to_string(),
383                model: None,
384                thinking: None,
385                timeout_seconds: None,
386            },
387        );
388
389        let id = store.add(job).unwrap();
390        let jobs = store.list(false);
391        assert_eq!(jobs.len(), 1);
392        assert_eq!(jobs[0].job_id, id);
393    }
394
395    #[test]
396    fn test_cron_store_persistence() {
397        let dir = TempDir::new().unwrap();
398
399        // Create and add job
400        {
401            let mut store = CronStore::new(dir.path()).unwrap();
402            let job = CronJob::new(
403                Some("Persistent".to_string()),
404                Schedule::Cron {
405                    expr: "0 * * * *".to_string(),
406                    tz: None,
407                },
408                SessionTarget::Main,
409                Payload::SystemEvent {
410                    text: "Hourly check".to_string(),
411                },
412            );
413            store.add(job).unwrap();
414        }
415
416        // Reload and verify
417        {
418            let store = CronStore::new(dir.path()).unwrap();
419            let jobs = store.list(false);
420            assert_eq!(jobs.len(), 1);
421            assert_eq!(jobs[0].name, Some("Persistent".to_string()));
422        }
423    }
424}