Skip to main content

uni_plugin/
scheduler.rs

1// Rust guideline compliant
2
3//! Background-job scheduler skeleton.
4//!
5//! The host owns a single scheduler that drives every registered
6//! [`crate::traits::background::BackgroundJobProvider`]. This module
7//! ships the scheduler's public API + persistent state record + a
8//! `SchedulerPersistence` trait. The host-side Tokio driver
9//! (`crates/uni/src/scheduler.rs`) wraps a loop that calls
10//! `tick_at(SystemTime::now())`, dispatches the returned jobs through
11//! the plugin registry, and forwards lifecycle transitions to the
12//! configured persistence backend.
13
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::time::SystemTime;
17
18use parking_lot::Mutex;
19use thiserror::Error;
20
21use crate::qname::QName;
22use crate::traits::background::{CancellationToken, Schedule};
23
24/// Lifecycle state of one scheduled job.
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26#[non_exhaustive]
27pub enum SchedulerJobStatus {
28    /// Registered but not yet started.
29    Pending,
30    /// Currently running.
31    Running,
32    /// Last run finished successfully.
33    Idle,
34    /// Last run failed; retry-policy applies.
35    FailedRetrying,
36    /// Cancelled by `cancel()`.
37    Cancelled,
38}
39
40/// Persistable record of a scheduled job's state.
41///
42/// Round-trips through `uni_system.background_jobs` in M11 cutover.
43#[derive(Clone, Debug)]
44pub struct SchedulerJobRecord {
45    /// Job id.
46    pub id: QName,
47    /// Lifecycle status.
48    pub status: SchedulerJobStatus,
49    /// When the next fire of this job is due. `None` for `Manual`
50    /// schedules until [`Scheduler::add_job`] marks the job `Pending`,
51    /// at which point it is eligible immediately.
52    pub next_fire_at: Option<SystemTime>,
53    /// When the most-recent run started.
54    pub last_started_at: Option<SystemTime>,
55    /// When the most-recent run finished.
56    pub last_finished_at: Option<SystemTime>,
57    /// Number of consecutive failures since the last success.
58    pub consecutive_failures: u32,
59    /// Schedule describing when fires are eligible.
60    pub schedule: Schedule,
61    /// Cancellation token; flipped on `cancel()` or shutdown.
62    pub cancel: CancellationToken,
63}
64
65impl SchedulerJobRecord {
66    /// Construct a pending record with the legacy `Manual` schedule.
67    ///
68    /// Equivalent to `pending_with_schedule(id, Schedule::Manual,
69    /// SystemTime::now())`.
70    #[must_use]
71    pub fn pending(id: QName) -> Self {
72        Self::pending_with_schedule(id, Schedule::Manual, SystemTime::now())
73    }
74
75    /// Construct a pending record with an explicit schedule.
76    ///
77    /// `now` is used both as the initial registration instant and as
78    /// the reference point for the first `next_fire_at` computation.
79    #[must_use]
80    pub fn pending_with_schedule(id: QName, schedule: Schedule, now: SystemTime) -> Self {
81        let next_fire_at = schedule.next_after(now);
82        Self {
83            id,
84            status: SchedulerJobStatus::Pending,
85            next_fire_at,
86            last_started_at: None,
87            last_finished_at: None,
88            consecutive_failures: 0,
89            schedule,
90            cancel: CancellationToken::new(),
91        }
92    }
93}
94
95/// Host-side scheduler skeleton.
96///
97/// One per Uni instance. M11 cutover wires `tokio::spawn` driving and
98/// persistence into `uni_system.background_jobs`. Currently the
99/// scheduler is paused — registered jobs are stored but not executed.
100#[derive(Debug)]
101pub struct Scheduler {
102    records: Mutex<Vec<SchedulerJobRecord>>,
103    paused: AtomicBool,
104}
105
106impl Default for Scheduler {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl Scheduler {
113    /// Construct a paused scheduler.
114    #[must_use]
115    pub fn new() -> Self {
116        Self {
117            records: Mutex::new(Vec::new()),
118            paused: AtomicBool::new(true),
119        }
120    }
121
122    /// Register a new job with the legacy `Manual` schedule.
123    ///
124    /// Equivalent to `add_scheduled_job(id, Schedule::Manual)`. The
125    /// job becomes eligible immediately and fires on the next tick
126    /// (no-op while paused).
127    pub fn add_job(&self, id: QName) {
128        self.add_scheduled_job(id, Schedule::Manual);
129    }
130
131    /// Register a new job with an explicit schedule.
132    ///
133    /// The job's `next_fire_at` is computed from the schedule plus
134    /// the current `SystemTime`. The scheduler picks it up on the
135    /// first [`Self::tick`] / [`Self::tick_at`] whose `now` is at or
136    /// past `next_fire_at` (no-op while paused).
137    pub fn add_scheduled_job(&self, id: QName, schedule: Schedule) {
138        let now = SystemTime::now();
139        self.records
140            .lock()
141            .push(SchedulerJobRecord::pending_with_schedule(id, schedule, now));
142    }
143
144    /// Cancel a scheduled job by id.
145    ///
146    /// Returns `true` if the job was found and cancelled.
147    pub fn cancel(&self, id: &QName) -> bool {
148        let mut records = self.records.lock();
149        let Some(r) = records.iter_mut().find(|r| &r.id == id) else {
150            return false;
151        };
152        r.status = SchedulerJobStatus::Cancelled;
153        r.cancel.cancel();
154        true
155    }
156
157    /// List all known jobs and their statuses (snapshot).
158    #[must_use]
159    pub fn list(&self) -> Vec<SchedulerJobRecord> {
160        self.records.lock().clone()
161    }
162
163    /// Look up the cancellation token associated with a registered job.
164    ///
165    /// Returns `None` if no job matches `id`. The returned clone shares
166    /// state with the record's token, so callers can both await
167    /// `cancelled().await` and observe the same cancel signal trip via
168    /// [`Self::cancel`].
169    ///
170    /// Used by the host driver to wrap each dispatched
171    /// `spawn_blocking` in a `tokio::select!` against `cancelled().await`,
172    /// so shutdown / explicit cancel propagates without waiting for the
173    /// job body to poll [`CancellationToken::is_cancelled`].
174    #[must_use]
175    pub fn cancel_token_for(&self, id: &QName) -> Option<CancellationToken> {
176        self.records
177            .lock()
178            .iter()
179            .find(|r| &r.id == id)
180            .map(|r| r.cancel.clone())
181    }
182
183    /// Resume the scheduler (M11 cutover wires actual driving here).
184    pub fn resume(&self) {
185        self.paused.store(false, Ordering::SeqCst);
186    }
187
188    /// Drive the scheduler with the current wall-clock time.
189    ///
190    /// Equivalent to `tick_at(SystemTime::now())`. See [`Self::tick_at`]
191    /// for the full semantics.
192    pub fn tick(&self) -> Vec<QName> {
193        self.tick_at(SystemTime::now())
194    }
195
196    /// Pop every pending job whose schedule has fired at or before
197    /// `now`, transition each to `Running`, and return their ids for
198    /// the caller to dispatch.
199    ///
200    /// **M11 substantive driver primitive.** This is the synchronous,
201    /// runtime-free heart of the scheduler — the eventual Tokio
202    /// driver wraps a poll loop that calls `tick_at(SystemTime::now())`,
203    /// dispatches the returned jobs (e.g., via `tokio::spawn` invoking
204    /// each job's `BackgroundJobProvider::execute`), and calls
205    /// [`Scheduler::mark_finished`] when each completes.
206    ///
207    /// Schedule semantics (delegated to
208    /// [`crate::traits::background::Schedule::next_after`]):
209    ///
210    /// - A job is "due" iff `status == Pending`,
211    ///   `next_fire_at.is_none()` or `next_fire_at <= now`, and the
212    ///   cancel token is not already triggered.
213    /// - `Manual` jobs have `next_fire_at = now` at registration and
214    ///   so are immediately due (matching legacy `tick()` behavior).
215    /// - `Once(at)` jobs become due only when `now >= at`.
216    /// - `Periodic(every)` jobs become due `every` after each fire.
217    /// - `Cron(expr)` jobs become due at the next cron instant
218    ///   computed via the [`cron`] crate.
219    ///
220    /// Honors pause: returns empty when [`Self::is_paused`].
221    /// Honors cancellation: skips jobs whose `cancel` token is
222    /// already triggered (filtering them out of the return).
223    pub fn tick_at(&self, now: SystemTime) -> Vec<QName> {
224        if self.is_paused() {
225            return Vec::new();
226        }
227        let mut records = self.records.lock();
228        let mut due: Vec<QName> = Vec::new();
229        for r in records.iter_mut() {
230            if !matches!(r.status, SchedulerJobStatus::Pending) {
231                continue;
232            }
233            if r.cancel.is_cancelled() {
234                r.status = SchedulerJobStatus::Cancelled;
235                continue;
236            }
237            // Time-gate: skip jobs whose schedule hasn't fired yet.
238            if let Some(fire_at) = r.next_fire_at
239                && fire_at > now
240            {
241                continue;
242            }
243            r.status = SchedulerJobStatus::Running;
244            r.last_started_at = Some(now);
245            due.push(r.id.clone());
246        }
247        due
248    }
249
250    /// Number of jobs currently in `Running` state. Useful for
251    /// observability (e.g., a metrics gauge).
252    #[must_use]
253    pub fn running_count(&self) -> usize {
254        self.records
255            .lock()
256            .iter()
257            .filter(|r| matches!(r.status, SchedulerJobStatus::Running))
258            .count()
259    }
260
261    /// Number of pending jobs ready for the next `tick`.
262    #[must_use]
263    pub fn pending_count(&self) -> usize {
264        self.records
265            .lock()
266            .iter()
267            .filter(|r| matches!(r.status, SchedulerJobStatus::Pending))
268            .count()
269    }
270
271    /// Reset every `Running` job back to `Pending` — used by the
272    /// driver to recover from a crash where jobs were started but not
273    /// finished. The host restores the scheduler state from
274    /// `uni_system.background_jobs` and calls this to make all
275    /// previously-`Running` jobs eligible for re-dispatch.
276    pub fn requeue_orphaned_runs(&self) -> usize {
277        let mut records = self.records.lock();
278        let mut count = 0;
279        for r in records.iter_mut() {
280            if matches!(r.status, SchedulerJobStatus::Running) {
281                r.status = SchedulerJobStatus::Pending;
282                count += 1;
283            }
284        }
285        count
286    }
287
288    /// Pause the scheduler.
289    pub fn pause(&self) {
290        self.paused.store(true, Ordering::SeqCst);
291    }
292
293    /// Returns `true` if currently paused.
294    #[must_use]
295    pub fn is_paused(&self) -> bool {
296        self.paused.load(Ordering::SeqCst)
297    }
298
299    /// Mark a job as starting a new run.
300    ///
301    /// Used by tests + the M11 cutover driver. Updates the record's
302    /// `status` to `Running` and stamps `last_started_at`.
303    pub fn mark_started(&self, id: &QName) {
304        let mut records = self.records.lock();
305        if let Some(r) = records.iter_mut().find(|r| &r.id == id) {
306            r.status = SchedulerJobStatus::Running;
307            r.last_started_at = Some(SystemTime::now());
308        }
309    }
310
311    /// Mark a job's run as finished (success or failure).
312    ///
313    /// Recomputes `next_fire_at` from the job's [`Schedule`] using
314    /// `SystemTime::now()` as the reference point. If the schedule has
315    /// another fire upcoming (Periodic, Cron, or a Once whose instant
316    /// is still in the future — which shouldn't normally happen after
317    /// it has just fired), the job transitions back to `Pending` so
318    /// the next [`Self::tick_at`] can pick it up. Otherwise the job
319    /// stays in its terminal state (`Idle` on success,
320    /// `FailedRetrying` on failure).
321    pub fn mark_finished(&self, id: &QName, success: bool) {
322        let now = SystemTime::now();
323        let mut records = self.records.lock();
324        let Some(r) = records.iter_mut().find(|r| &r.id == id) else {
325            return;
326        };
327        r.last_finished_at = Some(now);
328
329        // Only Periodic / Cron reschedule; Once / Manual terminate after a run.
330        let next = r.schedule.next_after(now);
331        let has_next =
332            matches!(r.schedule, Schedule::Periodic(_) | Schedule::Cron(_)) && next.is_some();
333
334        // Periodic / Cron jobs keep firing on schedule even after a
335        // failed run; the `consecutive_failures` counter and the
336        // [`crate::circuit_breaker::CircuitBreaker`] decide when to
337        // stop dispatching a flapping job. A `Once` job that failed
338        // stays in `FailedRetrying` since `has_next` is false.
339        if has_next {
340            r.status = SchedulerJobStatus::Pending;
341            r.next_fire_at = next;
342        } else {
343            r.status = if success {
344                SchedulerJobStatus::Idle
345            } else {
346                SchedulerJobStatus::FailedRetrying
347            };
348            if success {
349                r.next_fire_at = None;
350            }
351        }
352
353        if success {
354            r.consecutive_failures = 0;
355        } else {
356            r.consecutive_failures = r.consecutive_failures.saturating_add(1);
357        }
358    }
359}
360
361impl PartialEq for SchedulerJobRecord {
362    /// Two records are equal iff all their persisted state matches.
363    ///
364    /// Previously this compared only `id` and `status`, so two records
365    /// that differed in `schedule`, `next_fire_at`, or
366    /// `consecutive_failures` (i.e. genuinely-different job states)
367    /// would still compare equal. The `cancel` field is intentionally
368    /// excluded because it is per-process identity (an `Arc<AtomicBool>`)
369    /// and is not part of the persisted record.
370    fn eq(&self, other: &Self) -> bool {
371        self.id == other.id
372            && self.status == other.status
373            && self.next_fire_at == other.next_fire_at
374            && self.last_started_at == other.last_started_at
375            && self.last_finished_at == other.last_finished_at
376            && self.consecutive_failures == other.consecutive_failures
377            && self.schedule == other.schedule
378    }
379}
380
381/// Errors raised by [`SchedulerPersistence`] backends.
382#[derive(Debug, Error)]
383#[non_exhaustive]
384pub enum SchedulerPersistenceError {
385    /// Backend-specific failure (I/O, Cypher execution, serialization).
386    #[error("scheduler persistence: {0}")]
387    Backend(String),
388}
389
390/// Persistence backend for [`Scheduler`] job state.
391///
392/// Mirrors the meta-plugin's `Persistence` trait in shape but scoped
393/// to scheduler records. The Tokio driver (`crates/uni/src/scheduler.rs`)
394/// invokes `record_started` / `record_finished` / `cancel` on each
395/// lifecycle transition; on startup the driver calls `load_all` and
396/// re-registers persisted jobs (followed by
397/// [`Scheduler::requeue_orphaned_runs`] for any that were `Running`
398/// at the previous shutdown / crash).
399///
400/// Two impls ship in-tree:
401///
402/// - [`MemoryPersistence`] — no-op tests + as the default before the
403///   host wires a system-label backend.
404/// - `SystemLabelPersistence` (in `uni-query`, lands with the M9
405///   cutover): round-trips through `uni_system.background_jobs` via
406///   the write-enabled
407///   `QueryProcedureHost::execute_inner_query`.
408pub trait SchedulerPersistence: Send + Sync + std::fmt::Debug {
409    /// Persist a job's schedule at registration time.
410    ///
411    /// Called by the host wrapper (e.g. `SchedulerHost`) whenever a
412    /// caller invokes `add_scheduled_job`, so the schedule kind
413    /// (`Periodic` / `Cron` / `Once` / `Manual`) survives restart and
414    /// can be round-tripped through [`Self::load_all`]. The default
415    /// no-op suits in-memory backends and pre-existing impls that do
416    /// not need durability.
417    ///
418    /// # Errors
419    ///
420    /// Returns [`SchedulerPersistenceError`] on backend failure.
421    fn record_scheduled(
422        &self,
423        _id: &QName,
424        _schedule: &Schedule,
425    ) -> Result<(), SchedulerPersistenceError> {
426        Ok(())
427    }
428
429    /// Persist a job's transition into a new run.
430    ///
431    /// # Errors
432    ///
433    /// Returns [`SchedulerPersistenceError`] on backend failure.
434    fn record_started(
435        &self,
436        id: &QName,
437        started_at: SystemTime,
438    ) -> Result<(), SchedulerPersistenceError>;
439
440    /// Persist the outcome of a finished run.
441    ///
442    /// # Errors
443    ///
444    /// Returns [`SchedulerPersistenceError`] on backend failure.
445    fn record_finished(
446        &self,
447        id: &QName,
448        finished_at: SystemTime,
449        success: bool,
450    ) -> Result<(), SchedulerPersistenceError>;
451
452    /// Persist a cancellation.
453    ///
454    /// # Errors
455    ///
456    /// Returns [`SchedulerPersistenceError`] on backend failure.
457    fn cancel(&self, id: &QName) -> Result<(), SchedulerPersistenceError>;
458
459    /// Reload all known job records (used on host startup to restore
460    /// scheduler state across restart). Order is unspecified — the
461    /// driver re-registers them in any order.
462    ///
463    /// # Errors
464    ///
465    /// Returns [`SchedulerPersistenceError`] on backend failure.
466    fn load_all(&self) -> Result<Vec<SchedulerJobRecord>, SchedulerPersistenceError>;
467
468    /// Force any in-memory buffers to durable storage.
469    ///
470    /// Invoked by `uni.periodic.commit` so operators can drive a
471    /// synchronous checkpoint flush. Backends that write through on
472    /// every event (the default for the system-label backend) leave
473    /// this as the default no-op; buffered backends override.
474    ///
475    /// # Errors
476    ///
477    /// Returns [`SchedulerPersistenceError`] on backend failure.
478    fn flush_checkpoint(&self) -> Result<(), SchedulerPersistenceError> {
479        Ok(())
480    }
481}
482
483/// In-memory [`SchedulerPersistence`] backend. Always returns an empty
484/// `load_all`; every other call is a no-op.
485///
486/// Used by tests and as the default backend when the host has not yet
487/// wired a durable backend (e.g., during early `Uni::build` before the
488/// storage manager is available).
489#[derive(Debug, Default)]
490pub struct MemoryPersistence;
491
492impl SchedulerPersistence for MemoryPersistence {
493    fn record_started(
494        &self,
495        _id: &QName,
496        _started_at: SystemTime,
497    ) -> Result<(), SchedulerPersistenceError> {
498        Ok(())
499    }
500
501    fn record_finished(
502        &self,
503        _id: &QName,
504        _finished_at: SystemTime,
505        _success: bool,
506    ) -> Result<(), SchedulerPersistenceError> {
507        Ok(())
508    }
509
510    fn cancel(&self, _id: &QName) -> Result<(), SchedulerPersistenceError> {
511        Ok(())
512    }
513
514    fn load_all(&self) -> Result<Vec<SchedulerJobRecord>, SchedulerPersistenceError> {
515        Ok(Vec::new())
516    }
517}
518
519/// Trait-object handle to a scheduler, for cross-crate callers that
520/// can't depend on the concrete host-side `SchedulerHost` type.
521///
522/// The built-in `uni.periodic.*` procedures hold an `Arc<dyn
523/// SchedulerControl>` so they can register / cancel / list jobs
524/// without depending on `uni-db`. The host crate (`uni-db`) implements
525/// this on its `SchedulerHost` and passes it down at registration
526/// time.
527pub trait SchedulerControl: Send + Sync + std::fmt::Debug {
528    /// Register a job to fire on `schedule`.
529    fn add_scheduled_job(&self, id: QName, schedule: Schedule);
530
531    /// Cancel a job by id. Returns `true` if it existed.
532    fn cancel(&self, id: &QName) -> bool;
533
534    /// Snapshot of every known job.
535    fn list(&self) -> Vec<SchedulerJobRecord>;
536
537    /// Submit an inline write-mode Cypher body for synchronous
538    /// execution. The default impl returns an error so simple
539    /// scheduler primitives (without a host) can still satisfy the
540    /// trait shape; the `uni-db::scheduler::SchedulerHost` override
541    /// dispatches through its [`crate::traits::background::JobHost`].
542    ///
543    /// Used by `uni.periodic.submit(...)` and as the inner-loop body
544    /// of `uni.periodic.iterate(...)`.
545    ///
546    /// # Errors
547    ///
548    /// Returns [`crate::FnError`] when the scheduler is not wired to a
549    /// Cypher-execution host (default impl) or when the submitted
550    /// statement fails.
551    fn submit_cypher(&self, _cypher: &str) -> Result<(), crate::FnError> {
552        Err(crate::FnError::new(
553            0xD20,
554            "scheduler: submit_cypher not supported by this control (no host wired)",
555        ))
556    }
557
558    /// Drive the persistence backend to flush its checkpoint buffer.
559    ///
560    /// Default impl is a no-op so the bare [`Scheduler`] (with
561    /// [`MemoryPersistence`]) and any control that has no durable
562    /// backend keep working without an override. The host-side
563    /// `SchedulerHost` override forwards to its
564    /// [`SchedulerPersistence::flush_checkpoint`].
565    ///
566    /// # Errors
567    ///
568    /// Returns [`crate::FnError`] when the persistence backend reports
569    /// a flush failure.
570    fn flush_checkpoint(&self) -> Result<(), crate::FnError> {
571        Ok(())
572    }
573}
574
575impl SchedulerControl for Scheduler {
576    fn add_scheduled_job(&self, id: QName, schedule: Schedule) {
577        Self::add_scheduled_job(self, id, schedule);
578    }
579
580    fn cancel(&self, id: &QName) -> bool {
581        Self::cancel(self, id)
582    }
583
584    fn list(&self) -> Vec<SchedulerJobRecord> {
585        Self::list(self)
586    }
587}
588
589/// Cooperative-cancel handle handed to job implementations.
590#[derive(Clone, Debug)]
591pub struct SchedulerHandle {
592    inner: Arc<Scheduler>,
593}
594
595impl SchedulerHandle {
596    /// Wrap a scheduler in a clonable handle.
597    #[must_use]
598    pub fn new(scheduler: Arc<Scheduler>) -> Self {
599        Self { inner: scheduler }
600    }
601
602    /// Borrow the underlying scheduler.
603    #[must_use]
604    pub fn scheduler(&self) -> &Scheduler {
605        &self.inner
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612
613    #[test]
614    fn scheduler_default_is_paused() {
615        let s = Scheduler::new();
616        assert!(s.is_paused());
617        assert!(s.list().is_empty());
618    }
619
620    #[test]
621    fn scheduler_resume_pause_round_trip() {
622        let s = Scheduler::new();
623        s.resume();
624        assert!(!s.is_paused());
625        s.pause();
626        assert!(s.is_paused());
627    }
628
629    #[test]
630    fn add_job_and_cancel() {
631        let s = Scheduler::new();
632        s.add_job(QName::builtin("ttl_sweep"));
633        assert_eq!(s.list().len(), 1);
634        assert!(s.cancel(&QName::builtin("ttl_sweep")));
635        let recs = s.list();
636        assert_eq!(recs[0].status, SchedulerJobStatus::Cancelled);
637        assert!(recs[0].cancel.is_cancelled());
638    }
639
640    #[test]
641    fn cancel_unknown_job_returns_false() {
642        let s = Scheduler::new();
643        assert!(!s.cancel(&QName::builtin("nope")));
644    }
645
646    #[test]
647    fn run_lifecycle_increments_failures_then_resets() {
648        let s = Scheduler::new();
649        let id = QName::builtin("flaky");
650        s.add_job(id.clone());
651
652        s.mark_started(&id);
653        s.mark_finished(&id, false);
654        s.mark_started(&id);
655        s.mark_finished(&id, false);
656
657        let recs = s.list();
658        assert_eq!(recs[0].consecutive_failures, 2);
659        assert_eq!(recs[0].status, SchedulerJobStatus::FailedRetrying);
660
661        s.mark_started(&id);
662        s.mark_finished(&id, true);
663
664        let recs = s.list();
665        assert_eq!(recs[0].consecutive_failures, 0);
666        assert_eq!(recs[0].status, SchedulerJobStatus::Idle);
667    }
668
669    // ── tick / driver primitive tests ──────────────────────────────
670
671    #[test]
672    fn tick_returns_empty_when_paused() {
673        let s = Scheduler::new();
674        s.add_job(QName::builtin("job1"));
675        // Scheduler defaults to paused.
676        assert!(s.tick().is_empty());
677    }
678
679    #[test]
680    fn tick_dispatches_pending_jobs_when_resumed() {
681        let s = Scheduler::new();
682        s.add_job(QName::builtin("job1"));
683        s.add_job(QName::builtin("job2"));
684        s.resume();
685        let due = s.tick();
686        assert_eq!(due.len(), 2);
687        assert!(due.iter().any(|q| q.local() == "job1"));
688        assert!(due.iter().any(|q| q.local() == "job2"));
689        // Each ticked job is now Running.
690        assert_eq!(s.running_count(), 2);
691        assert_eq!(s.pending_count(), 0);
692    }
693
694    #[test]
695    fn tick_skips_cancelled_jobs() {
696        let s = Scheduler::new();
697        s.add_job(QName::builtin("doomed"));
698        s.cancel(&QName::builtin("doomed"));
699        s.resume();
700        let due = s.tick();
701        assert!(due.is_empty(), "cancelled job should not be dispatched");
702    }
703
704    #[test]
705    fn second_tick_returns_empty_until_jobs_marked_pending() {
706        let s = Scheduler::new();
707        s.add_job(QName::builtin("once"));
708        s.resume();
709        assert_eq!(s.tick().len(), 1);
710        // Without mark_finished, the job stays Running; second tick
711        // doesn't redispatch.
712        assert!(s.tick().is_empty());
713        s.mark_finished(&QName::builtin("once"), true);
714        // Now Idle, not Pending — still won't redispatch (idempotent).
715        assert!(s.tick().is_empty());
716    }
717
718    #[test]
719    fn requeue_orphaned_runs_moves_running_back_to_pending() {
720        let s = Scheduler::new();
721        s.add_job(QName::builtin("orphan"));
722        s.resume();
723        s.tick();
724        assert_eq!(s.running_count(), 1);
725        let count = s.requeue_orphaned_runs();
726        assert_eq!(count, 1);
727        assert_eq!(s.running_count(), 0);
728        assert_eq!(s.pending_count(), 1);
729        // After requeue, next tick dispatches again.
730        assert_eq!(s.tick().len(), 1);
731    }
732
733    // ── Schedule semantics tests ────────────────────────────────
734
735    #[test]
736    fn schedule_once_fires_only_after_instant() {
737        use std::time::Duration;
738        let s = Scheduler::new();
739        s.resume();
740        let future = SystemTime::now() + Duration::from_secs(60);
741        s.add_scheduled_job(QName::builtin("once"), Schedule::Once(future));
742        let due_now = s.tick_at(SystemTime::now());
743        assert!(
744            due_now.is_empty(),
745            "Once job should not fire before its instant"
746        );
747        let due_after = s.tick_at(future + Duration::from_secs(1));
748        assert_eq!(due_after.len(), 1);
749        assert_eq!(due_after[0].local(), "once");
750    }
751
752    #[test]
753    fn schedule_once_does_not_reschedule_after_finish() {
754        use std::time::Duration;
755        let s = Scheduler::new();
756        s.resume();
757        let past = SystemTime::now() - Duration::from_secs(1);
758        s.add_scheduled_job(QName::builtin("once"), Schedule::Once(past));
759        let due = s.tick_at(SystemTime::now());
760        assert_eq!(due.len(), 1);
761        s.mark_finished(&QName::builtin("once"), true);
762        let recs = s.list();
763        assert_eq!(recs[0].status, SchedulerJobStatus::Idle);
764        assert!(recs[0].next_fire_at.is_none());
765        assert!(
766            s.tick_at(SystemTime::now() + Duration::from_secs(3600))
767                .is_empty()
768        );
769    }
770
771    #[test]
772    fn schedule_periodic_reschedules_after_finish() {
773        use std::time::Duration;
774        let s = Scheduler::new();
775        s.resume();
776        let start = SystemTime::now();
777        s.add_scheduled_job(
778            QName::builtin("ticker"),
779            Schedule::Periodic(Duration::from_secs(10)),
780        );
781        assert!(s.tick_at(start + Duration::from_secs(5)).is_empty());
782        let due = s.tick_at(start + Duration::from_secs(11));
783        assert_eq!(due.len(), 1);
784        s.mark_finished(&QName::builtin("ticker"), true);
785        let recs = s.list();
786        assert_eq!(recs[0].status, SchedulerJobStatus::Pending);
787        assert!(recs[0].next_fire_at.is_some());
788    }
789
790    #[test]
791    fn schedule_cron_emits_future_fire() {
792        use std::time::Duration;
793        let s = Scheduler::new();
794        s.resume();
795        s.add_scheduled_job(
796            QName::builtin("every_min"),
797            Schedule::Cron(smol_str::SmolStr::new("0 * * * * *")),
798        );
799        let recs = s.list();
800        let next = recs[0].next_fire_at.expect("cron must produce a next fire");
801        assert!(next > SystemTime::now() - Duration::from_secs(1));
802    }
803
804    #[test]
805    fn manual_schedule_is_immediately_due() {
806        let s = Scheduler::new();
807        s.resume();
808        s.add_scheduled_job(QName::builtin("legacy"), Schedule::Manual);
809        let due = s.tick();
810        assert_eq!(due.len(), 1);
811        assert_eq!(due[0].local(), "legacy");
812    }
813
814    #[test]
815    fn pending_count_and_running_count_track_lifecycle() {
816        let s = Scheduler::new();
817        for n in 0..5 {
818            s.add_job(QName::builtin(format!("job{n}")));
819        }
820        s.resume();
821        assert_eq!(s.pending_count(), 5);
822        assert_eq!(s.running_count(), 0);
823        let due = s.tick();
824        assert_eq!(due.len(), 5);
825        assert_eq!(s.pending_count(), 0);
826        assert_eq!(s.running_count(), 5);
827        s.mark_finished(&QName::builtin("job0"), true);
828        s.mark_finished(&QName::builtin("job1"), false);
829        assert_eq!(s.running_count(), 3, "two have finished");
830    }
831}