zeph_scheduler/task.rs
1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::borrow::Cow;
5use std::future::Future;
6use std::pin::Pin;
7use std::str::FromStr;
8
9use chrono::{DateTime, Utc};
10use cron::Schedule as CronSchedule;
11
12use crate::error::SchedulerError;
13
14/// Normalise a cron expression to the 6-field format required by the `cron` crate.
15///
16/// Standard 5-field expressions (`min hour day month weekday`) are prepended with `"0 "` to
17/// default seconds to zero. 6-field expressions are passed through unchanged. Any other field
18/// count is also passed through unchanged and will produce an error from the `cron` crate at
19/// parse time.
20///
21/// # Examples
22///
23/// ```
24/// use zeph_scheduler::normalize_cron_expr;
25///
26/// // 5-field: seconds are defaulted to 0.
27/// assert_eq!(normalize_cron_expr("*/5 * * * *").as_ref(), "0 */5 * * * *");
28///
29/// // 6-field: passed through unchanged.
30/// assert_eq!(normalize_cron_expr("0 */5 * * * *").as_ref(), "0 */5 * * * *");
31/// ```
32#[must_use]
33pub fn normalize_cron_expr(expr: &str) -> Cow<'_, str> {
34 if expr.split_whitespace().count() == 5 {
35 Cow::Owned(format!("0 {expr}"))
36 } else {
37 Cow::Borrowed(expr)
38 }
39}
40
41/// Trust level assigned to a scheduled task, used by the RTW-A re-entry defense.
42///
43/// Provenance determines how strictly the tick-fence and injection-detection
44/// mechanisms are applied when a task is dispatched.
45///
46/// # Invariants
47///
48/// - `Static` tasks have their config set at binary startup and are never
49/// overwritten by DB writes between ticks.
50/// - `External` tasks originate from out-of-process writes (CLI, direct SQL)
51/// and are subject to the full quarantine and injection-detection pipeline.
52/// - `UserAdded` tasks are user-initiated via the control channel and
53/// receive a single-tick quarantine before their prompt enters the LLM.
54///
55/// # Examples
56///
57/// ```
58/// use zeph_scheduler::TaskProvenance;
59///
60/// assert_eq!(TaskProvenance::Static.as_str(), "static");
61/// assert_eq!(TaskProvenance::from_provenance_str("external"), TaskProvenance::External);
62/// assert_eq!(TaskProvenance::from_provenance_str("unknown_value"), TaskProvenance::External);
63/// ```
64#[derive(Debug, Clone, PartialEq, Eq)]
65#[non_exhaustive]
66pub enum TaskProvenance {
67 /// Registered at binary startup via [`crate::Scheduler::add_task`] — config is immutable.
68 Static,
69 /// Added via the runtime control channel (e.g. CLI `zeph schedule add`) — user-originated.
70 UserAdded,
71 /// Loaded from the DB on hydration or written by an external process — untrusted.
72 External,
73}
74
75impl TaskProvenance {
76 /// Return the stable persistence string for this provenance level.
77 ///
78 /// # Examples
79 ///
80 /// ```
81 /// use zeph_scheduler::TaskProvenance;
82 ///
83 /// assert_eq!(TaskProvenance::Static.as_str(), "static");
84 /// assert_eq!(TaskProvenance::UserAdded.as_str(), "user_added");
85 /// assert_eq!(TaskProvenance::External.as_str(), "external");
86 /// ```
87 #[must_use]
88 pub fn as_str(&self) -> &'static str {
89 match self {
90 Self::Static => "static",
91 Self::UserAdded => "user_added",
92 Self::External => "external",
93 }
94 }
95
96 /// Parse a provenance string from the database.
97 ///
98 /// Unknown strings default to [`TaskProvenance::External`] — the most restrictive
99 /// level — so future schema additions degrade safely.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// use zeph_scheduler::TaskProvenance;
105 ///
106 /// assert_eq!(TaskProvenance::from_provenance_str("static"), TaskProvenance::Static);
107 /// assert_eq!(TaskProvenance::from_provenance_str("user_added"), TaskProvenance::UserAdded);
108 /// assert_eq!(TaskProvenance::from_provenance_str("external"), TaskProvenance::External);
109 /// // Unknown values fall back to External (most restrictive).
110 /// assert_eq!(TaskProvenance::from_provenance_str("hydrated"), TaskProvenance::External);
111 /// ```
112 #[must_use]
113 pub fn from_provenance_str(s: &str) -> Self {
114 match s {
115 "static" => Self::Static,
116 "user_added" => Self::UserAdded,
117 _ => Self::External,
118 }
119 }
120
121 /// Returns `true` if this task originated from a potentially untrusted external source.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// use zeph_scheduler::TaskProvenance;
127 ///
128 /// assert!(TaskProvenance::External.is_external());
129 /// assert!(!TaskProvenance::Static.is_external());
130 /// assert!(!TaskProvenance::UserAdded.is_external());
131 /// ```
132 #[must_use]
133 pub fn is_external(&self) -> bool {
134 matches!(self, Self::External)
135 }
136}
137
138/// Identifies what type of work a scheduled task performs.
139///
140/// Built-in variants map to well-known agent subsystems. [`TaskKind::Custom`]
141/// carries an arbitrary string so callers can define their own task kinds without
142/// modifying this enum.
143///
144/// # Persistence
145///
146/// Each variant serialises to a stable `snake_case` string via [`TaskKind::as_str`]
147/// and deserialises via [`TaskKind::from_str_kind`]. These strings are stored in
148/// the `kind` column of the `scheduled_jobs` table.
149///
150/// # Examples
151///
152/// ```
153/// use zeph_scheduler::TaskKind;
154///
155/// assert_eq!(TaskKind::HealthCheck.as_str(), "health_check");
156/// assert_eq!(TaskKind::from_str_kind("memory_cleanup"), TaskKind::MemoryCleanup);
157/// assert_eq!(TaskKind::from_str_kind("my_custom"), TaskKind::Custom("my_custom".into()));
158/// ```
159#[non_exhaustive]
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub enum TaskKind {
162 /// Triggers the memory subsystem's cleanup / compaction routine.
163 MemoryCleanup,
164 /// Reloads skills from the skill registry.
165 SkillRefresh,
166 /// Runs a liveness or readiness probe for the agent.
167 HealthCheck,
168 /// Checks the GitHub releases API for a newer Zeph version.
169 UpdateCheck,
170 /// Runs an experiment task (used by `zeph-experiments`).
171 Experiment,
172 /// An application-defined task kind. The string is the persistence key.
173 Custom(String),
174}
175
176impl TaskKind {
177 /// Parse a task kind from its persistence string.
178 ///
179 /// Unknown strings are wrapped in [`TaskKind::Custom`] rather than returning
180 /// an error, so new built-in variants added in future versions do not break
181 /// existing stored jobs loaded with an older build.
182 ///
183 /// # Examples
184 ///
185 /// ```
186 /// use zeph_scheduler::TaskKind;
187 ///
188 /// assert_eq!(TaskKind::from_str_kind("health_check"), TaskKind::HealthCheck);
189 /// assert_eq!(TaskKind::from_str_kind("unknown"), TaskKind::Custom("unknown".into()));
190 /// ```
191 #[must_use]
192 pub fn from_str_kind(s: &str) -> Self {
193 match s {
194 "memory_cleanup" => Self::MemoryCleanup,
195 "skill_refresh" => Self::SkillRefresh,
196 "health_check" => Self::HealthCheck,
197 "update_check" => Self::UpdateCheck,
198 "experiment" => Self::Experiment,
199 other => Self::Custom(other.to_owned()),
200 }
201 }
202
203 /// Return the stable string key used for database persistence.
204 ///
205 /// # Examples
206 ///
207 /// ```
208 /// use zeph_scheduler::TaskKind;
209 ///
210 /// assert_eq!(TaskKind::SkillRefresh.as_str(), "skill_refresh");
211 /// assert_eq!(TaskKind::Custom("my_job".into()).as_str(), "my_job");
212 /// ```
213 #[must_use]
214 pub fn as_str(&self) -> &str {
215 match self {
216 Self::MemoryCleanup => "memory_cleanup",
217 Self::SkillRefresh => "skill_refresh",
218 Self::HealthCheck => "health_check",
219 Self::UpdateCheck => "update_check",
220 Self::Experiment => "experiment",
221 Self::Custom(s) => s,
222 }
223 }
224}
225
226/// Execution mode for a scheduled task.
227///
228/// Determines how the scheduler decides when to run a task and what to do after it
229/// completes:
230///
231/// - [`TaskMode::Periodic`] re-computes `next_run` from the cron schedule after
232/// each successful execution and never removes the task from memory.
233/// - [`TaskMode::OneShot`] fires once when `now >= run_at` and then removes the
234/// task from the in-memory task list and marks it `done` in the store.
235#[non_exhaustive]
236pub enum TaskMode {
237 /// Run on a repeating cron schedule.
238 Periodic {
239 /// Parsed cron schedule that drives `next_run` computation.
240 schedule: Box<CronSchedule>,
241 },
242 /// Run once at the specified UTC timestamp.
243 OneShot {
244 /// The earliest UTC time at which the task should execute.
245 run_at: DateTime<Utc>,
246 },
247}
248
249/// Descriptor sent over the control channel to register tasks at runtime.
250///
251/// Send a `SchedulerMessage::Add` wrapping a boxed `TaskDescriptor` to add a
252/// new task (or replace an existing one with the same name) without stopping the
253/// scheduler loop.
254pub struct TaskDescriptor {
255 /// Unique name for the task. Replaces any existing task with the same name.
256 pub name: String,
257 /// Execution mode (periodic or one-shot).
258 pub mode: TaskMode,
259 /// The category of work this task performs.
260 pub kind: TaskKind,
261 /// Arbitrary JSON configuration forwarded to the [`TaskHandler`] at execution time.
262 pub config: serde_json::Value,
263 /// Trust level for RTW-A re-entry defense.
264 ///
265 /// Tasks sent via the runtime channel default to [`TaskProvenance::UserAdded`].
266 pub provenance: TaskProvenance,
267}
268
269/// A task held in memory by the [`crate::Scheduler`].
270///
271/// Use [`ScheduledTask::new`] / [`ScheduledTask::periodic`] for cron-based tasks
272/// and [`ScheduledTask::oneshot`] for tasks that run at a fixed point in time.
273///
274/// # Examples
275///
276/// ```
277/// use zeph_scheduler::{ScheduledTask, TaskKind};
278///
279/// let task = ScheduledTask::new(
280/// "daily-cleanup",
281/// "0 3 * * *", // every day at 03:00 UTC (5-field cron)
282/// TaskKind::MemoryCleanup,
283/// serde_json::Value::Null,
284/// )
285/// .expect("valid cron expression");
286///
287/// assert_eq!(task.task_mode_str(), "periodic");
288/// assert!(task.cron_schedule().is_some());
289/// ```
290pub struct ScheduledTask {
291 /// Unique task name used as the primary key in the job store.
292 pub name: String,
293 /// Execution mode (periodic or one-shot).
294 pub mode: TaskMode,
295 /// The category of work this task performs.
296 pub kind: TaskKind,
297 /// Arbitrary JSON configuration forwarded to the [`TaskHandler`] at execution time.
298 pub config: serde_json::Value,
299 /// Trust level for RTW-A re-entry defense.
300 pub provenance: TaskProvenance,
301}
302
303impl ScheduledTask {
304 /// Create a new periodic task from a cron expression string.
305 ///
306 /// The resulting task has [`TaskProvenance::Static`] provenance.
307 ///
308 /// # Errors
309 ///
310 /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
311 pub fn new(
312 name: impl Into<String>,
313 cron_expr: &str,
314 kind: TaskKind,
315 config: serde_json::Value,
316 ) -> Result<Self, SchedulerError> {
317 Self::periodic(name, cron_expr, kind, config)
318 }
319
320 /// Create a periodic task from a cron expression.
321 ///
322 /// The resulting task has [`TaskProvenance::Static`] provenance. To create a task with
323 /// different provenance, use [`ScheduledTask::periodic_with_provenance`].
324 ///
325 /// # Errors
326 ///
327 /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
328 pub fn periodic(
329 name: impl Into<String>,
330 cron_expr: &str,
331 kind: TaskKind,
332 config: serde_json::Value,
333 ) -> Result<Self, SchedulerError> {
334 Self::periodic_with_provenance(name, cron_expr, kind, config, TaskProvenance::Static)
335 }
336
337 /// Create a periodic task from a cron expression with explicit provenance.
338 ///
339 /// # Errors
340 ///
341 /// Returns `SchedulerError::InvalidCron` if the expression is not valid.
342 pub fn periodic_with_provenance(
343 name: impl Into<String>,
344 cron_expr: &str,
345 kind: TaskKind,
346 config: serde_json::Value,
347 provenance: TaskProvenance,
348 ) -> Result<Self, SchedulerError> {
349 let normalized = normalize_cron_expr(cron_expr);
350 let schedule = CronSchedule::from_str(&normalized)
351 .map_err(|e| SchedulerError::InvalidCron(format!("{cron_expr}: {e}")))?;
352 Ok(Self {
353 name: name.into(),
354 mode: TaskMode::Periodic {
355 schedule: Box::new(schedule),
356 },
357 kind,
358 config,
359 provenance,
360 })
361 }
362
363 /// Create a one-shot task that runs at a specific point in time.
364 ///
365 /// The resulting task has [`TaskProvenance::Static`] provenance.
366 #[must_use]
367 pub fn oneshot(
368 name: impl Into<String>,
369 run_at: DateTime<Utc>,
370 kind: TaskKind,
371 config: serde_json::Value,
372 ) -> Self {
373 Self {
374 name: name.into(),
375 mode: TaskMode::OneShot { run_at },
376 kind,
377 config,
378 provenance: TaskProvenance::Static,
379 }
380 }
381
382 /// Returns the cron schedule if this is a periodic task.
383 #[must_use]
384 pub fn cron_schedule(&self) -> Option<&CronSchedule> {
385 if let TaskMode::Periodic { schedule } = &self.mode {
386 Some(schedule.as_ref())
387 } else {
388 None
389 }
390 }
391
392 /// Returns the canonical 6-field cron expression string for DB persistence.
393 ///
394 /// Returns an empty string for one-shot tasks, which do not have a cron schedule.
395 #[must_use]
396 pub fn cron_expr_string(&self) -> String {
397 match &self.mode {
398 TaskMode::Periodic { schedule } => schedule.to_string(),
399 TaskMode::OneShot { .. } => String::new(),
400 }
401 }
402
403 /// Returns the `task_mode` string used for DB persistence.
404 ///
405 /// Returns `"periodic"` or `"oneshot"`.
406 #[must_use]
407 pub fn task_mode_str(&self) -> &'static str {
408 match &self.mode {
409 TaskMode::Periodic { .. } => "periodic",
410 TaskMode::OneShot { .. } => "oneshot",
411 }
412 }
413}
414
415/// Trait for types that can execute a scheduled task.
416///
417/// Implementations receive the per-task JSON configuration stored in
418/// [`ScheduledTask::config`] and return `Ok(())` on success or a
419/// [`SchedulerError`] on failure. Failures are logged as warnings; the scheduler
420/// continues running and will retry on the next due tick.
421///
422/// Because async trait methods in Edition 2024 require returning a pinned boxed
423/// future for object safety, implementations must wrap their async work in
424/// `Box::pin(async move { … })`.
425///
426/// # Example
427///
428/// ```rust
429/// use std::future::Future;
430/// use std::pin::Pin;
431/// use zeph_scheduler::{SchedulerError, TaskHandler};
432///
433/// struct NoopHandler;
434///
435/// impl TaskHandler for NoopHandler {
436/// fn execute(
437/// &self,
438/// _config: &serde_json::Value,
439/// ) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>> + Send + '_>> {
440/// Box::pin(async move { Ok(()) })
441/// }
442/// }
443/// ```
444pub trait TaskHandler: Send + Sync {
445 /// Execute the task with the provided configuration.
446 ///
447 /// # Errors
448 ///
449 /// Return [`SchedulerError::TaskFailed`] (or any other variant) to indicate
450 /// that the task could not complete successfully. The error is logged but does
451 /// not stop the scheduler.
452 fn execute(
453 &self,
454 config: &serde_json::Value,
455 ) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>> + Send + '_>>;
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn task_kind_roundtrip() {
464 assert_eq!(
465 TaskKind::from_str_kind("memory_cleanup"),
466 TaskKind::MemoryCleanup
467 );
468 assert_eq!(TaskKind::MemoryCleanup.as_str(), "memory_cleanup");
469 assert_eq!(
470 TaskKind::from_str_kind("skill_refresh"),
471 TaskKind::SkillRefresh
472 );
473 assert_eq!(TaskKind::SkillRefresh.as_str(), "skill_refresh");
474 assert_eq!(
475 TaskKind::from_str_kind("health_check"),
476 TaskKind::HealthCheck
477 );
478 assert_eq!(
479 TaskKind::from_str_kind("update_check"),
480 TaskKind::UpdateCheck
481 );
482 assert_eq!(TaskKind::UpdateCheck.as_str(), "update_check");
483 assert_eq!(
484 TaskKind::from_str_kind("custom_job"),
485 TaskKind::Custom("custom_job".into())
486 );
487 assert_eq!(TaskKind::Custom("x".into()).as_str(), "x");
488 }
489
490 #[test]
491 fn task_kind_experiment_roundtrip() {
492 assert_eq!(
493 TaskKind::from_str_kind("experiment"),
494 TaskKind::Experiment,
495 "from_str_kind must map 'experiment' to Experiment variant, not Custom"
496 );
497 assert_eq!(TaskKind::Experiment.as_str(), "experiment");
498 }
499
500 #[test]
501 fn normalize_five_field_prepends_zero() {
502 assert_eq!(normalize_cron_expr("*/5 * * * *"), "0 */5 * * * *");
503 assert_eq!(normalize_cron_expr("0 3 * * *"), "0 0 3 * * *");
504 }
505
506 #[test]
507 fn normalize_six_field_passthrough() {
508 assert_eq!(normalize_cron_expr("0 0 3 * * *"), "0 0 3 * * *");
509 assert_eq!(normalize_cron_expr("* * * * * *"), "* * * * * *");
510 }
511
512 #[test]
513 fn normalize_other_field_count_passthrough() {
514 assert_eq!(normalize_cron_expr("not_cron"), "not_cron");
515 assert_eq!(normalize_cron_expr("0 0 0 0"), "0 0 0 0");
516 }
517
518 #[test]
519 fn normalize_empty_string_passthrough() {
520 assert_eq!(normalize_cron_expr(""), "");
521 }
522
523 #[test]
524 fn normalize_whitespace_only_passthrough() {
525 assert_eq!(normalize_cron_expr(" "), " ");
526 }
527
528 #[test]
529 fn valid_cron_creates_task() {
530 let task = ScheduledTask::new(
531 "test",
532 "0 0 * * * *",
533 TaskKind::HealthCheck,
534 serde_json::Value::Null,
535 );
536 assert!(task.is_ok());
537 }
538
539 #[test]
540 fn five_field_cron_creates_task() {
541 let task = ScheduledTask::new(
542 "five-field",
543 "*/5 * * * *",
544 TaskKind::HealthCheck,
545 serde_json::Value::Null,
546 );
547 assert!(task.is_ok(), "5-field cron must be accepted");
548 }
549
550 #[test]
551 fn invalid_cron_returns_error() {
552 let task = ScheduledTask::new(
553 "test",
554 "not_cron",
555 TaskKind::HealthCheck,
556 serde_json::Value::Null,
557 );
558 assert!(task.is_err());
559 }
560
561 #[test]
562 fn oneshot_task_creates_correctly() {
563 let run_at = Utc::now() + chrono::Duration::hours(1);
564 let task =
565 ScheduledTask::oneshot("t", run_at, TaskKind::HealthCheck, serde_json::Value::Null);
566 assert_eq!(task.task_mode_str(), "oneshot");
567 assert!(task.cron_schedule().is_none());
568 }
569
570 #[test]
571 fn periodic_task_mode_str() {
572 let task = ScheduledTask::periodic(
573 "p",
574 "0 * * * * *",
575 TaskKind::HealthCheck,
576 serde_json::Value::Null,
577 )
578 .unwrap();
579 assert_eq!(task.task_mode_str(), "periodic");
580 assert!(task.cron_schedule().is_some());
581 }
582
583 #[test]
584 fn task_provenance_roundtrip() {
585 assert_eq!(
586 TaskProvenance::from_provenance_str("static"),
587 TaskProvenance::Static
588 );
589 assert_eq!(
590 TaskProvenance::from_provenance_str("user_added"),
591 TaskProvenance::UserAdded
592 );
593 assert_eq!(
594 TaskProvenance::from_provenance_str("external"),
595 TaskProvenance::External
596 );
597 // Unknown values fall back to External (most restrictive fail-safe).
598 assert_eq!(
599 TaskProvenance::from_provenance_str("hydrated"),
600 TaskProvenance::External
601 );
602 assert_eq!(TaskProvenance::Static.as_str(), "static");
603 assert_eq!(TaskProvenance::UserAdded.as_str(), "user_added");
604 assert_eq!(TaskProvenance::External.as_str(), "external");
605 }
606
607 #[test]
608 fn task_provenance_is_external() {
609 assert!(TaskProvenance::External.is_external());
610 assert!(!TaskProvenance::Static.is_external());
611 assert!(!TaskProvenance::UserAdded.is_external());
612 }
613
614 #[test]
615 fn new_task_has_static_provenance() {
616 let task = ScheduledTask::new(
617 "test",
618 "0 * * * * *",
619 TaskKind::HealthCheck,
620 serde_json::Value::Null,
621 )
622 .unwrap();
623 assert_eq!(task.provenance, TaskProvenance::Static);
624 }
625
626 #[test]
627 fn periodic_with_provenance_sets_provenance() {
628 let task = ScheduledTask::periodic_with_provenance(
629 "ext",
630 "0 * * * * *",
631 TaskKind::HealthCheck,
632 serde_json::Value::Null,
633 TaskProvenance::External,
634 )
635 .unwrap();
636 assert_eq!(task.provenance, TaskProvenance::External);
637 }
638}