Skip to main content

clawft_types/
cron.rs

1//! Cron scheduling types.
2//!
3//! Defines the data model for scheduled jobs: [`CronJob`], its
4//! [`CronSchedule`], [`CronPayload`], and runtime [`CronJobState`].
5//! The [`CronStore`] is the top-level container persisted to disk.
6//!
7//! All timestamps use `DateTime<Utc>` for type safety. For backward
8//! compatibility, the serde layer accepts both RFC 3339 strings and
9//! millisecond-since-epoch integers via custom deserializers.
10
11use chrono::{DateTime, TimeZone, Utc};
12use serde::{Deserialize, Serialize};
13
14/// How a cron job is scheduled.
15#[non_exhaustive]
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum ScheduleKind {
19    /// Fire once at a specific timestamp.
20    At,
21    /// Fire repeatedly at a fixed interval.
22    Every,
23    /// Fire according to a cron expression.
24    Cron,
25}
26
27/// Schedule definition for a cron job.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct CronSchedule {
30    /// The type of schedule.
31    pub kind: ScheduleKind,
32
33    /// For [`ScheduleKind::At`]: timestamp in milliseconds since epoch.
34    #[serde(default, skip_serializing_if = "Option::is_none")]
35    pub at_ms: Option<i64>,
36
37    /// For [`ScheduleKind::Every`]: interval in milliseconds.
38    #[serde(default, skip_serializing_if = "Option::is_none")]
39    pub every_ms: Option<i64>,
40
41    /// For [`ScheduleKind::Cron`]: cron expression (e.g. `"0 9 * * *"`).
42    #[serde(default, skip_serializing_if = "Option::is_none")]
43    pub expr: Option<String>,
44
45    /// Timezone for cron expressions (e.g. `"UTC"`, `"Asia/Shanghai"`).
46    #[serde(default, skip_serializing_if = "Option::is_none")]
47    pub tz: Option<String>,
48}
49
50impl Default for CronSchedule {
51    fn default() -> Self {
52        Self {
53            kind: ScheduleKind::Every,
54            at_ms: None,
55            every_ms: None,
56            expr: None,
57            tz: None,
58        }
59    }
60}
61
62/// What action to perform when a cron job fires.
63#[non_exhaustive]
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "snake_case")]
66pub enum PayloadKind {
67    /// Emit a system-level event.
68    SystemEvent,
69    /// Trigger an agent turn with a message.
70    AgentTurn,
71}
72
73/// Payload executed when a cron job fires.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct CronPayload {
76    /// The type of payload.
77    #[serde(default = "default_payload_kind")]
78    pub kind: PayloadKind,
79
80    /// Message to deliver or use as agent prompt.
81    #[serde(default)]
82    pub message: String,
83
84    /// Whether to deliver the response to a channel.
85    #[serde(default)]
86    pub deliver: bool,
87
88    /// Target channel name (e.g. `"whatsapp"`).
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub channel: Option<String>,
91
92    /// Target recipient (e.g. phone number, user ID).
93    #[serde(default, skip_serializing_if = "Option::is_none")]
94    pub to: Option<String>,
95}
96
97fn default_payload_kind() -> PayloadKind {
98    PayloadKind::AgentTurn
99}
100
101impl Default for CronPayload {
102    fn default() -> Self {
103        Self {
104            kind: PayloadKind::AgentTurn,
105            message: String::new(),
106            deliver: false,
107            channel: None,
108            to: None,
109        }
110    }
111}
112
113/// Outcome of the last job execution.
114#[non_exhaustive]
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
116#[serde(rename_all = "snake_case")]
117pub enum JobStatus {
118    /// The job completed successfully.
119    Ok,
120    /// The job encountered an error.
121    Error,
122    /// The job was skipped (e.g. already running).
123    Skipped,
124}
125
126/// Runtime state of a cron job.
127#[derive(Debug, Clone, Serialize, Deserialize, Default)]
128pub struct CronJobState {
129    /// Next scheduled run time (UTC).
130    #[serde(
131        default,
132        skip_serializing_if = "Option::is_none",
133        deserialize_with = "deserialize_optional_datetime_or_ms"
134    )]
135    pub next_run_at: Option<DateTime<Utc>>,
136
137    /// Last actual run time (UTC).
138    #[serde(
139        default,
140        skip_serializing_if = "Option::is_none",
141        deserialize_with = "deserialize_optional_datetime_or_ms"
142    )]
143    pub last_run_at: Option<DateTime<Utc>>,
144
145    /// Outcome of the last run.
146    #[serde(default, skip_serializing_if = "Option::is_none")]
147    pub last_status: Option<JobStatus>,
148
149    /// Error message from the last failed run.
150    #[serde(default, skip_serializing_if = "Option::is_none")]
151    pub last_error: Option<String>,
152}
153
154/// A scheduled job.
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct CronJob {
157    /// Unique job identifier.
158    pub id: String,
159
160    /// Human-readable job name.
161    pub name: String,
162
163    /// Whether the job is active.
164    #[serde(default = "default_true")]
165    pub enabled: bool,
166
167    /// When and how often to run.
168    #[serde(default)]
169    pub schedule: CronSchedule,
170
171    /// What to do when the job fires.
172    #[serde(default)]
173    pub payload: CronPayload,
174
175    /// Runtime state (next run, last run, etc.).
176    #[serde(default)]
177    pub state: CronJobState,
178
179    /// Creation timestamp (UTC).
180    #[serde(default = "default_epoch", deserialize_with = "deserialize_datetime_or_ms")]
181    pub created_at: DateTime<Utc>,
182
183    /// Last update timestamp (UTC).
184    #[serde(default = "default_epoch", deserialize_with = "deserialize_datetime_or_ms")]
185    pub updated_at: DateTime<Utc>,
186
187    /// If true, delete the job after its next successful run.
188    #[serde(default)]
189    pub delete_after_run: bool,
190}
191
192/// Returns the Unix epoch as a default `DateTime<Utc>` (for `#[serde(default)]`).
193fn default_epoch() -> DateTime<Utc> {
194    DateTime::UNIX_EPOCH
195}
196
197/// Deserialize a `DateTime<Utc>` from either:
198/// - An RFC 3339 string (new format)
199/// - An integer (milliseconds since epoch, legacy format)
200fn deserialize_datetime_or_ms<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
201where
202    D: serde::Deserializer<'de>,
203{
204    use serde::de;
205
206    let value: serde_json::Value = Deserialize::deserialize(deserializer)?;
207    match &value {
208        serde_json::Value::String(s) => DateTime::parse_from_rfc3339(s)
209            .map(|dt| dt.with_timezone(&Utc))
210            .or_else(|_| s.parse::<DateTime<Utc>>())
211            .map_err(de::Error::custom),
212        serde_json::Value::Number(n) => {
213            let ms = n.as_i64().ok_or_else(|| de::Error::custom("expected i64"))?;
214            Utc.timestamp_millis_opt(ms)
215                .single()
216                .ok_or_else(|| de::Error::custom(format!("invalid ms timestamp: {ms}")))
217        }
218        serde_json::Value::Null => Ok(DateTime::UNIX_EPOCH),
219        _ => Err(de::Error::custom("expected string, integer, or null")),
220    }
221}
222
223/// Deserialize an `Option<DateTime<Utc>>` from either:
224/// - An RFC 3339 string (new format)
225/// - An integer (milliseconds since epoch, legacy format)
226/// - `null` / missing -> `None`
227fn deserialize_optional_datetime_or_ms<'de, D>(
228    deserializer: D,
229) -> Result<Option<DateTime<Utc>>, D::Error>
230where
231    D: serde::Deserializer<'de>,
232{
233    use serde::de;
234
235    let value: Option<serde_json::Value> = Option::deserialize(deserializer)?;
236    match value {
237        None | Some(serde_json::Value::Null) => Ok(None),
238        Some(serde_json::Value::String(s)) => DateTime::parse_from_rfc3339(&s)
239            .map(|dt| Some(dt.with_timezone(&Utc)))
240            .or_else(|_| s.parse::<DateTime<Utc>>().map(Some))
241            .map_err(de::Error::custom),
242        Some(serde_json::Value::Number(n)) => {
243            let ms = n.as_i64().ok_or_else(|| de::Error::custom("expected i64"))?;
244            Ok(Utc.timestamp_millis_opt(ms).single())
245        }
246        _ => Err(de::Error::custom("expected string, integer, or null")),
247    }
248}
249
250fn default_true() -> bool {
251    true
252}
253
254/// Persistent store for cron jobs.
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct CronStore {
257    /// Schema version for forward compatibility.
258    #[serde(default = "default_version")]
259    pub version: u32,
260
261    /// All registered cron jobs.
262    #[serde(default)]
263    pub jobs: Vec<CronJob>,
264}
265
266fn default_version() -> u32 {
267    1
268}
269
270impl Default for CronStore {
271    fn default() -> Self {
272        Self {
273            version: 1,
274            jobs: Vec::new(),
275        }
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    #[test]
284    fn schedule_default() {
285        let s = CronSchedule::default();
286        assert_eq!(s.kind, ScheduleKind::Every);
287        assert!(s.at_ms.is_none());
288        assert!(s.every_ms.is_none());
289    }
290
291    #[test]
292    fn payload_default() {
293        let p = CronPayload::default();
294        assert_eq!(p.kind, PayloadKind::AgentTurn);
295        assert!(p.message.is_empty());
296        assert!(!p.deliver);
297    }
298
299    #[test]
300    fn cron_store_default() {
301        let store = CronStore::default();
302        assert_eq!(store.version, 1);
303        assert!(store.jobs.is_empty());
304    }
305
306    #[test]
307    fn cron_job_serde_roundtrip() {
308        let now = Utc::now();
309        let job = CronJob {
310            id: "job-1".into(),
311            name: "daily check".into(),
312            enabled: true,
313            schedule: CronSchedule {
314                kind: ScheduleKind::Cron,
315                at_ms: None,
316                every_ms: None,
317                expr: Some("0 9 * * *".into()),
318                tz: Some("UTC".into()),
319            },
320            payload: CronPayload {
321                kind: PayloadKind::AgentTurn,
322                message: "run daily report".into(),
323                deliver: true,
324                channel: Some("slack".into()),
325                to: Some("C123".into()),
326            },
327            state: CronJobState::default(),
328            created_at: now,
329            updated_at: now,
330            delete_after_run: false,
331        };
332        let json = serde_json::to_string(&job).unwrap();
333        let restored: CronJob = serde_json::from_str(&json).unwrap();
334        assert_eq!(restored.id, "job-1");
335        assert_eq!(restored.schedule.kind, ScheduleKind::Cron);
336        assert_eq!(restored.schedule.expr.as_deref(), Some("0 9 * * *"));
337        assert_eq!(restored.payload.channel.as_deref(), Some("slack"));
338    }
339
340    #[test]
341    fn cron_store_serde_roundtrip() {
342        let store = CronStore {
343            version: 1,
344            jobs: vec![CronJob {
345                id: "j1".into(),
346                name: "test".into(),
347                enabled: true,
348                schedule: CronSchedule::default(),
349                payload: CronPayload::default(),
350                state: CronJobState::default(),
351                created_at: DateTime::UNIX_EPOCH,
352                updated_at: DateTime::UNIX_EPOCH,
353                delete_after_run: true,
354            }],
355        };
356        let json = serde_json::to_string(&store).unwrap();
357        let restored: CronStore = serde_json::from_str(&json).unwrap();
358        assert_eq!(restored.version, 1);
359        assert_eq!(restored.jobs.len(), 1);
360        assert!(restored.jobs[0].delete_after_run);
361    }
362
363    #[test]
364    fn schedule_kind_serde() {
365        let kinds = [
366            (ScheduleKind::At, "\"at\""),
367            (ScheduleKind::Every, "\"every\""),
368            (ScheduleKind::Cron, "\"cron\""),
369        ];
370        for (kind, expected) in &kinds {
371            let json = serde_json::to_string(kind).unwrap();
372            assert_eq!(&json, expected);
373            let restored: ScheduleKind = serde_json::from_str(&json).unwrap();
374            assert_eq!(restored, *kind);
375        }
376    }
377
378    #[test]
379    fn job_status_serde() {
380        let statuses = [
381            (JobStatus::Ok, "\"ok\""),
382            (JobStatus::Error, "\"error\""),
383            (JobStatus::Skipped, "\"skipped\""),
384        ];
385        for (status, expected) in &statuses {
386            let json = serde_json::to_string(status).unwrap();
387            assert_eq!(&json, expected);
388        }
389    }
390
391    #[test]
392    fn cron_job_defaults_on_missing_fields() {
393        let json = r#"{"id": "j1", "name": "test"}"#;
394        let job: CronJob = serde_json::from_str(json).unwrap();
395        assert!(job.enabled); // default true
396        assert_eq!(job.schedule.kind, ScheduleKind::Every);
397        assert_eq!(job.payload.kind, PayloadKind::AgentTurn);
398        assert!(!job.delete_after_run);
399    }
400
401    #[test]
402    fn job_state_with_error() {
403        let now = Utc::now();
404        let state = CronJobState {
405            next_run_at: Some(now),
406            last_run_at: Some(now),
407            last_status: Some(JobStatus::Error),
408            last_error: Some("connection refused".into()),
409        };
410        let json = serde_json::to_string(&state).unwrap();
411        let restored: CronJobState = serde_json::from_str(&json).unwrap();
412        assert_eq!(restored.last_status, Some(JobStatus::Error));
413        assert_eq!(restored.last_error.as_deref(), Some("connection refused"));
414    }
415
416    #[test]
417    fn backward_compat_ms_timestamps() {
418        // Legacy format: millisecond integers.
419        let json = r#"{
420            "id": "legacy-1",
421            "name": "old-job",
422            "created_at": 1700000000000,
423            "updated_at": 1700000000000,
424            "state": {
425                "next_run_at": 1700000100000,
426                "last_run_at": 1700000000000
427            }
428        }"#;
429        let job: CronJob = serde_json::from_str(json).unwrap();
430        assert_eq!(job.id, "legacy-1");
431        assert_eq!(job.created_at.timestamp_millis(), 1_700_000_000_000);
432        assert!(job.state.next_run_at.is_some());
433        assert!(job.state.last_run_at.is_some());
434    }
435
436    #[test]
437    fn backward_compat_legacy_field_names() {
438        // Legacy JSONL may have old field names with _ms suffix.
439        // These will be ignored by the new struct (fields renamed).
440        // This test verifies the new fields parse from their new names.
441        let json = r#"{
442            "id": "j1",
443            "name": "test",
444            "created_at": "2026-01-01T00:00:00Z",
445            "updated_at": "2026-01-01T00:00:00Z"
446        }"#;
447        let job: CronJob = serde_json::from_str(json).unwrap();
448        assert_eq!(job.created_at, Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap());
449    }
450}