use std::str::FromStr;
use adk_core::{AdkError, Result};
use async_trait::async_trait;
use cron::Schedule;
use futures::stream::BoxStream;
use tokio::time::sleep;
use super::event_source::{EventSource, TriggerEvent};
pub struct CronTrigger {
expression: String,
schedule: Schedule,
name: String,
}
impl CronTrigger {
pub fn new(expression: &str) -> Result<Self> {
let schedule = Schedule::from_str(expression)
.map_err(|e| AdkError::agent(format!("invalid cron expression: {e}")))?;
Ok(Self {
expression: expression.to_string(),
schedule,
name: format!("cron:{expression}"),
})
}
}
#[async_trait]
impl EventSource for CronTrigger {
fn name(&self) -> &str {
&self.name
}
async fn subscribe(&self) -> Result<BoxStream<'static, TriggerEvent>> {
let schedule = self.schedule.clone();
let source_name = self.name.clone();
let expression = self.expression.clone();
let stream = async_stream::stream! {
loop {
let now = chrono::Utc::now();
let next = schedule.upcoming(chrono::Utc).next();
let Some(next_tick) = next else {
break;
};
let duration = (next_tick - now).to_std().unwrap_or_default();
sleep(duration).await;
yield TriggerEvent {
source: source_name.clone(),
payload: serde_json::json!({
"expression": expression,
"tick": chrono::Utc::now().to_rfc3339(),
}),
};
}
};
Ok(Box::pin(stream))
}
}
impl std::fmt::Debug for CronTrigger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CronTrigger").field("expression", &self.expression).finish()
}
}