use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use crate::error::{Error, Result};
use crate::hook::ShutdownHook;
use crate::reason::ShutdownReason;
use crate::signal::SignalSet;
use crate::state::Inner;
use crate::token::{ShutdownToken, ShutdownTrigger};
const DEFAULT_GRACEFUL_MS: u64 = 5_000;
const DEFAULT_FORCE_MS: u64 = 10_000;
pub struct Coordinator {
inner: Arc<Inner>,
signals: SignalSet,
graceful_timeout: Duration,
force_timeout: Duration,
hooks: Mutex<Vec<Box<dyn ShutdownHook>>>,
installed: AtomicBool,
hooks_completed: AtomicUsize,
}
impl core::fmt::Debug for Coordinator {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Coordinator")
.field("signals", &self.signals)
.field("graceful_timeout", &self.graceful_timeout)
.field("force_timeout", &self.force_timeout)
.field(
"hooks",
&format_args!("[{} hook(s)]", self.hooks.lock().len()),
)
.field("installed", &self.installed.load(Ordering::Relaxed))
.field("initiated", &self.inner.is_initiated())
.finish()
}
}
impl Coordinator {
#[must_use]
pub fn builder() -> CoordinatorBuilder {
CoordinatorBuilder::new()
}
#[must_use]
pub fn token(&self) -> ShutdownToken {
ShutdownToken::new(Arc::clone(&self.inner))
}
#[must_use]
pub fn trigger(&self) -> ShutdownTrigger {
ShutdownTrigger::new(Arc::clone(&self.inner))
}
#[must_use]
pub fn signals(&self) -> SignalSet {
self.signals
}
#[must_use]
pub fn graceful_timeout(&self) -> Duration {
self.graceful_timeout
}
#[must_use]
pub fn force_timeout(&self) -> Duration {
self.force_timeout
}
#[must_use]
pub fn is_installed(&self) -> bool {
self.installed.load(Ordering::Relaxed)
}
#[must_use]
pub fn statistics(&self) -> Statistics {
let hooks_registered = self.hooks.lock().len();
let hooks_completed = self.hooks_completed.load(Ordering::Relaxed);
Statistics {
initiated: self.inner.is_initiated(),
reason: self.inner.reason(),
hooks_registered,
hooks_completed,
elapsed: self.inner.elapsed(),
}
}
pub fn run_hooks(&self, reason: ShutdownReason) -> usize {
let mut hooks = self.hooks.lock();
hooks.sort_by_key(|h| core::cmp::Reverse(h.priority()));
let start = Instant::now();
let mut count = 0usize;
for hook in hooks.iter() {
if start.elapsed() > self.graceful_timeout {
break;
}
let hook_ref = hook.as_ref();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
hook_ref.run(reason);
}));
if let Err(_panic) = result {
}
count += 1;
self.hooks_completed.fetch_add(1, Ordering::Relaxed);
}
count
}
pub fn install(&self) -> Result<()> {
if self
.installed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Err(Error::AlreadyInstalled);
}
let result = self.install_impl();
if result.is_err() {
self.installed.store(false, Ordering::Release);
}
result
}
#[cfg(feature = "tokio")]
fn install_impl(&self) -> Result<()> {
crate::install::tokio_rt::install(self)
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
fn install_impl(&self) -> Result<()> {
crate::install::async_std_rt::install(self)
}
#[cfg(all(
feature = "ctrlc-fallback",
not(feature = "tokio"),
not(feature = "async-std")
))]
fn install_impl(&self) -> Result<()> {
crate::install::ctrlc_sync::install(self)
}
#[cfg(not(any(feature = "tokio", feature = "async-std", feature = "ctrlc-fallback")))]
#[allow(clippy::unused_self)]
fn install_impl(&self) -> Result<()> {
Err(Error::NoRuntime)
}
}
pub struct CoordinatorBuilder {
signals: SignalSet,
graceful_timeout: Duration,
force_timeout: Duration,
hooks: Vec<Box<dyn ShutdownHook>>,
}
impl core::fmt::Debug for CoordinatorBuilder {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("CoordinatorBuilder")
.field("signals", &self.signals)
.field("graceful_timeout", &self.graceful_timeout)
.field("force_timeout", &self.force_timeout)
.field("hooks", &format_args!("[{} hook(s)]", self.hooks.len()))
.finish()
}
}
impl CoordinatorBuilder {
#[must_use]
pub fn new() -> Self {
Self {
signals: SignalSet::graceful(),
graceful_timeout: Duration::from_millis(DEFAULT_GRACEFUL_MS),
force_timeout: Duration::from_millis(DEFAULT_FORCE_MS),
hooks: Vec::new(),
}
}
#[must_use]
pub fn signals(mut self, set: SignalSet) -> Self {
self.signals = set;
self
}
#[must_use]
pub fn graceful_timeout(mut self, d: Duration) -> Self {
self.graceful_timeout = d;
self
}
#[must_use]
pub fn force_timeout(mut self, d: Duration) -> Self {
self.force_timeout = d;
self
}
#[must_use]
pub fn hook<H: ShutdownHook>(mut self, h: H) -> Self {
self.hooks.push(Box::new(h));
self
}
#[must_use]
pub fn build(self) -> Coordinator {
Coordinator {
inner: Inner::new(),
signals: self.signals,
graceful_timeout: self.graceful_timeout,
force_timeout: self.force_timeout,
hooks: Mutex::new(self.hooks),
installed: AtomicBool::new(false),
hooks_completed: AtomicUsize::new(0),
}
}
}
impl Default for CoordinatorBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct Statistics {
pub initiated: bool,
pub reason: Option<ShutdownReason>,
pub hooks_registered: usize,
pub hooks_completed: usize,
pub elapsed: Option<Duration>,
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::hook::hook_from_fn;
#[test]
fn builder_defaults() {
let c = Coordinator::builder().build();
assert_eq!(c.signals(), SignalSet::graceful());
assert_eq!(c.graceful_timeout(), Duration::from_millis(5_000));
assert_eq!(c.force_timeout(), Duration::from_millis(10_000));
assert!(!c.is_installed());
let stats = c.statistics();
assert!(!stats.initiated);
assert_eq!(stats.hooks_registered, 0);
assert_eq!(stats.hooks_completed, 0);
}
#[test]
fn token_observes_trigger() {
let c = Coordinator::builder().build();
let token = c.token();
let trigger = c.trigger();
assert!(!token.is_initiated());
assert!(trigger.trigger(ShutdownReason::Requested));
assert!(token.is_initiated());
assert_eq!(token.reason(), Some(ShutdownReason::Requested));
assert!(!trigger.trigger(ShutdownReason::Forced));
assert_eq!(token.reason(), Some(ShutdownReason::Requested));
}
#[test]
fn hooks_run_in_priority_order() {
let order = Arc::new(parking_lot::Mutex::new(Vec::<i32>::new()));
let push = |p: i32, order: &Arc<parking_lot::Mutex<Vec<i32>>>| {
let o = Arc::clone(order);
hook_from_fn(format!("p{p}"), p, move |_| {
o.lock().push(p);
})
};
let c = Coordinator::builder()
.hook(push(0, &order))
.hook(push(100, &order))
.hook(push(50, &order))
.build();
let count = c.run_hooks(ShutdownReason::Requested);
assert_eq!(count, 3);
assert_eq!(*order.lock(), vec![100, 50, 0]);
assert_eq!(c.statistics().hooks_completed, 3);
}
#[test]
fn hooks_respect_graceful_budget() {
let counter = Arc::new(AtomicUsize::new(0));
let c1 = Arc::clone(&counter);
let c2 = Arc::clone(&counter);
let slow = hook_from_fn("slow", 100, move |_| {
c1.fetch_add(1, Ordering::Relaxed);
std::thread::sleep(Duration::from_millis(30));
});
let later = hook_from_fn("later", 0, move |_| {
c2.fetch_add(1, Ordering::Relaxed);
});
let c = Coordinator::builder()
.graceful_timeout(Duration::from_millis(5))
.hook(slow)
.hook(later)
.build();
let count = c.run_hooks(ShutdownReason::Requested);
assert_eq!(count, 1);
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
#[test]
fn elapsed_increases_after_trigger() {
let c = Coordinator::builder().build();
let token = c.token();
assert!(token.elapsed().is_none());
let _ = c.trigger().trigger(ShutdownReason::Requested);
let first = token.elapsed().unwrap();
std::thread::sleep(Duration::from_millis(5));
let second = token.elapsed().unwrap();
assert!(second >= first);
}
#[test]
fn wait_blocking_timeout_returns_false_on_expiry() {
let c = Coordinator::builder().build();
let token = c.token();
assert!(!token.wait_blocking_timeout(Duration::from_millis(5)));
}
#[test]
fn wait_blocking_timeout_returns_true_on_trigger() {
let c = Coordinator::builder().build();
let token = c.token();
let trigger = c.trigger();
let handle = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(10));
trigger.trigger(ShutdownReason::Requested);
});
assert!(token.wait_blocking_timeout(Duration::from_secs(1)));
handle.join().unwrap();
}
#[cfg(not(any(feature = "tokio", feature = "async-std", feature = "ctrlc-fallback")))]
#[test]
fn install_errors_with_no_runtime() {
let c = Coordinator::builder().build();
assert!(matches!(c.install(), Err(Error::NoRuntime)));
}
#[cfg(feature = "tokio")]
#[tokio::test]
async fn token_wait_resolves_on_trigger() {
let c = Coordinator::builder().build();
let token = c.token();
let trigger = c.trigger();
let waiter = tokio::spawn(async move { token.wait().await });
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(trigger.trigger(ShutdownReason::Requested));
let _ = tokio::time::timeout(Duration::from_secs(1), waiter)
.await
.expect("wait did not resolve within 1s");
}
}