Skip to main content

zeph_scheduler/
task.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::borrow::Cow;
5use std::future::Future;
6use std::pin::Pin;
7use std::str::FromStr;
8
9use chrono::{DateTime, Utc};
10use cron::Schedule as CronSchedule;
11
12use crate::error::SchedulerError;
13
14/// Normalise a cron expression to the 6-field format required by the `cron` crate.
15///
16/// Standard 5-field expressions (`min hour day month weekday`) are prepended with `"0 "` to
17/// default seconds to zero. 6-field expressions are passed through unchanged. Any other field
18/// count is also passed through unchanged and will produce an error from the `cron` crate at
19/// parse time.
20///
21/// # Examples
22///
23/// ```
24/// use zeph_scheduler::normalize_cron_expr;
25///
26/// // 5-field: seconds are defaulted to 0.
27/// assert_eq!(normalize_cron_expr("*/5 * * * *").as_ref(), "0 */5 * * * *");
28///
29/// // 6-field: passed through unchanged.
30/// assert_eq!(normalize_cron_expr("0 */5 * * * *").as_ref(), "0 */5 * * * *");
31/// ```
32#[must_use]
33pub fn normalize_cron_expr(expr: &str) -> Cow<'_, str> {
34    if expr.split_whitespace().count() == 5 {
35        Cow::Owned(format!("0 {expr}"))
36    } else {
37        Cow::Borrowed(expr)
38    }
39}
40
41/// Identifies what type of work a scheduled task performs.
42///
43/// Built-in variants map to well-known agent subsystems. [`TaskKind::Custom`]
44/// carries an arbitrary string so callers can define their own task kinds without
45/// modifying this enum.
46///
47/// # Persistence
48///
49/// Each variant serialises to a stable `snake_case` string via [`TaskKind::as_str`]
50/// and deserialises via [`TaskKind::from_str_kind`]. These strings are stored in
51/// the `kind` column of the `scheduled_jobs` table.
52///
53/// # Examples
54///
55/// ```
56/// use zeph_scheduler::TaskKind;
57///
58/// assert_eq!(TaskKind::HealthCheck.as_str(), "health_check");
59/// assert_eq!(TaskKind::from_str_kind("memory_cleanup"), TaskKind::MemoryCleanup);
60/// assert_eq!(TaskKind::from_str_kind("my_custom"), TaskKind::Custom("my_custom".into()));
61/// ```
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum TaskKind {
64    /// Triggers the memory subsystem's cleanup / compaction routine.
65    MemoryCleanup,
66    /// Reloads skills from the skill registry.
67    SkillRefresh,
68    /// Runs a liveness or readiness probe for the agent.
69    HealthCheck,
70    /// Checks the GitHub releases API for a newer Zeph version.
71    UpdateCheck,
72    /// Runs an experiment task (used by `zeph-experiments`).
73    Experiment,
74    /// An application-defined task kind. The string is the persistence key.
75    Custom(String),
76}
77
78impl TaskKind {
79    /// Parse a task kind from its persistence string.
80    ///
81    /// Unknown strings are wrapped in [`TaskKind::Custom`] rather than returning
82    /// an error, so new built-in variants added in future versions do not break
83    /// existing stored jobs loaded with an older build.
84    ///
85    /// # Examples
86    ///
87    /// ```
88    /// use zeph_scheduler::TaskKind;
89    ///
90    /// assert_eq!(TaskKind::from_str_kind("health_check"), TaskKind::HealthCheck);
91    /// assert_eq!(TaskKind::from_str_kind("unknown"), TaskKind::Custom("unknown".into()));
92    /// ```
93    #[must_use]
94    pub fn from_str_kind(s: &str) -> Self {
95        match s {
96            "memory_cleanup" => Self::MemoryCleanup,
97            "skill_refresh" => Self::SkillRefresh,
98            "health_check" => Self::HealthCheck,
99            "update_check" => Self::UpdateCheck,
100            "experiment" => Self::Experiment,
101            other => Self::Custom(other.to_owned()),
102        }
103    }
104
105    /// Return the stable string key used for database persistence.
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// use zeph_scheduler::TaskKind;
111    ///
112    /// assert_eq!(TaskKind::SkillRefresh.as_str(), "skill_refresh");
113    /// assert_eq!(TaskKind::Custom("my_job".into()).as_str(), "my_job");
114    /// ```
115    #[must_use]
116    pub fn as_str(&self) -> &str {
117        match self {
118            Self::MemoryCleanup => "memory_cleanup",
119            Self::SkillRefresh => "skill_refresh",
120            Self::HealthCheck => "health_check",
121            Self::UpdateCheck => "update_check",
122            Self::Experiment => "experiment",
123            Self::Custom(s) => s,
124        }
125    }
126}
127
128/// Execution mode for a scheduled task.
129///
130/// Determines how the scheduler decides when to run a task and what to do after it
131/// completes:
132///
133/// - [`TaskMode::Periodic`] re-computes `next_run` from the cron schedule after
134///   each successful execution and never removes the task from memory.
135/// - [`TaskMode::OneShot`] fires once when `now >= run_at` and then removes the
136///   task from the in-memory task list and marks it `done` in the store.
137pub enum TaskMode {
138    /// Run on a repeating cron schedule.
139    Periodic {
140        /// Parsed cron schedule that drives `next_run` computation.
141        schedule: Box<CronSchedule>,
142    },
143    /// Run once at the specified UTC timestamp.
144    OneShot {
145        /// The earliest UTC time at which the task should execute.
146        run_at: DateTime<Utc>,
147    },
148}
149
150/// Descriptor sent over the control channel to register tasks at runtime.
151///
152/// Send a `SchedulerMessage::Add` wrapping a boxed `TaskDescriptor` to add a
153/// new task (or replace an existing one with the same name) without stopping the
154/// scheduler loop.
155pub struct TaskDescriptor {
156    /// Unique name for the task. Replaces any existing task with the same name.
157    pub name: String,
158    /// Execution mode (periodic or one-shot).
159    pub mode: TaskMode,
160    /// The category of work this task performs.
161    pub kind: TaskKind,
162    /// Arbitrary JSON configuration forwarded to the [`TaskHandler`] at execution time.
163    pub config: serde_json::Value,
164}
165
166/// A task held in memory by the [`crate::Scheduler`].
167///
168/// Use [`ScheduledTask::new`] / [`ScheduledTask::periodic`] for cron-based tasks
169/// and [`ScheduledTask::oneshot`] for tasks that run at a fixed point in time.
170///
171/// # Examples
172///
173/// ```
174/// use zeph_scheduler::{ScheduledTask, TaskKind};
175///
176/// let task = ScheduledTask::new(
177///     "daily-cleanup",
178///     "0 3 * * *",           // every day at 03:00 UTC (5-field cron)
179///     TaskKind::MemoryCleanup,
180///     serde_json::Value::Null,
181/// )
182/// .expect("valid cron expression");
183///
184/// assert_eq!(task.task_mode_str(), "periodic");
185/// assert!(task.cron_schedule().is_some());
186/// ```
187pub struct ScheduledTask {
188    /// Unique task name used as the primary key in the job store.
189    pub name: String,
190    /// Execution mode (periodic or one-shot).
191    pub mode: TaskMode,
192    /// The category of work this task performs.
193    pub kind: TaskKind,
194    /// Arbitrary JSON configuration forwarded to the [`TaskHandler`] at execution time.
195    pub config: serde_json::Value,
196}
197
198impl ScheduledTask {
199    /// Create a new periodic task from a cron expression string.
200    ///
201    /// # Errors
202    ///
203    /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
204    pub fn new(
205        name: impl Into<String>,
206        cron_expr: &str,
207        kind: TaskKind,
208        config: serde_json::Value,
209    ) -> Result<Self, SchedulerError> {
210        Self::periodic(name, cron_expr, kind, config)
211    }
212
213    /// Create a periodic task from a cron expression.
214    ///
215    /// # Errors
216    ///
217    /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
218    pub fn periodic(
219        name: impl Into<String>,
220        cron_expr: &str,
221        kind: TaskKind,
222        config: serde_json::Value,
223    ) -> Result<Self, SchedulerError> {
224        let normalized = normalize_cron_expr(cron_expr);
225        let schedule = CronSchedule::from_str(&normalized)
226            .map_err(|e| SchedulerError::InvalidCron(format!("{cron_expr}: {e}")))?;
227        Ok(Self {
228            name: name.into(),
229            mode: TaskMode::Periodic {
230                schedule: Box::new(schedule),
231            },
232            kind,
233            config,
234        })
235    }
236
237    /// Create a one-shot task that runs at a specific point in time.
238    #[must_use]
239    pub fn oneshot(
240        name: impl Into<String>,
241        run_at: DateTime<Utc>,
242        kind: TaskKind,
243        config: serde_json::Value,
244    ) -> Self {
245        Self {
246            name: name.into(),
247            mode: TaskMode::OneShot { run_at },
248            kind,
249            config,
250        }
251    }
252
253    /// Returns the cron schedule if this is a periodic task.
254    #[must_use]
255    pub fn cron_schedule(&self) -> Option<&CronSchedule> {
256        if let TaskMode::Periodic { schedule } = &self.mode {
257            Some(schedule.as_ref())
258        } else {
259            None
260        }
261    }
262
263    /// Returns the canonical 6-field cron expression string for DB persistence.
264    ///
265    /// Returns an empty string for one-shot tasks, which do not have a cron schedule.
266    #[must_use]
267    pub fn cron_expr_string(&self) -> String {
268        match &self.mode {
269            TaskMode::Periodic { schedule } => schedule.to_string(),
270            TaskMode::OneShot { .. } => String::new(),
271        }
272    }
273
274    /// Returns the `task_mode` string used for DB persistence.
275    ///
276    /// Returns `"periodic"` or `"oneshot"`.
277    #[must_use]
278    pub fn task_mode_str(&self) -> &'static str {
279        match &self.mode {
280            TaskMode::Periodic { .. } => "periodic",
281            TaskMode::OneShot { .. } => "oneshot",
282        }
283    }
284}
285
286/// Trait for types that can execute a scheduled task.
287///
288/// Implementations receive the per-task JSON configuration stored in
289/// [`ScheduledTask::config`] and return `Ok(())` on success or a
290/// [`SchedulerError`] on failure. Failures are logged as warnings; the scheduler
291/// continues running and will retry on the next due tick.
292///
293/// Because async trait methods in Edition 2024 require returning a pinned boxed
294/// future for object safety, implementations must wrap their async work in
295/// `Box::pin(async move { … })`.
296///
297/// # Example
298///
299/// ```rust
300/// use std::future::Future;
301/// use std::pin::Pin;
302/// use zeph_scheduler::{SchedulerError, TaskHandler};
303///
304/// struct NoopHandler;
305///
306/// impl TaskHandler for NoopHandler {
307///     fn execute(
308///         &self,
309///         _config: &serde_json::Value,
310///     ) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>> + Send + '_>> {
311///         Box::pin(async move { Ok(()) })
312///     }
313/// }
314/// ```
315pub trait TaskHandler: Send + Sync {
316    /// Execute the task with the provided configuration.
317    ///
318    /// # Errors
319    ///
320    /// Return [`SchedulerError::TaskFailed`] (or any other variant) to indicate
321    /// that the task could not complete successfully. The error is logged but does
322    /// not stop the scheduler.
323    fn execute(
324        &self,
325        config: &serde_json::Value,
326    ) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>> + Send + '_>>;
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    #[test]
334    fn task_kind_roundtrip() {
335        assert_eq!(
336            TaskKind::from_str_kind("memory_cleanup"),
337            TaskKind::MemoryCleanup
338        );
339        assert_eq!(TaskKind::MemoryCleanup.as_str(), "memory_cleanup");
340        assert_eq!(
341            TaskKind::from_str_kind("skill_refresh"),
342            TaskKind::SkillRefresh
343        );
344        assert_eq!(TaskKind::SkillRefresh.as_str(), "skill_refresh");
345        assert_eq!(
346            TaskKind::from_str_kind("health_check"),
347            TaskKind::HealthCheck
348        );
349        assert_eq!(
350            TaskKind::from_str_kind("update_check"),
351            TaskKind::UpdateCheck
352        );
353        assert_eq!(TaskKind::UpdateCheck.as_str(), "update_check");
354        assert_eq!(
355            TaskKind::from_str_kind("custom_job"),
356            TaskKind::Custom("custom_job".into())
357        );
358        assert_eq!(TaskKind::Custom("x".into()).as_str(), "x");
359    }
360
361    #[test]
362    fn task_kind_experiment_roundtrip() {
363        assert_eq!(
364            TaskKind::from_str_kind("experiment"),
365            TaskKind::Experiment,
366            "from_str_kind must map 'experiment' to Experiment variant, not Custom"
367        );
368        assert_eq!(TaskKind::Experiment.as_str(), "experiment");
369    }
370
371    #[test]
372    fn normalize_five_field_prepends_zero() {
373        assert_eq!(normalize_cron_expr("*/5 * * * *"), "0 */5 * * * *");
374        assert_eq!(normalize_cron_expr("0 3 * * *"), "0 0 3 * * *");
375    }
376
377    #[test]
378    fn normalize_six_field_passthrough() {
379        assert_eq!(normalize_cron_expr("0 0 3 * * *"), "0 0 3 * * *");
380        assert_eq!(normalize_cron_expr("* * * * * *"), "* * * * * *");
381    }
382
383    #[test]
384    fn normalize_other_field_count_passthrough() {
385        assert_eq!(normalize_cron_expr("not_cron"), "not_cron");
386        assert_eq!(normalize_cron_expr("0 0 0 0"), "0 0 0 0");
387    }
388
389    #[test]
390    fn normalize_empty_string_passthrough() {
391        assert_eq!(normalize_cron_expr(""), "");
392    }
393
394    #[test]
395    fn normalize_whitespace_only_passthrough() {
396        assert_eq!(normalize_cron_expr("   "), "   ");
397    }
398
399    #[test]
400    fn valid_cron_creates_task() {
401        let task = ScheduledTask::new(
402            "test",
403            "0 0 * * * *",
404            TaskKind::HealthCheck,
405            serde_json::Value::Null,
406        );
407        assert!(task.is_ok());
408    }
409
410    #[test]
411    fn five_field_cron_creates_task() {
412        let task = ScheduledTask::new(
413            "five-field",
414            "*/5 * * * *",
415            TaskKind::HealthCheck,
416            serde_json::Value::Null,
417        );
418        assert!(task.is_ok(), "5-field cron must be accepted");
419    }
420
421    #[test]
422    fn invalid_cron_returns_error() {
423        let task = ScheduledTask::new(
424            "test",
425            "not_cron",
426            TaskKind::HealthCheck,
427            serde_json::Value::Null,
428        );
429        assert!(task.is_err());
430    }
431
432    #[test]
433    fn oneshot_task_creates_correctly() {
434        let run_at = Utc::now() + chrono::Duration::hours(1);
435        let task =
436            ScheduledTask::oneshot("t", run_at, TaskKind::HealthCheck, serde_json::Value::Null);
437        assert_eq!(task.task_mode_str(), "oneshot");
438        assert!(task.cron_schedule().is_none());
439    }
440
441    #[test]
442    fn periodic_task_mode_str() {
443        let task = ScheduledTask::periodic(
444            "p",
445            "0 * * * * *",
446            TaskKind::HealthCheck,
447            serde_json::Value::Null,
448        )
449        .unwrap();
450        assert_eq!(task.task_mode_str(), "periodic");
451        assert!(task.cron_schedule().is_some());
452    }
453}