Skip to main content

walrus_cron/
lib.rs

1//! Cron scheduler — periodic agent tasks with dynamic job addition.
2//!
3//! Exposes a `create_cron` tool that agents can use to schedule new jobs
4//! dynamically. Jobs fire via a caller-provided callback, and the running
5//! scheduler picks up dynamically created jobs without daemon restart.
6
7use chrono::Utc;
8use compact_str::CompactString;
9use cron::Schedule;
10use protocol::{
11    api::{Client, Server},
12    message::SendRequest,
13};
14use std::{str::FromStr, sync::Arc};
15use tokio::{
16    sync::{RwLock, broadcast, mpsc},
17    task::JoinHandle,
18    time,
19};
20
21mod client;
22pub mod hook;
23pub mod parser;
24
25/// A parsed cron job ready for scheduling.
26#[derive(Debug, Clone)]
27pub struct CronJob {
28    /// Job name.
29    pub name: CompactString,
30    /// Parsed cron schedule.
31    pub schedule: Schedule,
32    /// Target agent name.
33    pub agent: CompactString,
34    /// Message to send on each fire.
35    pub message: String,
36}
37
38impl CronJob {
39    /// Parse a [`CronJob`] from raw fields.
40    pub fn new(
41        name: CompactString,
42        schedule_expr: &str,
43        agent: CompactString,
44        message: String,
45    ) -> anyhow::Result<Self> {
46        let schedule = Schedule::from_str(schedule_expr)
47            .map_err(|e| anyhow::anyhow!("invalid cron expression '{schedule_expr}': {e}"))?;
48        Ok(Self {
49            name,
50            schedule,
51            agent,
52            message,
53        })
54    }
55}
56
57/// Cron handler — owns the live job list for dynamic scheduling.
58///
59/// The `on_create` callback is called whenever a new cron job is created
60/// via the `create_cron` tool. Callers that don't need side-effects pass `|_| {}`.
61pub struct CronHandler {
62    jobs: Arc<RwLock<Vec<CronJob>>>,
63    on_create: Arc<dyn Fn(CronJob) + Send + Sync>,
64}
65
66impl CronHandler {
67    /// Create a handler from an initial set of jobs and a creation callback.
68    ///
69    /// `on_create` is called after each dynamic `create_cron` tool invocation.
70    /// Pass `|_| {}` if no side-effect is needed.
71    pub fn new<F: Fn(CronJob) + Send + Sync + 'static>(jobs: Vec<CronJob>, on_create: F) -> Self {
72        Self {
73            jobs: Arc::new(RwLock::new(jobs)),
74            on_create: Arc::new(on_create),
75        }
76    }
77
78    /// Get a clone of the jobs arc (for the scheduler task).
79    pub fn jobs_arc(&self) -> Arc<RwLock<Vec<CronJob>>> {
80        Arc::clone(&self.jobs)
81    }
82
83    /// Snapshot the current job list.
84    pub async fn jobs(&self) -> Vec<CronJob> {
85        self.jobs.read().await.clone()
86    }
87}
88
89impl wcore::Hook for CronHandler {
90    fn on_register_tools(
91        &self,
92        registry: &mut wcore::ToolRegistry,
93    ) -> impl std::future::Future<Output = ()> + Send {
94        let (tool, handler) = hook::create_cron_handler_with_notify(Arc::clone(&self.jobs), {
95            let cb = Arc::clone(&self.on_create);
96            move |job| cb(job)
97        });
98        registry.insert(tool, handler);
99        async {}
100    }
101}
102
103/// Cron scheduler that fires jobs on their schedules.
104struct CronScheduler {
105    jobs: Vec<CronJob>,
106}
107
108impl CronScheduler {
109    /// Create a scheduler from a list of cron jobs.
110    fn new(jobs: Vec<CronJob>) -> Self {
111        Self { jobs }
112    }
113
114    /// Start the scheduler. Calls `on_fire` for each job when it fires.
115    ///
116    /// Accepts an optional `mpsc::UnboundedReceiver<CronJob>` for dynamic
117    /// job addition. New jobs are merged into the live list between fire
118    /// cycles. Before sleeping, the scheduler identifies which jobs are due
119    /// at the soonest upcoming time. After waking it fires exactly those
120    /// jobs, avoiding the ambiguity of re-querying `upcoming()` post-sleep.
121    fn start<F, Fut>(
122        mut self,
123        on_fire: F,
124        mut add_rx: mpsc::UnboundedReceiver<CronJob>,
125        mut shutdown: broadcast::Receiver<()>,
126    ) -> JoinHandle<()>
127    where
128        F: Fn(CronJob) -> Fut + Send + Sync + 'static,
129        Fut: std::future::Future<Output = ()> + Send + 'static,
130    {
131        tokio::spawn(async move {
132            tracing::info!("cron scheduler started with {} job(s)", self.jobs.len());
133            loop {
134                // Drain any dynamically added jobs before computing schedule.
135                while let Ok(job) = add_rx.try_recv() {
136                    tracing::info!("cron scheduler: added dynamic job '{}'", job.name);
137                    self.jobs.push(job);
138                }
139
140                if self.jobs.is_empty() {
141                    // No jobs yet — wait for a dynamic add or shutdown.
142                    tokio::select! {
143                        Some(job) = add_rx.recv() => {
144                            tracing::info!("cron scheduler: added dynamic job '{}'", job.name);
145                            self.jobs.push(job);
146                            continue;
147                        }
148                        _ = shutdown.recv() => {
149                            tracing::info!("cron scheduler shutting down");
150                            return;
151                        }
152                    }
153                }
154
155                let now = Utc::now();
156                let mut due_jobs: Vec<usize> = Vec::new();
157                let mut soonest = None::<chrono::DateTime<Utc>>;
158
159                for (i, job) in self.jobs.iter().enumerate() {
160                    if let Some(next) = job.schedule.upcoming(Utc).next() {
161                        match soonest {
162                            None => {
163                                soonest = Some(next);
164                                due_jobs.clear();
165                                due_jobs.push(i);
166                            }
167                            Some(s) if next < s => {
168                                soonest = Some(next);
169                                due_jobs.clear();
170                                due_jobs.push(i);
171                            }
172                            Some(s) if (next - s).num_seconds().abs() <= 0 => {
173                                due_jobs.push(i);
174                            }
175                            _ => {}
176                        }
177                    }
178                }
179
180                let Some(soonest_time) = soonest else {
181                    tracing::warn!("no upcoming cron fires, scheduler stopping");
182                    return;
183                };
184
185                let wait = (soonest_time - now).to_std().unwrap_or_default();
186                tokio::select! {
187                    _ = time::sleep(wait) => {
188                        for &i in &due_jobs {
189                            tracing::info!("cron firing job '{}'", self.jobs[i].name);
190                            on_fire(self.jobs[i].clone()).await;
191                        }
192                    }
193                    Some(job) = add_rx.recv() => {
194                        tracing::info!("cron scheduler: added dynamic job '{}'", job.name);
195                        self.jobs.push(job);
196                        // Re-loop to recalculate schedule with new job.
197                    }
198                    _ = shutdown.recv() => {
199                        tracing::info!("cron scheduler shutting down");
200                        return;
201                    }
202                }
203            }
204        })
205    }
206}
207
208/// Start the cron scheduler with an in-process protocol client.
209///
210/// Takes a snapshot of jobs and a `Server` impl (e.g. `Gateway`) to dispatch
211/// `SendRequest`s through the protocol layer. Dynamic job addition is not
212/// supported through this function — use [`spawn_with_callback`] instead.
213pub fn spawn<S: Server + Clone + Send + 'static>(
214    jobs: Vec<CronJob>,
215    server: S,
216    shutdown: broadcast::Receiver<()>,
217) {
218    let scheduler = CronScheduler::new(jobs);
219    let (_add_tx, add_rx) = mpsc::unbounded_channel();
220
221    scheduler.start(
222        move |job| {
223            let mut client = client::CronClient::new(server.clone());
224            async move {
225                let req = SendRequest {
226                    agent: job.agent.clone(),
227                    content: job.message.clone(),
228                };
229                match client.send(req).await {
230                    Ok(response) => {
231                        tracing::info!(
232                            job = %job.name,
233                            agent = %job.agent,
234                            response_len = response.content.len(),
235                            "cron job completed"
236                        );
237                    }
238                    Err(e) => {
239                        tracing::error!(job = %job.name, "cron dispatch failed: {e}");
240                    }
241                }
242            }
243        },
244        add_rx,
245        shutdown,
246    );
247}
248
249/// Start the cron scheduler with a caller-provided fire callback.
250///
251/// Returns an `mpsc::UnboundedSender<CronJob>` for dynamically adding jobs
252/// to the running scheduler. The scheduler picks up new jobs between fire
253/// cycles without requiring a restart.
254pub fn spawn_with_callback<F, Fut>(
255    jobs: Vec<CronJob>,
256    on_fire: F,
257    shutdown: broadcast::Receiver<()>,
258) -> mpsc::UnboundedSender<CronJob>
259where
260    F: Fn(CronJob) -> Fut + Send + Sync + 'static,
261    Fut: std::future::Future<Output = ()> + Send + 'static,
262{
263    let scheduler = CronScheduler::new(jobs);
264    let (add_tx, add_rx) = mpsc::unbounded_channel();
265    scheduler.start(on_fire, add_rx, shutdown);
266    add_tx
267}