Skip to main content

signal_mod/
coord.rs

1//! The [`Coordinator`], its [`builder`](CoordinatorBuilder), and
2//! the [`Statistics`] snapshot type.
3
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use parking_lot::Mutex;
9
10use crate::error::{Error, Result};
11use crate::hook::ShutdownHook;
12use crate::reason::ShutdownReason;
13use crate::signal::SignalSet;
14use crate::state::Inner;
15use crate::token::{ShutdownToken, ShutdownTrigger};
16
17/// Default graceful-shutdown budget: 5 seconds.
18///
19/// The default is a balance between giving subsystems room to flush
20/// state and bounding the worst-case shutdown latency for an
21/// operator-initiated terminate. Override per-coordinator with
22/// [`CoordinatorBuilder::graceful_timeout`].
23const DEFAULT_GRACEFUL_MS: u64 = 5_000;
24
25/// Default force-shutdown budget: 10 seconds.
26///
27/// The force budget is exposed for downstream consumers that implement
28/// a multi-phase shutdown ladder on top of the graceful phase. The
29/// coordinator itself does not enforce this budget directly.
30const DEFAULT_FORCE_MS: u64 = 10_000;
31
32/// Owns the shutdown state machine, hook list, and (optionally) the
33/// installed signal handlers.
34///
35/// A `Coordinator` is the central object of `signal-mod`. It is
36/// constructed once at program startup via [`Coordinator::builder`]
37/// and then:
38///
39/// 1. Optionally registers OS-level signal handlers via
40///    [`install`](Coordinator::install).
41/// 2. Hands out cheap-to-clone [`ShutdownToken`] observer handles
42///    and [`ShutdownTrigger`] initiator handles to the rest of the
43///    program.
44/// 3. After shutdown is initiated (by signal, programmatic trigger,
45///    or supervisory parent), runs registered [`ShutdownHook`]s in
46///    descending priority order via
47///    [`run_hooks`](Coordinator::run_hooks).
48///
49/// The coordinator is `Send + Sync`. It holds an `Arc` to the shared
50/// state machine, so cloning a token or trigger is `O(1)`.
51///
52/// # Examples
53///
54/// ```no_run
55/// use signal_mod::{Coordinator, ShutdownReason, SignalSet};
56/// use std::time::Duration;
57///
58/// # #[cfg(feature = "tokio")]
59/// # async fn run() -> signal_mod::Result<()> {
60/// let coord = Coordinator::builder()
61///     .signals(SignalSet::graceful())
62///     .graceful_timeout(Duration::from_secs(5))
63///     .hook(signal_mod::hook_from_fn(
64///         "flush-logs",
65///         100,
66///         |reason| eprintln!("shutting down: {reason}"),
67///     ))
68///     .build();
69///
70/// coord.install()?;
71///
72/// let token = coord.token();
73/// token.wait().await;
74///
75/// let reason = token.reason().unwrap_or(ShutdownReason::Requested);
76/// coord.run_hooks(reason);
77/// # Ok(())
78/// # }
79/// ```
80pub struct Coordinator {
81    inner: Arc<Inner>,
82    signals: SignalSet,
83    graceful_timeout: Duration,
84    force_timeout: Duration,
85    hooks: Mutex<Vec<Box<dyn ShutdownHook>>>,
86    installed: AtomicBool,
87    hooks_completed: AtomicUsize,
88}
89
90impl core::fmt::Debug for Coordinator {
91    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
92        f.debug_struct("Coordinator")
93            .field("signals", &self.signals)
94            .field("graceful_timeout", &self.graceful_timeout)
95            .field("force_timeout", &self.force_timeout)
96            .field(
97                "hooks",
98                &format_args!("[{} hook(s)]", self.hooks.lock().len()),
99            )
100            .field("installed", &self.installed.load(Ordering::Relaxed))
101            .field("initiated", &self.inner.is_initiated())
102            .finish()
103    }
104}
105
106impl Coordinator {
107    /// Start a new [`CoordinatorBuilder`] with default configuration.
108    ///
109    /// Equivalent to [`CoordinatorBuilder::new`].
110    #[must_use]
111    pub fn builder() -> CoordinatorBuilder {
112        CoordinatorBuilder::new()
113    }
114
115    /// Create a new cloneable [`ShutdownToken`] observer handle.
116    ///
117    /// Tokens share the underlying state with the coordinator and
118    /// each other. Cloning a token costs one `Arc::clone`.
119    #[must_use]
120    pub fn token(&self) -> ShutdownToken {
121        ShutdownToken::new(Arc::clone(&self.inner))
122    }
123
124    /// Create a new cloneable [`ShutdownTrigger`] initiator handle.
125    ///
126    /// Triggers share the underlying state with the coordinator and
127    /// each other. Cloning a trigger costs one `Arc::clone`.
128    #[must_use]
129    pub fn trigger(&self) -> ShutdownTrigger {
130        ShutdownTrigger::new(Arc::clone(&self.inner))
131    }
132
133    /// Configured signal set.
134    #[must_use]
135    pub fn signals(&self) -> SignalSet {
136        self.signals
137    }
138
139    /// Configured graceful-shutdown timeout.
140    #[must_use]
141    pub fn graceful_timeout(&self) -> Duration {
142        self.graceful_timeout
143    }
144
145    /// Configured force-shutdown timeout.
146    #[must_use]
147    pub fn force_timeout(&self) -> Duration {
148        self.force_timeout
149    }
150
151    /// `true` if [`install`](Self::install) has been called
152    /// successfully on this coordinator.
153    #[must_use]
154    pub fn is_installed(&self) -> bool {
155        self.installed.load(Ordering::Relaxed)
156    }
157
158    /// Snapshot of the current shutdown state.
159    ///
160    /// The snapshot is taken under the same lock the state machine
161    /// uses internally, so all fields are mutually consistent. The
162    /// returned [`Statistics`] is `Clone` and may be passed across
163    /// threads.
164    #[must_use]
165    pub fn statistics(&self) -> Statistics {
166        let hooks_registered = self.hooks.lock().len();
167        let hooks_completed = self.hooks_completed.load(Ordering::Relaxed);
168        Statistics {
169            initiated: self.inner.is_initiated(),
170            reason: self.inner.reason(),
171            hooks_registered,
172            hooks_completed,
173            elapsed: self.inner.elapsed(),
174        }
175    }
176
177    /// Run registered hooks in descending priority order under the
178    /// graceful timeout budget.
179    ///
180    /// Returns the number of hooks that completed before the budget
181    /// elapsed. Hooks are sorted on every call (the list is small
182    /// and sort overhead is dominated by per-hook dispatch); within
183    /// a priority, insertion order is preserved.
184    ///
185    /// If the graceful budget elapses mid-loop, the remaining hooks
186    /// are skipped. Callers that implement a multi-phase ladder may
187    /// invoke `run_hooks` again with [`ShutdownReason::Forced`] to
188    /// retry; that second invocation runs every still-registered
189    /// hook from scratch (hook bodies are responsible for being
190    /// idempotent if they wish to be reusable across phases).
191    ///
192    /// # Examples
193    ///
194    /// ```
195    /// use signal_mod::{hook_from_fn, Coordinator, ShutdownReason};
196    ///
197    /// let coord = Coordinator::builder()
198    ///     .hook(hook_from_fn("first", 100, |_| {}))
199    ///     .hook(hook_from_fn("second", 0, |_| {}))
200    ///     .build();
201    ///
202    /// let ran = coord.run_hooks(ShutdownReason::Requested);
203    /// assert_eq!(ran, 2);
204    /// ```
205    pub fn run_hooks(&self, reason: ShutdownReason) -> usize {
206        let mut hooks = self.hooks.lock();
207        hooks.sort_by_key(|h| core::cmp::Reverse(h.priority()));
208        let start = Instant::now();
209        let mut count = 0usize;
210        for hook in hooks.iter() {
211            if start.elapsed() > self.graceful_timeout {
212                break;
213            }
214            // Hooks are user-supplied; a panic in one must not abort
215            // the entire shutdown sequence. We catch the unwind here
216            // and continue. The hook is counted as "completed" either
217            // way, because from the coordinator's perspective the
218            // hook's lifecycle ran to a terminal state.
219            let hook_ref = hook.as_ref();
220            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
221                hook_ref.run(reason);
222            }));
223            if let Err(_panic) = result {
224                // Swallow the panic. We deliberately do not log here
225                // (the crate has no tracing dep); applications that
226                // need diagnostics should wrap their own hook bodies
227                // in `catch_unwind` and report.
228            }
229            count += 1;
230            self.hooks_completed.fetch_add(1, Ordering::Relaxed);
231        }
232        count
233    }
234
235    /// Install OS-level signal handlers for the configured set.
236    ///
237    /// The back-end is selected at compile time by feature flags:
238    ///
239    /// - `tokio` (default): spawns background `tokio` tasks. Must be
240    ///   called from inside a `tokio` runtime context.
241    /// - `async-std` (and `tokio` not enabled): spawns background
242    ///   `async-std` tasks. Must be called from inside an
243    ///   `async-std` runtime context.
244    /// - `ctrlc-fallback` (and neither runtime feature enabled):
245    ///   registers a synchronous `ctrlc` handler covering
246    ///   [`Signal::Interrupt`](crate::Signal::Interrupt).
247    /// - No back-end feature enabled: returns [`Error::NoRuntime`].
248    ///
249    /// `tokio` takes precedence over `async-std`.
250    ///
251    /// Installation is idempotent on the coordinator side: a second
252    /// call returns [`Error::AlreadyInstalled`] without touching the
253    /// OS. The process-global signal slot is owned by the first
254    /// back-end that grabs it; do not install handlers from two
255    /// different coordinators in the same process.
256    ///
257    /// # Errors
258    ///
259    /// - [`Error::AlreadyInstalled`] if this coordinator has already
260    ///   installed handlers (regardless of which back-end succeeded).
261    /// - [`Error::SignalRegistration`] if the platform rejects a
262    ///   specific signal. The internal install flag is reverted on
263    ///   error so the call can be retried after the cause is fixed.
264    /// - [`Error::NoRuntime`] if no back-end feature is enabled.
265    pub fn install(&self) -> Result<()> {
266        if self
267            .installed
268            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
269            .is_err()
270        {
271            return Err(Error::AlreadyInstalled);
272        }
273
274        let result = self.install_impl();
275
276        if result.is_err() {
277            self.installed.store(false, Ordering::Release);
278        }
279        result
280    }
281
282    #[cfg(feature = "tokio")]
283    fn install_impl(&self) -> Result<()> {
284        crate::install::tokio_rt::install(self)
285    }
286
287    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
288    fn install_impl(&self) -> Result<()> {
289        crate::install::async_std_rt::install(self)
290    }
291
292    #[cfg(all(
293        feature = "ctrlc-fallback",
294        not(feature = "tokio"),
295        not(feature = "async-std")
296    ))]
297    fn install_impl(&self) -> Result<()> {
298        crate::install::ctrlc_sync::install(self)
299    }
300
301    #[cfg(not(any(feature = "tokio", feature = "async-std", feature = "ctrlc-fallback")))]
302    #[allow(clippy::unused_self)]
303    fn install_impl(&self) -> Result<()> {
304        Err(Error::NoRuntime)
305    }
306}
307
308/// Builder for [`Coordinator`].
309///
310/// Created by [`Coordinator::builder`] or [`CoordinatorBuilder::new`].
311/// Methods consume `self` and return `self` so they may be chained.
312///
313/// # Examples
314///
315/// ```
316/// use signal_mod::{hook_from_fn, Coordinator, SignalSet};
317/// use std::time::Duration;
318///
319/// let coord = Coordinator::builder()
320///     .signals(SignalSet::standard())
321///     .graceful_timeout(Duration::from_secs(10))
322///     .force_timeout(Duration::from_secs(20))
323///     .hook(hook_from_fn("close-db", 200, |_| {}))
324///     .hook(hook_from_fn("flush-logs", 100, |_| {}))
325///     .build();
326///
327/// assert_eq!(coord.signals(), SignalSet::standard());
328/// ```
329pub struct CoordinatorBuilder {
330    signals: SignalSet,
331    graceful_timeout: Duration,
332    force_timeout: Duration,
333    hooks: Vec<Box<dyn ShutdownHook>>,
334}
335
336impl core::fmt::Debug for CoordinatorBuilder {
337    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
338        f.debug_struct("CoordinatorBuilder")
339            .field("signals", &self.signals)
340            .field("graceful_timeout", &self.graceful_timeout)
341            .field("force_timeout", &self.force_timeout)
342            .field("hooks", &format_args!("[{} hook(s)]", self.hooks.len()))
343            .finish()
344    }
345}
346
347impl CoordinatorBuilder {
348    /// Start a new builder with the default configuration.
349    ///
350    /// Defaults are:
351    ///
352    /// - signals: [`SignalSet::graceful`]
353    /// - graceful timeout: 5 seconds
354    /// - force timeout: 10 seconds
355    /// - no hooks
356    #[must_use]
357    pub fn new() -> Self {
358        Self {
359            signals: SignalSet::graceful(),
360            graceful_timeout: Duration::from_millis(DEFAULT_GRACEFUL_MS),
361            force_timeout: Duration::from_millis(DEFAULT_FORCE_MS),
362            hooks: Vec::new(),
363        }
364    }
365
366    /// Override the signal set the coordinator will install handlers
367    /// for.
368    #[must_use]
369    pub fn signals(mut self, set: SignalSet) -> Self {
370        self.signals = set;
371        self
372    }
373
374    /// Override the graceful-shutdown timeout.
375    ///
376    /// During [`Coordinator::run_hooks`], hooks have at most this
377    /// long in aggregate before remaining hooks are skipped.
378    #[must_use]
379    pub fn graceful_timeout(mut self, d: Duration) -> Self {
380        self.graceful_timeout = d;
381        self
382    }
383
384    /// Override the force-shutdown timeout.
385    ///
386    /// Exposed for downstream consumers that implement their own
387    /// forced ladder; the coordinator itself does not enforce this
388    /// budget.
389    #[must_use]
390    pub fn force_timeout(mut self, d: Duration) -> Self {
391        self.force_timeout = d;
392        self
393    }
394
395    /// Register a [`ShutdownHook`] to run during
396    /// [`Coordinator::run_hooks`].
397    ///
398    /// Hooks may be added in any order; they are sorted at run time
399    /// by descending priority.
400    #[must_use]
401    pub fn hook<H: ShutdownHook>(mut self, h: H) -> Self {
402        self.hooks.push(Box::new(h));
403        self
404    }
405
406    /// Finalize into a [`Coordinator`].
407    #[must_use]
408    pub fn build(self) -> Coordinator {
409        Coordinator {
410            inner: Inner::new(),
411            signals: self.signals,
412            graceful_timeout: self.graceful_timeout,
413            force_timeout: self.force_timeout,
414            hooks: Mutex::new(self.hooks),
415            installed: AtomicBool::new(false),
416            hooks_completed: AtomicUsize::new(0),
417        }
418    }
419}
420
421impl Default for CoordinatorBuilder {
422    fn default() -> Self {
423        Self::new()
424    }
425}
426
427/// Snapshot of coordinator state, returned by
428/// [`Coordinator::statistics`].
429///
430/// All fields are public for direct read access. The snapshot is a
431/// value type; subsequent state changes on the coordinator do not
432/// affect a previously-taken snapshot.
433///
434/// # Examples
435///
436/// ```
437/// use signal_mod::{Coordinator, ShutdownReason};
438///
439/// let coord = Coordinator::builder().build();
440/// let stats_before = coord.statistics();
441/// assert!(!stats_before.initiated);
442/// assert_eq!(stats_before.hooks_registered, 0);
443///
444/// coord.trigger().trigger(ShutdownReason::Requested);
445/// let stats_after = coord.statistics();
446/// assert!(stats_after.initiated);
447/// assert_eq!(stats_after.reason, Some(ShutdownReason::Requested));
448/// ```
449#[derive(Debug, Clone)]
450pub struct Statistics {
451    /// `true` if shutdown has been initiated.
452    pub initiated: bool,
453    /// Reason carried with the trigger that initiated shutdown.
454    pub reason: Option<ShutdownReason>,
455    /// Number of hooks registered on the coordinator.
456    pub hooks_registered: usize,
457    /// Cumulative number of hook runs completed across all
458    /// `run_hooks` calls.
459    pub hooks_completed: usize,
460    /// Wall-clock time since shutdown was initiated.
461    pub elapsed: Option<Duration>,
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use std::sync::atomic::{AtomicUsize, Ordering};
468    use std::sync::Arc;
469
470    use crate::hook::hook_from_fn;
471
472    #[test]
473    fn builder_defaults() {
474        let c = Coordinator::builder().build();
475        assert_eq!(c.signals(), SignalSet::graceful());
476        assert_eq!(c.graceful_timeout(), Duration::from_millis(5_000));
477        assert_eq!(c.force_timeout(), Duration::from_millis(10_000));
478        assert!(!c.is_installed());
479        let stats = c.statistics();
480        assert!(!stats.initiated);
481        assert_eq!(stats.hooks_registered, 0);
482        assert_eq!(stats.hooks_completed, 0);
483    }
484
485    #[test]
486    fn token_observes_trigger() {
487        let c = Coordinator::builder().build();
488        let token = c.token();
489        let trigger = c.trigger();
490        assert!(!token.is_initiated());
491        assert!(trigger.trigger(ShutdownReason::Requested));
492        assert!(token.is_initiated());
493        assert_eq!(token.reason(), Some(ShutdownReason::Requested));
494        assert!(!trigger.trigger(ShutdownReason::Forced));
495        assert_eq!(token.reason(), Some(ShutdownReason::Requested));
496    }
497
498    #[test]
499    fn hooks_run_in_priority_order() {
500        let order = Arc::new(parking_lot::Mutex::new(Vec::<i32>::new()));
501
502        let push = |p: i32, order: &Arc<parking_lot::Mutex<Vec<i32>>>| {
503            let o = Arc::clone(order);
504            hook_from_fn(format!("p{p}"), p, move |_| {
505                o.lock().push(p);
506            })
507        };
508
509        let c = Coordinator::builder()
510            .hook(push(0, &order))
511            .hook(push(100, &order))
512            .hook(push(50, &order))
513            .build();
514
515        let count = c.run_hooks(ShutdownReason::Requested);
516        assert_eq!(count, 3);
517        assert_eq!(*order.lock(), vec![100, 50, 0]);
518        assert_eq!(c.statistics().hooks_completed, 3);
519    }
520
521    #[test]
522    fn hooks_respect_graceful_budget() {
523        let counter = Arc::new(AtomicUsize::new(0));
524        let c1 = Arc::clone(&counter);
525        let c2 = Arc::clone(&counter);
526
527        let slow = hook_from_fn("slow", 100, move |_| {
528            c1.fetch_add(1, Ordering::Relaxed);
529            std::thread::sleep(Duration::from_millis(30));
530        });
531        let later = hook_from_fn("later", 0, move |_| {
532            c2.fetch_add(1, Ordering::Relaxed);
533        });
534
535        let c = Coordinator::builder()
536            .graceful_timeout(Duration::from_millis(5))
537            .hook(slow)
538            .hook(later)
539            .build();
540
541        let count = c.run_hooks(ShutdownReason::Requested);
542        assert_eq!(count, 1);
543        assert_eq!(counter.load(Ordering::Relaxed), 1);
544    }
545
546    #[test]
547    fn elapsed_increases_after_trigger() {
548        let c = Coordinator::builder().build();
549        let token = c.token();
550        assert!(token.elapsed().is_none());
551        let _ = c.trigger().trigger(ShutdownReason::Requested);
552        let first = token.elapsed().unwrap();
553        std::thread::sleep(Duration::from_millis(5));
554        let second = token.elapsed().unwrap();
555        assert!(second >= first);
556    }
557
558    #[test]
559    fn wait_blocking_timeout_returns_false_on_expiry() {
560        let c = Coordinator::builder().build();
561        let token = c.token();
562        assert!(!token.wait_blocking_timeout(Duration::from_millis(5)));
563    }
564
565    #[test]
566    fn wait_blocking_timeout_returns_true_on_trigger() {
567        let c = Coordinator::builder().build();
568        let token = c.token();
569        let trigger = c.trigger();
570
571        let handle = std::thread::spawn(move || {
572            std::thread::sleep(Duration::from_millis(10));
573            trigger.trigger(ShutdownReason::Requested);
574        });
575
576        assert!(token.wait_blocking_timeout(Duration::from_secs(1)));
577        handle.join().unwrap();
578    }
579
580    #[cfg(not(any(feature = "tokio", feature = "async-std", feature = "ctrlc-fallback")))]
581    #[test]
582    fn install_errors_with_no_runtime() {
583        let c = Coordinator::builder().build();
584        assert!(matches!(c.install(), Err(Error::NoRuntime)));
585    }
586
587    #[cfg(feature = "tokio")]
588    #[tokio::test]
589    async fn token_wait_resolves_on_trigger() {
590        let c = Coordinator::builder().build();
591        let token = c.token();
592        let trigger = c.trigger();
593
594        let waiter = tokio::spawn(async move { token.wait().await });
595        tokio::time::sleep(Duration::from_millis(10)).await;
596        assert!(trigger.trigger(ShutdownReason::Requested));
597        let _ = tokio::time::timeout(Duration::from_secs(1), waiter)
598            .await
599            .expect("wait did not resolve within 1s");
600    }
601}