Skip to main content

zeph_scheduler/
task.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::borrow::Cow;
5use std::future::Future;
6use std::pin::Pin;
7use std::str::FromStr;
8
9use chrono::{DateTime, Utc};
10use cron::Schedule as CronSchedule;
11
12use crate::error::SchedulerError;
13
14/// Normalise a cron expression to the 6-field format required by the `cron` crate.
15///
16/// Standard 5-field expressions (`min hour day month weekday`) are prepended with `"0 "` to
17/// default seconds to zero. 6-field expressions are passed through unchanged. Any other field
18/// count is also passed through unchanged and will produce an error from the `cron` crate at
19/// parse time.
20///
21/// # Examples
22///
23/// ```
24/// use zeph_scheduler::normalize_cron_expr;
25///
26/// // 5-field: seconds are defaulted to 0.
27/// assert_eq!(normalize_cron_expr("*/5 * * * *").as_ref(), "0 */5 * * * *");
28///
29/// // 6-field: passed through unchanged.
30/// assert_eq!(normalize_cron_expr("0 */5 * * * *").as_ref(), "0 */5 * * * *");
31/// ```
32#[must_use]
33pub fn normalize_cron_expr(expr: &str) -> Cow<'_, str> {
34    if expr.split_whitespace().count() == 5 {
35        Cow::Owned(format!("0 {expr}"))
36    } else {
37        Cow::Borrowed(expr)
38    }
39}
40
41/// Trust level assigned to a scheduled task, used by the RTW-A re-entry defense.
42///
43/// Provenance determines how strictly the tick-fence and injection-detection
44/// mechanisms are applied when a task is dispatched.
45///
46/// # Invariants
47///
48/// - `Static` tasks have their config set at binary startup and are never
49///   overwritten by DB writes between ticks.
50/// - `External` tasks originate from out-of-process writes (CLI, direct SQL)
51///   and are subject to the full quarantine and injection-detection pipeline.
52/// - `UserAdded` tasks are user-initiated via the control channel and
53///   receive a single-tick quarantine before their prompt enters the LLM.
54///
55/// # Examples
56///
57/// ```
58/// use zeph_scheduler::TaskProvenance;
59///
60/// assert_eq!(TaskProvenance::Static.as_str(), "static");
61/// assert_eq!(TaskProvenance::from_provenance_str("external"), TaskProvenance::External);
62/// assert_eq!(TaskProvenance::from_provenance_str("unknown_value"), TaskProvenance::External);
63/// ```
64#[derive(Debug, Clone, PartialEq, Eq)]
65#[non_exhaustive]
66pub enum TaskProvenance {
67    /// Registered at binary startup via [`crate::Scheduler::add_task`] — config is immutable.
68    Static,
69    /// Added via the runtime control channel (e.g. CLI `zeph schedule add`) — user-originated.
70    UserAdded,
71    /// Loaded from the DB on hydration or written by an external process — untrusted.
72    External,
73}
74
75impl TaskProvenance {
76    /// Return the stable persistence string for this provenance level.
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// use zeph_scheduler::TaskProvenance;
82    ///
83    /// assert_eq!(TaskProvenance::Static.as_str(), "static");
84    /// assert_eq!(TaskProvenance::UserAdded.as_str(), "user_added");
85    /// assert_eq!(TaskProvenance::External.as_str(), "external");
86    /// ```
87    #[must_use]
88    pub fn as_str(&self) -> &'static str {
89        match self {
90            Self::Static => "static",
91            Self::UserAdded => "user_added",
92            Self::External => "external",
93        }
94    }
95
96    /// Parse a provenance string from the database.
97    ///
98    /// Unknown strings default to [`TaskProvenance::External`] — the most restrictive
99    /// level — so future schema additions degrade safely.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use zeph_scheduler::TaskProvenance;
105    ///
106    /// assert_eq!(TaskProvenance::from_provenance_str("static"), TaskProvenance::Static);
107    /// assert_eq!(TaskProvenance::from_provenance_str("user_added"), TaskProvenance::UserAdded);
108    /// assert_eq!(TaskProvenance::from_provenance_str("external"), TaskProvenance::External);
109    /// // Unknown values fall back to External (most restrictive).
110    /// assert_eq!(TaskProvenance::from_provenance_str("hydrated"), TaskProvenance::External);
111    /// ```
112    #[must_use]
113    pub fn from_provenance_str(s: &str) -> Self {
114        match s {
115            "static" => Self::Static,
116            "user_added" => Self::UserAdded,
117            _ => Self::External,
118        }
119    }
120
121    /// Returns `true` if this task originated from a potentially untrusted external source.
122    ///
123    /// # Examples
124    ///
125    /// ```
126    /// use zeph_scheduler::TaskProvenance;
127    ///
128    /// assert!(TaskProvenance::External.is_external());
129    /// assert!(!TaskProvenance::Static.is_external());
130    /// assert!(!TaskProvenance::UserAdded.is_external());
131    /// ```
132    #[must_use]
133    pub fn is_external(&self) -> bool {
134        matches!(self, Self::External)
135    }
136}
137
138/// Identifies what type of work a scheduled task performs.
139///
140/// Built-in variants map to well-known agent subsystems. [`TaskKind::Custom`]
141/// carries an arbitrary string so callers can define their own task kinds without
142/// modifying this enum.
143///
144/// # Persistence
145///
146/// Each variant serialises to a stable `snake_case` string via [`TaskKind::as_str`]
147/// and deserialises via [`TaskKind::from_str_kind`]. These strings are stored in
148/// the `kind` column of the `scheduled_jobs` table.
149///
150/// # Examples
151///
152/// ```
153/// use zeph_scheduler::TaskKind;
154///
155/// assert_eq!(TaskKind::HealthCheck.as_str(), "health_check");
156/// assert_eq!(TaskKind::from_str_kind("memory_cleanup"), TaskKind::MemoryCleanup);
157/// assert_eq!(TaskKind::from_str_kind("my_custom"), TaskKind::Custom("my_custom".into()));
158/// ```
159#[non_exhaustive]
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub enum TaskKind {
162    /// Triggers the memory subsystem's cleanup / compaction routine.
163    MemoryCleanup,
164    /// Reloads skills from the skill registry.
165    SkillRefresh,
166    /// Runs a liveness or readiness probe for the agent.
167    HealthCheck,
168    /// Checks the GitHub releases API for a newer Zeph version.
169    UpdateCheck,
170    /// Runs an experiment task (used by `zeph-experiments`).
171    Experiment,
172    /// An application-defined task kind. The string is the persistence key.
173    Custom(String),
174}
175
176impl TaskKind {
177    /// Parse a task kind from its persistence string.
178    ///
179    /// Unknown strings are wrapped in [`TaskKind::Custom`] rather than returning
180    /// an error, so new built-in variants added in future versions do not break
181    /// existing stored jobs loaded with an older build.
182    ///
183    /// # Examples
184    ///
185    /// ```
186    /// use zeph_scheduler::TaskKind;
187    ///
188    /// assert_eq!(TaskKind::from_str_kind("health_check"), TaskKind::HealthCheck);
189    /// assert_eq!(TaskKind::from_str_kind("unknown"), TaskKind::Custom("unknown".into()));
190    /// ```
191    #[must_use]
192    pub fn from_str_kind(s: &str) -> Self {
193        match s {
194            "memory_cleanup" => Self::MemoryCleanup,
195            "skill_refresh" => Self::SkillRefresh,
196            "health_check" => Self::HealthCheck,
197            "update_check" => Self::UpdateCheck,
198            "experiment" => Self::Experiment,
199            other => Self::Custom(other.to_owned()),
200        }
201    }
202
203    /// Return the stable string key used for database persistence.
204    ///
205    /// # Examples
206    ///
207    /// ```
208    /// use zeph_scheduler::TaskKind;
209    ///
210    /// assert_eq!(TaskKind::SkillRefresh.as_str(), "skill_refresh");
211    /// assert_eq!(TaskKind::Custom("my_job".into()).as_str(), "my_job");
212    /// ```
213    #[must_use]
214    pub fn as_str(&self) -> &str {
215        match self {
216            Self::MemoryCleanup => "memory_cleanup",
217            Self::SkillRefresh => "skill_refresh",
218            Self::HealthCheck => "health_check",
219            Self::UpdateCheck => "update_check",
220            Self::Experiment => "experiment",
221            Self::Custom(s) => s,
222        }
223    }
224}
225
226/// Execution mode for a scheduled task.
227///
228/// Determines how the scheduler decides when to run a task and what to do after it
229/// completes:
230///
231/// - [`TaskMode::Periodic`] re-computes `next_run` from the cron schedule after
232///   each successful execution and never removes the task from memory.
233/// - [`TaskMode::OneShot`] fires once when `now >= run_at` and then removes the
234///   task from the in-memory task list and marks it `done` in the store.
235#[non_exhaustive]
236pub enum TaskMode {
237    /// Run on a repeating cron schedule.
238    Periodic {
239        /// Parsed cron schedule that drives `next_run` computation.
240        schedule: Box<CronSchedule>,
241    },
242    /// Run once at the specified UTC timestamp.
243    OneShot {
244        /// The earliest UTC time at which the task should execute.
245        run_at: DateTime<Utc>,
246    },
247}
248
249/// Descriptor sent over the control channel to register tasks at runtime.
250///
251/// Send a `SchedulerMessage::Add` wrapping a boxed `TaskDescriptor` to add a
252/// new task (or replace an existing one with the same name) without stopping the
253/// scheduler loop.
254pub struct TaskDescriptor {
255    /// Unique name for the task. Replaces any existing task with the same name.
256    pub name: String,
257    /// Execution mode (periodic or one-shot).
258    pub mode: TaskMode,
259    /// The category of work this task performs.
260    pub kind: TaskKind,
261    /// Arbitrary JSON configuration forwarded to the [`TaskHandler`] at execution time.
262    pub config: serde_json::Value,
263    /// Trust level for RTW-A re-entry defense.
264    ///
265    /// Tasks sent via the runtime channel default to [`TaskProvenance::UserAdded`].
266    pub provenance: TaskProvenance,
267}
268
269/// A task held in memory by the [`crate::Scheduler`].
270///
271/// Use [`ScheduledTask::new`] / [`ScheduledTask::periodic`] for cron-based tasks
272/// and [`ScheduledTask::oneshot`] for tasks that run at a fixed point in time.
273///
274/// # Examples
275///
276/// ```
277/// use zeph_scheduler::{ScheduledTask, TaskKind};
278///
279/// let task = ScheduledTask::new(
280///     "daily-cleanup",
281///     "0 3 * * *",           // every day at 03:00 UTC (5-field cron)
282///     TaskKind::MemoryCleanup,
283///     serde_json::Value::Null,
284/// )
285/// .expect("valid cron expression");
286///
287/// assert_eq!(task.task_mode_str(), "periodic");
288/// assert!(task.cron_schedule().is_some());
289/// ```
290pub struct ScheduledTask {
291    /// Unique task name used as the primary key in the job store.
292    pub name: String,
293    /// Execution mode (periodic or one-shot).
294    pub mode: TaskMode,
295    /// The category of work this task performs.
296    pub kind: TaskKind,
297    /// Arbitrary JSON configuration forwarded to the [`TaskHandler`] at execution time.
298    pub config: serde_json::Value,
299    /// Trust level for RTW-A re-entry defense.
300    pub provenance: TaskProvenance,
301}
302
303impl ScheduledTask {
304    /// Create a new periodic task from a cron expression string.
305    ///
306    /// The resulting task has [`TaskProvenance::Static`] provenance.
307    ///
308    /// # Errors
309    ///
310    /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
311    pub fn new(
312        name: impl Into<String>,
313        cron_expr: &str,
314        kind: TaskKind,
315        config: serde_json::Value,
316    ) -> Result<Self, SchedulerError> {
317        Self::periodic(name, cron_expr, kind, config)
318    }
319
320    /// Create a periodic task from a cron expression.
321    ///
322    /// The resulting task has [`TaskProvenance::Static`] provenance. To create a task with
323    /// different provenance, use [`ScheduledTask::periodic_with_provenance`].
324    ///
325    /// # Errors
326    ///
327    /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
328    pub fn periodic(
329        name: impl Into<String>,
330        cron_expr: &str,
331        kind: TaskKind,
332        config: serde_json::Value,
333    ) -> Result<Self, SchedulerError> {
334        Self::periodic_with_provenance(name, cron_expr, kind, config, TaskProvenance::Static)
335    }
336
337    /// Create a periodic task from a cron expression with explicit provenance.
338    ///
339    /// # Errors
340    ///
341    /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
342    pub fn periodic_with_provenance(
343        name: impl Into<String>,
344        cron_expr: &str,
345        kind: TaskKind,
346        config: serde_json::Value,
347        provenance: TaskProvenance,
348    ) -> Result<Self, SchedulerError> {
349        let normalized = normalize_cron_expr(cron_expr);
350        let schedule = CronSchedule::from_str(&normalized)
351            .map_err(|e| SchedulerError::InvalidCron(format!("{cron_expr}: {e}")))?;
352        Ok(Self {
353            name: name.into(),
354            mode: TaskMode::Periodic {
355                schedule: Box::new(schedule),
356            },
357            kind,
358            config,
359            provenance,
360        })
361    }
362
363    /// Create a one-shot task that runs at a specific point in time.
364    ///
365    /// The resulting task has [`TaskProvenance::Static`] provenance.
366    #[must_use]
367    pub fn oneshot(
368        name: impl Into<String>,
369        run_at: DateTime<Utc>,
370        kind: TaskKind,
371        config: serde_json::Value,
372    ) -> Self {
373        Self {
374            name: name.into(),
375            mode: TaskMode::OneShot { run_at },
376            kind,
377            config,
378            provenance: TaskProvenance::Static,
379        }
380    }
381
382    /// Returns the cron schedule if this is a periodic task.
383    #[must_use]
384    pub fn cron_schedule(&self) -> Option<&CronSchedule> {
385        if let TaskMode::Periodic { schedule } = &self.mode {
386            Some(schedule.as_ref())
387        } else {
388            None
389        }
390    }
391
392    /// Returns the canonical 6-field cron expression string for DB persistence.
393    ///
394    /// Returns an empty string for one-shot tasks, which do not have a cron schedule.
395    #[must_use]
396    pub fn cron_expr_string(&self) -> String {
397        match &self.mode {
398            TaskMode::Periodic { schedule } => schedule.to_string(),
399            TaskMode::OneShot { .. } => String::new(),
400        }
401    }
402
403    /// Returns the `task_mode` string used for DB persistence.
404    ///
405    /// Returns `"periodic"` or `"oneshot"`.
406    #[must_use]
407    pub fn task_mode_str(&self) -> &'static str {
408        match &self.mode {
409            TaskMode::Periodic { .. } => "periodic",
410            TaskMode::OneShot { .. } => "oneshot",
411        }
412    }
413}
414
415/// Trait for types that can execute a scheduled task.
416///
417/// Implementations receive the per-task JSON configuration stored in
418/// [`ScheduledTask::config`] and return `Ok(())` on success or a
419/// [`SchedulerError`] on failure. Failures are logged as warnings; the scheduler
420/// continues running and will retry on the next due tick.
421///
422/// Because async trait methods in Edition 2024 require returning a pinned boxed
423/// future for object safety, implementations must wrap their async work in
424/// `Box::pin(async move { … })`.
425///
426/// # Example
427///
428/// ```rust
429/// use std::future::Future;
430/// use std::pin::Pin;
431/// use zeph_scheduler::{SchedulerError, TaskHandler};
432///
433/// struct NoopHandler;
434///
435/// impl TaskHandler for NoopHandler {
436///     fn execute(
437///         &self,
438///         _config: &serde_json::Value,
439///     ) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>> + Send + '_>> {
440///         Box::pin(async move { Ok(()) })
441///     }
442/// }
443/// ```
444pub trait TaskHandler: Send + Sync {
445    /// Execute the task with the provided configuration.
446    ///
447    /// # Errors
448    ///
449    /// Return [`SchedulerError::TaskFailed`] (or any other variant) to indicate
450    /// that the task could not complete successfully. The error is logged but does
451    /// not stop the scheduler.
452    fn execute(
453        &self,
454        config: &serde_json::Value,
455    ) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>> + Send + '_>>;
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn task_kind_roundtrip() {
464        assert_eq!(
465            TaskKind::from_str_kind("memory_cleanup"),
466            TaskKind::MemoryCleanup
467        );
468        assert_eq!(TaskKind::MemoryCleanup.as_str(), "memory_cleanup");
469        assert_eq!(
470            TaskKind::from_str_kind("skill_refresh"),
471            TaskKind::SkillRefresh
472        );
473        assert_eq!(TaskKind::SkillRefresh.as_str(), "skill_refresh");
474        assert_eq!(
475            TaskKind::from_str_kind("health_check"),
476            TaskKind::HealthCheck
477        );
478        assert_eq!(
479            TaskKind::from_str_kind("update_check"),
480            TaskKind::UpdateCheck
481        );
482        assert_eq!(TaskKind::UpdateCheck.as_str(), "update_check");
483        assert_eq!(
484            TaskKind::from_str_kind("custom_job"),
485            TaskKind::Custom("custom_job".into())
486        );
487        assert_eq!(TaskKind::Custom("x".into()).as_str(), "x");
488    }
489
490    #[test]
491    fn task_kind_experiment_roundtrip() {
492        assert_eq!(
493            TaskKind::from_str_kind("experiment"),
494            TaskKind::Experiment,
495            "from_str_kind must map 'experiment' to Experiment variant, not Custom"
496        );
497        assert_eq!(TaskKind::Experiment.as_str(), "experiment");
498    }
499
500    #[test]
501    fn normalize_five_field_prepends_zero() {
502        assert_eq!(normalize_cron_expr("*/5 * * * *"), "0 */5 * * * *");
503        assert_eq!(normalize_cron_expr("0 3 * * *"), "0 0 3 * * *");
504    }
505
506    #[test]
507    fn normalize_six_field_passthrough() {
508        assert_eq!(normalize_cron_expr("0 0 3 * * *"), "0 0 3 * * *");
509        assert_eq!(normalize_cron_expr("* * * * * *"), "* * * * * *");
510    }
511
512    #[test]
513    fn normalize_other_field_count_passthrough() {
514        assert_eq!(normalize_cron_expr("not_cron"), "not_cron");
515        assert_eq!(normalize_cron_expr("0 0 0 0"), "0 0 0 0");
516    }
517
518    #[test]
519    fn normalize_empty_string_passthrough() {
520        assert_eq!(normalize_cron_expr(""), "");
521    }
522
523    #[test]
524    fn normalize_whitespace_only_passthrough() {
525        assert_eq!(normalize_cron_expr("   "), "   ");
526    }
527
528    #[test]
529    fn valid_cron_creates_task() {
530        let task = ScheduledTask::new(
531            "test",
532            "0 0 * * * *",
533            TaskKind::HealthCheck,
534            serde_json::Value::Null,
535        );
536        assert!(task.is_ok());
537    }
538
539    #[test]
540    fn five_field_cron_creates_task() {
541        let task = ScheduledTask::new(
542            "five-field",
543            "*/5 * * * *",
544            TaskKind::HealthCheck,
545            serde_json::Value::Null,
546        );
547        assert!(task.is_ok(), "5-field cron must be accepted");
548    }
549
550    #[test]
551    fn invalid_cron_returns_error() {
552        let task = ScheduledTask::new(
553            "test",
554            "not_cron",
555            TaskKind::HealthCheck,
556            serde_json::Value::Null,
557        );
558        assert!(task.is_err());
559    }
560
561    #[test]
562    fn oneshot_task_creates_correctly() {
563        let run_at = Utc::now() + chrono::Duration::hours(1);
564        let task =
565            ScheduledTask::oneshot("t", run_at, TaskKind::HealthCheck, serde_json::Value::Null);
566        assert_eq!(task.task_mode_str(), "oneshot");
567        assert!(task.cron_schedule().is_none());
568    }
569
570    #[test]
571    fn periodic_task_mode_str() {
572        let task = ScheduledTask::periodic(
573            "p",
574            "0 * * * * *",
575            TaskKind::HealthCheck,
576            serde_json::Value::Null,
577        )
578        .unwrap();
579        assert_eq!(task.task_mode_str(), "periodic");
580        assert!(task.cron_schedule().is_some());
581    }
582
583    #[test]
584    fn task_provenance_roundtrip() {
585        assert_eq!(
586            TaskProvenance::from_provenance_str("static"),
587            TaskProvenance::Static
588        );
589        assert_eq!(
590            TaskProvenance::from_provenance_str("user_added"),
591            TaskProvenance::UserAdded
592        );
593        assert_eq!(
594            TaskProvenance::from_provenance_str("external"),
595            TaskProvenance::External
596        );
597        // Unknown values fall back to External (most restrictive fail-safe).
598        assert_eq!(
599            TaskProvenance::from_provenance_str("hydrated"),
600            TaskProvenance::External
601        );
602        assert_eq!(TaskProvenance::Static.as_str(), "static");
603        assert_eq!(TaskProvenance::UserAdded.as_str(), "user_added");
604        assert_eq!(TaskProvenance::External.as_str(), "external");
605    }
606
607    #[test]
608    fn task_provenance_is_external() {
609        assert!(TaskProvenance::External.is_external());
610        assert!(!TaskProvenance::Static.is_external());
611        assert!(!TaskProvenance::UserAdded.is_external());
612    }
613
614    #[test]
615    fn new_task_has_static_provenance() {
616        let task = ScheduledTask::new(
617            "test",
618            "0 * * * * *",
619            TaskKind::HealthCheck,
620            serde_json::Value::Null,
621        )
622        .unwrap();
623        assert_eq!(task.provenance, TaskProvenance::Static);
624    }
625
626    #[test]
627    fn periodic_with_provenance_sets_provenance() {
628        let task = ScheduledTask::periodic_with_provenance(
629            "ext",
630            "0 * * * * *",
631            TaskKind::HealthCheck,
632            serde_json::Value::Null,
633            TaskProvenance::External,
634        )
635        .unwrap();
636        assert_eq!(task.provenance, TaskProvenance::External);
637    }
638}