Skip to main content

clawft_kernel/
cron.rs

1//! Cron scheduling service for WeftOS kernel.
2//!
3//! Provides interval-based job scheduling with per-agent targeting.
4//! Jobs fire on a regular interval and dispatch IPC messages to
5//! target agents via the A2ARouter.
6
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Mutex;
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use tracing::debug;
15
16use crate::health::HealthStatus;
17use crate::process::Pid;
18use crate::service::{ServiceType, SystemService};
19
20/// A scheduled cron job.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct CronJob {
23    /// Unique job identifier.
24    pub id: String,
25    /// Human-readable name.
26    pub name: String,
27    /// Fire every N seconds.
28    pub interval_secs: u64,
29    /// Command payload to send.
30    pub command: String,
31    /// Target agent PID (None = kernel).
32    pub target_pid: Option<Pid>,
33    /// Whether the job is active.
34    pub enabled: bool,
35    /// When the job was created.
36    pub created_at: DateTime<Utc>,
37    /// Last time the job fired.
38    pub last_fired: Option<DateTime<Utc>>,
39    /// Number of times the job has fired.
40    pub fire_count: u64,
41}
42
43/// Result of a tick: which jobs fired.
44#[derive(Debug)]
45pub struct TickResult {
46    /// Job IDs that fired during this tick.
47    pub fired: Vec<String>,
48}
49
50/// Cron scheduling service.
51///
52/// Maintains a registry of interval-based jobs. The daemon calls
53/// `tick()` periodically (e.g., every second) and the service fires
54/// any overdue jobs by sending messages through the A2ARouter.
55pub struct CronService {
56    started: AtomicBool,
57    jobs: Mutex<HashMap<String, CronJob>>,
58}
59
60impl CronService {
61    pub fn new() -> Self {
62        Self {
63            started: AtomicBool::new(false),
64            jobs: Mutex::new(HashMap::new()),
65        }
66    }
67
68    /// Add a new cron job. Returns the created job.
69    pub fn add_job(
70        &self,
71        name: String,
72        interval_secs: u64,
73        command: String,
74        target_pid: Option<Pid>,
75    ) -> CronJob {
76        let job = CronJob {
77            id: uuid::Uuid::new_v4().to_string(),
78            name,
79            interval_secs,
80            command,
81            target_pid,
82            enabled: true,
83            created_at: Utc::now(),
84            last_fired: None,
85            fire_count: 0,
86        };
87
88        let mut jobs = self.jobs.lock().unwrap();
89        jobs.insert(job.id.clone(), job.clone());
90        debug!(job_id = %job.id, name = %job.name, interval = job.interval_secs, "cron job added");
91        job
92    }
93
94    /// Remove a job by ID. Returns the removed job if it existed.
95    pub fn remove_job(&self, id: &str) -> Option<CronJob> {
96        let mut jobs = self.jobs.lock().unwrap();
97        let removed = jobs.remove(id);
98        if let Some(ref j) = removed {
99            debug!(job_id = %j.id, name = %j.name, "cron job removed");
100        }
101        removed
102    }
103
104    /// List all registered jobs.
105    pub fn list_jobs(&self) -> Vec<CronJob> {
106        let jobs = self.jobs.lock().unwrap();
107        let mut list: Vec<CronJob> = jobs.values().cloned().collect();
108        list.sort_by(|a, b| a.created_at.cmp(&b.created_at));
109        list
110    }
111
112    /// Look up a single job by ID.
113    pub fn get_job(&self, id: &str) -> Option<CronJob> {
114        let jobs = self.jobs.lock().unwrap();
115        jobs.get(id).cloned()
116    }
117
118    /// Tick the scheduler — check all jobs and collect those that are overdue.
119    ///
120    /// Returns the list of job IDs that should fire. The caller is responsible
121    /// for actually dispatching the messages (to keep CronService decoupled
122    /// from async A2ARouter).
123    pub fn tick(&self) -> TickResult {
124        let now = Utc::now();
125        let mut fired = Vec::new();
126        let mut jobs = self.jobs.lock().unwrap();
127
128        for job in jobs.values_mut() {
129            if !job.enabled {
130                continue;
131            }
132
133            let should_fire = match job.last_fired {
134                None => true, // Never fired — fire immediately
135                Some(last) => {
136                    let elapsed = (now - last).num_seconds();
137                    elapsed >= job.interval_secs as i64
138                }
139            };
140
141            if should_fire {
142                job.last_fired = Some(now);
143                job.fire_count += 1;
144                fired.push(job.id.clone());
145                debug!(
146                    job_id = %job.id,
147                    name = %job.name,
148                    fire_count = job.fire_count,
149                    "cron job fired"
150                );
151            }
152        }
153
154        TickResult { fired }
155    }
156
157    /// Get a snapshot of a job's current state (for dispatching after tick).
158    pub fn job_snapshot(&self, id: &str) -> Option<CronJob> {
159        self.get_job(id)
160    }
161
162    /// Number of registered jobs.
163    pub fn job_count(&self) -> usize {
164        self.jobs.lock().unwrap().len()
165    }
166}
167
168impl Default for CronService {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174#[async_trait]
175impl SystemService for CronService {
176    fn name(&self) -> &str {
177        "cron"
178    }
179
180    fn service_type(&self) -> ServiceType {
181        ServiceType::Cron
182    }
183
184    async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
185        self.started.store(true, Ordering::Relaxed);
186        tracing::info!("cron service started");
187        Ok(())
188    }
189
190    async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
191        self.started.store(false, Ordering::Relaxed);
192        tracing::info!("cron service stopped");
193        Ok(())
194    }
195
196    async fn health_check(&self) -> HealthStatus {
197        if self.started.load(Ordering::Relaxed) {
198            HealthStatus::Healthy
199        } else {
200            HealthStatus::Degraded("not started".into())
201        }
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn add_and_list_jobs() {
211        let svc = CronService::new();
212        let j1 = svc.add_job("heartbeat".into(), 10, "ping".into(), Some(1));
213        let j2 = svc.add_job("cleanup".into(), 60, "gc".into(), None);
214
215        let jobs = svc.list_jobs();
216        assert_eq!(jobs.len(), 2);
217        assert_eq!(svc.job_count(), 2);
218
219        let fetched = svc.get_job(&j1.id).unwrap();
220        assert_eq!(fetched.name, "heartbeat");
221        assert_eq!(fetched.interval_secs, 10);
222        assert_eq!(fetched.target_pid, Some(1));
223
224        let fetched2 = svc.get_job(&j2.id).unwrap();
225        assert_eq!(fetched2.name, "cleanup");
226    }
227
228    #[test]
229    fn remove_job() {
230        let svc = CronService::new();
231        let job = svc.add_job("temp".into(), 5, "check".into(), None);
232
233        let removed = svc.remove_job(&job.id);
234        assert!(removed.is_some());
235        assert_eq!(removed.unwrap().name, "temp");
236        assert_eq!(svc.job_count(), 0);
237
238        // Remove again returns None
239        assert!(svc.remove_job(&job.id).is_none());
240    }
241
242    #[test]
243    fn tick_fires_new_jobs_immediately() {
244        let svc = CronService::new();
245        svc.add_job("fast".into(), 1, "ping".into(), None);
246
247        let result = svc.tick();
248        assert_eq!(result.fired.len(), 1);
249
250        // Verify fire_count incremented
251        let jobs = svc.list_jobs();
252        assert_eq!(jobs[0].fire_count, 1);
253        assert!(jobs[0].last_fired.is_some());
254    }
255
256    #[test]
257    fn tick_respects_interval() {
258        let svc = CronService::new();
259        let _job = svc.add_job("slow".into(), 3600, "check".into(), None);
260
261        // First tick fires (never fired before)
262        let result = svc.tick();
263        assert_eq!(result.fired.len(), 1);
264
265        // Second tick should NOT fire (interval not elapsed)
266        let result2 = svc.tick();
267        assert_eq!(result2.fired.len(), 0);
268    }
269
270    #[test]
271    fn tick_skips_disabled_jobs() {
272        let svc = CronService::new();
273        let job = svc.add_job("disabled".into(), 1, "noop".into(), None);
274
275        // Disable the job
276        {
277            let mut jobs = svc.jobs.lock().unwrap();
278            jobs.get_mut(&job.id).unwrap().enabled = false;
279        }
280
281        let result = svc.tick();
282        assert_eq!(result.fired.len(), 0);
283    }
284
285    #[test]
286    fn empty_tick() {
287        let svc = CronService::new();
288        let result = svc.tick();
289        assert!(result.fired.is_empty());
290    }
291}