Skip to main content

symbi_runtime/reasoning/
scheduler.rs

1//! Agent scheduler for durable cron-based execution
2//!
3//! Provides scheduled agent execution with cron expressions and
4//! durable sleep between runs.
5//!
6//! Feature-gated behind `cron`.
7
8use crate::types::AgentId;
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15/// Configuration for a scheduled agent.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ScheduleConfig {
18    /// Agent to execute.
19    pub agent_id: AgentId,
20    /// Cron expression (standard 5-field or extended 6-field).
21    pub cron_expr: String,
22    /// Initial observation/prompt for each run.
23    pub observation: String,
24    /// Whether the schedule is enabled.
25    pub enabled: bool,
26    /// When this schedule was created.
27    pub created_at: DateTime<Utc>,
28    /// When this schedule was last executed.
29    pub last_run: Option<DateTime<Utc>>,
30    /// Number of times this schedule has run.
31    pub run_count: u64,
32}
33
34/// Result of parsing and validating a cron expression.
35#[derive(Debug, Clone)]
36pub struct ParsedCron {
37    /// The original cron expression.
38    pub expression: String,
39    /// Next scheduled time from now.
40    pub next_fire: Option<DateTime<Utc>>,
41}
42
43/// Parse a cron expression and compute the next fire time.
44///
45/// Supports standard 5-field cron (minute, hour, day-of-month, month, day-of-week).
46/// Returns a `ParsedCron` with the expression and next fire time, or an error
47/// if the expression is invalid.
48pub fn parse_cron(expr: &str) -> Result<ParsedCron, SchedulerError> {
49    // Validate by checking field count (5 or 6 fields expected)
50    let fields: Vec<&str> = expr.split_whitespace().collect();
51    if fields.len() < 5 || fields.len() > 7 {
52        return Err(SchedulerError::InvalidCron {
53            expression: expr.to_string(),
54            message: format!("Expected 5-7 fields, got {}", fields.len()),
55        });
56    }
57
58    // Validate individual fields
59    for (i, field) in fields.iter().enumerate() {
60        validate_cron_field(field, i).map_err(|msg| SchedulerError::InvalidCron {
61            expression: expr.to_string(),
62            message: msg,
63        })?;
64    }
65
66    Ok(ParsedCron {
67        expression: expr.to_string(),
68        next_fire: None, // Full cron scheduling requires the `cron` crate at runtime
69    })
70}
71
72fn validate_cron_field(field: &str, index: usize) -> Result<(), String> {
73    let field_name = match index {
74        0 => "minute",
75        1 => "hour",
76        2 => "day-of-month",
77        3 => "month",
78        4 => "day-of-week",
79        5 => "year",
80        6 => "seconds",
81        _ => "unknown",
82    };
83
84    if field == "*" || field == "?" {
85        return Ok(());
86    }
87
88    // Handle ranges (e.g., "1-5"), lists (e.g., "1,3,5"), and steps (e.g., "*/5")
89    for part in field.split(',') {
90        let part = part.trim();
91        if part.contains('/') {
92            let parts: Vec<&str> = part.split('/').collect();
93            if parts.len() != 2 {
94                return Err(format!("Invalid step in {} field: {}", field_name, part));
95            }
96            if parts[0] != "*" {
97                parts[0].parse::<u32>().map_err(|_| {
98                    format!("Invalid base value in {} field: {}", field_name, parts[0])
99                })?;
100            }
101            parts[1]
102                .parse::<u32>()
103                .map_err(|_| format!("Invalid step value in {} field: {}", field_name, parts[1]))?;
104        } else if part.contains('-') {
105            let parts: Vec<&str> = part.split('-').collect();
106            if parts.len() != 2 {
107                return Err(format!("Invalid range in {} field: {}", field_name, part));
108            }
109            parts[0].parse::<u32>().map_err(|_| {
110                format!("Invalid range start in {} field: {}", field_name, parts[0])
111            })?;
112            parts[1]
113                .parse::<u32>()
114                .map_err(|_| format!("Invalid range end in {} field: {}", field_name, parts[1]))?;
115        } else {
116            part.parse::<u32>()
117                .map_err(|_| format!("Invalid value in {} field: {}", field_name, part))?;
118        }
119    }
120
121    Ok(())
122}
123
124/// Manages scheduled agent executions.
125pub struct AgentScheduler {
126    schedules: Arc<RwLock<HashMap<String, ScheduleConfig>>>,
127}
128
129impl Default for AgentScheduler {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl AgentScheduler {
136    /// Create a new scheduler.
137    pub fn new() -> Self {
138        Self {
139            schedules: Arc::new(RwLock::new(HashMap::new())),
140        }
141    }
142
143    /// Add a scheduled agent execution.
144    pub async fn schedule(
145        &self,
146        name: impl Into<String>,
147        agent_id: AgentId,
148        cron_expr: impl Into<String>,
149        observation: impl Into<String>,
150    ) -> Result<(), SchedulerError> {
151        let cron_expr = cron_expr.into();
152
153        // Validate the cron expression
154        parse_cron(&cron_expr)?;
155
156        let config = ScheduleConfig {
157            agent_id,
158            cron_expr,
159            observation: observation.into(),
160            enabled: true,
161            created_at: Utc::now(),
162            last_run: None,
163            run_count: 0,
164        };
165
166        self.schedules.write().await.insert(name.into(), config);
167        Ok(())
168    }
169
170    /// Get a schedule by name.
171    pub async fn get_schedule(&self, name: &str) -> Option<ScheduleConfig> {
172        self.schedules.read().await.get(name).cloned()
173    }
174
175    /// List all schedules.
176    pub async fn list_schedules(&self) -> Vec<(String, ScheduleConfig)> {
177        self.schedules
178            .read()
179            .await
180            .iter()
181            .map(|(k, v)| (k.clone(), v.clone()))
182            .collect()
183    }
184
185    /// Enable or disable a schedule.
186    pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
187        if let Some(config) = self.schedules.write().await.get_mut(name) {
188            config.enabled = enabled;
189            true
190        } else {
191            false
192        }
193    }
194
195    /// Remove a schedule.
196    pub async fn remove_schedule(&self, name: &str) -> bool {
197        self.schedules.write().await.remove(name).is_some()
198    }
199
200    /// Record that a schedule has been executed.
201    pub async fn record_execution(&self, name: &str) -> bool {
202        if let Some(config) = self.schedules.write().await.get_mut(name) {
203            config.last_run = Some(Utc::now());
204            config.run_count += 1;
205            true
206        } else {
207            false
208        }
209    }
210
211    /// Get schedules that are due for execution.
212    pub async fn due_schedules(&self) -> Vec<(String, ScheduleConfig)> {
213        self.schedules
214            .read()
215            .await
216            .iter()
217            .filter(|(_, config)| config.enabled)
218            .map(|(name, config)| (name.clone(), config.clone()))
219            .collect()
220    }
221}
222
223/// Errors from the scheduler.
224#[derive(Debug, thiserror::Error)]
225pub enum SchedulerError {
226    #[error("Invalid cron expression '{expression}': {message}")]
227    InvalidCron { expression: String, message: String },
228
229    #[error("Schedule '{name}' not found")]
230    NotFound { name: String },
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn test_parse_cron_valid_5_field() {
239        let result = parse_cron("0 * * * *");
240        assert!(result.is_ok());
241        assert_eq!(result.unwrap().expression, "0 * * * *");
242    }
243
244    #[test]
245    fn test_parse_cron_valid_with_ranges() {
246        assert!(parse_cron("0 9-17 * * 1-5").is_ok());
247    }
248
249    #[test]
250    fn test_parse_cron_valid_with_steps() {
251        assert!(parse_cron("*/15 * * * *").is_ok());
252    }
253
254    #[test]
255    fn test_parse_cron_valid_with_lists() {
256        assert!(parse_cron("0 0 1,15 * *").is_ok());
257    }
258
259    #[test]
260    fn test_parse_cron_invalid_too_few_fields() {
261        let result = parse_cron("0 *");
262        assert!(result.is_err());
263        assert!(result.unwrap_err().to_string().contains("5-7 fields"));
264    }
265
266    #[test]
267    fn test_parse_cron_invalid_field_value() {
268        let result = parse_cron("abc * * * *");
269        assert!(result.is_err());
270    }
271
272    #[test]
273    fn test_parse_cron_valid_6_field() {
274        assert!(parse_cron("0 30 9 * * 1-5").is_ok());
275    }
276
277    #[tokio::test]
278    async fn test_scheduler_add_and_get() {
279        let scheduler = AgentScheduler::new();
280        let agent_id = AgentId::new();
281
282        scheduler
283            .schedule("daily_check", agent_id, "0 9 * * *", "Run daily analysis")
284            .await
285            .unwrap();
286
287        let config = scheduler.get_schedule("daily_check").await.unwrap();
288        assert_eq!(config.agent_id, agent_id);
289        assert_eq!(config.cron_expr, "0 9 * * *");
290        assert!(config.enabled);
291        assert_eq!(config.run_count, 0);
292    }
293
294    #[tokio::test]
295    async fn test_scheduler_list() {
296        let scheduler = AgentScheduler::new();
297
298        scheduler
299            .schedule("a", AgentId::new(), "0 * * * *", "task a")
300            .await
301            .unwrap();
302        scheduler
303            .schedule("b", AgentId::new(), "*/5 * * * *", "task b")
304            .await
305            .unwrap();
306
307        let schedules = scheduler.list_schedules().await;
308        assert_eq!(schedules.len(), 2);
309    }
310
311    #[tokio::test]
312    async fn test_scheduler_enable_disable() {
313        let scheduler = AgentScheduler::new();
314
315        scheduler
316            .schedule("job", AgentId::new(), "0 * * * *", "task")
317            .await
318            .unwrap();
319
320        assert!(scheduler.set_enabled("job", false).await);
321        assert!(!scheduler.get_schedule("job").await.unwrap().enabled);
322
323        assert!(scheduler.set_enabled("job", true).await);
324        assert!(scheduler.get_schedule("job").await.unwrap().enabled);
325
326        assert!(!scheduler.set_enabled("nonexistent", false).await);
327    }
328
329    #[tokio::test]
330    async fn test_scheduler_remove() {
331        let scheduler = AgentScheduler::new();
332
333        scheduler
334            .schedule("temp", AgentId::new(), "0 * * * *", "task")
335            .await
336            .unwrap();
337
338        assert!(scheduler.remove_schedule("temp").await);
339        assert!(scheduler.get_schedule("temp").await.is_none());
340        assert!(!scheduler.remove_schedule("temp").await);
341    }
342
343    #[tokio::test]
344    async fn test_scheduler_record_execution() {
345        let scheduler = AgentScheduler::new();
346
347        scheduler
348            .schedule("job", AgentId::new(), "0 * * * *", "task")
349            .await
350            .unwrap();
351
352        assert!(scheduler.record_execution("job").await);
353        let config = scheduler.get_schedule("job").await.unwrap();
354        assert_eq!(config.run_count, 1);
355        assert!(config.last_run.is_some());
356
357        assert!(scheduler.record_execution("job").await);
358        let config = scheduler.get_schedule("job").await.unwrap();
359        assert_eq!(config.run_count, 2);
360
361        assert!(!scheduler.record_execution("nonexistent").await);
362    }
363
364    #[tokio::test]
365    async fn test_scheduler_due_schedules() {
366        let scheduler = AgentScheduler::new();
367
368        scheduler
369            .schedule("enabled", AgentId::new(), "0 * * * *", "task")
370            .await
371            .unwrap();
372        scheduler
373            .schedule("disabled", AgentId::new(), "0 * * * *", "task")
374            .await
375            .unwrap();
376        scheduler.set_enabled("disabled", false).await;
377
378        let due = scheduler.due_schedules().await;
379        assert_eq!(due.len(), 1);
380        assert_eq!(due[0].0, "enabled");
381    }
382
383    #[tokio::test]
384    async fn test_scheduler_invalid_cron_rejected() {
385        let scheduler = AgentScheduler::new();
386
387        let result = scheduler
388            .schedule("bad", AgentId::new(), "invalid cron", "task")
389            .await;
390        assert!(result.is_err());
391    }
392
393    #[test]
394    fn test_schedule_config_serialization() {
395        let config = ScheduleConfig {
396            agent_id: AgentId::new(),
397            cron_expr: "0 9 * * *".into(),
398            observation: "Run analysis".into(),
399            enabled: true,
400            created_at: Utc::now(),
401            last_run: None,
402            run_count: 0,
403        };
404
405        let json = serde_json::to_string(&config).unwrap();
406        let restored: ScheduleConfig = serde_json::from_str(&json).unwrap();
407        assert_eq!(restored.cron_expr, "0 9 * * *");
408        assert!(restored.enabled);
409    }
410}