Skip to main content

zeph_scheduler/
scheduler.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6use std::time::Duration;
7#[allow(unused_imports)]
8use zeph_db::sql;
9
10use chrono::Utc;
11use tokio::sync::{Mutex, mpsc, watch};
12
13use crate::error::SchedulerError;
14use crate::sanitize::sanitize_task_prompt;
15use crate::store::JobStore;
16use crate::task::{ScheduledTask, TaskDescriptor, TaskHandler, TaskKind, TaskMode};
17
18/// Messages sent to the [`Scheduler`] over its control channel.
19///
20/// Obtain the sender from [`Scheduler::new`] or [`Scheduler::with_max_tasks`]
21/// and use it to add or cancel tasks while the scheduler loop is running.
22///
23/// # Examples
24///
25/// ```rust,no_run
26/// use tokio::sync::watch;
27/// use zeph_scheduler::{JobStore, Scheduler, SchedulerMessage, TaskDescriptor, TaskKind, TaskMode};
28/// use chrono::Utc;
29///
30/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
31/// let store = JobStore::open("sqlite:scheduler.db").await?;
32/// let (_shutdown_tx, shutdown_rx) = watch::channel(false);
33/// let (_scheduler, msg_tx) = Scheduler::new(store, shutdown_rx);
34///
35/// // Add a one-shot task that runs immediately.
36/// let desc = TaskDescriptor {
37///     name: "generate-report".into(),
38///     mode: TaskMode::OneShot { run_at: Utc::now() },
39///     kind: TaskKind::Custom("report".into()),
40///     config: serde_json::json!({"task": "Generate weekly report"}),
41/// };
42/// msg_tx.send(SchedulerMessage::Add(Box::new(desc))).await?;
43///
44/// // Cancel a previously registered task.
45/// msg_tx.send(SchedulerMessage::Cancel("generate-report".into())).await?;
46/// # Ok(())
47/// # }
48/// ```
49pub enum SchedulerMessage {
50    /// Register a new task (or replace an existing one with the same name).
51    Add(Box<TaskDescriptor>),
52    /// Cancel and delete the task with the given name.
53    Cancel(String),
54}
55
56/// Cron-based periodic task scheduler.
57///
58/// `Scheduler` owns the in-memory task list and drives execution on a configurable
59/// tick interval. It persists job state to `SQLite` via [`JobStore`] so task schedules
60/// survive restarts.
61///
62/// # Creation
63///
64/// Use [`Scheduler::new`] (defaults: 100-task cap, 60-second tick) or
65/// [`Scheduler::with_max_tasks`] to set a custom capacity.
66///
67/// # Registration
68///
69/// - **Before start**: call [`Scheduler::add_task`] and [`Scheduler::register_handler`].
70/// - **At runtime**: send [`SchedulerMessage::Add`] / [`SchedulerMessage::Cancel`]
71///   on the `mpsc::Sender` returned by the constructor.
72///
73/// # Lifecycle
74///
75/// ```text
76/// Scheduler::new()  →  add_task / register_handler  →  init()  →  run()
77///                                                                      │
78///                                                            shutdown_rx receives true
79///                                                                      │
80///                                                                    exit
81/// ```
82pub struct Scheduler {
83    tasks: Vec<ScheduledTask>,
84    store: JobStore,
85    handlers: HashMap<String, Box<dyn TaskHandler>>,
86    shutdown_rx: watch::Receiver<bool>,
87    task_rx: mpsc::Receiver<SchedulerMessage>,
88    /// Optional sender for injecting custom task prompts into the agent loop.
89    custom_task_tx: Option<mpsc::Sender<String>>,
90    max_tasks: usize,
91    /// Per-task execution mutex: task names of tasks currently being executed.
92    ///
93    /// SIGNIFICANT-5: prevents concurrent executions of the same task when the
94    /// handler is slow and `catch_up_missed` + `tick` overlap.
95    in_flight: Arc<Mutex<HashSet<String>>>,
96}
97
98impl Scheduler {
99    /// Create a scheduler with a default task cap of 100 and a 60-second tick interval.
100    ///
101    /// Returns `(Scheduler, sender)` where `sender` is used to add or cancel tasks at
102    /// runtime via [`SchedulerMessage`].
103    #[must_use]
104    pub fn new(
105        store: JobStore,
106        shutdown_rx: watch::Receiver<bool>,
107    ) -> (Self, mpsc::Sender<SchedulerMessage>) {
108        Self::with_max_tasks(store, shutdown_rx, 100)
109    }
110
111    /// Create a scheduler with a custom maximum number of concurrent tasks.
112    ///
113    /// Tasks arriving via the control channel when `max_tasks` is already reached are
114    /// silently dropped and a warning is emitted via `tracing`.
115    ///
116    /// Returns `(Scheduler, sender)` where `sender` is used to add or cancel tasks at
117    /// runtime via [`SchedulerMessage`].
118    #[must_use]
119    pub fn with_max_tasks(
120        store: JobStore,
121        shutdown_rx: watch::Receiver<bool>,
122        max_tasks: usize,
123    ) -> (Self, mpsc::Sender<SchedulerMessage>) {
124        let (tx, rx) = mpsc::channel(64);
125        let scheduler = Self {
126            tasks: Vec::new(),
127            store,
128            handlers: HashMap::new(),
129            shutdown_rx,
130            task_rx: rx,
131            custom_task_tx: None,
132            max_tasks,
133            in_flight: Arc::new(Mutex::new(HashSet::new())),
134        };
135        (scheduler, tx)
136    }
137
138    /// Attach a sender for injecting custom task prompts into the agent loop.
139    #[must_use]
140    pub fn with_custom_task_sender(mut self, tx: mpsc::Sender<String>) -> Self {
141        self.custom_task_tx = Some(tx);
142        self
143    }
144
145    /// Add a task to the scheduler.
146    ///
147    /// This method must be called before [`Scheduler::init`]. To add tasks while the
148    /// scheduler is already running, send a [`SchedulerMessage::Add`] on the control
149    /// channel instead.
150    pub fn add_task(&mut self, task: ScheduledTask) {
151        self.tasks.push(task);
152    }
153
154    /// Register a handler for tasks of the given kind.
155    ///
156    /// When a task is due, the scheduler looks up its [`TaskKind`]'s string key and
157    /// calls the matching handler. Tasks whose kind has no registered handler are
158    /// skipped with a debug-level log.
159    pub fn register_handler(&mut self, kind: &TaskKind, handler: Box<dyn TaskHandler>) {
160        self.handlers.insert(kind.as_str().to_owned(), handler);
161    }
162
163    /// Initialize the store, sync task definitions, compute initial `next_run` for each task,
164    /// and hydrate any CLI-added periodic jobs that live only in the DB back into `self.tasks`.
165    ///
166    /// Static tasks registered via [`Scheduler::add_task`] are upserted into the store first.
167    /// Then all periodic jobs stored in the DB that are not already present in `self.tasks`
168    /// (by name) are reconstructed from their persisted `cron_expr` and appended — this ensures
169    /// that jobs added via the CLI (which write directly to the store) are visible to
170    /// `tick` and [`Scheduler::catch_up_missed`] on the next startup.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if DB init, upsert, `next_run` persistence, or job listing fails.
175    pub async fn init(&mut self) -> Result<(), SchedulerError> {
176        self.store.init().await?;
177        let now = Utc::now();
178        for task in &self.tasks {
179            match &task.mode {
180                TaskMode::Periodic { schedule } => {
181                    self.store
182                        .upsert_job_with_mode(
183                            &task.name,
184                            &schedule.to_string(),
185                            task.kind.as_str(),
186                            "periodic",
187                            None,
188                            "",
189                        )
190                        .await?;
191                    // Always set next_run for periodic tasks if not already persisted.
192                    if self.store.get_next_run(&task.name).await?.is_none() {
193                        match schedule.after(&now).next() {
194                            Some(next) => {
195                                self.store
196                                    .set_next_run(&task.name, &next.to_rfc3339())
197                                    .await?;
198                            }
199                            None => {
200                                tracing::warn!(
201                                    task = %task.name,
202                                    "cron produces no future occurrence, skipping next_run"
203                                );
204                            }
205                        }
206                    }
207                }
208                TaskMode::OneShot { run_at } => {
209                    self.store
210                        .upsert_job_with_mode(
211                            &task.name,
212                            "",
213                            task.kind.as_str(),
214                            "oneshot",
215                            Some(&run_at.to_rfc3339()),
216                            "",
217                        )
218                        .await?;
219                }
220            }
221        }
222
223        // Hydrate periodic jobs added via CLI (or other out-of-process writers) that were
224        // persisted in the store but never registered in self.tasks. Without this step,
225        // tick() and catch_up_missed() silently ignore them on every restart.
226        let stored_jobs = self.store.list_jobs_full().await?;
227        // Collect owned strings to release the borrow on self.tasks before mutating it below.
228        let static_names: std::collections::HashSet<String> =
229            self.tasks.iter().map(|t| t.name.clone()).collect();
230
231        for job in stored_jobs {
232            if job.task_mode != "periodic" || static_names.contains(&job.name) {
233                continue;
234            }
235            match ScheduledTask::periodic(
236                job.name.clone(),
237                &job.cron_expr,
238                crate::task::TaskKind::from_str_kind(&job.kind),
239                serde_json::Value::Null,
240            ) {
241                Ok(task) => {
242                    // Compute next_run if not already stored (same logic as for static tasks).
243                    if self.store.get_next_run(&job.name).await?.is_none()
244                        && let Some(schedule) = task.cron_schedule()
245                    {
246                        match schedule.after(&now).next() {
247                            Some(next) => {
248                                if let Err(e) =
249                                    self.store.set_next_run(&job.name, &next.to_rfc3339()).await
250                                {
251                                    tracing::warn!(
252                                        task = %job.name,
253                                        "failed to persist next_run for hydrated job: {e}"
254                                    );
255                                }
256                            }
257                            None => {
258                                tracing::warn!(
259                                    task = %job.name,
260                                    "cron produces no future occurrence, skipping next_run"
261                                );
262                            }
263                        }
264                    }
265                    tracing::debug!(task = %job.name, "hydrated CLI-added periodic job from store");
266                    self.tasks.push(task);
267                }
268                Err(e) => {
269                    tracing::warn!(
270                        task = %job.name,
271                        cron_expr = %job.cron_expr,
272                        "skipping persisted job with invalid cron expression: {e}"
273                    );
274                }
275            }
276        }
277
278        Ok(())
279    }
280
281    /// Fire overdue periodic tasks once on startup, then advance their `next_run`.
282    ///
283    /// For each periodic task whose `next_run <= now`, the task is executed via
284    /// the registered handler exactly once. One-shot tasks are handled by the
285    /// normal `tick()` path and are NOT replayed here.
286    ///
287    /// SIGNIFICANT-5: uses the same `in_flight` mutex as `tick()` so that
288    /// `catch_up_missed` and a concurrent `tick()` cannot execute the same task.
289    ///
290    /// # Errors
291    ///
292    /// Returns the first error encountered during store or handler operations.
293    pub async fn catch_up_missed(&mut self) -> Result<(), SchedulerError> {
294        let _span =
295            tracing::info_span!("scheduler.daemon.catch_up", tasks = self.tasks.len()).entered();
296
297        let now = chrono::Utc::now();
298        let mut replayed = 0usize;
299
300        // Collect overdue periodic tasks first so we don't borrow self.tasks while executing.
301        let overdue: Vec<_> = {
302            let mut v = Vec::new();
303            for task in &self.tasks {
304                let TaskMode::Periodic { .. } = &task.mode else {
305                    continue;
306                };
307                if let Ok(Some(ref s)) = self.store.get_next_run(&task.name).await
308                    && s.parse::<chrono::DateTime<chrono::Utc>>()
309                        .is_ok_and(|dt| dt <= now)
310                {
311                    v.push(task.name.clone());
312                }
313            }
314            v
315        };
316
317        for name in &overdue {
318            // Per-task mutex: skip if already running (safety against overlap with tick).
319            {
320                let mut guard = self.in_flight.lock().await;
321                if guard.contains(name.as_str()) {
322                    tracing::debug!(task = %name, "catch_up_missed: task in-flight, skipping");
323                    continue;
324                }
325                guard.insert(name.clone());
326            }
327
328            let result = self.run_periodic_task_by_name(name, &now).await;
329
330            self.in_flight.lock().await.remove(name.as_str());
331
332            match result {
333                Ok(true) => replayed += 1,
334                Ok(false) => {}
335                Err(e) => tracing::warn!(task = %name, "catch_up_missed: handler error: {e}"),
336            }
337        }
338
339        tracing::info!(replayed, "catch_up_missed complete");
340        Ok(())
341    }
342
343    /// Execute a named periodic task and advance its `next_run`.
344    ///
345    /// Returns `Ok(true)` if the task was found and executed, `Ok(false)` if not found.
346    async fn run_periodic_task_by_name(
347        &self,
348        name: &str,
349        now: &chrono::DateTime<chrono::Utc>,
350    ) -> Result<bool, SchedulerError> {
351        let Some(task) = self.tasks.iter().find(|t| t.name == name) else {
352            return Ok(false);
353        };
354        let TaskMode::Periodic { schedule } = &task.mode else {
355            return Ok(false);
356        };
357        let Some(handler) = self.handlers.get(task.kind.as_str()) else {
358            tracing::debug!(task = %name, "catch_up_missed: no handler, skipping");
359            return Ok(false);
360        };
361
362        tracing::info!(task = %name, "catch_up_missed: executing overdue task");
363        handler.execute(&task.config).await?;
364
365        let next = schedule
366            .after(now)
367            .next()
368            .map(|dt| dt.to_rfc3339())
369            .unwrap_or_default();
370        self.store
371            .record_run(name, &now.to_rfc3339(), &next)
372            .await?;
373        Ok(true)
374    }
375
376    /// Run the scheduler loop with a configurable tick interval and graceful shutdown window.
377    ///
378    /// The interval is clamped to `5..=3600` seconds. Missed ticks are skipped to avoid
379    /// burst storms. After the shutdown channel fires, in-flight ticks are allowed to
380    /// complete but no new ticks start. The `grace_secs` window gives handlers time to
381    /// finish before the function returns.
382    ///
383    /// The grace window is clamped to 60 seconds. Values above 60 have no additional effect.
384    /// Note: the sleep is a best-effort delay, not a join on in-flight handlers — handlers
385    /// that outlive the grace window are dropped, not awaited.
386    ///
387    /// The `grace_secs` parameter corresponds to `scheduler.daemon.shutdown_grace_secs`
388    /// in config (default 30). Pass 0 for immediate exit after shutdown signal.
389    pub async fn run_with_interval_and_grace(&mut self, tick_secs: u64, grace_secs: u64) {
390        let secs = tick_secs.clamp(5, 3600);
391        let mut interval = tokio::time::interval(Duration::from_secs(secs));
392        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
393
394        loop {
395            tokio::select! {
396                _ = interval.tick() => {
397                    let _tick_span = tracing::info_span!(
398                        "scheduler.daemon.tick",
399                        tasks = self.tasks.len()
400                    ).entered();
401                    self.drain_channel().await;
402                    self.tick().await;
403                }
404                _ = self.shutdown_rx.changed() => {
405                    if *self.shutdown_rx.borrow() {
406                        tracing::info!("scheduler shutting down (grace {}s)", grace_secs);
407                        if grace_secs > 0 {
408                            let deadline = tokio::time::Instant::now()
409                                + Duration::from_secs(grace_secs.min(60));
410                            loop {
411                                if self.in_flight.lock().await.is_empty() {
412                                    tracing::debug!("scheduler: no in-flight tasks, exiting immediately");
413                                    break;
414                                }
415                                if tokio::time::Instant::now() >= deadline {
416                                    tracing::warn!("scheduler: grace period elapsed with tasks still in-flight");
417                                    break;
418                                }
419                                tokio::time::sleep(Duration::from_millis(100)).await;
420                            }
421                        }
422                        break;
423                    }
424                }
425            }
426        }
427    }
428
429    /// Run the scheduler loop with a configurable tick interval.
430    ///
431    /// The interval is clamped to a minimum of 1 second. Missed ticks (caused by a
432    /// slow `tick()` call) are skipped instead of burst-replayed, preventing runaway
433    /// execution storms on slow hosts.
434    ///
435    /// This method runs until `true` is sent on the shutdown channel.
436    pub async fn run_with_interval(&mut self, tick_secs: u64) {
437        let secs = tick_secs.max(1);
438        let mut interval = tokio::time::interval(Duration::from_secs(secs));
439        // Skip missed ticks instead of bursting to catch up. Without this, a slow `tick()`
440        // call causes tokio to fire the interval in a tight loop to "catch up", producing
441        // hundreds of executions per second (#2737 leak 4).
442        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
443        loop {
444            tokio::select! {
445                _ = interval.tick() => {
446                    self.drain_channel().await;
447                    self.tick().await;
448                }
449                _ = self.shutdown_rx.changed() => {
450                    if *self.shutdown_rx.borrow() {
451                        tracing::info!("scheduler shutting down");
452                        break;
453                    }
454                }
455            }
456        }
457    }
458
459    /// Run the scheduler loop, checking for due tasks every 60 seconds.
460    ///
461    /// This is a convenience wrapper around [`Scheduler::run_with_interval`] with a
462    /// 60-second tick. It runs until `true` is sent on the shutdown channel.
463    pub async fn run(&mut self) {
464        let mut interval = tokio::time::interval(Duration::from_mins(1));
465        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
466        loop {
467            tokio::select! {
468                _ = interval.tick() => {
469                    self.drain_channel().await;
470                    self.tick().await;
471                }
472                _ = self.shutdown_rx.changed() => {
473                    if *self.shutdown_rx.borrow() {
474                        tracing::info!("scheduler shutting down");
475                        break;
476                    }
477                }
478            }
479        }
480    }
481
482    async fn drain_channel(&mut self) {
483        while let Ok(msg) = self.task_rx.try_recv() {
484            match msg {
485                SchedulerMessage::Add(boxed) => {
486                    let desc = *boxed;
487                    self.register_descriptor(desc).await;
488                }
489                SchedulerMessage::Cancel(name) => {
490                    self.tasks.retain(|t| t.name != name);
491                    if let Err(e) = self.store.delete_job(&name).await {
492                        tracing::warn!(task = %name, "failed to delete job from store: {e}");
493                    }
494                }
495            }
496        }
497    }
498
499    async fn register_descriptor(&mut self, desc: TaskDescriptor) {
500        // Check capacity only when adding a new task (upsert of existing name does not count).
501        let is_new = !self.tasks.iter().any(|t| t.name == desc.name);
502        if is_new && self.tasks.len() >= self.max_tasks {
503            tracing::warn!(
504                task = %desc.name,
505                max_tasks = self.max_tasks,
506                "max_tasks limit reached, dropping task"
507            );
508            return;
509        }
510        let now = Utc::now();
511        match &desc.mode {
512            TaskMode::Periodic { schedule } => {
513                if let Err(e) = self
514                    .store
515                    .upsert_job_with_mode(
516                        &desc.name,
517                        &schedule.to_string(),
518                        desc.kind.as_str(),
519                        "periodic",
520                        None,
521                        "",
522                    )
523                    .await
524                {
525                    tracing::warn!(task = %desc.name, "failed to upsert job: {e}");
526                    return;
527                }
528                if let Some(next) = schedule.after(&now).next() {
529                    let _ = self
530                        .store
531                        .set_next_run(&desc.name, &next.to_rfc3339())
532                        .await;
533                }
534            }
535            TaskMode::OneShot { run_at } => {
536                if let Err(e) = self
537                    .store
538                    .upsert_job_with_mode(
539                        &desc.name,
540                        "",
541                        desc.kind.as_str(),
542                        "oneshot",
543                        Some(&run_at.to_rfc3339()),
544                        "",
545                    )
546                    .await
547                {
548                    tracing::warn!(task = %desc.name, "failed to upsert oneshot job: {e}");
549                    return;
550                }
551            }
552        }
553        // Remove old entry with same name if present.
554        self.tasks.retain(|t| t.name != desc.name);
555        self.tasks.push(ScheduledTask {
556            name: desc.name,
557            mode: desc.mode,
558            kind: desc.kind,
559            config: desc.config,
560        });
561    }
562
563    async fn tick(&mut self) {
564        let now = Utc::now();
565        let mut completed_oneshots: Vec<String> = Vec::new();
566
567        for task in &self.tasks {
568            let should_run = match &task.mode {
569                TaskMode::Periodic { .. } => {
570                    match self.store.get_next_run(&task.name).await {
571                        Ok(Some(ref s)) => {
572                            s.parse::<chrono::DateTime<Utc>>().is_ok_and(|dt| dt <= now)
573                        }
574                        // PERF-SC-04 fix: missing next_run must not mean "fire now".
575                        // Compute and persist next occurrence, then skip this tick.
576                        Ok(None) => {
577                            if let Some(schedule) = task.cron_schedule()
578                                && let Some(next) = schedule.after(&now).next()
579                            {
580                                let _ = self
581                                    .store
582                                    .set_next_run(&task.name, &next.to_rfc3339())
583                                    .await;
584                            }
585                            false
586                        }
587                        Err(e) => {
588                            tracing::warn!(task = %task.name, "failed to check next_run: {e}");
589                            false
590                        }
591                    }
592                }
593                TaskMode::OneShot { run_at } => *run_at <= now,
594            };
595
596            if should_run {
597                if let Some(handler) = self.handlers.get(task.kind.as_str()) {
598                    tracing::info!(task = %task.name, kind = task.kind.as_str(), "executing task");
599                    match handler.execute(&task.config).await {
600                        Ok(()) => match &task.mode {
601                            TaskMode::Periodic { schedule } => {
602                                let next = schedule
603                                    .after(&now)
604                                    .next()
605                                    .map(|dt| dt.to_rfc3339())
606                                    .unwrap_or_default();
607                                if let Err(e) = self
608                                    .store
609                                    .record_run(&task.name, &now.to_rfc3339(), &next)
610                                    .await
611                                {
612                                    tracing::warn!(task = %task.name, "failed to record run: {e}");
613                                }
614                            }
615                            TaskMode::OneShot { .. } => {
616                                if let Err(e) = self.store.mark_done(&task.name).await {
617                                    tracing::warn!(task = %task.name, "failed to mark done: {e}");
618                                }
619                                completed_oneshots.push(task.name.clone());
620                            }
621                        },
622                        Err(e) => {
623                            tracing::warn!(task = %task.name, "task execution failed: {e}");
624                        }
625                    }
626                } else if let TaskMode::OneShot { .. } = &task.mode {
627                    // Dual-path for custom oneshot tasks without a registered handler:
628                    // when `CustomTaskHandler` is registered it handles the task via the
629                    // handler interface above.  This branch is a fallback that injects the
630                    // prompt directly into the agent loop through `custom_task_tx` for cases
631                    // where no handler was registered (e.g. scheduler created without one).
632                    if let (TaskKind::Custom(_), Some(tx)) = (&task.kind, &self.custom_task_tx) {
633                        let raw =
634                            task.config.get("task").and_then(|v| v.as_str()).unwrap_or(
635                                "Execute the following scheduled task now: check status",
636                            );
637                        let prompt = sanitize_task_prompt(raw);
638                        let _ = tx.try_send(prompt);
639                        if let Err(e) = self.store.mark_done(&task.name).await {
640                            tracing::warn!(task = %task.name, "failed to mark done: {e}");
641                        }
642                        completed_oneshots.push(task.name.clone());
643                    } else {
644                        tracing::debug!(
645                            task = %task.name,
646                            kind = task.kind.as_str(),
647                            "no handler registered"
648                        );
649                    }
650                } else {
651                    tracing::debug!(task = %task.name, kind = task.kind.as_str(), "no handler registered");
652                }
653            }
654        }
655
656        // Remove completed one-shot tasks from memory.
657        self.tasks.retain(|t| !completed_oneshots.contains(&t.name));
658    }
659}
660
661#[cfg(test)]
662mod tests {
663    use std::pin::Pin;
664    use std::sync::Arc;
665    use std::sync::atomic::{AtomicU32, Ordering};
666
667    use chrono::Duration;
668
669    use super::*;
670    use crate::task::TaskHandler;
671    use zeph_db::DbPool;
672
673    struct CountingHandler {
674        count: Arc<AtomicU32>,
675    }
676
677    impl TaskHandler for CountingHandler {
678        fn execute(
679            &self,
680            _config: &serde_json::Value,
681        ) -> Pin<Box<dyn std::future::Future<Output = Result<(), SchedulerError>> + Send + '_>>
682        {
683            let count = self.count.clone();
684            Box::pin(async move {
685                count.fetch_add(1, Ordering::Relaxed);
686                Ok(())
687            })
688        }
689    }
690
691    async fn test_pool() -> DbPool {
692        zeph_db::sqlx::SqlitePool::connect("sqlite::memory:")
693            .await
694            .unwrap()
695    }
696
697    #[tokio::test]
698    async fn scheduler_init_and_tick() {
699        let pool = test_pool().await;
700        let store = JobStore::new(pool.clone());
701        let (_tx, rx) = watch::channel(false);
702        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
703
704        let task = ScheduledTask::new(
705            "test",
706            "* * * * * *",
707            TaskKind::HealthCheck,
708            serde_json::Value::Null,
709        )
710        .unwrap();
711        scheduler.add_task(task);
712
713        let count = Arc::new(AtomicU32::new(0));
714        scheduler.register_handler(
715            &TaskKind::HealthCheck,
716            Box::new(CountingHandler {
717                count: count.clone(),
718            }),
719        );
720
721        scheduler.init().await.unwrap();
722
723        // Backdate next_run to simulate a due task.
724        zeph_db::query(sql!(
725            "UPDATE scheduled_jobs SET next_run = '2000-01-01T00:00:00+00:00' WHERE name = 'test'"
726        ))
727        .execute(&pool)
728        .await
729        .unwrap();
730
731        scheduler.tick().await;
732        assert_eq!(count.load(Ordering::Relaxed), 1);
733    }
734
735    /// PERF-SC-04 regression: a task with no `next_run` must not fire.
736    #[tokio::test]
737    async fn tick_does_not_fire_without_next_run() {
738        let pool = test_pool().await;
739        let store = JobStore::new(pool.clone());
740        let (_tx, rx) = watch::channel(false);
741        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
742
743        let task = ScheduledTask::new(
744            "yearly",
745            "0 0 1 1 * *",
746            TaskKind::HealthCheck,
747            serde_json::Value::Null,
748        )
749        .unwrap();
750        scheduler.add_task(task);
751
752        let count = Arc::new(AtomicU32::new(0));
753        scheduler.register_handler(
754            &TaskKind::HealthCheck,
755            Box::new(CountingHandler {
756                count: count.clone(),
757            }),
758        );
759
760        // Init the store but do NOT set next_run (simulate missing next_run).
761        scheduler.store.init().await.unwrap();
762        scheduler
763            .store
764            .upsert_job("yearly", "0 0 1 1 * *", "health_check")
765            .await
766            .unwrap();
767        // Explicitly clear next_run to ensure it's NULL.
768        zeph_db::query(sql!(
769            "UPDATE scheduled_jobs SET next_run = NULL WHERE name = 'yearly'"
770        ))
771        .execute(&pool)
772        .await
773        .unwrap();
774
775        scheduler.tick().await;
776        assert_eq!(
777            count.load(Ordering::Relaxed),
778            0,
779            "task without next_run must not fire (PERF-SC-04)"
780        );
781    }
782
783    /// After `init()`, every periodic task must have a non-null `next_run`.
784    #[tokio::test]
785    async fn init_always_sets_next_run() {
786        let pool = test_pool().await;
787        let store = JobStore::new(pool.clone());
788        let (_tx, rx) = watch::channel(false);
789        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
790
791        let task = ScheduledTask::new(
792            "periodic",
793            "0 * * * * *",
794            TaskKind::HealthCheck,
795            serde_json::Value::Null,
796        )
797        .unwrap();
798        scheduler.add_task(task);
799        scheduler.init().await.unwrap();
800
801        let next: Option<String> = zeph_db::query_scalar(sql!(
802            "SELECT next_run FROM scheduled_jobs WHERE name = 'periodic'"
803        ))
804        .fetch_optional(&pool)
805        .await
806        .unwrap()
807        .flatten();
808        assert!(
809            next.is_some(),
810            "next_run must be set after init() for periodic task"
811        );
812    }
813
814    /// A task whose `next_run` is in the future must not fire.
815    #[tokio::test]
816    async fn task_does_not_fire_before_next_run() {
817        let pool = test_pool().await;
818        let store = JobStore::new(pool.clone());
819        let (_tx, rx) = watch::channel(false);
820        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
821
822        let task = ScheduledTask::new(
823            "future",
824            "0 0 1 1 * *", // once a year
825            TaskKind::HealthCheck,
826            serde_json::Value::Null,
827        )
828        .unwrap();
829        scheduler.add_task(task);
830
831        let count = Arc::new(AtomicU32::new(0));
832        scheduler.register_handler(
833            &TaskKind::HealthCheck,
834            Box::new(CountingHandler {
835                count: count.clone(),
836            }),
837        );
838
839        scheduler.init().await.unwrap();
840
841        // Manually set next_run to far future to prevent firing.
842        let far_future = "2099-01-01T00:00:00+00:00";
843        zeph_db::query(sql!(
844            "UPDATE scheduled_jobs SET next_run = ? WHERE name = 'future'"
845        ))
846        .bind(far_future)
847        .execute(&pool)
848        .await
849        .unwrap();
850
851        scheduler.tick().await;
852        assert_eq!(
853            count.load(Ordering::Relaxed),
854            0,
855            "should not fire before next_run"
856        );
857    }
858
859    /// After a task fires, `next_run` is advanced to the following occurrence.
860    #[tokio::test]
861    async fn next_run_advances_after_execution() {
862        let pool = test_pool().await;
863        let store = JobStore::new(pool.clone());
864        let (_tx, rx) = watch::channel(false);
865        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
866
867        let task = ScheduledTask::new(
868            "adv",
869            "0 * * * * *",
870            TaskKind::HealthCheck,
871            serde_json::Value::Null,
872        )
873        .unwrap();
874        scheduler.add_task(task);
875        scheduler.register_handler(
876            &TaskKind::HealthCheck,
877            Box::new(CountingHandler {
878                count: Arc::new(AtomicU32::new(0)),
879            }),
880        );
881
882        scheduler.init().await.unwrap();
883
884        // Backdate next_run to force execution.
885        zeph_db::query(sql!(
886            "UPDATE scheduled_jobs SET next_run = '2000-01-01T00:00:00+00:00' WHERE name = 'adv'"
887        ))
888        .execute(&pool)
889        .await
890        .unwrap();
891
892        scheduler.tick().await;
893
894        // next_run must now be in the future.
895        let next: Option<String> = zeph_db::query_scalar(sql!(
896            "SELECT next_run FROM scheduled_jobs WHERE name = 'adv'"
897        ))
898        .fetch_optional(&pool)
899        .await
900        .unwrap()
901        .flatten();
902        let next_str = next.expect("next_run should be set after execution");
903        let next_dt = next_str
904            .parse::<chrono::DateTime<Utc>>()
905            .expect("should parse as RFC3339");
906        // The backdated value was 2000-01-01; after tick() the scheduler must have
907        // advanced next_run to a future occurrence (at least year 2001+).
908        // We avoid comparing against Utc::now() here because on slow CI hosts
909        // (e.g. Windows) a per-second cron can tick past the assertion window.
910        let epoch_2001 = chrono::DateTime::parse_from_rfc3339("2001-01-01T00:00:00+00:00")
911            .expect("static parse")
912            .with_timezone(&Utc);
913        assert!(
914            next_dt > epoch_2001,
915            "next_run must have advanced beyond the backdated value after firing"
916        );
917    }
918
919    #[tokio::test]
920    async fn scheduler_shutdown() {
921        let pool = test_pool().await;
922        let store = JobStore::new(pool);
923        let (tx, rx) = watch::channel(false);
924        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
925        scheduler.init().await.unwrap();
926
927        let handle = tokio::spawn(async move { scheduler.run().await });
928        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
929        let _ = tx.send(true);
930        tokio::time::timeout(std::time::Duration::from_secs(2), handle)
931            .await
932            .expect("scheduler should stop")
933            .expect("task should complete");
934    }
935
936    /// One-shot task fires when `run_at` is in the past.
937    #[tokio::test]
938    async fn oneshot_fires_at_run_at() {
939        let pool = test_pool().await;
940        let store = JobStore::new(pool.clone());
941        let (_tx, rx) = watch::channel(false);
942        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
943
944        let past = Utc::now() - Duration::hours(1);
945        let task = ScheduledTask::oneshot(
946            "os_fire",
947            past,
948            TaskKind::HealthCheck,
949            serde_json::Value::Null,
950        );
951        scheduler.add_task(task);
952
953        let count = Arc::new(AtomicU32::new(0));
954        scheduler.register_handler(
955            &TaskKind::HealthCheck,
956            Box::new(CountingHandler {
957                count: count.clone(),
958            }),
959        );
960        scheduler.init().await.unwrap();
961        scheduler.tick().await;
962
963        assert_eq!(
964            count.load(Ordering::Relaxed),
965            1,
966            "oneshot must fire when run_at is past"
967        );
968    }
969
970    /// One-shot task must NOT fire when `run_at` is in the future.
971    #[tokio::test]
972    async fn oneshot_does_not_fire_before_run_at() {
973        let pool = test_pool().await;
974        let store = JobStore::new(pool.clone());
975        let (_tx, rx) = watch::channel(false);
976        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
977
978        let future = Utc::now() + Duration::hours(1);
979        let task = ScheduledTask::oneshot(
980            "os_future",
981            future,
982            TaskKind::HealthCheck,
983            serde_json::Value::Null,
984        );
985        scheduler.add_task(task);
986
987        let count = Arc::new(AtomicU32::new(0));
988        scheduler.register_handler(
989            &TaskKind::HealthCheck,
990            Box::new(CountingHandler {
991                count: count.clone(),
992            }),
993        );
994        scheduler.init().await.unwrap();
995        scheduler.tick().await;
996
997        assert_eq!(
998            count.load(Ordering::Relaxed),
999            0,
1000            "oneshot must not fire before run_at"
1001        );
1002    }
1003
1004    /// After a one-shot fires, it is removed from self.tasks.
1005    #[tokio::test]
1006    async fn oneshot_removed_after_execution() {
1007        let pool = test_pool().await;
1008        let store = JobStore::new(pool.clone());
1009        let (_tx, rx) = watch::channel(false);
1010        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
1011
1012        let past = Utc::now() - Duration::seconds(1);
1013        let task = ScheduledTask::oneshot(
1014            "os_rm",
1015            past,
1016            TaskKind::HealthCheck,
1017            serde_json::Value::Null,
1018        );
1019        scheduler.add_task(task);
1020        scheduler.register_handler(
1021            &TaskKind::HealthCheck,
1022            Box::new(CountingHandler {
1023                count: Arc::new(AtomicU32::new(0)),
1024            }),
1025        );
1026        scheduler.init().await.unwrap();
1027        assert_eq!(scheduler.tasks.len(), 1);
1028        scheduler.tick().await;
1029        assert_eq!(
1030            scheduler.tasks.len(),
1031            0,
1032            "completed oneshot must be removed from tasks"
1033        );
1034    }
1035
1036    /// `init()` hydrates periodic jobs that were written to the store out-of-process
1037    /// (e.g. via the CLI) and are NOT present in `self.tasks` at construction time.
1038    ///
1039    /// Regression test for fix #3499: before the fix, CLI-added jobs were never fired
1040    /// because `init()` did not call `store.list_jobs_full()` to backfill `self.tasks`.
1041    #[tokio::test]
1042    async fn init_hydrates_cli_added_periodic_jobs_from_store() {
1043        let pool = test_pool().await;
1044        let store = JobStore::new(pool.clone());
1045
1046        // Simulate CLI insertion: write a periodic job directly to the store
1047        // *before* the Scheduler is constructed — mimicking a CLI `schedule add` command
1048        // that writes to the DB while the daemon is not running.
1049        store.init().await.unwrap();
1050        store
1051            .upsert_job_with_mode(
1052                "cli-job",
1053                "0 * * * * *",
1054                "health_check",
1055                "periodic",
1056                None,
1057                "",
1058            )
1059            .await
1060            .unwrap();
1061
1062        // Construct a fresh Scheduler with an empty task list (no add_task calls),
1063        // pointing at the same pool that already has the CLI-added job.
1064        let store2 = JobStore::new(pool.clone());
1065        let (_tx, rx) = watch::channel(false);
1066        let (mut scheduler, _msg_tx) = Scheduler::new(store2, rx);
1067
1068        // Before init() self.tasks is empty.
1069        assert_eq!(
1070            scheduler.tasks.len(),
1071            0,
1072            "tasks must be empty before init()"
1073        );
1074
1075        scheduler.init().await.unwrap();
1076
1077        // After init() the CLI-added periodic job must have been hydrated.
1078        assert_eq!(
1079            scheduler.tasks.len(),
1080            1,
1081            "init() must hydrate the CLI-added periodic job from the store"
1082        );
1083        assert_eq!(
1084            scheduler.tasks[0].name, "cli-job",
1085            "hydrated task name must match the DB row"
1086        );
1087
1088        // next_run must have been computed and persisted.
1089        let next_run = store.get_next_run("cli-job").await.unwrap();
1090        assert!(
1091            next_run.is_some(),
1092            "init() must compute and persist next_run for the hydrated job"
1093        );
1094        let dt = next_run
1095            .unwrap()
1096            .parse::<chrono::DateTime<chrono::Utc>>()
1097            .expect("next_run must be a valid RFC3339 timestamp");
1098        assert!(
1099            dt > chrono::Utc::now(),
1100            "next_run must be in the future after hydration"
1101        );
1102    }
1103
1104    /// `init()` does NOT re-add jobs that are already present in `self.tasks` — avoids
1105    /// duplicates when both `add_task()` and a DB record exist for the same name.
1106    #[tokio::test]
1107    async fn init_does_not_duplicate_static_tasks_already_in_tasks() {
1108        let pool = test_pool().await;
1109        let store = JobStore::new(pool.clone());
1110        let (_tx, rx) = watch::channel(false);
1111        let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
1112
1113        // Register via add_task (static path).
1114        let task = ScheduledTask::new(
1115            "static-job",
1116            "0 * * * * *",
1117            TaskKind::HealthCheck,
1118            serde_json::Value::Null,
1119        )
1120        .unwrap();
1121        scheduler.add_task(task);
1122
1123        // init() upserts the task into the store AND then calls list_jobs_full().
1124        // The job will be in both self.tasks AND the DB; hydration must skip it.
1125        scheduler.init().await.unwrap();
1126
1127        assert_eq!(
1128            scheduler.tasks.len(),
1129            1,
1130            "init() must not duplicate a static task that is already in self.tasks"
1131        );
1132    }
1133
1134    /// Task registered via channel fires on next tick.
1135    #[tokio::test]
1136    async fn channel_registration() {
1137        let pool = test_pool().await;
1138        let store = JobStore::new(pool.clone());
1139        let (_tx, rx) = watch::channel(false);
1140        let (mut scheduler, msg_tx) = Scheduler::new(store, rx);
1141
1142        let count = Arc::new(AtomicU32::new(0));
1143        scheduler.register_handler(
1144            &TaskKind::HealthCheck,
1145            Box::new(CountingHandler {
1146                count: count.clone(),
1147            }),
1148        );
1149        scheduler.init().await.unwrap();
1150
1151        // Register a task via channel with a past run_at.
1152        let past = Utc::now() - Duration::hours(1);
1153        let desc = TaskDescriptor {
1154            name: "chan_task".to_owned(),
1155            mode: TaskMode::OneShot { run_at: past },
1156            kind: TaskKind::HealthCheck,
1157            config: serde_json::Value::Null,
1158        };
1159        msg_tx
1160            .send(SchedulerMessage::Add(Box::new(desc)))
1161            .await
1162            .unwrap();
1163
1164        // drain_channel + tick.
1165        scheduler.drain_channel().await;
1166        scheduler.tick().await;
1167
1168        assert_eq!(
1169            count.load(Ordering::Relaxed),
1170            1,
1171            "channel-registered task must fire"
1172        );
1173    }
1174}