1use crate::types::AgentId;
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ScheduleConfig {
18 pub agent_id: AgentId,
20 pub cron_expr: String,
22 pub observation: String,
24 pub enabled: bool,
26 pub created_at: DateTime<Utc>,
28 pub last_run: Option<DateTime<Utc>>,
30 pub run_count: u64,
32}
33
34#[derive(Debug, Clone)]
36pub struct ParsedCron {
37 pub expression: String,
39 pub next_fire: Option<DateTime<Utc>>,
41}
42
43pub fn parse_cron(expr: &str) -> Result<ParsedCron, SchedulerError> {
49 let fields: Vec<&str> = expr.split_whitespace().collect();
51 if fields.len() < 5 || fields.len() > 7 {
52 return Err(SchedulerError::InvalidCron {
53 expression: expr.to_string(),
54 message: format!("Expected 5-7 fields, got {}", fields.len()),
55 });
56 }
57
58 for (i, field) in fields.iter().enumerate() {
60 validate_cron_field(field, i).map_err(|msg| SchedulerError::InvalidCron {
61 expression: expr.to_string(),
62 message: msg,
63 })?;
64 }
65
66 Ok(ParsedCron {
67 expression: expr.to_string(),
68 next_fire: None, })
70}
71
72fn validate_cron_field(field: &str, index: usize) -> Result<(), String> {
73 let field_name = match index {
74 0 => "minute",
75 1 => "hour",
76 2 => "day-of-month",
77 3 => "month",
78 4 => "day-of-week",
79 5 => "year",
80 6 => "seconds",
81 _ => "unknown",
82 };
83
84 if field == "*" || field == "?" {
85 return Ok(());
86 }
87
88 for part in field.split(',') {
90 let part = part.trim();
91 if part.contains('/') {
92 let parts: Vec<&str> = part.split('/').collect();
93 if parts.len() != 2 {
94 return Err(format!("Invalid step in {} field: {}", field_name, part));
95 }
96 if parts[0] != "*" {
97 parts[0].parse::<u32>().map_err(|_| {
98 format!("Invalid base value in {} field: {}", field_name, parts[0])
99 })?;
100 }
101 parts[1]
102 .parse::<u32>()
103 .map_err(|_| format!("Invalid step value in {} field: {}", field_name, parts[1]))?;
104 } else if part.contains('-') {
105 let parts: Vec<&str> = part.split('-').collect();
106 if parts.len() != 2 {
107 return Err(format!("Invalid range in {} field: {}", field_name, part));
108 }
109 parts[0].parse::<u32>().map_err(|_| {
110 format!("Invalid range start in {} field: {}", field_name, parts[0])
111 })?;
112 parts[1]
113 .parse::<u32>()
114 .map_err(|_| format!("Invalid range end in {} field: {}", field_name, parts[1]))?;
115 } else {
116 part.parse::<u32>()
117 .map_err(|_| format!("Invalid value in {} field: {}", field_name, part))?;
118 }
119 }
120
121 Ok(())
122}
123
124pub struct AgentScheduler {
126 schedules: Arc<RwLock<HashMap<String, ScheduleConfig>>>,
127}
128
129impl Default for AgentScheduler {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl AgentScheduler {
136 pub fn new() -> Self {
138 Self {
139 schedules: Arc::new(RwLock::new(HashMap::new())),
140 }
141 }
142
143 pub async fn schedule(
145 &self,
146 name: impl Into<String>,
147 agent_id: AgentId,
148 cron_expr: impl Into<String>,
149 observation: impl Into<String>,
150 ) -> Result<(), SchedulerError> {
151 let cron_expr = cron_expr.into();
152
153 parse_cron(&cron_expr)?;
155
156 let config = ScheduleConfig {
157 agent_id,
158 cron_expr,
159 observation: observation.into(),
160 enabled: true,
161 created_at: Utc::now(),
162 last_run: None,
163 run_count: 0,
164 };
165
166 self.schedules.write().await.insert(name.into(), config);
167 Ok(())
168 }
169
170 pub async fn get_schedule(&self, name: &str) -> Option<ScheduleConfig> {
172 self.schedules.read().await.get(name).cloned()
173 }
174
175 pub async fn list_schedules(&self) -> Vec<(String, ScheduleConfig)> {
177 self.schedules
178 .read()
179 .await
180 .iter()
181 .map(|(k, v)| (k.clone(), v.clone()))
182 .collect()
183 }
184
185 pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
187 if let Some(config) = self.schedules.write().await.get_mut(name) {
188 config.enabled = enabled;
189 true
190 } else {
191 false
192 }
193 }
194
195 pub async fn remove_schedule(&self, name: &str) -> bool {
197 self.schedules.write().await.remove(name).is_some()
198 }
199
200 pub async fn record_execution(&self, name: &str) -> bool {
202 if let Some(config) = self.schedules.write().await.get_mut(name) {
203 config.last_run = Some(Utc::now());
204 config.run_count += 1;
205 true
206 } else {
207 false
208 }
209 }
210
211 pub async fn due_schedules(&self) -> Vec<(String, ScheduleConfig)> {
213 self.schedules
214 .read()
215 .await
216 .iter()
217 .filter(|(_, config)| config.enabled)
218 .map(|(name, config)| (name.clone(), config.clone()))
219 .collect()
220 }
221}
222
223#[derive(Debug, thiserror::Error)]
225pub enum SchedulerError {
226 #[error("Invalid cron expression '{expression}': {message}")]
227 InvalidCron { expression: String, message: String },
228
229 #[error("Schedule '{name}' not found")]
230 NotFound { name: String },
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn test_parse_cron_valid_5_field() {
239 let result = parse_cron("0 * * * *");
240 assert!(result.is_ok());
241 assert_eq!(result.unwrap().expression, "0 * * * *");
242 }
243
244 #[test]
245 fn test_parse_cron_valid_with_ranges() {
246 assert!(parse_cron("0 9-17 * * 1-5").is_ok());
247 }
248
249 #[test]
250 fn test_parse_cron_valid_with_steps() {
251 assert!(parse_cron("*/15 * * * *").is_ok());
252 }
253
254 #[test]
255 fn test_parse_cron_valid_with_lists() {
256 assert!(parse_cron("0 0 1,15 * *").is_ok());
257 }
258
259 #[test]
260 fn test_parse_cron_invalid_too_few_fields() {
261 let result = parse_cron("0 *");
262 assert!(result.is_err());
263 assert!(result.unwrap_err().to_string().contains("5-7 fields"));
264 }
265
266 #[test]
267 fn test_parse_cron_invalid_field_value() {
268 let result = parse_cron("abc * * * *");
269 assert!(result.is_err());
270 }
271
272 #[test]
273 fn test_parse_cron_valid_6_field() {
274 assert!(parse_cron("0 30 9 * * 1-5").is_ok());
275 }
276
277 #[tokio::test]
278 async fn test_scheduler_add_and_get() {
279 let scheduler = AgentScheduler::new();
280 let agent_id = AgentId::new();
281
282 scheduler
283 .schedule("daily_check", agent_id, "0 9 * * *", "Run daily analysis")
284 .await
285 .unwrap();
286
287 let config = scheduler.get_schedule("daily_check").await.unwrap();
288 assert_eq!(config.agent_id, agent_id);
289 assert_eq!(config.cron_expr, "0 9 * * *");
290 assert!(config.enabled);
291 assert_eq!(config.run_count, 0);
292 }
293
294 #[tokio::test]
295 async fn test_scheduler_list() {
296 let scheduler = AgentScheduler::new();
297
298 scheduler
299 .schedule("a", AgentId::new(), "0 * * * *", "task a")
300 .await
301 .unwrap();
302 scheduler
303 .schedule("b", AgentId::new(), "*/5 * * * *", "task b")
304 .await
305 .unwrap();
306
307 let schedules = scheduler.list_schedules().await;
308 assert_eq!(schedules.len(), 2);
309 }
310
311 #[tokio::test]
312 async fn test_scheduler_enable_disable() {
313 let scheduler = AgentScheduler::new();
314
315 scheduler
316 .schedule("job", AgentId::new(), "0 * * * *", "task")
317 .await
318 .unwrap();
319
320 assert!(scheduler.set_enabled("job", false).await);
321 assert!(!scheduler.get_schedule("job").await.unwrap().enabled);
322
323 assert!(scheduler.set_enabled("job", true).await);
324 assert!(scheduler.get_schedule("job").await.unwrap().enabled);
325
326 assert!(!scheduler.set_enabled("nonexistent", false).await);
327 }
328
329 #[tokio::test]
330 async fn test_scheduler_remove() {
331 let scheduler = AgentScheduler::new();
332
333 scheduler
334 .schedule("temp", AgentId::new(), "0 * * * *", "task")
335 .await
336 .unwrap();
337
338 assert!(scheduler.remove_schedule("temp").await);
339 assert!(scheduler.get_schedule("temp").await.is_none());
340 assert!(!scheduler.remove_schedule("temp").await);
341 }
342
343 #[tokio::test]
344 async fn test_scheduler_record_execution() {
345 let scheduler = AgentScheduler::new();
346
347 scheduler
348 .schedule("job", AgentId::new(), "0 * * * *", "task")
349 .await
350 .unwrap();
351
352 assert!(scheduler.record_execution("job").await);
353 let config = scheduler.get_schedule("job").await.unwrap();
354 assert_eq!(config.run_count, 1);
355 assert!(config.last_run.is_some());
356
357 assert!(scheduler.record_execution("job").await);
358 let config = scheduler.get_schedule("job").await.unwrap();
359 assert_eq!(config.run_count, 2);
360
361 assert!(!scheduler.record_execution("nonexistent").await);
362 }
363
364 #[tokio::test]
365 async fn test_scheduler_due_schedules() {
366 let scheduler = AgentScheduler::new();
367
368 scheduler
369 .schedule("enabled", AgentId::new(), "0 * * * *", "task")
370 .await
371 .unwrap();
372 scheduler
373 .schedule("disabled", AgentId::new(), "0 * * * *", "task")
374 .await
375 .unwrap();
376 scheduler.set_enabled("disabled", false).await;
377
378 let due = scheduler.due_schedules().await;
379 assert_eq!(due.len(), 1);
380 assert_eq!(due[0].0, "enabled");
381 }
382
383 #[tokio::test]
384 async fn test_scheduler_invalid_cron_rejected() {
385 let scheduler = AgentScheduler::new();
386
387 let result = scheduler
388 .schedule("bad", AgentId::new(), "invalid cron", "task")
389 .await;
390 assert!(result.is_err());
391 }
392
393 #[test]
394 fn test_schedule_config_serialization() {
395 let config = ScheduleConfig {
396 agent_id: AgentId::new(),
397 cron_expr: "0 9 * * *".into(),
398 observation: "Run analysis".into(),
399 enabled: true,
400 created_at: Utc::now(),
401 last_run: None,
402 run_count: 0,
403 };
404
405 let json = serde_json::to_string(&config).unwrap();
406 let restored: ScheduleConfig = serde_json::from_str(&json).unwrap();
407 assert_eq!(restored.cron_expr, "0 9 * * *");
408 assert!(restored.enabled);
409 }
410}