zeph_scheduler/task.rs
1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::borrow::Cow;
5use std::future::Future;
6use std::pin::Pin;
7use std::str::FromStr;
8
9use chrono::{DateTime, Utc};
10use cron::Schedule as CronSchedule;
11
12use crate::error::SchedulerError;
13
14/// Normalise a cron expression to the 6-field format required by the `cron` crate.
15///
16/// Standard 5-field expressions (`min hour day month weekday`) are prepended with `"0 "` to
17/// default seconds to zero. 6-field expressions are passed through unchanged. Any other field
18/// count is also passed through unchanged and will produce an error from the `cron` crate at
19/// parse time.
20///
21/// # Examples
22///
23/// ```
24/// use zeph_scheduler::normalize_cron_expr;
25///
26/// // 5-field: seconds are defaulted to 0.
27/// assert_eq!(normalize_cron_expr("*/5 * * * *").as_ref(), "0 */5 * * * *");
28///
29/// // 6-field: passed through unchanged.
30/// assert_eq!(normalize_cron_expr("0 */5 * * * *").as_ref(), "0 */5 * * * *");
31/// ```
32#[must_use]
33pub fn normalize_cron_expr(expr: &str) -> Cow<'_, str> {
34 if expr.split_whitespace().count() == 5 {
35 Cow::Owned(format!("0 {expr}"))
36 } else {
37 Cow::Borrowed(expr)
38 }
39}
40
41/// Identifies what type of work a scheduled task performs.
42///
43/// Built-in variants map to well-known agent subsystems. [`TaskKind::Custom`]
44/// carries an arbitrary string so callers can define their own task kinds without
45/// modifying this enum.
46///
47/// # Persistence
48///
49/// Each variant serialises to a stable `snake_case` string via [`TaskKind::as_str`]
50/// and deserialises via [`TaskKind::from_str_kind`]. These strings are stored in
51/// the `kind` column of the `scheduled_jobs` table.
52///
53/// # Examples
54///
55/// ```
56/// use zeph_scheduler::TaskKind;
57///
58/// assert_eq!(TaskKind::HealthCheck.as_str(), "health_check");
59/// assert_eq!(TaskKind::from_str_kind("memory_cleanup"), TaskKind::MemoryCleanup);
60/// assert_eq!(TaskKind::from_str_kind("my_custom"), TaskKind::Custom("my_custom".into()));
61/// ```
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum TaskKind {
64 /// Triggers the memory subsystem's cleanup / compaction routine.
65 MemoryCleanup,
66 /// Reloads skills from the skill registry.
67 SkillRefresh,
68 /// Runs a liveness or readiness probe for the agent.
69 HealthCheck,
70 /// Checks the GitHub releases API for a newer Zeph version.
71 UpdateCheck,
72 /// Runs an experiment task (used by `zeph-experiments`).
73 Experiment,
74 /// An application-defined task kind. The string is the persistence key.
75 Custom(String),
76}
77
78impl TaskKind {
79 /// Parse a task kind from its persistence string.
80 ///
81 /// Unknown strings are wrapped in [`TaskKind::Custom`] rather than returning
82 /// an error, so new built-in variants added in future versions do not break
83 /// existing stored jobs loaded with an older build.
84 ///
85 /// # Examples
86 ///
87 /// ```
88 /// use zeph_scheduler::TaskKind;
89 ///
90 /// assert_eq!(TaskKind::from_str_kind("health_check"), TaskKind::HealthCheck);
91 /// assert_eq!(TaskKind::from_str_kind("unknown"), TaskKind::Custom("unknown".into()));
92 /// ```
93 #[must_use]
94 pub fn from_str_kind(s: &str) -> Self {
95 match s {
96 "memory_cleanup" => Self::MemoryCleanup,
97 "skill_refresh" => Self::SkillRefresh,
98 "health_check" => Self::HealthCheck,
99 "update_check" => Self::UpdateCheck,
100 "experiment" => Self::Experiment,
101 other => Self::Custom(other.to_owned()),
102 }
103 }
104
105 /// Return the stable string key used for database persistence.
106 ///
107 /// # Examples
108 ///
109 /// ```
110 /// use zeph_scheduler::TaskKind;
111 ///
112 /// assert_eq!(TaskKind::SkillRefresh.as_str(), "skill_refresh");
113 /// assert_eq!(TaskKind::Custom("my_job".into()).as_str(), "my_job");
114 /// ```
115 #[must_use]
116 pub fn as_str(&self) -> &str {
117 match self {
118 Self::MemoryCleanup => "memory_cleanup",
119 Self::SkillRefresh => "skill_refresh",
120 Self::HealthCheck => "health_check",
121 Self::UpdateCheck => "update_check",
122 Self::Experiment => "experiment",
123 Self::Custom(s) => s,
124 }
125 }
126}
127
128/// Execution mode for a scheduled task.
129///
130/// Determines how the scheduler decides when to run a task and what to do after it
131/// completes:
132///
133/// - [`TaskMode::Periodic`] re-computes `next_run` from the cron schedule after
134/// each successful execution and never removes the task from memory.
135/// - [`TaskMode::OneShot`] fires once when `now >= run_at` and then removes the
136/// task from the in-memory task list and marks it `done` in the store.
137pub enum TaskMode {
138 /// Run on a repeating cron schedule.
139 Periodic {
140 /// Parsed cron schedule that drives `next_run` computation.
141 schedule: Box<CronSchedule>,
142 },
143 /// Run once at the specified UTC timestamp.
144 OneShot {
145 /// The earliest UTC time at which the task should execute.
146 run_at: DateTime<Utc>,
147 },
148}
149
150/// Descriptor sent over the control channel to register tasks at runtime.
151///
152/// Send a `SchedulerMessage::Add` wrapping a boxed `TaskDescriptor` to add a
153/// new task (or replace an existing one with the same name) without stopping the
154/// scheduler loop.
155pub struct TaskDescriptor {
156 /// Unique name for the task. Replaces any existing task with the same name.
157 pub name: String,
158 /// Execution mode (periodic or one-shot).
159 pub mode: TaskMode,
160 /// The category of work this task performs.
161 pub kind: TaskKind,
162 /// Arbitrary JSON configuration forwarded to the [`TaskHandler`] at execution time.
163 pub config: serde_json::Value,
164}
165
166/// A task held in memory by the [`crate::Scheduler`].
167///
168/// Use [`ScheduledTask::new`] / [`ScheduledTask::periodic`] for cron-based tasks
169/// and [`ScheduledTask::oneshot`] for tasks that run at a fixed point in time.
170///
171/// # Examples
172///
173/// ```
174/// use zeph_scheduler::{ScheduledTask, TaskKind};
175///
176/// let task = ScheduledTask::new(
177/// "daily-cleanup",
178/// "0 3 * * *", // every day at 03:00 UTC (5-field cron)
179/// TaskKind::MemoryCleanup,
180/// serde_json::Value::Null,
181/// )
182/// .expect("valid cron expression");
183///
184/// assert_eq!(task.task_mode_str(), "periodic");
185/// assert!(task.cron_schedule().is_some());
186/// ```
187pub struct ScheduledTask {
188 /// Unique task name used as the primary key in the job store.
189 pub name: String,
190 /// Execution mode (periodic or one-shot).
191 pub mode: TaskMode,
192 /// The category of work this task performs.
193 pub kind: TaskKind,
194 /// Arbitrary JSON configuration forwarded to the [`TaskHandler`] at execution time.
195 pub config: serde_json::Value,
196}
197
198impl ScheduledTask {
199 /// Create a new periodic task from a cron expression string.
200 ///
201 /// # Errors
202 ///
203 /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
204 pub fn new(
205 name: impl Into<String>,
206 cron_expr: &str,
207 kind: TaskKind,
208 config: serde_json::Value,
209 ) -> Result<Self, SchedulerError> {
210 Self::periodic(name, cron_expr, kind, config)
211 }
212
213 /// Create a periodic task from a cron expression.
214 ///
215 /// # Errors
216 ///
217 /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
218 pub fn periodic(
219 name: impl Into<String>,
220 cron_expr: &str,
221 kind: TaskKind,
222 config: serde_json::Value,
223 ) -> Result<Self, SchedulerError> {
224 let normalized = normalize_cron_expr(cron_expr);
225 let schedule = CronSchedule::from_str(&normalized)
226 .map_err(|e| SchedulerError::InvalidCron(format!("{cron_expr}: {e}")))?;
227 Ok(Self {
228 name: name.into(),
229 mode: TaskMode::Periodic {
230 schedule: Box::new(schedule),
231 },
232 kind,
233 config,
234 })
235 }
236
237 /// Create a one-shot task that runs at a specific point in time.
238 #[must_use]
239 pub fn oneshot(
240 name: impl Into<String>,
241 run_at: DateTime<Utc>,
242 kind: TaskKind,
243 config: serde_json::Value,
244 ) -> Self {
245 Self {
246 name: name.into(),
247 mode: TaskMode::OneShot { run_at },
248 kind,
249 config,
250 }
251 }
252
253 /// Returns the cron schedule if this is a periodic task.
254 #[must_use]
255 pub fn cron_schedule(&self) -> Option<&CronSchedule> {
256 if let TaskMode::Periodic { schedule } = &self.mode {
257 Some(schedule.as_ref())
258 } else {
259 None
260 }
261 }
262
263 /// Returns the canonical 6-field cron expression string for DB persistence.
264 ///
265 /// Returns an empty string for one-shot tasks, which do not have a cron schedule.
266 #[must_use]
267 pub fn cron_expr_string(&self) -> String {
268 match &self.mode {
269 TaskMode::Periodic { schedule } => schedule.to_string(),
270 TaskMode::OneShot { .. } => String::new(),
271 }
272 }
273
274 /// Returns the `task_mode` string used for DB persistence.
275 ///
276 /// Returns `"periodic"` or `"oneshot"`.
277 #[must_use]
278 pub fn task_mode_str(&self) -> &'static str {
279 match &self.mode {
280 TaskMode::Periodic { .. } => "periodic",
281 TaskMode::OneShot { .. } => "oneshot",
282 }
283 }
284}
285
286/// Trait for types that can execute a scheduled task.
287///
288/// Implementations receive the per-task JSON configuration stored in
289/// [`ScheduledTask::config`] and return `Ok(())` on success or a
290/// [`SchedulerError`] on failure. Failures are logged as warnings; the scheduler
291/// continues running and will retry on the next due tick.
292///
293/// Because async trait methods in Edition 2024 require returning a pinned boxed
294/// future for object safety, implementations must wrap their async work in
295/// `Box::pin(async move { … })`.
296///
297/// # Example
298///
299/// ```rust
300/// use std::future::Future;
301/// use std::pin::Pin;
302/// use zeph_scheduler::{SchedulerError, TaskHandler};
303///
304/// struct NoopHandler;
305///
306/// impl TaskHandler for NoopHandler {
307/// fn execute(
308/// &self,
309/// _config: &serde_json::Value,
310/// ) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>> + Send + '_>> {
311/// Box::pin(async move { Ok(()) })
312/// }
313/// }
314/// ```
315pub trait TaskHandler: Send + Sync {
316 /// Execute the task with the provided configuration.
317 ///
318 /// # Errors
319 ///
320 /// Return [`SchedulerError::TaskFailed`] (or any other variant) to indicate
321 /// that the task could not complete successfully. The error is logged but does
322 /// not stop the scheduler.
323 fn execute(
324 &self,
325 config: &serde_json::Value,
326 ) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>> + Send + '_>>;
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332
333 #[test]
334 fn task_kind_roundtrip() {
335 assert_eq!(
336 TaskKind::from_str_kind("memory_cleanup"),
337 TaskKind::MemoryCleanup
338 );
339 assert_eq!(TaskKind::MemoryCleanup.as_str(), "memory_cleanup");
340 assert_eq!(
341 TaskKind::from_str_kind("skill_refresh"),
342 TaskKind::SkillRefresh
343 );
344 assert_eq!(TaskKind::SkillRefresh.as_str(), "skill_refresh");
345 assert_eq!(
346 TaskKind::from_str_kind("health_check"),
347 TaskKind::HealthCheck
348 );
349 assert_eq!(
350 TaskKind::from_str_kind("update_check"),
351 TaskKind::UpdateCheck
352 );
353 assert_eq!(TaskKind::UpdateCheck.as_str(), "update_check");
354 assert_eq!(
355 TaskKind::from_str_kind("custom_job"),
356 TaskKind::Custom("custom_job".into())
357 );
358 assert_eq!(TaskKind::Custom("x".into()).as_str(), "x");
359 }
360
361 #[test]
362 fn task_kind_experiment_roundtrip() {
363 assert_eq!(
364 TaskKind::from_str_kind("experiment"),
365 TaskKind::Experiment,
366 "from_str_kind must map 'experiment' to Experiment variant, not Custom"
367 );
368 assert_eq!(TaskKind::Experiment.as_str(), "experiment");
369 }
370
371 #[test]
372 fn normalize_five_field_prepends_zero() {
373 assert_eq!(normalize_cron_expr("*/5 * * * *"), "0 */5 * * * *");
374 assert_eq!(normalize_cron_expr("0 3 * * *"), "0 0 3 * * *");
375 }
376
377 #[test]
378 fn normalize_six_field_passthrough() {
379 assert_eq!(normalize_cron_expr("0 0 3 * * *"), "0 0 3 * * *");
380 assert_eq!(normalize_cron_expr("* * * * * *"), "* * * * * *");
381 }
382
383 #[test]
384 fn normalize_other_field_count_passthrough() {
385 assert_eq!(normalize_cron_expr("not_cron"), "not_cron");
386 assert_eq!(normalize_cron_expr("0 0 0 0"), "0 0 0 0");
387 }
388
389 #[test]
390 fn normalize_empty_string_passthrough() {
391 assert_eq!(normalize_cron_expr(""), "");
392 }
393
394 #[test]
395 fn normalize_whitespace_only_passthrough() {
396 assert_eq!(normalize_cron_expr(" "), " ");
397 }
398
399 #[test]
400 fn valid_cron_creates_task() {
401 let task = ScheduledTask::new(
402 "test",
403 "0 0 * * * *",
404 TaskKind::HealthCheck,
405 serde_json::Value::Null,
406 );
407 assert!(task.is_ok());
408 }
409
410 #[test]
411 fn five_field_cron_creates_task() {
412 let task = ScheduledTask::new(
413 "five-field",
414 "*/5 * * * *",
415 TaskKind::HealthCheck,
416 serde_json::Value::Null,
417 );
418 assert!(task.is_ok(), "5-field cron must be accepted");
419 }
420
421 #[test]
422 fn invalid_cron_returns_error() {
423 let task = ScheduledTask::new(
424 "test",
425 "not_cron",
426 TaskKind::HealthCheck,
427 serde_json::Value::Null,
428 );
429 assert!(task.is_err());
430 }
431
432 #[test]
433 fn oneshot_task_creates_correctly() {
434 let run_at = Utc::now() + chrono::Duration::hours(1);
435 let task =
436 ScheduledTask::oneshot("t", run_at, TaskKind::HealthCheck, serde_json::Value::Null);
437 assert_eq!(task.task_mode_str(), "oneshot");
438 assert!(task.cron_schedule().is_none());
439 }
440
441 #[test]
442 fn periodic_task_mode_str() {
443 let task = ScheduledTask::periodic(
444 "p",
445 "0 * * * * *",
446 TaskKind::HealthCheck,
447 serde_json::Value::Null,
448 )
449 .unwrap();
450 assert_eq!(task.task_mode_str(), "periodic");
451 assert!(task.cron_schedule().is_some());
452 }
453}