hyperi_rustlib/shutdown.rs
1// Project: hyperi-rustlib
2// File: src/shutdown.rs
3// Purpose: Unified graceful shutdown with global CancellationToken
4// Language: Rust
5//
6// License: FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Unified graceful shutdown manager.
10//!
11//! Provides a global [`CancellationToken`] that all modules can listen on
12//! for coordinated graceful shutdown. One place handles SIGTERM/SIGINT,
13//! all modules drain gracefully.
14//!
15//! ## Usage
16//!
17//! ```rust,no_run
18//! use hyperi_rustlib::shutdown;
19//!
20//! #[tokio::main]
21//! async fn main() {
22//! // Install the signal handler once at startup
23//! let token = shutdown::install_signal_handler();
24//!
25//! // Pass token to workers, or they can call shutdown::token() directly
26//! tokio::spawn(async move {
27//! loop {
28//! tokio::select! {
29//! _ = token.cancelled() => {
30//! // drain and exit
31//! break;
32//! }
33//! _ = do_work() => {}
34//! }
35//! }
36//! });
37//! }
38//!
39//! async fn do_work() {
40//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
41//! }
42//! ```
43
44use std::sync::OnceLock;
45use tokio_util::sync::CancellationToken;
46
47static TOKEN: OnceLock<CancellationToken> = OnceLock::new();
48
49/// Get the global shutdown token.
50///
51/// All modules should clone this token and listen for cancellation
52/// in their main loops via `token.cancelled().await`.
53///
54/// The token is created lazily on first access.
55pub fn token() -> CancellationToken {
56 TOKEN.get_or_init(CancellationToken::new).clone()
57}
58
59/// Check if shutdown has been requested.
60pub fn is_shutdown() -> bool {
61 TOKEN.get().is_some_and(CancellationToken::is_cancelled)
62}
63
64/// Trigger shutdown programmatically.
65///
66/// Cancels the global token. All modules listening on it will
67/// begin their drain/cleanup sequence.
68pub fn trigger() {
69 if let Some(t) = TOKEN.get() {
70 t.cancel();
71 }
72}
73
74/// Wait for SIGTERM or SIGINT, then trigger shutdown.
75///
76/// Call this once at application startup. It spawns a background
77/// task that waits for the OS signal, then cancels the global token.
78///
79/// **K8s pre-stop compliance:** When running in Kubernetes (detected via
80/// [`crate::env::runtime_context`]), sleeps for `PRESTOP_DELAY_SECS`
81/// (default 5) before cancelling the token. This gives K8s time to
82/// remove the pod from Service endpoints before the app starts draining.
83/// On bare metal / Docker, the delay is 0 (immediate shutdown).
84///
85/// Returns the token for use in `tokio::select!` or other async
86/// shutdown coordination.
87#[must_use]
88pub fn install_signal_handler() -> CancellationToken {
89 let t = token();
90 let cancel = t.clone();
91
92 tokio::spawn(async move {
93 wait_for_signal().await;
94
95 // K8s pre-stop: delay before draining to allow endpoint removal.
96 // Without this, K8s routes traffic to a pod that's already shutting down.
97 let prestop_delay = prestop_delay_secs();
98 if prestop_delay > 0 {
99 #[cfg(feature = "logger")]
100 tracing::info!(
101 delay_secs = prestop_delay,
102 "Pre-stop delay: waiting for K8s endpoint removal"
103 );
104 tokio::time::sleep(std::time::Duration::from_secs(prestop_delay)).await;
105 }
106
107 // Emit eviction metric when SIGTERM received in K8s
108 #[cfg(any(feature = "metrics", feature = "otel-metrics"))]
109 if crate::env::runtime_context().is_kubernetes() {
110 metrics::counter!("pod_eviction_received_total").increment(1);
111 }
112
113 cancel.cancel();
114
115 #[cfg(feature = "logger")]
116 tracing::info!("Shutdown signal received, cancelling all tasks");
117 });
118
119 t
120}
121
122/// Determine the pre-stop delay in seconds.
123///
124/// - `PRESTOP_DELAY_SECS` env var overrides (for tuning in deployment manifests)
125/// - K8s detected: default 5 seconds
126/// - Bare metal / Docker: default 0 (no delay)
127fn prestop_delay_secs() -> u64 {
128 if let Ok(val) = std::env::var("PRESTOP_DELAY_SECS")
129 && let Ok(secs) = val.parse::<u64>()
130 {
131 return secs;
132 }
133 if crate::env::runtime_context().is_kubernetes() {
134 5
135 } else {
136 0
137 }
138}
139
140/// Wait for SIGTERM or SIGINT.
141async fn wait_for_signal() {
142 let ctrl_c = async {
143 if let Err(e) = tokio::signal::ctrl_c().await {
144 tracing::error!(error = %e, "Failed to install Ctrl+C handler");
145 std::future::pending::<()>().await;
146 }
147 };
148
149 #[cfg(unix)]
150 let terminate = async {
151 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
152 Ok(mut sig) => {
153 sig.recv().await;
154 }
155 Err(e) => {
156 tracing::error!(
157 error = %e,
158 "Failed to install SIGTERM handler, only Ctrl+C will trigger shutdown"
159 );
160 std::future::pending::<()>().await;
161 }
162 }
163 };
164
165 #[cfg(unix)]
166 tokio::select! {
167 () = ctrl_c => {},
168 () = terminate => {},
169 }
170
171 #[cfg(not(unix))]
172 ctrl_c.await;
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[test]
180 fn token_is_not_cancelled_initially() {
181 // Use a fresh token (not the global) to avoid test pollution
182 let t = CancellationToken::new();
183 assert!(!t.is_cancelled());
184 }
185
186 #[test]
187 fn trigger_cancels_token() {
188 let t = CancellationToken::new();
189 assert!(!t.is_cancelled());
190 t.cancel();
191 assert!(t.is_cancelled());
192 }
193
194 #[test]
195 fn token_is_cloneable_and_shared() {
196 let t = CancellationToken::new();
197 let c1 = t.clone();
198 let c2 = t.clone();
199
200 assert!(!c1.is_cancelled());
201 assert!(!c2.is_cancelled());
202
203 t.cancel();
204
205 assert!(c1.is_cancelled());
206 assert!(c2.is_cancelled());
207 }
208
209 #[test]
210 fn multiple_triggers_are_idempotent() {
211 let t = CancellationToken::new();
212 t.cancel();
213 t.cancel(); // second cancel should not panic
214 assert!(t.is_cancelled());
215 }
216
217 #[tokio::test]
218 async fn cancelled_future_resolves_after_cancel() {
219 let t = CancellationToken::new();
220 let c = t.clone();
221
222 tokio::spawn(async move {
223 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
224 c.cancel();
225 });
226
227 // This should resolve once the token is cancelled
228 t.cancelled().await;
229 assert!(t.is_cancelled());
230 }
231
232 #[tokio::test]
233 async fn child_token_cancelled_by_parent() {
234 let parent = CancellationToken::new();
235 let child = parent.child_token();
236
237 assert!(!child.is_cancelled());
238 parent.cancel();
239 assert!(child.is_cancelled());
240 }
241}