use anyhow::Result;
use chrono::{DateTime, Utc, Datelike, Timelike};
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tokio::time::{interval, Duration};
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct ScheduleTick {
pub rule_id: String,
pub target_agent_id: String,
pub triggered_at: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub enum ScheduleType {
Interval { seconds: u64 },
Cron { cron_expression: String },
}
#[derive(Debug, Clone)]
pub struct ScheduleRule {
pub id: String,
pub schedule_type: ScheduleType,
pub target_agent_id: String,
pub enabled: bool,
pub last_triggered: Option<DateTime<Utc>>,
pub tags: Vec<String>,
}
impl ScheduleRule {
pub fn new_interval(
id: impl Into<String>,
seconds: u64,
target_agent_id: impl Into<String>,
) -> Self {
Self {
id: id.into(),
schedule_type: ScheduleType::Interval { seconds },
target_agent_id: target_agent_id.into(),
enabled: true,
last_triggered: None,
tags: vec![],
}
}
pub fn new_cron(
id: impl Into<String>,
cron_expression: impl Into<String>,
target_agent_id: impl Into<String>,
) -> Self {
Self {
id: id.into(),
schedule_type: ScheduleType::Cron {
cron_expression: cron_expression.into(),
},
target_agent_id: target_agent_id.into(),
enabled: true,
last_triggered: None,
tags: vec![],
}
}
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}
pub fn should_trigger(&self, now: DateTime<Utc>) -> bool {
if !self.enabled {
return false;
}
match &self.schedule_type {
ScheduleType::Interval { seconds } => {
match self.last_triggered {
Some(last) => {
let elapsed = now.signed_duration_since(last).num_seconds() as u64;
elapsed >= *seconds
}
None => true, }
}
ScheduleType::Cron { cron_expression } => {
Self::matches_cron(now, cron_expression)
}
}
}
fn matches_cron(now: DateTime<Utc>, cron_expression: &str) -> bool {
let parts: Vec<&str> = cron_expression.split_whitespace().collect();
if parts.len() != 5 {
warn!("Invalid cron expression: {}", cron_expression);
return false;
}
let minute = now.minute();
let hour = now.hour();
let day = now.day();
let month = now.month();
let weekday = now.weekday().num_days_from_sunday();
let matches_field = |field: &str, value: u32| -> bool {
if field == "*" {
return true;
}
if field.contains(',') {
return field
.split(',')
.filter_map(|s| s.parse::<u32>().ok())
.any(|v| v == value);
}
if field.contains('-') {
if let Some((start, end)) = field.split_once('-') {
if let (Ok(start), Ok(end)) = (start.parse::<u32>(), end.parse::<u32>()) {
return value >= start && value <= end;
}
}
}
if field.starts_with("*/") {
if let Some(step) = field.strip_prefix("*/").and_then(|s| s.parse::<u32>().ok()) {
if step > 0 {
return value % step == 0;
}
}
}
field.parse::<u32>() == Ok(value)
};
matches_field(parts[0], minute)
&& matches_field(parts[1], hour)
&& matches_field(parts[2], day)
&& matches_field(parts[3], month)
&& matches_field(parts[4], weekday)
}
pub fn mark_triggered(&mut self) {
self.last_triggered = Some(Utc::now());
}
}
pub struct ScheduleManager {
rules: DashMap<String, ScheduleRule>,
tx: broadcast::Sender<ScheduleTick>,
running: Arc<RwLock<bool>>,
}
impl ScheduleManager {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(100);
Self {
rules: DashMap::new(),
tx,
running: Arc::new(RwLock::new(false)),
}
}
pub fn register_rule(&self, rule: ScheduleRule) -> Result<()> {
let rule_id = rule.id.clone();
self.rules.insert(rule.id.clone(), rule);
info!("Registered schedule rule: {}", rule_id);
Ok(())
}
pub fn remove_rule(&self, rule_id: &str) -> Option<ScheduleRule> {
self.rules.remove(rule_id).map(|(_, rule)| rule)
}
pub fn get_rule(&self, rule_id: &str) -> Option<ScheduleRule> {
self.rules.get(rule_id).map(|r| r.clone())
}
pub fn has_rule(&self, rule_id: &str) -> bool {
self.rules.contains_key(rule_id)
}
pub fn list_rules(&self) -> Vec<ScheduleRule> {
self.rules.iter().map(|r| r.clone()).collect()
}
pub fn set_rule_enabled(&self, rule_id: &str, enabled: bool) -> bool {
if let Some(mut rule) = self.rules.get_mut(rule_id) {
rule.enabled = enabled;
info!(
"Schedule rule {} is now {}",
rule_id,
if enabled { "enabled" } else { "disabled" }
);
true
} else {
false
}
}
pub fn subscribe(&self) -> broadcast::Receiver<ScheduleTick> {
self.tx.subscribe()
}
pub fn start(&self) {
let rules = self.rules.clone();
let tx = self.tx.clone();
let running = self.running.clone();
tokio::spawn(async move {
*running.write().await = true;
info!("Schedule manager started");
let mut check_interval = interval(Duration::from_secs(1));
loop {
check_interval.tick().await;
if !*running.read().await {
break;
}
let now = Utc::now();
let mut triggered_count = 0;
for mut rule_ref in rules.iter_mut() {
let rule = rule_ref.value_mut();
if rule.should_trigger(now) {
let tick = ScheduleTick {
rule_id: rule.id.clone(),
target_agent_id: rule.target_agent_id.clone(),
triggered_at: now,
};
if let Err(e) = tx.send(tick) {
error!("Failed to send schedule tick: {}", e);
} else {
debug!("Schedule rule triggered: {}", rule.id);
rule.mark_triggered();
triggered_count += 1;
}
}
}
if triggered_count > 0 {
debug!("Triggered {} schedule rules", triggered_count);
}
}
info!("Schedule manager stopped");
});
}
pub async fn stop(&self) {
*self.running.write().await = false;
info!("Schedule manager stopping...");
}
pub async fn is_running(&self) -> bool {
*self.running.read().await
}
}
impl Default for ScheduleManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_interval_rule_should_trigger() {
let mut rule = ScheduleRule::new_interval("test", 60, "agent1");
assert!(rule.should_trigger(Utc::now()));
rule.last_triggered = Some(Utc::now());
assert!(!rule.should_trigger(Utc::now()));
rule.enabled = false;
assert!(!rule.should_trigger(Utc::now()));
}
#[test]
fn test_cron_expression_matching() {
let now = Utc::now();
assert!(ScheduleRule::matches_cron(now, "* * * * *"));
let cron_hourly = format!("{} * * * *", now.minute());
assert!(ScheduleRule::matches_cron(now, &cron_hourly));
let cron_exact = format!("{} {} * * *", now.minute(), now.hour());
assert!(ScheduleRule::matches_cron(now, &cron_exact));
}
#[tokio::test]
async fn test_schedule_manager() {
let manager = ScheduleManager::new();
let rule = ScheduleRule::new_interval("test_interval", 1, "test_agent");
manager.register_rule(rule).unwrap();
let mut rx = manager.subscribe();
manager.start();
let tick = tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.expect("Timeout waiting for schedule tick")
.expect("Channel closed");
assert_eq!(tick.rule_id, "test_interval");
assert_eq!(tick.target_agent_id, "test_agent");
manager.stop().await;
}
}