signal_mod/coord.rs
1//! The [`Coordinator`], its [`builder`](CoordinatorBuilder), and
2//! the [`Statistics`] snapshot type.
3
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use parking_lot::Mutex;
9
10use crate::error::{Error, Result};
11use crate::hook::ShutdownHook;
12use crate::reason::ShutdownReason;
13use crate::signal::SignalSet;
14use crate::state::Inner;
15use crate::token::{ShutdownToken, ShutdownTrigger};
16
17/// Default graceful-shutdown budget: 5 seconds.
18///
19/// The default is a balance between giving subsystems room to flush
20/// state and bounding the worst-case shutdown latency for an
21/// operator-initiated terminate. Override per-coordinator with
22/// [`CoordinatorBuilder::graceful_timeout`].
23const DEFAULT_GRACEFUL_MS: u64 = 5_000;
24
25/// Default force-shutdown budget: 10 seconds.
26///
27/// The force budget is exposed for downstream consumers that implement
28/// a multi-phase shutdown ladder on top of the graceful phase. The
29/// coordinator itself does not enforce this budget directly.
30const DEFAULT_FORCE_MS: u64 = 10_000;
31
32/// Owns the shutdown state machine, hook list, and (optionally) the
33/// installed signal handlers.
34///
35/// A `Coordinator` is the central object of `signal-mod`. It is
36/// constructed once at program startup via [`Coordinator::builder`]
37/// and then:
38///
39/// 1. Optionally registers OS-level signal handlers via
40/// [`install`](Coordinator::install).
41/// 2. Hands out cheap-to-clone [`ShutdownToken`] observer handles
42/// and [`ShutdownTrigger`] initiator handles to the rest of the
43/// program.
44/// 3. After shutdown is initiated (by signal, programmatic trigger,
45/// or supervisory parent), runs registered [`ShutdownHook`]s in
46/// descending priority order via
47/// [`run_hooks`](Coordinator::run_hooks).
48///
49/// The coordinator is `Send + Sync`. It holds an `Arc` to the shared
50/// state machine, so cloning a token or trigger is `O(1)`.
51///
52/// # Examples
53///
54/// ```no_run
55/// use signal_mod::{Coordinator, ShutdownReason, SignalSet};
56/// use std::time::Duration;
57///
58/// # #[cfg(feature = "tokio")]
59/// # async fn run() -> signal_mod::Result<()> {
60/// let coord = Coordinator::builder()
61/// .signals(SignalSet::graceful())
62/// .graceful_timeout(Duration::from_secs(5))
63/// .hook(signal_mod::hook_from_fn(
64/// "flush-logs",
65/// 100,
66/// |reason| eprintln!("shutting down: {reason}"),
67/// ))
68/// .build();
69///
70/// coord.install()?;
71///
72/// let token = coord.token();
73/// token.wait().await;
74///
75/// let reason = token.reason().unwrap_or(ShutdownReason::Requested);
76/// coord.run_hooks(reason);
77/// # Ok(())
78/// # }
79/// ```
80pub struct Coordinator {
81 inner: Arc<Inner>,
82 signals: SignalSet,
83 graceful_timeout: Duration,
84 force_timeout: Duration,
85 hooks: Mutex<Vec<Box<dyn ShutdownHook>>>,
86 installed: AtomicBool,
87 hooks_completed: AtomicUsize,
88}
89
90impl core::fmt::Debug for Coordinator {
91 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
92 f.debug_struct("Coordinator")
93 .field("signals", &self.signals)
94 .field("graceful_timeout", &self.graceful_timeout)
95 .field("force_timeout", &self.force_timeout)
96 .field(
97 "hooks",
98 &format_args!("[{} hook(s)]", self.hooks.lock().len()),
99 )
100 .field("installed", &self.installed.load(Ordering::Relaxed))
101 .field("initiated", &self.inner.is_initiated())
102 .finish()
103 }
104}
105
106impl Coordinator {
107 /// Start a new [`CoordinatorBuilder`] with default configuration.
108 ///
109 /// Equivalent to [`CoordinatorBuilder::new`].
110 #[must_use]
111 pub fn builder() -> CoordinatorBuilder {
112 CoordinatorBuilder::new()
113 }
114
115 /// Create a new cloneable [`ShutdownToken`] observer handle.
116 ///
117 /// Tokens share the underlying state with the coordinator and
118 /// each other. Cloning a token costs one `Arc::clone`.
119 #[must_use]
120 pub fn token(&self) -> ShutdownToken {
121 ShutdownToken::new(Arc::clone(&self.inner))
122 }
123
124 /// Create a new cloneable [`ShutdownTrigger`] initiator handle.
125 ///
126 /// Triggers share the underlying state with the coordinator and
127 /// each other. Cloning a trigger costs one `Arc::clone`.
128 #[must_use]
129 pub fn trigger(&self) -> ShutdownTrigger {
130 ShutdownTrigger::new(Arc::clone(&self.inner))
131 }
132
133 /// Configured signal set.
134 #[must_use]
135 pub fn signals(&self) -> SignalSet {
136 self.signals
137 }
138
139 /// Configured graceful-shutdown timeout.
140 #[must_use]
141 pub fn graceful_timeout(&self) -> Duration {
142 self.graceful_timeout
143 }
144
145 /// Configured force-shutdown timeout.
146 #[must_use]
147 pub fn force_timeout(&self) -> Duration {
148 self.force_timeout
149 }
150
151 /// `true` if [`install`](Self::install) has been called
152 /// successfully on this coordinator.
153 #[must_use]
154 pub fn is_installed(&self) -> bool {
155 self.installed.load(Ordering::Relaxed)
156 }
157
158 /// Snapshot of the current shutdown state.
159 ///
160 /// The snapshot is taken under the same lock the state machine
161 /// uses internally, so all fields are mutually consistent. The
162 /// returned [`Statistics`] is `Clone` and may be passed across
163 /// threads.
164 #[must_use]
165 pub fn statistics(&self) -> Statistics {
166 let hooks_registered = self.hooks.lock().len();
167 let hooks_completed = self.hooks_completed.load(Ordering::Relaxed);
168 Statistics {
169 initiated: self.inner.is_initiated(),
170 reason: self.inner.reason(),
171 hooks_registered,
172 hooks_completed,
173 elapsed: self.inner.elapsed(),
174 }
175 }
176
177 /// Run registered hooks in descending priority order under the
178 /// graceful timeout budget.
179 ///
180 /// Returns the number of hooks that completed before the budget
181 /// elapsed. Hooks are sorted on every call (the list is small
182 /// and sort overhead is dominated by per-hook dispatch); within
183 /// a priority, insertion order is preserved.
184 ///
185 /// If the graceful budget elapses mid-loop, the remaining hooks
186 /// are skipped. Callers that implement a multi-phase ladder may
187 /// invoke `run_hooks` again with [`ShutdownReason::Forced`] to
188 /// retry; that second invocation runs every still-registered
189 /// hook from scratch (hook bodies are responsible for being
190 /// idempotent if they wish to be reusable across phases).
191 ///
192 /// # Examples
193 ///
194 /// ```
195 /// use signal_mod::{hook_from_fn, Coordinator, ShutdownReason};
196 ///
197 /// let coord = Coordinator::builder()
198 /// .hook(hook_from_fn("first", 100, |_| {}))
199 /// .hook(hook_from_fn("second", 0, |_| {}))
200 /// .build();
201 ///
202 /// let ran = coord.run_hooks(ShutdownReason::Requested);
203 /// assert_eq!(ran, 2);
204 /// ```
205 pub fn run_hooks(&self, reason: ShutdownReason) -> usize {
206 let mut hooks = self.hooks.lock();
207 hooks.sort_by_key(|h| core::cmp::Reverse(h.priority()));
208 let start = Instant::now();
209 let mut count = 0usize;
210 for hook in hooks.iter() {
211 if start.elapsed() > self.graceful_timeout {
212 break;
213 }
214 // Hooks are user-supplied; a panic in one must not abort
215 // the entire shutdown sequence. We catch the unwind here
216 // and continue. The hook is counted as "completed" either
217 // way, because from the coordinator's perspective the
218 // hook's lifecycle ran to a terminal state.
219 let hook_ref = hook.as_ref();
220 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
221 hook_ref.run(reason);
222 }));
223 if let Err(_panic) = result {
224 // Swallow the panic. We deliberately do not log here
225 // (the crate has no tracing dep); applications that
226 // need diagnostics should wrap their own hook bodies
227 // in `catch_unwind` and report.
228 }
229 count += 1;
230 self.hooks_completed.fetch_add(1, Ordering::Relaxed);
231 }
232 count
233 }
234
235 /// Install OS-level signal handlers for the configured set.
236 ///
237 /// The back-end is selected at compile time by feature flags:
238 ///
239 /// - `tokio` (default): spawns background `tokio` tasks. Must be
240 /// called from inside a `tokio` runtime context.
241 /// - `async-std` (and `tokio` not enabled): spawns background
242 /// `async-std` tasks. Must be called from inside an
243 /// `async-std` runtime context.
244 /// - `ctrlc-fallback` (and neither runtime feature enabled):
245 /// registers a synchronous `ctrlc` handler covering
246 /// [`Signal::Interrupt`](crate::Signal::Interrupt).
247 /// - No back-end feature enabled: returns [`Error::NoRuntime`].
248 ///
249 /// `tokio` takes precedence over `async-std`.
250 ///
251 /// Installation is idempotent on the coordinator side: a second
252 /// call returns [`Error::AlreadyInstalled`] without touching the
253 /// OS. The process-global signal slot is owned by the first
254 /// back-end that grabs it; do not install handlers from two
255 /// different coordinators in the same process.
256 ///
257 /// # Errors
258 ///
259 /// - [`Error::AlreadyInstalled`] if this coordinator has already
260 /// installed handlers (regardless of which back-end succeeded).
261 /// - [`Error::SignalRegistration`] if the platform rejects a
262 /// specific signal. The internal install flag is reverted on
263 /// error so the call can be retried after the cause is fixed.
264 /// - [`Error::NoRuntime`] if no back-end feature is enabled.
265 pub fn install(&self) -> Result<()> {
266 if self
267 .installed
268 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
269 .is_err()
270 {
271 return Err(Error::AlreadyInstalled);
272 }
273
274 let result = self.install_impl();
275
276 if result.is_err() {
277 self.installed.store(false, Ordering::Release);
278 }
279 result
280 }
281
282 #[cfg(feature = "tokio")]
283 fn install_impl(&self) -> Result<()> {
284 crate::install::tokio_rt::install(self)
285 }
286
287 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
288 fn install_impl(&self) -> Result<()> {
289 crate::install::async_std_rt::install(self)
290 }
291
292 #[cfg(all(
293 feature = "ctrlc-fallback",
294 not(feature = "tokio"),
295 not(feature = "async-std")
296 ))]
297 fn install_impl(&self) -> Result<()> {
298 crate::install::ctrlc_sync::install(self)
299 }
300
301 #[cfg(not(any(feature = "tokio", feature = "async-std", feature = "ctrlc-fallback")))]
302 #[allow(clippy::unused_self)]
303 fn install_impl(&self) -> Result<()> {
304 Err(Error::NoRuntime)
305 }
306}
307
308/// Builder for [`Coordinator`].
309///
310/// Created by [`Coordinator::builder`] or [`CoordinatorBuilder::new`].
311/// Methods consume `self` and return `self` so they may be chained.
312///
313/// # Examples
314///
315/// ```
316/// use signal_mod::{hook_from_fn, Coordinator, SignalSet};
317/// use std::time::Duration;
318///
319/// let coord = Coordinator::builder()
320/// .signals(SignalSet::standard())
321/// .graceful_timeout(Duration::from_secs(10))
322/// .force_timeout(Duration::from_secs(20))
323/// .hook(hook_from_fn("close-db", 200, |_| {}))
324/// .hook(hook_from_fn("flush-logs", 100, |_| {}))
325/// .build();
326///
327/// assert_eq!(coord.signals(), SignalSet::standard());
328/// ```
329pub struct CoordinatorBuilder {
330 signals: SignalSet,
331 graceful_timeout: Duration,
332 force_timeout: Duration,
333 hooks: Vec<Box<dyn ShutdownHook>>,
334}
335
336impl core::fmt::Debug for CoordinatorBuilder {
337 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
338 f.debug_struct("CoordinatorBuilder")
339 .field("signals", &self.signals)
340 .field("graceful_timeout", &self.graceful_timeout)
341 .field("force_timeout", &self.force_timeout)
342 .field("hooks", &format_args!("[{} hook(s)]", self.hooks.len()))
343 .finish()
344 }
345}
346
347impl CoordinatorBuilder {
348 /// Start a new builder with the default configuration.
349 ///
350 /// Defaults are:
351 ///
352 /// - signals: [`SignalSet::graceful`]
353 /// - graceful timeout: 5 seconds
354 /// - force timeout: 10 seconds
355 /// - no hooks
356 #[must_use]
357 pub fn new() -> Self {
358 Self {
359 signals: SignalSet::graceful(),
360 graceful_timeout: Duration::from_millis(DEFAULT_GRACEFUL_MS),
361 force_timeout: Duration::from_millis(DEFAULT_FORCE_MS),
362 hooks: Vec::new(),
363 }
364 }
365
366 /// Override the signal set the coordinator will install handlers
367 /// for.
368 #[must_use]
369 pub fn signals(mut self, set: SignalSet) -> Self {
370 self.signals = set;
371 self
372 }
373
374 /// Override the graceful-shutdown timeout.
375 ///
376 /// During [`Coordinator::run_hooks`], hooks have at most this
377 /// long in aggregate before remaining hooks are skipped.
378 #[must_use]
379 pub fn graceful_timeout(mut self, d: Duration) -> Self {
380 self.graceful_timeout = d;
381 self
382 }
383
384 /// Override the force-shutdown timeout.
385 ///
386 /// Exposed for downstream consumers that implement their own
387 /// forced ladder; the coordinator itself does not enforce this
388 /// budget.
389 #[must_use]
390 pub fn force_timeout(mut self, d: Duration) -> Self {
391 self.force_timeout = d;
392 self
393 }
394
395 /// Register a [`ShutdownHook`] to run during
396 /// [`Coordinator::run_hooks`].
397 ///
398 /// Hooks may be added in any order; they are sorted at run time
399 /// by descending priority.
400 #[must_use]
401 pub fn hook<H: ShutdownHook>(mut self, h: H) -> Self {
402 self.hooks.push(Box::new(h));
403 self
404 }
405
406 /// Finalize into a [`Coordinator`].
407 #[must_use]
408 pub fn build(self) -> Coordinator {
409 Coordinator {
410 inner: Inner::new(),
411 signals: self.signals,
412 graceful_timeout: self.graceful_timeout,
413 force_timeout: self.force_timeout,
414 hooks: Mutex::new(self.hooks),
415 installed: AtomicBool::new(false),
416 hooks_completed: AtomicUsize::new(0),
417 }
418 }
419}
420
421impl Default for CoordinatorBuilder {
422 fn default() -> Self {
423 Self::new()
424 }
425}
426
427/// Snapshot of coordinator state, returned by
428/// [`Coordinator::statistics`].
429///
430/// All fields are public for direct read access. The snapshot is a
431/// value type; subsequent state changes on the coordinator do not
432/// affect a previously-taken snapshot.
433///
434/// # Examples
435///
436/// ```
437/// use signal_mod::{Coordinator, ShutdownReason};
438///
439/// let coord = Coordinator::builder().build();
440/// let stats_before = coord.statistics();
441/// assert!(!stats_before.initiated);
442/// assert_eq!(stats_before.hooks_registered, 0);
443///
444/// coord.trigger().trigger(ShutdownReason::Requested);
445/// let stats_after = coord.statistics();
446/// assert!(stats_after.initiated);
447/// assert_eq!(stats_after.reason, Some(ShutdownReason::Requested));
448/// ```
449#[derive(Debug, Clone)]
450pub struct Statistics {
451 /// `true` if shutdown has been initiated.
452 pub initiated: bool,
453 /// Reason carried with the trigger that initiated shutdown.
454 pub reason: Option<ShutdownReason>,
455 /// Number of hooks registered on the coordinator.
456 pub hooks_registered: usize,
457 /// Cumulative number of hook runs completed across all
458 /// `run_hooks` calls.
459 pub hooks_completed: usize,
460 /// Wall-clock time since shutdown was initiated.
461 pub elapsed: Option<Duration>,
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use std::sync::atomic::{AtomicUsize, Ordering};
468 use std::sync::Arc;
469
470 use crate::hook::hook_from_fn;
471
472 #[test]
473 fn builder_defaults() {
474 let c = Coordinator::builder().build();
475 assert_eq!(c.signals(), SignalSet::graceful());
476 assert_eq!(c.graceful_timeout(), Duration::from_millis(5_000));
477 assert_eq!(c.force_timeout(), Duration::from_millis(10_000));
478 assert!(!c.is_installed());
479 let stats = c.statistics();
480 assert!(!stats.initiated);
481 assert_eq!(stats.hooks_registered, 0);
482 assert_eq!(stats.hooks_completed, 0);
483 }
484
485 #[test]
486 fn token_observes_trigger() {
487 let c = Coordinator::builder().build();
488 let token = c.token();
489 let trigger = c.trigger();
490 assert!(!token.is_initiated());
491 assert!(trigger.trigger(ShutdownReason::Requested));
492 assert!(token.is_initiated());
493 assert_eq!(token.reason(), Some(ShutdownReason::Requested));
494 assert!(!trigger.trigger(ShutdownReason::Forced));
495 assert_eq!(token.reason(), Some(ShutdownReason::Requested));
496 }
497
498 #[test]
499 fn hooks_run_in_priority_order() {
500 let order = Arc::new(parking_lot::Mutex::new(Vec::<i32>::new()));
501
502 let push = |p: i32, order: &Arc<parking_lot::Mutex<Vec<i32>>>| {
503 let o = Arc::clone(order);
504 hook_from_fn(format!("p{p}"), p, move |_| {
505 o.lock().push(p);
506 })
507 };
508
509 let c = Coordinator::builder()
510 .hook(push(0, &order))
511 .hook(push(100, &order))
512 .hook(push(50, &order))
513 .build();
514
515 let count = c.run_hooks(ShutdownReason::Requested);
516 assert_eq!(count, 3);
517 assert_eq!(*order.lock(), vec![100, 50, 0]);
518 assert_eq!(c.statistics().hooks_completed, 3);
519 }
520
521 #[test]
522 fn hooks_respect_graceful_budget() {
523 let counter = Arc::new(AtomicUsize::new(0));
524 let c1 = Arc::clone(&counter);
525 let c2 = Arc::clone(&counter);
526
527 let slow = hook_from_fn("slow", 100, move |_| {
528 c1.fetch_add(1, Ordering::Relaxed);
529 std::thread::sleep(Duration::from_millis(30));
530 });
531 let later = hook_from_fn("later", 0, move |_| {
532 c2.fetch_add(1, Ordering::Relaxed);
533 });
534
535 let c = Coordinator::builder()
536 .graceful_timeout(Duration::from_millis(5))
537 .hook(slow)
538 .hook(later)
539 .build();
540
541 let count = c.run_hooks(ShutdownReason::Requested);
542 assert_eq!(count, 1);
543 assert_eq!(counter.load(Ordering::Relaxed), 1);
544 }
545
546 #[test]
547 fn elapsed_increases_after_trigger() {
548 let c = Coordinator::builder().build();
549 let token = c.token();
550 assert!(token.elapsed().is_none());
551 let _ = c.trigger().trigger(ShutdownReason::Requested);
552 let first = token.elapsed().unwrap();
553 std::thread::sleep(Duration::from_millis(5));
554 let second = token.elapsed().unwrap();
555 assert!(second >= first);
556 }
557
558 #[test]
559 fn wait_blocking_timeout_returns_false_on_expiry() {
560 let c = Coordinator::builder().build();
561 let token = c.token();
562 assert!(!token.wait_blocking_timeout(Duration::from_millis(5)));
563 }
564
565 #[test]
566 fn wait_blocking_timeout_returns_true_on_trigger() {
567 let c = Coordinator::builder().build();
568 let token = c.token();
569 let trigger = c.trigger();
570
571 let handle = std::thread::spawn(move || {
572 std::thread::sleep(Duration::from_millis(10));
573 trigger.trigger(ShutdownReason::Requested);
574 });
575
576 assert!(token.wait_blocking_timeout(Duration::from_secs(1)));
577 handle.join().unwrap();
578 }
579
580 #[cfg(not(any(feature = "tokio", feature = "async-std", feature = "ctrlc-fallback")))]
581 #[test]
582 fn install_errors_with_no_runtime() {
583 let c = Coordinator::builder().build();
584 assert!(matches!(c.install(), Err(Error::NoRuntime)));
585 }
586
587 #[cfg(feature = "tokio")]
588 #[tokio::test]
589 async fn token_wait_resolves_on_trigger() {
590 let c = Coordinator::builder().build();
591 let token = c.token();
592 let trigger = c.trigger();
593
594 let waiter = tokio::spawn(async move { token.wait().await });
595 tokio::time::sleep(Duration::from_millis(10)).await;
596 assert!(trigger.trigger(ShutdownReason::Requested));
597 let _ = tokio::time::timeout(Duration::from_secs(1), waiter)
598 .await
599 .expect("wait did not resolve within 1s");
600 }
601}