Skip to main content

rustyclaw_core/
cron.rs

1//! Cron job scheduling for RustyClaw.
2//!
3//! Provides a simple job scheduler that persists jobs to disk and can
4//! trigger agent turns or system events on schedule.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12/// Unique identifier for a cron job.
13pub type JobId = String;
14
15/// Generate a unique job ID.
16fn generate_job_id() -> JobId {
17    let timestamp = SystemTime::now()
18        .duration_since(UNIX_EPOCH)
19        .unwrap_or_default()
20        .as_millis();
21    format!("job-{:x}", timestamp)
22}
23
24/// Schedule kinds for cron jobs.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(tag = "kind", rename_all = "camelCase")]
27pub enum Schedule {
28    /// One-shot at an absolute time (ISO 8601).
29    At { at: String },
30    /// Recurring interval in milliseconds.
31    Every {
32        every_ms: u64,
33        #[serde(skip_serializing_if = "Option::is_none")]
34        anchor_ms: Option<u64>,
35    },
36    /// Cron expression (5-field).
37    Cron {
38        expr: String,
39        #[serde(skip_serializing_if = "Option::is_none")]
40        tz: Option<String>,
41    },
42}
43
44/// Payload kinds for cron jobs.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46#[serde(tag = "kind", rename_all = "camelCase")]
47pub enum Payload {
48    /// System event injected into main session.
49    SystemEvent { text: String },
50    /// Agent turn in an isolated session.
51    AgentTurn {
52        message: String,
53        #[serde(skip_serializing_if = "Option::is_none")]
54        model: Option<String>,
55        #[serde(skip_serializing_if = "Option::is_none")]
56        thinking: Option<String>,
57        #[serde(skip_serializing_if = "Option::is_none")]
58        timeout_seconds: Option<u64>,
59    },
60}
61
62/// Delivery configuration for isolated jobs.
63#[derive(Debug, Clone, Serialize, Deserialize, Default)]
64#[serde(rename_all = "camelCase")]
65pub struct Delivery {
66    #[serde(default)]
67    pub mode: DeliveryMode,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub channel: Option<String>,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub to: Option<String>,
72    #[serde(default)]
73    pub best_effort: bool,
74}
75
76/// Delivery mode.
77#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
78#[serde(rename_all = "camelCase")]
79pub enum DeliveryMode {
80    #[default]
81    Announce,
82    None,
83}
84
85/// Session target for job execution.
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87#[serde(rename_all = "camelCase")]
88pub enum SessionTarget {
89    Main,
90    Isolated,
91}
92
93/// A cron job definition.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95#[serde(rename_all = "camelCase")]
96pub struct CronJob {
97    pub job_id: JobId,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub name: Option<String>,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub description: Option<String>,
102    pub schedule: Schedule,
103    pub session_target: SessionTarget,
104    pub payload: Payload,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub delivery: Option<Delivery>,
107    #[serde(default = "default_true")]
108    pub enabled: bool,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub agent_id: Option<String>,
111    /// For one-shot jobs, delete after successful run.
112    #[serde(default)]
113    pub delete_after_run: bool,
114    /// Last run timestamp (ms since epoch).
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub last_run_ms: Option<u64>,
117    /// Next scheduled run timestamp (ms since epoch).
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub next_run_ms: Option<u64>,
120    /// Created timestamp (ms since epoch).
121    pub created_ms: u64,
122}
123
124fn default_true() -> bool {
125    true
126}
127
128impl CronJob {
129    /// Create a new job with the given parameters.
130    pub fn new(
131        name: Option<String>,
132        schedule: Schedule,
133        session_target: SessionTarget,
134        payload: Payload,
135    ) -> Self {
136        let now_ms = SystemTime::now()
137            .duration_since(UNIX_EPOCH)
138            .unwrap_or_default()
139            .as_millis() as u64;
140
141        let delete_after_run = matches!(schedule, Schedule::At { .. });
142
143        Self {
144            job_id: generate_job_id(),
145            name,
146            description: None,
147            schedule,
148            session_target,
149            payload,
150            delivery: None,
151            enabled: true,
152            agent_id: None,
153            delete_after_run,
154            last_run_ms: None,
155            next_run_ms: None,
156            created_ms: now_ms,
157        }
158    }
159}
160
161/// Run history entry.
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(rename_all = "camelCase")]
164pub struct RunEntry {
165    pub job_id: JobId,
166    pub run_id: String,
167    pub started_ms: u64,
168    pub finished_ms: Option<u64>,
169    pub status: RunStatus,
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub error: Option<String>,
172}
173
174/// Run status.
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176#[serde(rename_all = "camelCase")]
177pub enum RunStatus {
178    Running,
179    Ok,
180    Error,
181    Timeout,
182    Skipped,
183}
184
185/// Cron job store that persists jobs to disk.
186pub struct CronStore {
187    /// Path to the jobs file.
188    jobs_path: PathBuf,
189    /// Path to the runs directory.
190    runs_dir: PathBuf,
191    /// In-memory job cache.
192    jobs: HashMap<JobId, CronJob>,
193}
194
195impl CronStore {
196    /// Create or load a cron store from the given directory.
197    pub fn new(cron_dir: &Path) -> Result<Self, String> {
198        let jobs_path = cron_dir.join("jobs.json");
199        let runs_dir = cron_dir.join("runs");
200
201        // Ensure directories exist
202        fs::create_dir_all(cron_dir)
203            .map_err(|e| format!("Failed to create cron directory: {}", e))?;
204        fs::create_dir_all(&runs_dir)
205            .map_err(|e| format!("Failed to create runs directory: {}", e))?;
206
207        // Load existing jobs
208        let jobs = if jobs_path.exists() {
209            let content = fs::read_to_string(&jobs_path)
210                .map_err(|e| format!("Failed to read jobs file: {}", e))?;
211            serde_json::from_str(&content)
212                .map_err(|e| format!("Failed to parse jobs file: {}", e))?
213        } else {
214            HashMap::new()
215        };
216
217        Ok(Self {
218            jobs_path,
219            runs_dir,
220            jobs,
221        })
222    }
223
224    /// Save jobs to disk.
225    fn save(&self) -> Result<(), String> {
226        let content = serde_json::to_string_pretty(&self.jobs)
227            .map_err(|e| format!("Failed to serialize jobs: {}", e))?;
228        fs::write(&self.jobs_path, content)
229            .map_err(|e| format!("Failed to write jobs file: {}", e))?;
230        Ok(())
231    }
232
233    /// Add a new job.
234    pub fn add(&mut self, job: CronJob) -> Result<JobId, String> {
235        let id = job.job_id.clone();
236        self.jobs.insert(id.clone(), job);
237        self.save()?;
238        Ok(id)
239    }
240
241    /// Get a job by ID.
242    pub fn get(&self, job_id: &str) -> Option<&CronJob> {
243        self.jobs.get(job_id)
244    }
245
246    /// List all jobs.
247    pub fn list(&self, include_disabled: bool) -> Vec<&CronJob> {
248        self.jobs
249            .values()
250            .filter(|j| include_disabled || j.enabled)
251            .collect()
252    }
253
254    /// Update a job with a patch.
255    pub fn update(&mut self, job_id: &str, patch: CronJobPatch) -> Result<(), String> {
256        let job = self
257            .jobs
258            .get_mut(job_id)
259            .ok_or_else(|| format!("Job not found: {}", job_id))?;
260
261        if let Some(name) = patch.name {
262            job.name = Some(name);
263        }
264        if let Some(enabled) = patch.enabled {
265            job.enabled = enabled;
266        }
267        if let Some(schedule) = patch.schedule {
268            job.schedule = schedule;
269        }
270        if let Some(payload) = patch.payload {
271            job.payload = payload;
272        }
273        if let Some(delivery) = patch.delivery {
274            job.delivery = Some(delivery);
275        }
276
277        self.save()
278    }
279
280    /// Remove a job.
281    pub fn remove(&mut self, job_id: &str) -> Result<CronJob, String> {
282        let job = self
283            .jobs
284            .remove(job_id)
285            .ok_or_else(|| format!("Job not found: {}", job_id))?;
286        self.save()?;
287        Ok(job)
288    }
289
290    /// Get run history for a job.
291    pub fn get_runs(&self, job_id: &str, limit: usize) -> Result<Vec<RunEntry>, String> {
292        let runs_file = self.runs_dir.join(format!("{}.jsonl", job_id));
293        if !runs_file.exists() {
294            return Ok(Vec::new());
295        }
296
297        let content = fs::read_to_string(&runs_file)
298            .map_err(|e| format!("Failed to read runs file: {}", e))?;
299
300        let runs: Vec<RunEntry> = content
301            .lines()
302            .filter_map(|line| serde_json::from_str(line).ok())
303            .collect();
304
305        // Return last N runs
306        Ok(runs.into_iter().rev().take(limit).collect())
307    }
308
309    /// Record a run.
310    pub fn record_run(&self, entry: &RunEntry) -> Result<(), String> {
311        let runs_file = self.runs_dir.join(format!("{}.jsonl", entry.job_id));
312        let line = serde_json::to_string(entry)
313            .map_err(|e| format!("Failed to serialize run entry: {}", e))?;
314
315        use std::io::Write;
316        let mut file = fs::OpenOptions::new()
317            .create(true)
318            .append(true)
319            .open(&runs_file)
320            .map_err(|e| format!("Failed to open runs file: {}", e))?;
321
322        writeln!(file, "{}", line).map_err(|e| format!("Failed to write run entry: {}", e))?;
323
324        Ok(())
325    }
326}
327
328/// Patch for updating a cron job.
329#[derive(Debug, Clone, Default, Serialize, Deserialize)]
330#[serde(rename_all = "camelCase")]
331pub struct CronJobPatch {
332    #[serde(skip_serializing_if = "Option::is_none")]
333    pub name: Option<String>,
334    #[serde(skip_serializing_if = "Option::is_none")]
335    pub enabled: Option<bool>,
336    #[serde(skip_serializing_if = "Option::is_none")]
337    pub schedule: Option<Schedule>,
338    #[serde(skip_serializing_if = "Option::is_none")]
339    pub payload: Option<Payload>,
340    #[serde(skip_serializing_if = "Option::is_none")]
341    pub delivery: Option<Delivery>,
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use tempfile::TempDir;
348
349    #[test]
350    fn test_create_job() {
351        let job = CronJob::new(
352            Some("Test Job".to_string()),
353            Schedule::At {
354                at: "2026-02-12T18:00:00Z".to_string(),
355            },
356            SessionTarget::Main,
357            Payload::SystemEvent {
358                text: "Test reminder".to_string(),
359            },
360        );
361
362        assert!(job.job_id.starts_with("job-"));
363        assert_eq!(job.name, Some("Test Job".to_string()));
364        assert!(job.enabled);
365        assert!(job.delete_after_run); // One-shot jobs auto-delete
366    }
367
368    #[test]
369    fn test_cron_store_add_list() {
370        let dir = TempDir::new().unwrap();
371        let mut store = CronStore::new(dir.path()).unwrap();
372
373        let job = CronJob::new(
374            Some("Test".to_string()),
375            Schedule::Every {
376                every_ms: 60000,
377                anchor_ms: None,
378            },
379            SessionTarget::Isolated,
380            Payload::AgentTurn {
381                message: "Do something".to_string(),
382                model: None,
383                thinking: None,
384                timeout_seconds: None,
385            },
386        );
387
388        let id = store.add(job).unwrap();
389        let jobs = store.list(false);
390        assert_eq!(jobs.len(), 1);
391        assert_eq!(jobs[0].job_id, id);
392    }
393
394    #[test]
395    fn test_cron_store_persistence() {
396        let dir = TempDir::new().unwrap();
397
398        // Create and add job
399        {
400            let mut store = CronStore::new(dir.path()).unwrap();
401            let job = CronJob::new(
402                Some("Persistent".to_string()),
403                Schedule::Cron {
404                    expr: "0 * * * *".to_string(),
405                    tz: None,
406                },
407                SessionTarget::Main,
408                Payload::SystemEvent {
409                    text: "Hourly check".to_string(),
410                },
411            );
412            store.add(job).unwrap();
413        }
414
415        // Reload and verify
416        {
417            let store = CronStore::new(dir.path()).unwrap();
418            let jobs = store.list(false);
419            assert_eq!(jobs.len(), 1);
420            assert_eq!(jobs[0].name, Some("Persistent".to_string()));
421        }
422    }
423}