Skip to main content

uni_plugin_host/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Host-side tokio-backed scheduler driver.
5//!
6//! Wraps the runtime-free [`uni_plugin::scheduler::Scheduler`]
7//! primitive in a [`SchedulerHost`] that:
8//!
9//! - calls [`Scheduler::tick_at`](uni_plugin::scheduler::Scheduler::tick_at)
10//!   every `tick_interval`,
11//! - looks up the [`BackgroundJobProvider`] for each due job from
12//!   the host's [`PluginRegistry`],
13//! - dispatches each provider's
14//!   [`execute`](uni_plugin::traits::background::BackgroundJobProvider::execute)
15//!   on [`tokio::task::spawn_blocking`] (the trait method is sync;
16//!   providers that need async work `Handle::current().block_on(...)`
17//!   inside),
18//! - reports lifecycle transitions to the configured
19//!   [`SchedulerPersistence`] backend, and
20//! - drains all in-flight runs on shutdown via the supplied
21//!   `ShutdownHandle` broadcast.
22//!
23//! Per the M11 plan, the durable backend (writes through
24//! `uni_system.background_jobs` via the write-enabled
25//! `execute_inner_query`) lives downstream in `uni-query`; this module
26//! consumes only the [`SchedulerPersistence`] trait.
27
28// Rust guideline compliant
29
30use std::sync::{Arc, OnceLock};
31use std::time::Duration;
32
33use tokio::sync::broadcast;
34
35use uni_plugin::PluginRegistry;
36use uni_plugin::circuit_breaker::{BreakerConfig, CircuitBreaker};
37use uni_plugin::plugin::PluginId;
38use uni_plugin::qname::QName;
39use uni_plugin::scheduler::{MemoryPersistence, Scheduler, SchedulerPersistence};
40use uni_plugin::traits::background::{BackgroundJobProvider, JobContext, JobHost};
41use uni_store::storage::manager::StorageManager;
42
43use crate::host::HostCypherExecutor;
44use crate::shutdown::ShutdownHandle;
45
46/// Default driver tick interval — chosen to match the existing
47/// [`DeferralQueue`](crate::triggers::DeferralQueue) ticker so the
48/// two background drivers cohabit on the same cadence.
49pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_millis(100);
50
51/// Host-side scheduler driver.
52///
53/// One per `Uni` instance. Constructed during
54/// `Uni::build`; the constructor spawns the driving loop onto the
55/// ambient tokio runtime and tracks the join handle through the supplied
56/// `ShutdownHandle`.
57#[derive(Debug)]
58pub struct SchedulerHost {
59    /// The primitive scheduler this host drives.
60    scheduler: Arc<Scheduler>,
61    /// Persistence backend for job state.
62    persistence: Arc<dyn SchedulerPersistence>,
63    /// Per-(plugin, qname) circuit breaker. Background jobs that fail
64    /// `failure_threshold` consecutive times trip the breaker and are
65    /// skipped on subsequent ticks until `cooldown` elapses (after
66    /// which the breaker half-opens and admits one test call). This
67    /// prevents a flapping job from monopolizing the spawn-blocking
68    /// pool. See [`CircuitBreaker`].
69    circuit_breaker: Arc<CircuitBreaker>,
70    /// Host services passed to each [`JobContext`] on dispatch.
71    /// Optional so test fixtures that don't need storage / Uni access
72    /// can construct a `SchedulerHost` without a full `Uni`.
73    job_host: Option<Arc<SchedulerJobHost>>,
74}
75
76/// Concrete [`JobHost`] implementation that lets built-in background
77/// jobs reach the storage manager and (after `Uni::build` finishes)
78/// the host `UniInner` for write-mode Cypher execution.
79///
80/// Built by `Uni::build`. Held by [`SchedulerHost`] and passed by
81/// reference into each [`JobContext`] in `dispatch_one_tick`.
82pub struct SchedulerJobHost {
83    storage: Arc<StorageManager>,
84    /// Set after `Uni::build` returns — gives ttl/iterate-style jobs
85    /// access to the host's write-mode Cypher executor. The executor
86    /// itself holds a `Weak` to the host so the scheduler-host ↔ Uni
87    /// cycle doesn't leak.
88    host_executor: OnceLock<Arc<dyn HostCypherExecutor>>,
89}
90
91impl std::fmt::Debug for SchedulerJobHost {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct("SchedulerJobHost")
94            .field("host_executor_wired", &self.host_executor.get().is_some())
95            .finish_non_exhaustive()
96    }
97}
98
99impl SchedulerJobHost {
100    /// Construct with the storage manager only. The host Cypher
101    /// executor is set later (after `Uni::build` constructs it) via
102    /// [`Self::set_host_executor`].
103    #[must_use]
104    pub fn new(storage: Arc<StorageManager>) -> Self {
105        Self {
106            storage,
107            host_executor: OnceLock::new(),
108        }
109    }
110
111    /// Wire the host write-mode Cypher executor. Idempotent —
112    /// subsequent calls after the first are no-ops.
113    pub fn set_host_executor(&self, exec: Arc<dyn HostCypherExecutor>) {
114        let _ = self.host_executor.set(exec);
115    }
116
117    /// Borrow the storage manager.
118    #[must_use]
119    pub fn storage(&self) -> &Arc<StorageManager> {
120        &self.storage
121    }
122}
123
124/// `SchedulerControl` impl for the host-side `SchedulerHost`.
125///
126/// Delegates list/add/cancel to the inner [`Scheduler`] and overrides
127/// `submit_cypher` to dispatch through the attached
128/// [`SchedulerJobHost`]'s `execute_write_cypher`. This is what makes
129/// the `uni.periodic.submit` / `uni.periodic.iterate` procedures
130/// actually run Cypher when invoked.
131impl uni_plugin::scheduler::SchedulerControl for SchedulerHost {
132    fn add_scheduled_job(&self, id: QName, schedule: uni_plugin::traits::background::Schedule) {
133        // Persist the schedule kind before registering so a restart
134        // can replay `Periodic` / `Cron` / `Once` jobs without
135        // downgrading them to `Manual`. Failures are logged and the
136        // registration still proceeds — losing durability is strictly
137        // worse than losing the in-memory job.
138        if let Err(e) = self.persistence.record_scheduled(&id, &schedule) {
139            tracing::warn!(
140                qname = %id,
141                error = %e,
142                "SchedulerHost: record_scheduled failed; in-memory registration continues",
143            );
144        }
145        self.scheduler.add_scheduled_job(id, schedule);
146    }
147
148    fn cancel(&self, id: &QName) -> bool {
149        self.scheduler.cancel(id)
150    }
151
152    fn list(&self) -> Vec<uni_plugin::scheduler::SchedulerJobRecord> {
153        self.scheduler.list()
154    }
155
156    fn submit_cypher(&self, cypher: &str) -> Result<(), uni_plugin::FnError> {
157        let Some(host) = self.job_host.as_ref() else {
158            return Err(uni_plugin::FnError::new(
159                0xD21,
160                "submit_cypher: scheduler host has no JobHost wired",
161            ));
162        };
163        host.execute_write_cypher(cypher)
164    }
165
166    fn flush_checkpoint(&self) -> Result<(), uni_plugin::FnError> {
167        self.persistence
168            .flush_checkpoint()
169            .map_err(|e| uni_plugin::FnError::new(0xD22, format!("flush_checkpoint: {e}")))
170    }
171}
172
173impl JobHost for SchedulerJobHost {
174    fn as_any(&self) -> &dyn std::any::Any {
175        self
176    }
177
178    fn compact_storage(&self) -> Result<(), uni_plugin::FnError> {
179        // `StorageManager::compact` is async; bridge sync→async via
180        // `block_in_place` + `Handle::current().block_on(...)`. Safe
181        // because the job's `execute()` already runs on
182        // `spawn_blocking`, which is a multi-thread tokio worker.
183        let storage = Arc::clone(&self.storage);
184        tokio::task::block_in_place(|| {
185            tokio::runtime::Handle::current().block_on(async move { storage.compact().await })
186        })
187        .map(|_stats| ())
188        .map_err(|e| uni_plugin::FnError::new(0xD11, format!("compact_storage: {e}")))
189    }
190
191    fn execute_write_cypher(&self, cypher: &str) -> Result<(), uni_plugin::FnError> {
192        // The host executor is set after `Uni::build`. Absence means
193        // the host isn't wired yet (or has been dropped — racing
194        // shutdown); bail without error so the scheduler doesn't open
195        // the circuit breaker on a graceful-shutdown race. The
196        // executor owns the `block_in_place`/current-thread-runtime
197        // guard and the Session/tx/commit body — see
198        // `UniInnerCypherExecutor` in `uni-db`.
199        let Some(exec) = self.host_executor.get() else {
200            tracing::debug!("execute_write_cypher: host executor not wired (shutdown race?)",);
201            return Ok(());
202        };
203        exec.execute_write_cypher(cypher)
204            .map_err(|e| uni_plugin::FnError::new(0xD12, format!("execute_write_cypher: {e}")))
205    }
206}
207
208impl SchedulerHost {
209    /// Construct a scheduler host wired to the supplied
210    /// [`PluginRegistry`] and `ShutdownHandle`, spawning the driving
211    /// loop on the ambient tokio runtime.
212    ///
213    /// Reloads previously-persisted jobs via
214    /// [`SchedulerPersistence::load_all`] and registers them with the
215    /// primitive. Re-queues any orphaned `Running` runs via
216    /// [`Scheduler::requeue_orphaned_runs`].
217    ///
218    /// # Panics
219    ///
220    /// Does not panic itself, but the spawned driver task panics if
221    /// no tokio runtime is active. Callers must invoke this from
222    /// inside a tokio context (which is the case during `Uni::build`).
223    #[must_use]
224    pub fn spawn(
225        registry: Arc<PluginRegistry>,
226        persistence: Arc<dyn SchedulerPersistence>,
227        shutdown: &ShutdownHandle,
228        tick_interval: Duration,
229    ) -> Arc<Self> {
230        Self::spawn_with_job_host(registry, persistence, shutdown, tick_interval, None)
231    }
232
233    /// Construct a scheduler host with an attached
234    /// [`SchedulerJobHost`] (built by `Uni::build` and threaded into
235    /// every dispatched [`JobContext`]).
236    #[must_use]
237    pub fn spawn_with_job_host(
238        registry: Arc<PluginRegistry>,
239        persistence: Arc<dyn SchedulerPersistence>,
240        shutdown: &ShutdownHandle,
241        tick_interval: Duration,
242        job_host: Option<Arc<SchedulerJobHost>>,
243    ) -> Arc<Self> {
244        let scheduler = Arc::new(Scheduler::new());
245
246        // Replay persisted job records (if any) so jobs survive
247        // restart. Errors are logged and ignored — a missing or
248        // corrupt backend should not block startup.
249        match persistence.load_all() {
250            Ok(records) => {
251                for record in records {
252                    scheduler.add_scheduled_job(record.id.clone(), record.schedule);
253                }
254                // Anything previously stuck in `Running` is now
255                // `Pending` and will fire on next tick.
256                let requeued = scheduler.requeue_orphaned_runs();
257                if requeued > 0 {
258                    tracing::info!(
259                        requeued,
260                        "scheduler: requeued orphaned runs from previous shutdown"
261                    );
262                }
263            }
264            Err(e) => tracing::warn!(error = %e, "scheduler: load_all failed; starting empty"),
265        }
266
267        scheduler.resume();
268
269        let circuit_breaker = Arc::new(CircuitBreaker::new(BreakerConfig::default()));
270
271        let host = Arc::new(Self {
272            scheduler: Arc::clone(&scheduler),
273            persistence: Arc::clone(&persistence),
274            circuit_breaker: Arc::clone(&circuit_breaker),
275            job_host: job_host.clone(),
276        });
277
278        // Spawn the driver loop.
279        let driver_scheduler = Arc::clone(&scheduler);
280        let driver_persistence = Arc::clone(&persistence);
281        let driver_registry = Arc::clone(&registry);
282        let driver_breaker = Arc::clone(&circuit_breaker);
283        let driver_job_host = job_host;
284        let shutdown_rx = shutdown.subscribe();
285        let handle = tokio::spawn(driver_loop(
286            driver_scheduler,
287            driver_persistence,
288            driver_registry,
289            driver_breaker,
290            driver_job_host,
291            shutdown_rx,
292            tick_interval,
293        ));
294        shutdown.track_task(handle);
295
296        host
297    }
298
299    /// Borrow the attached [`SchedulerJobHost`] (if any).
300    #[must_use]
301    pub fn job_host(&self) -> Option<&Arc<SchedulerJobHost>> {
302        self.job_host.as_ref()
303    }
304
305    /// Borrow the host's circuit breaker. Exposed for tests + the
306    /// eventual `uni.system.circuit_breaker.*` introspection
307    /// procedures.
308    #[must_use]
309    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
310        &self.circuit_breaker
311    }
312
313    /// Borrow the underlying primitive scheduler.
314    #[must_use]
315    pub fn scheduler(&self) -> &Arc<Scheduler> {
316        &self.scheduler
317    }
318
319    /// Borrow the persistence backend.
320    #[must_use]
321    pub fn persistence(&self) -> &Arc<dyn SchedulerPersistence> {
322        &self.persistence
323    }
324}
325
326/// Convenience constructor returning a [`SchedulerHost`] backed by an
327/// in-memory persistence — used by the default `Uni::build` path
328/// before the host wires a durable backend.
329#[must_use]
330pub fn spawn_with_memory_persistence(
331    registry: Arc<PluginRegistry>,
332    shutdown: &ShutdownHandle,
333) -> Arc<SchedulerHost> {
334    SchedulerHost::spawn(
335        registry,
336        Arc::new(MemoryPersistence),
337        shutdown,
338        DEFAULT_TICK_INTERVAL,
339    )
340}
341
342/// The driving loop. Calls
343/// [`Scheduler::tick_at`](uni_plugin::scheduler::Scheduler::tick_at) on
344/// every interval and dispatches each due job to its provider on a
345/// blocking worker thread.
346async fn driver_loop(
347    scheduler: Arc<Scheduler>,
348    persistence: Arc<dyn SchedulerPersistence>,
349    registry: Arc<PluginRegistry>,
350    circuit_breaker: Arc<CircuitBreaker>,
351    job_host: Option<Arc<SchedulerJobHost>>,
352    mut shutdown_rx: broadcast::Receiver<()>,
353    tick_interval: Duration,
354) {
355    let mut ticker = tokio::time::interval(tick_interval);
356    // Skip the immediate first tick so registration of jobs (which
357    // may happen synchronously right after `spawn`) can settle.
358    ticker.tick().await;
359    loop {
360        tokio::select! {
361            _ = ticker.tick() => {
362                dispatch_one_tick(
363                    &scheduler,
364                    &persistence,
365                    &registry,
366                    &circuit_breaker,
367                    job_host.as_ref(),
368                );
369            }
370            _ = shutdown_rx.recv() => {
371                tracing::info!("scheduler driver: shutdown received");
372                break;
373            }
374        }
375    }
376}
377
378/// Perform one tick: collect due jobs, look up their providers, and
379/// spawn each on `spawn_blocking`. Persisted lifecycle transitions
380/// happen on the same spawned task so the persistence backend sees a
381/// consistent `record_started` → `record_finished` pair per run.
382fn dispatch_one_tick(
383    scheduler: &Arc<Scheduler>,
384    persistence: &Arc<dyn SchedulerPersistence>,
385    registry: &Arc<PluginRegistry>,
386    circuit_breaker: &Arc<CircuitBreaker>,
387    job_host: Option<&Arc<SchedulerJobHost>>,
388) {
389    let due = scheduler.tick();
390    if due.is_empty() {
391        return;
392    }
393    let providers = registry.background_jobs();
394    // All background-job invocations are attributed to the "uni"
395    // plugin id for breaker bookkeeping; per-job qname distinguishes
396    // them so a flapping `ttl_sweep` does not poison `compaction`.
397    let plugin_id = PluginId::new("uni");
398    for id in due {
399        // Circuit-breaker gate: if a job has tripped, skip dispatch
400        // until the cooldown elapses (half-open lets one through).
401        if !circuit_breaker.allow(&plugin_id, &id) {
402            tracing::debug!(
403                job = %id,
404                "scheduler: circuit breaker open; skipping this tick"
405            );
406            // Mark finished with success=false so the schedule
407            // recomputes a next fire instead of leaving the job stuck
408            // Running. We intentionally don't `record_failure` here —
409            // the breaker is already open; recording would only add
410            // noise.
411            scheduler.mark_finished(&id, false);
412            continue;
413        }
414        let Some(provider) = find_provider(&providers, &id) else {
415            tracing::warn!(
416                job = %id,
417                "scheduler: no provider registered; marking finished with failure"
418            );
419            let now = std::time::SystemTime::now();
420            scheduler.mark_finished(&id, false);
421            circuit_breaker.record_failure(&plugin_id, &id);
422            let _ = persistence.record_finished(&id, now, false);
423            continue;
424        };
425        let scheduler_clone = Arc::clone(scheduler);
426        let persistence_clone = Arc::clone(persistence);
427        let breaker_clone = Arc::clone(circuit_breaker);
428        let plugin_id_clone = plugin_id.clone();
429        let job_host_clone = job_host.cloned();
430        let started_at = std::time::SystemTime::now();
431        if let Err(e) = persistence_clone.record_started(&id, started_at) {
432            tracing::warn!(
433                job = %id,
434                error = %e,
435                "scheduler: record_started failed; continuing"
436            );
437        }
438        // §1.2 / Phase 6: use the cancel token attached to the
439        // scheduler's job record (rather than a fresh one) so that
440        // `Scheduler::cancel(id)` and any other holder of the token can
441        // signal in-flight runs cooperatively. Fall back to a fresh
442        // token if the record vanished between `tick()` and here
443        // (extremely unlikely; defensive).
444        let cancel = scheduler.cancel_token_for(&id).unwrap_or_default();
445        let cancel_for_select = cancel.clone();
446        let id_for_log = id.clone();
447        let blocking = tokio::task::spawn_blocking(move || {
448            let mut ctx = JobContext::new(cancel, None);
449            // SAFETY-irrelevant: the host Arc is held by
450            // `job_host_clone` for the entire scope of this closure;
451            // the borrow stays valid for the synchronous
452            // `provider.execute(ctx)` call. Rust's borrow checker
453            // verifies this — `provider.execute` consumes `ctx`
454            // before the closure ends.
455            if let Some(host) = job_host_clone.as_deref() {
456                ctx = ctx.with_host(host as &dyn JobHost);
457            }
458            provider.execute(ctx)
459        });
460        // §1.2 / Phase 6: wrap the blocking dispatch in `tokio::select!`
461        // racing the per-job `cancelled().await`. If cancellation
462        // arrives mid-run, the scheduler observes it immediately and
463        // marks the job finished without waiting for the body to poll
464        // `is_cancelled()`. The body keeps running on the blocking
465        // worker (we can't preempt synchronous Rust), but its result is
466        // dropped — the lifecycle records use the "cancelled" outcome
467        // path so the breaker / persistence stay consistent.
468        tokio::spawn(async move {
469            let success = tokio::select! {
470                joined = blocking => {
471                    match joined {
472                        Ok(outcome) => outcome.is_ok(),
473                        Err(join_err) => {
474                            tracing::warn!(
475                                job = %id_for_log,
476                                error = %join_err,
477                                "scheduler: blocking dispatch join failed"
478                            );
479                            false
480                        }
481                    }
482                }
483                () = cancel_for_select.cancelled() => {
484                    tracing::info!(
485                        job = %id_for_log,
486                        "scheduler: cancellation observed before job completion"
487                    );
488                    false
489                }
490            };
491            let finished_at = std::time::SystemTime::now();
492            scheduler_clone.mark_finished(&id, success);
493            if success {
494                breaker_clone.record_success(&plugin_id_clone, &id);
495            } else {
496                breaker_clone.record_failure(&plugin_id_clone, &id);
497            }
498            if let Err(e) = persistence_clone.record_finished(&id, finished_at, success) {
499                tracing::warn!(
500                    job = %id,
501                    error = %e,
502                    "scheduler: record_finished failed"
503                );
504            }
505        });
506    }
507}
508
509/// Linear search through the registry's background-job providers for
510/// one whose [`JobDefinition::id`](
511/// uni_plugin::traits::background::JobDefinition) matches.
512///
513/// Provider count is small (single-digit to low-double-digit in
514/// practice — one per registered built-in / user job), so the linear
515/// scan is cheaper than a hash lookup at this scale.
516fn find_provider(
517    providers: &Arc<Vec<Arc<dyn BackgroundJobProvider>>>,
518    id: &QName,
519) -> Option<Arc<dyn BackgroundJobProvider>> {
520    providers
521        .iter()
522        .find(|p| &p.definition().id == id)
523        .map(Arc::clone)
524}
525
526#[cfg(test)]
527mod tests {
528    use super::*;
529    use std::sync::atomic::{AtomicU64, Ordering};
530    use uni_plugin::Capability;
531    use uni_plugin::CapabilitySet;
532    use uni_plugin::PluginRegistrar;
533    use uni_plugin::errors::FnError;
534    use uni_plugin::traits::background::{
535        ConcurrencyLimit, JobDefinition, JobOutcome, RetryPolicy, Schedule,
536    };
537
538    /// Test fixture: a job that increments a shared counter on each fire.
539    #[derive(Debug)]
540    struct CountingJob {
541        definition: JobDefinition,
542        counter: Arc<AtomicU64>,
543    }
544
545    impl BackgroundJobProvider for CountingJob {
546        fn definition(&self) -> &JobDefinition {
547            &self.definition
548        }
549
550        fn execute(&self, _ctx: JobContext<'_>) -> Result<JobOutcome, FnError> {
551            self.counter.fetch_add(1, Ordering::SeqCst);
552            Ok(JobOutcome::Done)
553        }
554    }
555
556    /// Test fixture: a job that always fails. Used to drive the
557    /// circuit breaker open.
558    #[derive(Debug)]
559    struct AlwaysFailJob {
560        definition: JobDefinition,
561        attempts: Arc<AtomicU64>,
562    }
563
564    impl BackgroundJobProvider for AlwaysFailJob {
565        fn definition(&self) -> &JobDefinition {
566            &self.definition
567        }
568
569        fn execute(&self, _ctx: JobContext<'_>) -> Result<JobOutcome, FnError> {
570            self.attempts.fetch_add(1, Ordering::SeqCst);
571            Err(FnError::new(0xC1F, "always fails"))
572        }
573    }
574
575    fn make_registry_with_job(provider: Arc<dyn BackgroundJobProvider>) -> Arc<PluginRegistry> {
576        let registry = Arc::new(PluginRegistry::new());
577        let caps = CapabilitySet::from_iter_of([Capability::BackgroundJob { max_concurrent: 0 }]);
578        let plugin_id = uni_plugin::PluginId::new("test");
579        let mut r = PluginRegistrar::new(plugin_id, &caps, &registry);
580        r.background_job(provider).expect("background_job register");
581        r.commit_to_registry().expect("commit");
582        registry
583    }
584
585    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
586    async fn driver_fires_periodic_job() {
587        let counter = Arc::new(AtomicU64::new(0));
588        let provider = Arc::new(CountingJob {
589            definition: JobDefinition {
590                id: QName::new("test", "ticker"),
591                schedule: Schedule::Periodic(Duration::from_millis(50)),
592                concurrency: ConcurrencyLimit::Exclusive,
593                timeout: Duration::from_secs(1),
594                retry: RetryPolicy::Never,
595                docs: "test ticker".to_owned(),
596            },
597            counter: Arc::clone(&counter),
598        });
599        let registry = make_registry_with_job(provider);
600        let shutdown = ShutdownHandle::new(Duration::from_secs(5));
601        let host = SchedulerHost::spawn(
602            registry,
603            Arc::new(MemoryPersistence),
604            &shutdown,
605            Duration::from_millis(25),
606        );
607        host.scheduler().add_scheduled_job(
608            QName::new("test", "ticker"),
609            Schedule::Periodic(Duration::from_millis(50)),
610        );
611
612        // Let the driver run for ~400ms — should yield several fires.
613        tokio::time::sleep(Duration::from_millis(400)).await;
614
615        let fires = counter.load(Ordering::SeqCst);
616        assert!(
617            fires >= 2,
618            "expected the periodic job to fire at least twice, got {fires}"
619        );
620
621        // Clean shutdown.
622        let _ = shutdown.shutdown_async().await;
623    }
624
625    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
626    async fn cancel_halts_further_runs() {
627        let counter = Arc::new(AtomicU64::new(0));
628        let provider = Arc::new(CountingJob {
629            definition: JobDefinition {
630                id: QName::new("test", "cancelme"),
631                schedule: Schedule::Periodic(Duration::from_millis(50)),
632                concurrency: ConcurrencyLimit::Exclusive,
633                timeout: Duration::from_secs(1),
634                retry: RetryPolicy::Never,
635                docs: "cancelme".to_owned(),
636            },
637            counter: Arc::clone(&counter),
638        });
639        let registry = make_registry_with_job(provider);
640        let shutdown = ShutdownHandle::new(Duration::from_secs(5));
641        let host = SchedulerHost::spawn(
642            registry,
643            Arc::new(MemoryPersistence),
644            &shutdown,
645            Duration::from_millis(25),
646        );
647        let job_id = QName::new("test", "cancelme");
648        host.scheduler().add_scheduled_job(
649            job_id.clone(),
650            Schedule::Periodic(Duration::from_millis(50)),
651        );
652
653        // Let it fire at least once.
654        tokio::time::sleep(Duration::from_millis(150)).await;
655        let pre_cancel = counter.load(Ordering::SeqCst);
656        assert!(pre_cancel >= 1, "expected at least one pre-cancel fire");
657
658        host.scheduler().cancel(&job_id);
659
660        // Settle, then take a final count.
661        tokio::time::sleep(Duration::from_millis(300)).await;
662        let post_cancel = counter.load(Ordering::SeqCst);
663
664        // After cancel the counter should stop advancing. We allow up
665        // to one extra fire in flight at the moment of cancel.
666        assert!(
667            post_cancel <= pre_cancel + 1,
668            "expected cancel to halt firing; pre={pre_cancel} post={post_cancel}"
669        );
670
671        let _ = shutdown.shutdown_async().await;
672    }
673
674    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
675    async fn circuit_breaker_opens_after_threshold_failures() {
676        let attempts = Arc::new(AtomicU64::new(0));
677        let provider = Arc::new(AlwaysFailJob {
678            definition: JobDefinition {
679                id: QName::new("test", "flaky"),
680                schedule: Schedule::Periodic(Duration::from_millis(20)),
681                concurrency: ConcurrencyLimit::Exclusive,
682                timeout: Duration::from_secs(1),
683                retry: RetryPolicy::Never,
684                docs: "flaky".to_owned(),
685            },
686            attempts: Arc::clone(&attempts),
687        });
688        let registry = make_registry_with_job(provider);
689        let shutdown = ShutdownHandle::new(Duration::from_secs(5));
690        let host = SchedulerHost::spawn(
691            registry,
692            Arc::new(MemoryPersistence),
693            &shutdown,
694            Duration::from_millis(10),
695        );
696        host.scheduler().add_scheduled_job(
697            QName::new("test", "flaky"),
698            Schedule::Periodic(Duration::from_millis(20)),
699        );
700
701        // Let the driver pile up failures. With a 10ms tick interval
702        // and a 20ms periodic schedule, we expect ~25 dispatch
703        // opportunities over 500ms — but the breaker opens after 10
704        // failures, after which attempts plateau.
705        tokio::time::sleep(Duration::from_millis(500)).await;
706
707        let total_attempts = attempts.load(Ordering::SeqCst);
708        assert!(
709            (10..=20).contains(&total_attempts),
710            "expected the breaker to cap attempts around the failure_threshold (10); \
711             got {total_attempts}"
712        );
713
714        let _ = shutdown.shutdown_async().await;
715    }
716}