uni_plugin/scheduler.rs
1// Rust guideline compliant
2
3//! Background-job scheduler skeleton.
4//!
5//! The host owns a single scheduler that drives every registered
6//! [`crate::traits::background::BackgroundJobProvider`]. This module
7//! ships the scheduler's public API + persistent state record + a
8//! `SchedulerPersistence` trait. The host-side Tokio driver
9//! (`crates/uni/src/scheduler.rs`) wraps a loop that calls
10//! `tick_at(SystemTime::now())`, dispatches the returned jobs through
11//! the plugin registry, and forwards lifecycle transitions to the
12//! configured persistence backend.
13
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::time::SystemTime;
17
18use parking_lot::Mutex;
19use thiserror::Error;
20
21use crate::qname::QName;
22use crate::traits::background::{CancellationToken, Schedule};
23
24/// Lifecycle state of one scheduled job.
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26#[non_exhaustive]
27pub enum SchedulerJobStatus {
28 /// Registered but not yet started.
29 Pending,
30 /// Currently running.
31 Running,
32 /// Last run finished successfully.
33 Idle,
34 /// Last run failed; retry-policy applies.
35 FailedRetrying,
36 /// Cancelled by `cancel()`.
37 Cancelled,
38}
39
40/// Persistable record of a scheduled job's state.
41///
42/// Round-trips through `uni_system.background_jobs` in M11 cutover.
43#[derive(Clone, Debug)]
44pub struct SchedulerJobRecord {
45 /// Job id.
46 pub id: QName,
47 /// Lifecycle status.
48 pub status: SchedulerJobStatus,
49 /// When the next fire of this job is due. `None` for `Manual`
50 /// schedules until [`Scheduler::add_job`] marks the job `Pending`,
51 /// at which point it is eligible immediately.
52 pub next_fire_at: Option<SystemTime>,
53 /// When the most-recent run started.
54 pub last_started_at: Option<SystemTime>,
55 /// When the most-recent run finished.
56 pub last_finished_at: Option<SystemTime>,
57 /// Number of consecutive failures since the last success.
58 pub consecutive_failures: u32,
59 /// Schedule describing when fires are eligible.
60 pub schedule: Schedule,
61 /// Cancellation token; flipped on `cancel()` or shutdown.
62 pub cancel: CancellationToken,
63}
64
65impl SchedulerJobRecord {
66 /// Construct a pending record with the legacy `Manual` schedule.
67 ///
68 /// Equivalent to `pending_with_schedule(id, Schedule::Manual,
69 /// SystemTime::now())`.
70 #[must_use]
71 pub fn pending(id: QName) -> Self {
72 Self::pending_with_schedule(id, Schedule::Manual, SystemTime::now())
73 }
74
75 /// Construct a pending record with an explicit schedule.
76 ///
77 /// `now` is used both as the initial registration instant and as
78 /// the reference point for the first `next_fire_at` computation.
79 #[must_use]
80 pub fn pending_with_schedule(id: QName, schedule: Schedule, now: SystemTime) -> Self {
81 let next_fire_at = schedule.next_after(now);
82 Self {
83 id,
84 status: SchedulerJobStatus::Pending,
85 next_fire_at,
86 last_started_at: None,
87 last_finished_at: None,
88 consecutive_failures: 0,
89 schedule,
90 cancel: CancellationToken::new(),
91 }
92 }
93}
94
95/// Host-side scheduler skeleton.
96///
97/// One per Uni instance. M11 cutover wires `tokio::spawn` driving and
98/// persistence into `uni_system.background_jobs`. Currently the
99/// scheduler is paused — registered jobs are stored but not executed.
100#[derive(Debug)]
101pub struct Scheduler {
102 records: Mutex<Vec<SchedulerJobRecord>>,
103 paused: AtomicBool,
104}
105
106impl Default for Scheduler {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl Scheduler {
113 /// Construct a paused scheduler.
114 #[must_use]
115 pub fn new() -> Self {
116 Self {
117 records: Mutex::new(Vec::new()),
118 paused: AtomicBool::new(true),
119 }
120 }
121
122 /// Register a new job with the legacy `Manual` schedule.
123 ///
124 /// Equivalent to `add_scheduled_job(id, Schedule::Manual)`. The
125 /// job becomes eligible immediately and fires on the next tick
126 /// (no-op while paused).
127 pub fn add_job(&self, id: QName) {
128 self.add_scheduled_job(id, Schedule::Manual);
129 }
130
131 /// Register a new job with an explicit schedule.
132 ///
133 /// The job's `next_fire_at` is computed from the schedule plus
134 /// the current `SystemTime`. The scheduler picks it up on the
135 /// first [`Self::tick`] / [`Self::tick_at`] whose `now` is at or
136 /// past `next_fire_at` (no-op while paused).
137 pub fn add_scheduled_job(&self, id: QName, schedule: Schedule) {
138 let now = SystemTime::now();
139 self.records
140 .lock()
141 .push(SchedulerJobRecord::pending_with_schedule(id, schedule, now));
142 }
143
144 /// Cancel a scheduled job by id.
145 ///
146 /// Returns `true` if the job was found and cancelled.
147 pub fn cancel(&self, id: &QName) -> bool {
148 let mut records = self.records.lock();
149 let Some(r) = records.iter_mut().find(|r| &r.id == id) else {
150 return false;
151 };
152 r.status = SchedulerJobStatus::Cancelled;
153 r.cancel.cancel();
154 true
155 }
156
157 /// List all known jobs and their statuses (snapshot).
158 #[must_use]
159 pub fn list(&self) -> Vec<SchedulerJobRecord> {
160 self.records.lock().clone()
161 }
162
163 /// Look up the cancellation token associated with a registered job.
164 ///
165 /// Returns `None` if no job matches `id`. The returned clone shares
166 /// state with the record's token, so callers can both await
167 /// `cancelled().await` and observe the same cancel signal trip via
168 /// [`Self::cancel`].
169 ///
170 /// Used by the host driver to wrap each dispatched
171 /// `spawn_blocking` in a `tokio::select!` against `cancelled().await`,
172 /// so shutdown / explicit cancel propagates without waiting for the
173 /// job body to poll [`CancellationToken::is_cancelled`].
174 #[must_use]
175 pub fn cancel_token_for(&self, id: &QName) -> Option<CancellationToken> {
176 self.records
177 .lock()
178 .iter()
179 .find(|r| &r.id == id)
180 .map(|r| r.cancel.clone())
181 }
182
183 /// Resume the scheduler (M11 cutover wires actual driving here).
184 pub fn resume(&self) {
185 self.paused.store(false, Ordering::SeqCst);
186 }
187
188 /// Drive the scheduler with the current wall-clock time.
189 ///
190 /// Equivalent to `tick_at(SystemTime::now())`. See [`Self::tick_at`]
191 /// for the full semantics.
192 pub fn tick(&self) -> Vec<QName> {
193 self.tick_at(SystemTime::now())
194 }
195
196 /// Pop every pending job whose schedule has fired at or before
197 /// `now`, transition each to `Running`, and return their ids for
198 /// the caller to dispatch.
199 ///
200 /// **M11 substantive driver primitive.** This is the synchronous,
201 /// runtime-free heart of the scheduler — the eventual Tokio
202 /// driver wraps a poll loop that calls `tick_at(SystemTime::now())`,
203 /// dispatches the returned jobs (e.g., via `tokio::spawn` invoking
204 /// each job's `BackgroundJobProvider::execute`), and calls
205 /// [`Scheduler::mark_finished`] when each completes.
206 ///
207 /// Schedule semantics (delegated to
208 /// [`crate::traits::background::Schedule::next_after`]):
209 ///
210 /// - A job is "due" iff `status == Pending`,
211 /// `next_fire_at.is_none()` or `next_fire_at <= now`, and the
212 /// cancel token is not already triggered.
213 /// - `Manual` jobs have `next_fire_at = now` at registration and
214 /// so are immediately due (matching legacy `tick()` behavior).
215 /// - `Once(at)` jobs become due only when `now >= at`.
216 /// - `Periodic(every)` jobs become due `every` after each fire.
217 /// - `Cron(expr)` jobs become due at the next cron instant
218 /// computed via the [`cron`] crate.
219 ///
220 /// Honors pause: returns empty when [`Self::is_paused`].
221 /// Honors cancellation: skips jobs whose `cancel` token is
222 /// already triggered (filtering them out of the return).
223 pub fn tick_at(&self, now: SystemTime) -> Vec<QName> {
224 if self.is_paused() {
225 return Vec::new();
226 }
227 let mut records = self.records.lock();
228 let mut due: Vec<QName> = Vec::new();
229 for r in records.iter_mut() {
230 if !matches!(r.status, SchedulerJobStatus::Pending) {
231 continue;
232 }
233 if r.cancel.is_cancelled() {
234 r.status = SchedulerJobStatus::Cancelled;
235 continue;
236 }
237 // Time-gate: skip jobs whose schedule hasn't fired yet.
238 if let Some(fire_at) = r.next_fire_at
239 && fire_at > now
240 {
241 continue;
242 }
243 r.status = SchedulerJobStatus::Running;
244 r.last_started_at = Some(now);
245 due.push(r.id.clone());
246 }
247 due
248 }
249
250 /// Number of jobs currently in `Running` state. Useful for
251 /// observability (e.g., a metrics gauge).
252 #[must_use]
253 pub fn running_count(&self) -> usize {
254 self.records
255 .lock()
256 .iter()
257 .filter(|r| matches!(r.status, SchedulerJobStatus::Running))
258 .count()
259 }
260
261 /// Number of pending jobs ready for the next `tick`.
262 #[must_use]
263 pub fn pending_count(&self) -> usize {
264 self.records
265 .lock()
266 .iter()
267 .filter(|r| matches!(r.status, SchedulerJobStatus::Pending))
268 .count()
269 }
270
271 /// Reset every `Running` job back to `Pending` — used by the
272 /// driver to recover from a crash where jobs were started but not
273 /// finished. The host restores the scheduler state from
274 /// `uni_system.background_jobs` and calls this to make all
275 /// previously-`Running` jobs eligible for re-dispatch.
276 pub fn requeue_orphaned_runs(&self) -> usize {
277 let mut records = self.records.lock();
278 let mut count = 0;
279 for r in records.iter_mut() {
280 if matches!(r.status, SchedulerJobStatus::Running) {
281 r.status = SchedulerJobStatus::Pending;
282 count += 1;
283 }
284 }
285 count
286 }
287
288 /// Pause the scheduler.
289 pub fn pause(&self) {
290 self.paused.store(true, Ordering::SeqCst);
291 }
292
293 /// Returns `true` if currently paused.
294 #[must_use]
295 pub fn is_paused(&self) -> bool {
296 self.paused.load(Ordering::SeqCst)
297 }
298
299 /// Mark a job as starting a new run.
300 ///
301 /// Used by tests + the M11 cutover driver. Updates the record's
302 /// `status` to `Running` and stamps `last_started_at`.
303 pub fn mark_started(&self, id: &QName) {
304 let mut records = self.records.lock();
305 if let Some(r) = records.iter_mut().find(|r| &r.id == id) {
306 r.status = SchedulerJobStatus::Running;
307 r.last_started_at = Some(SystemTime::now());
308 }
309 }
310
311 /// Mark a job's run as finished (success or failure).
312 ///
313 /// Recomputes `next_fire_at` from the job's [`Schedule`] using
314 /// `SystemTime::now()` as the reference point. If the schedule has
315 /// another fire upcoming (Periodic, Cron, or a Once whose instant
316 /// is still in the future — which shouldn't normally happen after
317 /// it has just fired), the job transitions back to `Pending` so
318 /// the next [`Self::tick_at`] can pick it up. Otherwise the job
319 /// stays in its terminal state (`Idle` on success,
320 /// `FailedRetrying` on failure).
321 pub fn mark_finished(&self, id: &QName, success: bool) {
322 let now = SystemTime::now();
323 let mut records = self.records.lock();
324 let Some(r) = records.iter_mut().find(|r| &r.id == id) else {
325 return;
326 };
327 r.last_finished_at = Some(now);
328
329 // Only Periodic / Cron reschedule; Once / Manual terminate after a run.
330 let next = r.schedule.next_after(now);
331 let has_next =
332 matches!(r.schedule, Schedule::Periodic(_) | Schedule::Cron(_)) && next.is_some();
333
334 // Periodic / Cron jobs keep firing on schedule even after a
335 // failed run; the `consecutive_failures` counter and the
336 // [`crate::circuit_breaker::CircuitBreaker`] decide when to
337 // stop dispatching a flapping job. A `Once` job that failed
338 // stays in `FailedRetrying` since `has_next` is false.
339 if has_next {
340 r.status = SchedulerJobStatus::Pending;
341 r.next_fire_at = next;
342 } else {
343 r.status = if success {
344 SchedulerJobStatus::Idle
345 } else {
346 SchedulerJobStatus::FailedRetrying
347 };
348 if success {
349 r.next_fire_at = None;
350 }
351 }
352
353 if success {
354 r.consecutive_failures = 0;
355 } else {
356 r.consecutive_failures = r.consecutive_failures.saturating_add(1);
357 }
358 }
359}
360
361impl PartialEq for SchedulerJobRecord {
362 /// Two records are equal iff all their persisted state matches.
363 ///
364 /// Previously this compared only `id` and `status`, so two records
365 /// that differed in `schedule`, `next_fire_at`, or
366 /// `consecutive_failures` (i.e. genuinely-different job states)
367 /// would still compare equal. The `cancel` field is intentionally
368 /// excluded because it is per-process identity (an `Arc<AtomicBool>`)
369 /// and is not part of the persisted record.
370 fn eq(&self, other: &Self) -> bool {
371 self.id == other.id
372 && self.status == other.status
373 && self.next_fire_at == other.next_fire_at
374 && self.last_started_at == other.last_started_at
375 && self.last_finished_at == other.last_finished_at
376 && self.consecutive_failures == other.consecutive_failures
377 && self.schedule == other.schedule
378 }
379}
380
381/// Errors raised by [`SchedulerPersistence`] backends.
382#[derive(Debug, Error)]
383#[non_exhaustive]
384pub enum SchedulerPersistenceError {
385 /// Backend-specific failure (I/O, Cypher execution, serialization).
386 #[error("scheduler persistence: {0}")]
387 Backend(String),
388}
389
390/// Persistence backend for [`Scheduler`] job state.
391///
392/// Mirrors the meta-plugin's `Persistence` trait in shape but scoped
393/// to scheduler records. The Tokio driver (`crates/uni/src/scheduler.rs`)
394/// invokes `record_started` / `record_finished` / `cancel` on each
395/// lifecycle transition; on startup the driver calls `load_all` and
396/// re-registers persisted jobs (followed by
397/// [`Scheduler::requeue_orphaned_runs`] for any that were `Running`
398/// at the previous shutdown / crash).
399///
400/// Two impls ship in-tree:
401///
402/// - [`MemoryPersistence`] — no-op tests + as the default before the
403/// host wires a system-label backend.
404/// - `SystemLabelPersistence` (in `uni-query`, lands with the M9
405/// cutover): round-trips through `uni_system.background_jobs` via
406/// the write-enabled
407/// `QueryProcedureHost::execute_inner_query`.
408pub trait SchedulerPersistence: Send + Sync + std::fmt::Debug {
409 /// Persist a job's schedule at registration time.
410 ///
411 /// Called by the host wrapper (e.g. `SchedulerHost`) whenever a
412 /// caller invokes `add_scheduled_job`, so the schedule kind
413 /// (`Periodic` / `Cron` / `Once` / `Manual`) survives restart and
414 /// can be round-tripped through [`Self::load_all`]. The default
415 /// no-op suits in-memory backends and pre-existing impls that do
416 /// not need durability.
417 ///
418 /// # Errors
419 ///
420 /// Returns [`SchedulerPersistenceError`] on backend failure.
421 fn record_scheduled(
422 &self,
423 _id: &QName,
424 _schedule: &Schedule,
425 ) -> Result<(), SchedulerPersistenceError> {
426 Ok(())
427 }
428
429 /// Persist a job's transition into a new run.
430 ///
431 /// # Errors
432 ///
433 /// Returns [`SchedulerPersistenceError`] on backend failure.
434 fn record_started(
435 &self,
436 id: &QName,
437 started_at: SystemTime,
438 ) -> Result<(), SchedulerPersistenceError>;
439
440 /// Persist the outcome of a finished run.
441 ///
442 /// # Errors
443 ///
444 /// Returns [`SchedulerPersistenceError`] on backend failure.
445 fn record_finished(
446 &self,
447 id: &QName,
448 finished_at: SystemTime,
449 success: bool,
450 ) -> Result<(), SchedulerPersistenceError>;
451
452 /// Persist a cancellation.
453 ///
454 /// # Errors
455 ///
456 /// Returns [`SchedulerPersistenceError`] on backend failure.
457 fn cancel(&self, id: &QName) -> Result<(), SchedulerPersistenceError>;
458
459 /// Reload all known job records (used on host startup to restore
460 /// scheduler state across restart). Order is unspecified — the
461 /// driver re-registers them in any order.
462 ///
463 /// # Errors
464 ///
465 /// Returns [`SchedulerPersistenceError`] on backend failure.
466 fn load_all(&self) -> Result<Vec<SchedulerJobRecord>, SchedulerPersistenceError>;
467
468 /// Force any in-memory buffers to durable storage.
469 ///
470 /// Invoked by `uni.periodic.commit` so operators can drive a
471 /// synchronous checkpoint flush. Backends that write through on
472 /// every event (the default for the system-label backend) leave
473 /// this as the default no-op; buffered backends override.
474 ///
475 /// # Errors
476 ///
477 /// Returns [`SchedulerPersistenceError`] on backend failure.
478 fn flush_checkpoint(&self) -> Result<(), SchedulerPersistenceError> {
479 Ok(())
480 }
481}
482
483/// In-memory [`SchedulerPersistence`] backend. Always returns an empty
484/// `load_all`; every other call is a no-op.
485///
486/// Used by tests and as the default backend when the host has not yet
487/// wired a durable backend (e.g., during early `Uni::build` before the
488/// storage manager is available).
489#[derive(Debug, Default)]
490pub struct MemoryPersistence;
491
492impl SchedulerPersistence for MemoryPersistence {
493 fn record_started(
494 &self,
495 _id: &QName,
496 _started_at: SystemTime,
497 ) -> Result<(), SchedulerPersistenceError> {
498 Ok(())
499 }
500
501 fn record_finished(
502 &self,
503 _id: &QName,
504 _finished_at: SystemTime,
505 _success: bool,
506 ) -> Result<(), SchedulerPersistenceError> {
507 Ok(())
508 }
509
510 fn cancel(&self, _id: &QName) -> Result<(), SchedulerPersistenceError> {
511 Ok(())
512 }
513
514 fn load_all(&self) -> Result<Vec<SchedulerJobRecord>, SchedulerPersistenceError> {
515 Ok(Vec::new())
516 }
517}
518
519/// Trait-object handle to a scheduler, for cross-crate callers that
520/// can't depend on the concrete host-side `SchedulerHost` type.
521///
522/// The built-in `uni.periodic.*` procedures hold an `Arc<dyn
523/// SchedulerControl>` so they can register / cancel / list jobs
524/// without depending on `uni-db`. The host crate (`uni-db`) implements
525/// this on its `SchedulerHost` and passes it down at registration
526/// time.
527pub trait SchedulerControl: Send + Sync + std::fmt::Debug {
528 /// Register a job to fire on `schedule`.
529 fn add_scheduled_job(&self, id: QName, schedule: Schedule);
530
531 /// Cancel a job by id. Returns `true` if it existed.
532 fn cancel(&self, id: &QName) -> bool;
533
534 /// Snapshot of every known job.
535 fn list(&self) -> Vec<SchedulerJobRecord>;
536
537 /// Submit an inline write-mode Cypher body for synchronous
538 /// execution. The default impl returns an error so simple
539 /// scheduler primitives (without a host) can still satisfy the
540 /// trait shape; the `uni-db::scheduler::SchedulerHost` override
541 /// dispatches through its [`crate::traits::background::JobHost`].
542 ///
543 /// Used by `uni.periodic.submit(...)` and as the inner-loop body
544 /// of `uni.periodic.iterate(...)`.
545 ///
546 /// # Errors
547 ///
548 /// Returns [`crate::FnError`] when the scheduler is not wired to a
549 /// Cypher-execution host (default impl) or when the submitted
550 /// statement fails.
551 fn submit_cypher(&self, _cypher: &str) -> Result<(), crate::FnError> {
552 Err(crate::FnError::new(
553 0xD20,
554 "scheduler: submit_cypher not supported by this control (no host wired)",
555 ))
556 }
557
558 /// Drive the persistence backend to flush its checkpoint buffer.
559 ///
560 /// Default impl is a no-op so the bare [`Scheduler`] (with
561 /// [`MemoryPersistence`]) and any control that has no durable
562 /// backend keep working without an override. The host-side
563 /// `SchedulerHost` override forwards to its
564 /// [`SchedulerPersistence::flush_checkpoint`].
565 ///
566 /// # Errors
567 ///
568 /// Returns [`crate::FnError`] when the persistence backend reports
569 /// a flush failure.
570 fn flush_checkpoint(&self) -> Result<(), crate::FnError> {
571 Ok(())
572 }
573}
574
575impl SchedulerControl for Scheduler {
576 fn add_scheduled_job(&self, id: QName, schedule: Schedule) {
577 Self::add_scheduled_job(self, id, schedule);
578 }
579
580 fn cancel(&self, id: &QName) -> bool {
581 Self::cancel(self, id)
582 }
583
584 fn list(&self) -> Vec<SchedulerJobRecord> {
585 Self::list(self)
586 }
587}
588
589/// Cooperative-cancel handle handed to job implementations.
590#[derive(Clone, Debug)]
591pub struct SchedulerHandle {
592 inner: Arc<Scheduler>,
593}
594
595impl SchedulerHandle {
596 /// Wrap a scheduler in a clonable handle.
597 #[must_use]
598 pub fn new(scheduler: Arc<Scheduler>) -> Self {
599 Self { inner: scheduler }
600 }
601
602 /// Borrow the underlying scheduler.
603 #[must_use]
604 pub fn scheduler(&self) -> &Scheduler {
605 &self.inner
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612
613 #[test]
614 fn scheduler_default_is_paused() {
615 let s = Scheduler::new();
616 assert!(s.is_paused());
617 assert!(s.list().is_empty());
618 }
619
620 #[test]
621 fn scheduler_resume_pause_round_trip() {
622 let s = Scheduler::new();
623 s.resume();
624 assert!(!s.is_paused());
625 s.pause();
626 assert!(s.is_paused());
627 }
628
629 #[test]
630 fn add_job_and_cancel() {
631 let s = Scheduler::new();
632 s.add_job(QName::builtin("ttl_sweep"));
633 assert_eq!(s.list().len(), 1);
634 assert!(s.cancel(&QName::builtin("ttl_sweep")));
635 let recs = s.list();
636 assert_eq!(recs[0].status, SchedulerJobStatus::Cancelled);
637 assert!(recs[0].cancel.is_cancelled());
638 }
639
640 #[test]
641 fn cancel_unknown_job_returns_false() {
642 let s = Scheduler::new();
643 assert!(!s.cancel(&QName::builtin("nope")));
644 }
645
646 #[test]
647 fn run_lifecycle_increments_failures_then_resets() {
648 let s = Scheduler::new();
649 let id = QName::builtin("flaky");
650 s.add_job(id.clone());
651
652 s.mark_started(&id);
653 s.mark_finished(&id, false);
654 s.mark_started(&id);
655 s.mark_finished(&id, false);
656
657 let recs = s.list();
658 assert_eq!(recs[0].consecutive_failures, 2);
659 assert_eq!(recs[0].status, SchedulerJobStatus::FailedRetrying);
660
661 s.mark_started(&id);
662 s.mark_finished(&id, true);
663
664 let recs = s.list();
665 assert_eq!(recs[0].consecutive_failures, 0);
666 assert_eq!(recs[0].status, SchedulerJobStatus::Idle);
667 }
668
669 // ── tick / driver primitive tests ──────────────────────────────
670
671 #[test]
672 fn tick_returns_empty_when_paused() {
673 let s = Scheduler::new();
674 s.add_job(QName::builtin("job1"));
675 // Scheduler defaults to paused.
676 assert!(s.tick().is_empty());
677 }
678
679 #[test]
680 fn tick_dispatches_pending_jobs_when_resumed() {
681 let s = Scheduler::new();
682 s.add_job(QName::builtin("job1"));
683 s.add_job(QName::builtin("job2"));
684 s.resume();
685 let due = s.tick();
686 assert_eq!(due.len(), 2);
687 assert!(due.iter().any(|q| q.local() == "job1"));
688 assert!(due.iter().any(|q| q.local() == "job2"));
689 // Each ticked job is now Running.
690 assert_eq!(s.running_count(), 2);
691 assert_eq!(s.pending_count(), 0);
692 }
693
694 #[test]
695 fn tick_skips_cancelled_jobs() {
696 let s = Scheduler::new();
697 s.add_job(QName::builtin("doomed"));
698 s.cancel(&QName::builtin("doomed"));
699 s.resume();
700 let due = s.tick();
701 assert!(due.is_empty(), "cancelled job should not be dispatched");
702 }
703
704 #[test]
705 fn second_tick_returns_empty_until_jobs_marked_pending() {
706 let s = Scheduler::new();
707 s.add_job(QName::builtin("once"));
708 s.resume();
709 assert_eq!(s.tick().len(), 1);
710 // Without mark_finished, the job stays Running; second tick
711 // doesn't redispatch.
712 assert!(s.tick().is_empty());
713 s.mark_finished(&QName::builtin("once"), true);
714 // Now Idle, not Pending — still won't redispatch (idempotent).
715 assert!(s.tick().is_empty());
716 }
717
718 #[test]
719 fn requeue_orphaned_runs_moves_running_back_to_pending() {
720 let s = Scheduler::new();
721 s.add_job(QName::builtin("orphan"));
722 s.resume();
723 s.tick();
724 assert_eq!(s.running_count(), 1);
725 let count = s.requeue_orphaned_runs();
726 assert_eq!(count, 1);
727 assert_eq!(s.running_count(), 0);
728 assert_eq!(s.pending_count(), 1);
729 // After requeue, next tick dispatches again.
730 assert_eq!(s.tick().len(), 1);
731 }
732
733 // ── Schedule semantics tests ────────────────────────────────
734
735 #[test]
736 fn schedule_once_fires_only_after_instant() {
737 use std::time::Duration;
738 let s = Scheduler::new();
739 s.resume();
740 let future = SystemTime::now() + Duration::from_secs(60);
741 s.add_scheduled_job(QName::builtin("once"), Schedule::Once(future));
742 let due_now = s.tick_at(SystemTime::now());
743 assert!(
744 due_now.is_empty(),
745 "Once job should not fire before its instant"
746 );
747 let due_after = s.tick_at(future + Duration::from_secs(1));
748 assert_eq!(due_after.len(), 1);
749 assert_eq!(due_after[0].local(), "once");
750 }
751
752 #[test]
753 fn schedule_once_does_not_reschedule_after_finish() {
754 use std::time::Duration;
755 let s = Scheduler::new();
756 s.resume();
757 let past = SystemTime::now() - Duration::from_secs(1);
758 s.add_scheduled_job(QName::builtin("once"), Schedule::Once(past));
759 let due = s.tick_at(SystemTime::now());
760 assert_eq!(due.len(), 1);
761 s.mark_finished(&QName::builtin("once"), true);
762 let recs = s.list();
763 assert_eq!(recs[0].status, SchedulerJobStatus::Idle);
764 assert!(recs[0].next_fire_at.is_none());
765 assert!(
766 s.tick_at(SystemTime::now() + Duration::from_secs(3600))
767 .is_empty()
768 );
769 }
770
771 #[test]
772 fn schedule_periodic_reschedules_after_finish() {
773 use std::time::Duration;
774 let s = Scheduler::new();
775 s.resume();
776 let start = SystemTime::now();
777 s.add_scheduled_job(
778 QName::builtin("ticker"),
779 Schedule::Periodic(Duration::from_secs(10)),
780 );
781 assert!(s.tick_at(start + Duration::from_secs(5)).is_empty());
782 let due = s.tick_at(start + Duration::from_secs(11));
783 assert_eq!(due.len(), 1);
784 s.mark_finished(&QName::builtin("ticker"), true);
785 let recs = s.list();
786 assert_eq!(recs[0].status, SchedulerJobStatus::Pending);
787 assert!(recs[0].next_fire_at.is_some());
788 }
789
790 #[test]
791 fn schedule_cron_emits_future_fire() {
792 use std::time::Duration;
793 let s = Scheduler::new();
794 s.resume();
795 s.add_scheduled_job(
796 QName::builtin("every_min"),
797 Schedule::Cron(smol_str::SmolStr::new("0 * * * * *")),
798 );
799 let recs = s.list();
800 let next = recs[0].next_fire_at.expect("cron must produce a next fire");
801 assert!(next > SystemTime::now() - Duration::from_secs(1));
802 }
803
804 #[test]
805 fn manual_schedule_is_immediately_due() {
806 let s = Scheduler::new();
807 s.resume();
808 s.add_scheduled_job(QName::builtin("legacy"), Schedule::Manual);
809 let due = s.tick();
810 assert_eq!(due.len(), 1);
811 assert_eq!(due[0].local(), "legacy");
812 }
813
814 #[test]
815 fn pending_count_and_running_count_track_lifecycle() {
816 let s = Scheduler::new();
817 for n in 0..5 {
818 s.add_job(QName::builtin(format!("job{n}")));
819 }
820 s.resume();
821 assert_eq!(s.pending_count(), 5);
822 assert_eq!(s.running_count(), 0);
823 let due = s.tick();
824 assert_eq!(due.len(), 5);
825 assert_eq!(s.pending_count(), 0);
826 assert_eq!(s.running_count(), 5);
827 s.mark_finished(&QName::builtin("job0"), true);
828 s.mark_finished(&QName::builtin("job1"), false);
829 assert_eq!(s.running_count(), 3, "two have finished");
830 }
831}