1use std::collections::HashMap;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Mutex;
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use tracing::debug;
15
16use crate::health::HealthStatus;
17use crate::process::Pid;
18use crate::service::{ServiceType, SystemService};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct CronJob {
23 pub id: String,
25 pub name: String,
27 pub interval_secs: u64,
29 pub command: String,
31 pub target_pid: Option<Pid>,
33 pub enabled: bool,
35 pub created_at: DateTime<Utc>,
37 pub last_fired: Option<DateTime<Utc>>,
39 pub fire_count: u64,
41}
42
43#[derive(Debug)]
45pub struct TickResult {
46 pub fired: Vec<String>,
48}
49
50pub struct CronService {
56 started: AtomicBool,
57 jobs: Mutex<HashMap<String, CronJob>>,
58}
59
60impl CronService {
61 pub fn new() -> Self {
62 Self {
63 started: AtomicBool::new(false),
64 jobs: Mutex::new(HashMap::new()),
65 }
66 }
67
68 pub fn add_job(
70 &self,
71 name: String,
72 interval_secs: u64,
73 command: String,
74 target_pid: Option<Pid>,
75 ) -> CronJob {
76 let job = CronJob {
77 id: uuid::Uuid::new_v4().to_string(),
78 name,
79 interval_secs,
80 command,
81 target_pid,
82 enabled: true,
83 created_at: Utc::now(),
84 last_fired: None,
85 fire_count: 0,
86 };
87
88 let mut jobs = self.jobs.lock().unwrap();
89 jobs.insert(job.id.clone(), job.clone());
90 debug!(job_id = %job.id, name = %job.name, interval = job.interval_secs, "cron job added");
91 job
92 }
93
94 pub fn remove_job(&self, id: &str) -> Option<CronJob> {
96 let mut jobs = self.jobs.lock().unwrap();
97 let removed = jobs.remove(id);
98 if let Some(ref j) = removed {
99 debug!(job_id = %j.id, name = %j.name, "cron job removed");
100 }
101 removed
102 }
103
104 pub fn list_jobs(&self) -> Vec<CronJob> {
106 let jobs = self.jobs.lock().unwrap();
107 let mut list: Vec<CronJob> = jobs.values().cloned().collect();
108 list.sort_by(|a, b| a.created_at.cmp(&b.created_at));
109 list
110 }
111
112 pub fn get_job(&self, id: &str) -> Option<CronJob> {
114 let jobs = self.jobs.lock().unwrap();
115 jobs.get(id).cloned()
116 }
117
118 pub fn tick(&self) -> TickResult {
124 let now = Utc::now();
125 let mut fired = Vec::new();
126 let mut jobs = self.jobs.lock().unwrap();
127
128 for job in jobs.values_mut() {
129 if !job.enabled {
130 continue;
131 }
132
133 let should_fire = match job.last_fired {
134 None => true, Some(last) => {
136 let elapsed = (now - last).num_seconds();
137 elapsed >= job.interval_secs as i64
138 }
139 };
140
141 if should_fire {
142 job.last_fired = Some(now);
143 job.fire_count += 1;
144 fired.push(job.id.clone());
145 debug!(
146 job_id = %job.id,
147 name = %job.name,
148 fire_count = job.fire_count,
149 "cron job fired"
150 );
151 }
152 }
153
154 TickResult { fired }
155 }
156
157 pub fn job_snapshot(&self, id: &str) -> Option<CronJob> {
159 self.get_job(id)
160 }
161
162 pub fn job_count(&self) -> usize {
164 self.jobs.lock().unwrap().len()
165 }
166}
167
168impl Default for CronService {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174#[async_trait]
175impl SystemService for CronService {
176 fn name(&self) -> &str {
177 "cron"
178 }
179
180 fn service_type(&self) -> ServiceType {
181 ServiceType::Cron
182 }
183
184 async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
185 self.started.store(true, Ordering::Relaxed);
186 tracing::info!("cron service started");
187 Ok(())
188 }
189
190 async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
191 self.started.store(false, Ordering::Relaxed);
192 tracing::info!("cron service stopped");
193 Ok(())
194 }
195
196 async fn health_check(&self) -> HealthStatus {
197 if self.started.load(Ordering::Relaxed) {
198 HealthStatus::Healthy
199 } else {
200 HealthStatus::Degraded("not started".into())
201 }
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[test]
210 fn add_and_list_jobs() {
211 let svc = CronService::new();
212 let j1 = svc.add_job("heartbeat".into(), 10, "ping".into(), Some(1));
213 let j2 = svc.add_job("cleanup".into(), 60, "gc".into(), None);
214
215 let jobs = svc.list_jobs();
216 assert_eq!(jobs.len(), 2);
217 assert_eq!(svc.job_count(), 2);
218
219 let fetched = svc.get_job(&j1.id).unwrap();
220 assert_eq!(fetched.name, "heartbeat");
221 assert_eq!(fetched.interval_secs, 10);
222 assert_eq!(fetched.target_pid, Some(1));
223
224 let fetched2 = svc.get_job(&j2.id).unwrap();
225 assert_eq!(fetched2.name, "cleanup");
226 }
227
228 #[test]
229 fn remove_job() {
230 let svc = CronService::new();
231 let job = svc.add_job("temp".into(), 5, "check".into(), None);
232
233 let removed = svc.remove_job(&job.id);
234 assert!(removed.is_some());
235 assert_eq!(removed.unwrap().name, "temp");
236 assert_eq!(svc.job_count(), 0);
237
238 assert!(svc.remove_job(&job.id).is_none());
240 }
241
242 #[test]
243 fn tick_fires_new_jobs_immediately() {
244 let svc = CronService::new();
245 svc.add_job("fast".into(), 1, "ping".into(), None);
246
247 let result = svc.tick();
248 assert_eq!(result.fired.len(), 1);
249
250 let jobs = svc.list_jobs();
252 assert_eq!(jobs[0].fire_count, 1);
253 assert!(jobs[0].last_fired.is_some());
254 }
255
256 #[test]
257 fn tick_respects_interval() {
258 let svc = CronService::new();
259 let _job = svc.add_job("slow".into(), 3600, "check".into(), None);
260
261 let result = svc.tick();
263 assert_eq!(result.fired.len(), 1);
264
265 let result2 = svc.tick();
267 assert_eq!(result2.fired.len(), 0);
268 }
269
270 #[test]
271 fn tick_skips_disabled_jobs() {
272 let svc = CronService::new();
273 let job = svc.add_job("disabled".into(), 1, "noop".into(), None);
274
275 {
277 let mut jobs = svc.jobs.lock().unwrap();
278 jobs.get_mut(&job.id).unwrap().enabled = false;
279 }
280
281 let result = svc.tick();
282 assert_eq!(result.fired.len(), 0);
283 }
284
285 #[test]
286 fn empty_tick() {
287 let svc = CronService::new();
288 let result = svc.tick();
289 assert!(result.fired.is_empty());
290 }
291}