Skip to main content

hyperi_rustlib/
shutdown.rs

1// Project:   hyperi-rustlib
2// File:      src/shutdown.rs
3// Purpose:   Unified graceful shutdown with global CancellationToken
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Unified graceful shutdown manager.
10//!
11//! Provides a global [`CancellationToken`] that all modules can listen on
12//! for coordinated graceful shutdown. One place handles SIGTERM/SIGINT,
13//! all modules drain gracefully.
14//!
15//! ## Usage
16//!
17//! ```rust,no_run
18//! use hyperi_rustlib::shutdown;
19//!
20//! #[tokio::main]
21//! async fn main() {
22//!     // Install the signal handler once at startup
23//!     let token = shutdown::install_signal_handler();
24//!
25//!     // Pass token to workers, or they can call shutdown::token() directly
26//!     tokio::spawn(async move {
27//!         loop {
28//!             tokio::select! {
29//!                 _ = token.cancelled() => {
30//!                     // drain and exit
31//!                     break;
32//!                 }
33//!                 _ = do_work() => {}
34//!             }
35//!         }
36//!     });
37//! }
38//!
39//! async fn do_work() {
40//!     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
41//! }
42//! ```
43
44use std::sync::OnceLock;
45use tokio_util::sync::CancellationToken;
46
47static TOKEN: OnceLock<CancellationToken> = OnceLock::new();
48
49/// Get the global shutdown token.
50///
51/// All modules should clone this token and listen for cancellation
52/// in their main loops via `token.cancelled().await`.
53///
54/// The token is created lazily on first access.
55pub fn token() -> CancellationToken {
56    TOKEN.get_or_init(CancellationToken::new).clone()
57}
58
59/// Check if shutdown has been requested.
60pub fn is_shutdown() -> bool {
61    TOKEN.get().is_some_and(CancellationToken::is_cancelled)
62}
63
64/// Trigger shutdown programmatically.
65///
66/// Cancels the global token. All modules listening on it will
67/// begin their drain/cleanup sequence.
68pub fn trigger() {
69    if let Some(t) = TOKEN.get() {
70        t.cancel();
71    }
72}
73
74/// Wait for SIGTERM or SIGINT, then trigger shutdown.
75///
76/// Call this once at application startup. It spawns a background
77/// task that waits for the OS signal, then cancels the global token.
78///
79/// **K8s pre-stop compliance:** When running in Kubernetes (detected via
80/// [`crate::env::runtime_context`]), sleeps for `PRESTOP_DELAY_SECS`
81/// (default 5) before cancelling the token. This gives K8s time to
82/// remove the pod from Service endpoints before the app starts draining.
83/// On bare metal / Docker, the delay is 0 (immediate shutdown).
84///
85/// Returns the token for use in `tokio::select!` or other async
86/// shutdown coordination.
87#[must_use]
88pub fn install_signal_handler() -> CancellationToken {
89    let t = token();
90    let cancel = t.clone();
91
92    tokio::spawn(async move {
93        wait_for_signal().await;
94
95        // K8s pre-stop: delay before draining to allow endpoint removal.
96        // Without this, K8s routes traffic to a pod that's already shutting down.
97        let prestop_delay = prestop_delay_secs();
98        if prestop_delay > 0 {
99            #[cfg(feature = "logger")]
100            tracing::info!(
101                delay_secs = prestop_delay,
102                "Pre-stop delay: waiting for K8s endpoint removal"
103            );
104            tokio::time::sleep(std::time::Duration::from_secs(prestop_delay)).await;
105        }
106
107        // Emit eviction metric when SIGTERM received in K8s
108        // TODO (metrics audit): `pod_eviction_received_total` has no `{ns}`
109        // prefix -- left as-is (no clean namespace available at this layer;
110        // shutdown runs before any MetricsManager handle is threaded here).
111        #[cfg(any(feature = "metrics", feature = "otel-metrics"))]
112        if crate::env::runtime_context().is_kubernetes() {
113            metrics::counter!("pod_eviction_received_total").increment(1);
114        }
115
116        cancel.cancel();
117
118        #[cfg(feature = "logger")]
119        tracing::info!("Shutdown signal received, cancelling all tasks");
120    });
121
122    t
123}
124
125/// Determine the pre-stop delay in seconds.
126///
127/// - `PRESTOP_DELAY_SECS` env var overrides (for tuning in deployment manifests)
128/// - K8s detected: default 5 seconds
129/// - Bare metal / Docker: default 0 (no delay)
130fn prestop_delay_secs() -> u64 {
131    if let Ok(val) = std::env::var("PRESTOP_DELAY_SECS")
132        && let Ok(secs) = val.parse::<u64>()
133    {
134        return secs;
135    }
136    if crate::env::runtime_context().is_kubernetes() {
137        5
138    } else {
139        0
140    }
141}
142
143/// Wait for SIGTERM or SIGINT.
144async fn wait_for_signal() {
145    let ctrl_c = async {
146        if let Err(e) = tokio::signal::ctrl_c().await {
147            tracing::error!(error = %e, "Failed to install Ctrl+C handler");
148            std::future::pending::<()>().await;
149        }
150    };
151
152    #[cfg(unix)]
153    let terminate = async {
154        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
155            Ok(mut sig) => {
156                sig.recv().await;
157            }
158            Err(e) => {
159                tracing::error!(
160                    error = %e,
161                    "Failed to install SIGTERM handler, only Ctrl+C will trigger shutdown"
162                );
163                std::future::pending::<()>().await;
164            }
165        }
166    };
167
168    #[cfg(unix)]
169    tokio::select! {
170        () = ctrl_c => {},
171        () = terminate => {},
172    }
173
174    #[cfg(not(unix))]
175    ctrl_c.await;
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    #[test]
183    fn token_is_not_cancelled_initially() {
184        // Use a fresh token (not the global) to avoid test pollution
185        let t = CancellationToken::new();
186        assert!(!t.is_cancelled());
187    }
188
189    #[test]
190    fn trigger_cancels_token() {
191        let t = CancellationToken::new();
192        assert!(!t.is_cancelled());
193        t.cancel();
194        assert!(t.is_cancelled());
195    }
196
197    #[test]
198    fn token_is_cloneable_and_shared() {
199        let t = CancellationToken::new();
200        let c1 = t.clone();
201        let c2 = t.clone();
202
203        assert!(!c1.is_cancelled());
204        assert!(!c2.is_cancelled());
205
206        t.cancel();
207
208        assert!(c1.is_cancelled());
209        assert!(c2.is_cancelled());
210    }
211
212    #[test]
213    fn multiple_triggers_are_idempotent() {
214        let t = CancellationToken::new();
215        t.cancel();
216        t.cancel(); // second cancel should not panic
217        assert!(t.is_cancelled());
218    }
219
220    #[tokio::test]
221    async fn cancelled_future_resolves_after_cancel() {
222        let t = CancellationToken::new();
223        let c = t.clone();
224
225        tokio::spawn(async move {
226            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
227            c.cancel();
228        });
229
230        // This should resolve once the token is cancelled
231        t.cancelled().await;
232        assert!(t.is_cancelled());
233    }
234
235    #[tokio::test]
236    async fn child_token_cancelled_by_parent() {
237        let parent = CancellationToken::new();
238        let child = parent.child_token();
239
240        assert!(!child.is_cancelled());
241        parent.cancel();
242        assert!(child.is_cancelled());
243    }
244}