1use chrono::Utc;
8use compact_str::CompactString;
9use cron::Schedule;
10use protocol::{
11 api::{Client, Server},
12 message::SendRequest,
13};
14use std::{str::FromStr, sync::Arc};
15use tokio::{
16 sync::{RwLock, broadcast, mpsc},
17 task::JoinHandle,
18 time,
19};
20
21mod client;
22pub mod hook;
23pub mod parser;
24
25#[derive(Debug, Clone)]
27pub struct CronJob {
28 pub name: CompactString,
30 pub schedule: Schedule,
32 pub agent: CompactString,
34 pub message: String,
36}
37
38impl CronJob {
39 pub fn new(
41 name: CompactString,
42 schedule_expr: &str,
43 agent: CompactString,
44 message: String,
45 ) -> anyhow::Result<Self> {
46 let schedule = Schedule::from_str(schedule_expr)
47 .map_err(|e| anyhow::anyhow!("invalid cron expression '{schedule_expr}': {e}"))?;
48 Ok(Self {
49 name,
50 schedule,
51 agent,
52 message,
53 })
54 }
55}
56
57pub struct CronHandler {
62 jobs: Arc<RwLock<Vec<CronJob>>>,
63 on_create: Arc<dyn Fn(CronJob) + Send + Sync>,
64}
65
66impl CronHandler {
67 pub fn new<F: Fn(CronJob) + Send + Sync + 'static>(jobs: Vec<CronJob>, on_create: F) -> Self {
72 Self {
73 jobs: Arc::new(RwLock::new(jobs)),
74 on_create: Arc::new(on_create),
75 }
76 }
77
78 pub fn jobs_arc(&self) -> Arc<RwLock<Vec<CronJob>>> {
80 Arc::clone(&self.jobs)
81 }
82
83 pub async fn jobs(&self) -> Vec<CronJob> {
85 self.jobs.read().await.clone()
86 }
87}
88
89impl wcore::Hook for CronHandler {
90 fn on_register_tools(
91 &self,
92 registry: &mut wcore::ToolRegistry,
93 ) -> impl std::future::Future<Output = ()> + Send {
94 let (tool, handler) = hook::create_cron_handler_with_notify(Arc::clone(&self.jobs), {
95 let cb = Arc::clone(&self.on_create);
96 move |job| cb(job)
97 });
98 registry.insert(tool, handler);
99 async {}
100 }
101}
102
103struct CronScheduler {
105 jobs: Vec<CronJob>,
106}
107
108impl CronScheduler {
109 fn new(jobs: Vec<CronJob>) -> Self {
111 Self { jobs }
112 }
113
114 fn start<F, Fut>(
122 mut self,
123 on_fire: F,
124 mut add_rx: mpsc::UnboundedReceiver<CronJob>,
125 mut shutdown: broadcast::Receiver<()>,
126 ) -> JoinHandle<()>
127 where
128 F: Fn(CronJob) -> Fut + Send + Sync + 'static,
129 Fut: std::future::Future<Output = ()> + Send + 'static,
130 {
131 tokio::spawn(async move {
132 tracing::info!("cron scheduler started with {} job(s)", self.jobs.len());
133 loop {
134 while let Ok(job) = add_rx.try_recv() {
136 tracing::info!("cron scheduler: added dynamic job '{}'", job.name);
137 self.jobs.push(job);
138 }
139
140 if self.jobs.is_empty() {
141 tokio::select! {
143 Some(job) = add_rx.recv() => {
144 tracing::info!("cron scheduler: added dynamic job '{}'", job.name);
145 self.jobs.push(job);
146 continue;
147 }
148 _ = shutdown.recv() => {
149 tracing::info!("cron scheduler shutting down");
150 return;
151 }
152 }
153 }
154
155 let now = Utc::now();
156 let mut due_jobs: Vec<usize> = Vec::new();
157 let mut soonest = None::<chrono::DateTime<Utc>>;
158
159 for (i, job) in self.jobs.iter().enumerate() {
160 if let Some(next) = job.schedule.upcoming(Utc).next() {
161 match soonest {
162 None => {
163 soonest = Some(next);
164 due_jobs.clear();
165 due_jobs.push(i);
166 }
167 Some(s) if next < s => {
168 soonest = Some(next);
169 due_jobs.clear();
170 due_jobs.push(i);
171 }
172 Some(s) if (next - s).num_seconds().abs() <= 0 => {
173 due_jobs.push(i);
174 }
175 _ => {}
176 }
177 }
178 }
179
180 let Some(soonest_time) = soonest else {
181 tracing::warn!("no upcoming cron fires, scheduler stopping");
182 return;
183 };
184
185 let wait = (soonest_time - now).to_std().unwrap_or_default();
186 tokio::select! {
187 _ = time::sleep(wait) => {
188 for &i in &due_jobs {
189 tracing::info!("cron firing job '{}'", self.jobs[i].name);
190 on_fire(self.jobs[i].clone()).await;
191 }
192 }
193 Some(job) = add_rx.recv() => {
194 tracing::info!("cron scheduler: added dynamic job '{}'", job.name);
195 self.jobs.push(job);
196 }
198 _ = shutdown.recv() => {
199 tracing::info!("cron scheduler shutting down");
200 return;
201 }
202 }
203 }
204 })
205 }
206}
207
208pub fn spawn<S: Server + Clone + Send + 'static>(
214 jobs: Vec<CronJob>,
215 server: S,
216 shutdown: broadcast::Receiver<()>,
217) {
218 let scheduler = CronScheduler::new(jobs);
219 let (_add_tx, add_rx) = mpsc::unbounded_channel();
220
221 scheduler.start(
222 move |job| {
223 let mut client = client::CronClient::new(server.clone());
224 async move {
225 let req = SendRequest {
226 agent: job.agent.clone(),
227 content: job.message.clone(),
228 };
229 match client.send(req).await {
230 Ok(response) => {
231 tracing::info!(
232 job = %job.name,
233 agent = %job.agent,
234 response_len = response.content.len(),
235 "cron job completed"
236 );
237 }
238 Err(e) => {
239 tracing::error!(job = %job.name, "cron dispatch failed: {e}");
240 }
241 }
242 }
243 },
244 add_rx,
245 shutdown,
246 );
247}
248
249pub fn spawn_with_callback<F, Fut>(
255 jobs: Vec<CronJob>,
256 on_fire: F,
257 shutdown: broadcast::Receiver<()>,
258) -> mpsc::UnboundedSender<CronJob>
259where
260 F: Fn(CronJob) -> Fut + Send + Sync + 'static,
261 Fut: std::future::Future<Output = ()> + Send + 'static,
262{
263 let scheduler = CronScheduler::new(jobs);
264 let (add_tx, add_rx) = mpsc::unbounded_channel();
265 scheduler.start(on_fire, add_rx, shutdown);
266 add_tx
267}