use chrono::Utc;
use compact_str::CompactString;
use cron::Schedule;
use protocol::{
api::{Client, Server},
message::SendRequest,
};
use std::{str::FromStr, sync::Arc};
use tokio::{
sync::{RwLock, broadcast, mpsc},
task::JoinHandle,
time,
};
mod client;
pub mod hook;
pub mod parser;
#[derive(Debug, Clone)]
pub struct CronJob {
pub name: CompactString,
pub schedule: Schedule,
pub agent: CompactString,
pub message: String,
}
impl CronJob {
pub fn new(
name: CompactString,
schedule_expr: &str,
agent: CompactString,
message: String,
) -> anyhow::Result<Self> {
let schedule = Schedule::from_str(schedule_expr)
.map_err(|e| anyhow::anyhow!("invalid cron expression '{schedule_expr}': {e}"))?;
Ok(Self {
name,
schedule,
agent,
message,
})
}
}
pub struct CronHandler {
jobs: Arc<RwLock<Vec<CronJob>>>,
on_create: Arc<dyn Fn(CronJob) + Send + Sync>,
}
impl CronHandler {
pub fn new<F: Fn(CronJob) + Send + Sync + 'static>(jobs: Vec<CronJob>, on_create: F) -> Self {
Self {
jobs: Arc::new(RwLock::new(jobs)),
on_create: Arc::new(on_create),
}
}
pub fn jobs_arc(&self) -> Arc<RwLock<Vec<CronJob>>> {
Arc::clone(&self.jobs)
}
pub async fn jobs(&self) -> Vec<CronJob> {
self.jobs.read().await.clone()
}
}
impl wcore::Hook for CronHandler {
fn on_register_tools(
&self,
registry: &mut wcore::ToolRegistry,
) -> impl std::future::Future<Output = ()> + Send {
let (tool, handler) = hook::create_cron_handler_with_notify(Arc::clone(&self.jobs), {
let cb = Arc::clone(&self.on_create);
move |job| cb(job)
});
registry.insert(tool, handler);
async {}
}
}
struct CronScheduler {
jobs: Vec<CronJob>,
}
impl CronScheduler {
fn new(jobs: Vec<CronJob>) -> Self {
Self { jobs }
}
fn start<F, Fut>(
mut self,
on_fire: F,
mut add_rx: mpsc::UnboundedReceiver<CronJob>,
mut shutdown: broadcast::Receiver<()>,
) -> JoinHandle<()>
where
F: Fn(CronJob) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
tokio::spawn(async move {
tracing::info!("cron scheduler started with {} job(s)", self.jobs.len());
loop {
while let Ok(job) = add_rx.try_recv() {
tracing::info!("cron scheduler: added dynamic job '{}'", job.name);
self.jobs.push(job);
}
if self.jobs.is_empty() {
tokio::select! {
Some(job) = add_rx.recv() => {
tracing::info!("cron scheduler: added dynamic job '{}'", job.name);
self.jobs.push(job);
continue;
}
_ = shutdown.recv() => {
tracing::info!("cron scheduler shutting down");
return;
}
}
}
let now = Utc::now();
let mut due_jobs: Vec<usize> = Vec::new();
let mut soonest = None::<chrono::DateTime<Utc>>;
for (i, job) in self.jobs.iter().enumerate() {
if let Some(next) = job.schedule.upcoming(Utc).next() {
match soonest {
None => {
soonest = Some(next);
due_jobs.clear();
due_jobs.push(i);
}
Some(s) if next < s => {
soonest = Some(next);
due_jobs.clear();
due_jobs.push(i);
}
Some(s) if (next - s).num_seconds().abs() <= 0 => {
due_jobs.push(i);
}
_ => {}
}
}
}
let Some(soonest_time) = soonest else {
tracing::warn!("no upcoming cron fires, scheduler stopping");
return;
};
let wait = (soonest_time - now).to_std().unwrap_or_default();
tokio::select! {
_ = time::sleep(wait) => {
for &i in &due_jobs {
tracing::info!("cron firing job '{}'", self.jobs[i].name);
on_fire(self.jobs[i].clone()).await;
}
}
Some(job) = add_rx.recv() => {
tracing::info!("cron scheduler: added dynamic job '{}'", job.name);
self.jobs.push(job);
}
_ = shutdown.recv() => {
tracing::info!("cron scheduler shutting down");
return;
}
}
}
})
}
}
pub fn spawn<S: Server + Clone + Send + 'static>(
jobs: Vec<CronJob>,
server: S,
shutdown: broadcast::Receiver<()>,
) {
let scheduler = CronScheduler::new(jobs);
let (_add_tx, add_rx) = mpsc::unbounded_channel();
scheduler.start(
move |job| {
let mut client = client::CronClient::new(server.clone());
async move {
let req = SendRequest {
agent: job.agent.clone(),
content: job.message.clone(),
};
match client.send(req).await {
Ok(response) => {
tracing::info!(
job = %job.name,
agent = %job.agent,
response_len = response.content.len(),
"cron job completed"
);
}
Err(e) => {
tracing::error!(job = %job.name, "cron dispatch failed: {e}");
}
}
}
},
add_rx,
shutdown,
);
}
pub fn spawn_with_callback<F, Fut>(
jobs: Vec<CronJob>,
on_fire: F,
shutdown: broadcast::Receiver<()>,
) -> mpsc::UnboundedSender<CronJob>
where
F: Fn(CronJob) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
let scheduler = CronScheduler::new(jobs);
let (add_tx, add_rx) = mpsc::unbounded_channel();
scheduler.start(on_fire, add_rx, shutdown);
add_tx
}