adaptive_pipeline_bootstrap/shutdown.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Shutdown Coordination
9//!
10//! Manages graceful shutdown across application components.
11//!
12//! ## Design Pattern
13//!
14//! The shutdown coordinator provides:
15//! - **Cancellation tokens** for propagating shutdown signals
16//! - **Grace period** with timeout enforcement
17//! - **Atomic state** for shutdown tracking
18//! - **Async-aware** shutdown orchestration
19//!
20//! ## Usage
21//!
22//! ```rust
23//! use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
24//! use std::time::Duration;
25//!
26//! #[tokio::main]
27//! async fn main() {
28//! let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
29//!
30//! // Clone token for worker tasks
31//! let token = coordinator.token();
32//!
33//! // Spawn worker
34//! tokio::spawn(async move {
35//! loop {
36//! tokio::select! {
37//! _ = token.cancelled() => {
38//! println!("Worker shutting down gracefully");
39//! break;
40//! }
41//! _ = tokio::time::sleep(Duration::from_secs(1)) => {
42//! println!("Working...");
43//! }
44//! }
45//! }
46//! });
47//!
48//! // Later: initiate shutdown
49//! coordinator.initiate_shutdown();
50//! coordinator.wait_for_shutdown().await;
51//! }
52//! ```
53
54use std::sync::atomic::{AtomicBool, Ordering};
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::sync::Notify;
58
59/// Default grace period for graceful shutdown (in seconds)
60pub const DEFAULT_GRACE_PERIOD_SECS: u64 = 5;
61
62/// Cancellation token for signaling shutdown
63///
64/// Lightweight clone-able token that can be passed to async tasks.
65///
66/// # Examples
67///
68/// ```
69/// use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
70/// use std::time::Duration;
71///
72/// # async fn example() {
73/// let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
74/// let token = coordinator.token();
75///
76/// // Pass token to async task
77/// tokio::spawn(async move {
78/// tokio::select! {
79/// _ = token.cancelled() => {
80/// println!("Task received shutdown signal");
81/// }
82/// _ = async { /* do work */ } => {
83/// println!("Task completed normally");
84/// }
85/// }
86/// });
87///
88/// // Later, initiate shutdown
89/// coordinator.initiate_shutdown();
90/// # }
91/// ```
92#[derive(Clone)]
93pub struct CancellationToken {
94 /// Shared cancellation flag
95 cancelled: Arc<AtomicBool>,
96 /// Notification for waiters
97 notify: Arc<Notify>,
98}
99
100impl CancellationToken {
101 /// Create a new cancellation token
102 fn new() -> Self {
103 Self {
104 cancelled: Arc::new(AtomicBool::new(false)),
105 notify: Arc::new(Notify::new()),
106 }
107 }
108
109 /// Cancel this token
110 pub fn cancel(&self) {
111 self.cancelled.store(true, Ordering::SeqCst);
112 self.notify.notify_waiters();
113 }
114
115 /// Check if cancelled (non-blocking)
116 pub fn is_cancelled(&self) -> bool {
117 self.cancelled.load(Ordering::SeqCst)
118 }
119
120 /// Wait for cancellation (async)
121 pub async fn cancelled(&self) {
122 if self.is_cancelled() {
123 return;
124 }
125 self.notify.notified().await;
126 }
127}
128
129/// Shutdown coordinator
130///
131/// Manages graceful shutdown with grace period and timeout enforcement.
132#[derive(Clone)]
133pub struct ShutdownCoordinator {
134 /// Cancellation token for shutdown signal
135 token: CancellationToken,
136
137 /// Grace period before forced shutdown
138 grace_period: Duration,
139
140 /// Shutdown initiated flag
141 shutdown_initiated: Arc<AtomicBool>,
142
143 /// Notification for shutdown completion
144 shutdown_complete: Arc<Notify>,
145}
146
147impl ShutdownCoordinator {
148 /// Create a new shutdown coordinator
149 ///
150 /// # Arguments
151 ///
152 /// * `grace_period` - Maximum time to wait for graceful shutdown
153 pub fn new(grace_period: Duration) -> Self {
154 Self {
155 token: CancellationToken::new(),
156 grace_period,
157 shutdown_initiated: Arc::new(AtomicBool::new(false)),
158 shutdown_complete: Arc::new(Notify::new()),
159 }
160 }
161
162 /// Get a cancellation token
163 ///
164 /// Tokens can be cloned and passed to async tasks for shutdown signaling.
165 pub fn token(&self) -> CancellationToken {
166 self.token.clone()
167 }
168
169 /// Check if shutdown has been initiated
170 pub fn is_shutting_down(&self) -> bool {
171 self.shutdown_initiated.load(Ordering::SeqCst)
172 }
173
174 /// Initiate graceful shutdown
175 ///
176 /// This will:
177 /// 1. Set shutdown initiated flag
178 /// 2. Cancel all tokens
179 /// 3. Start grace period timer
180 pub fn initiate_shutdown(&self) {
181 if self
182 .shutdown_initiated
183 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
184 .is_ok()
185 {
186 tracing::info!("Initiating graceful shutdown (grace period: {:?})", self.grace_period);
187 self.token.cancel();
188 }
189 }
190
191 /// Wait for shutdown to complete or timeout
192 ///
193 /// Returns `true` if shutdown completed within grace period,
194 /// `false` if timeout occurred.
195 ///
196 /// # Examples
197 ///
198 /// ```
199 /// use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
200 /// use std::time::Duration;
201 ///
202 /// # async fn example() {
203 /// let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
204 ///
205 /// // In main application loop
206 /// coordinator.initiate_shutdown();
207 ///
208 /// // Wait for all tasks to complete
209 /// if coordinator.wait_for_shutdown().await {
210 /// println!("Shutdown completed gracefully");
211 /// } else {
212 /// println!("Shutdown timed out, forcing exit");
213 /// }
214 ///
215 /// coordinator.complete_shutdown();
216 /// # }
217 /// ```
218 pub async fn wait_for_shutdown(&self) -> bool {
219 if !self.is_shutting_down() {
220 tracing::warn!("wait_for_shutdown called but shutdown not initiated");
221 return true;
222 }
223
224 // Race shutdown completion against timeout
225 tokio::select! {
226 _ = self.shutdown_complete.notified() => {
227 tracing::info!("Shutdown completed gracefully");
228 true
229 }
230 _ = tokio::time::sleep(self.grace_period) => {
231 tracing::warn!("Shutdown grace period expired, forcing shutdown");
232 false
233 }
234 }
235 }
236
237 /// Signal that shutdown is complete
238 ///
239 /// Call this after all cleanup is done to notify waiters.
240 pub fn complete_shutdown(&self) {
241 self.shutdown_complete.notify_waiters();
242 }
243
244 /// Wait for shutdown with custom timeout
245 pub async fn wait_with_timeout(&self, timeout: Duration) -> bool {
246 if !self.is_shutting_down() {
247 return true;
248 }
249
250 tokio::select! {
251 _ = self.shutdown_complete.notified() => true,
252 _ = tokio::time::sleep(timeout) => false,
253 }
254 }
255}
256
257impl Default for ShutdownCoordinator {
258 fn default() -> Self {
259 Self::new(Duration::from_secs(DEFAULT_GRACE_PERIOD_SECS))
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 #[test]
268 fn test_cancellation_token_create() {
269 let token = CancellationToken::new();
270 assert!(!token.is_cancelled());
271 }
272
273 #[test]
274 fn test_cancellation_token_cancel() {
275 let token = CancellationToken::new();
276 token.cancel();
277 assert!(token.is_cancelled());
278 }
279
280 #[test]
281 fn test_cancellation_token_clone() {
282 let token1 = CancellationToken::new();
283 let token2 = token1.clone();
284
285 token1.cancel();
286 assert!(token2.is_cancelled());
287 }
288
289 #[tokio::test]
290 async fn test_cancellation_token_cancelled_already() {
291 let token = CancellationToken::new();
292 token.cancel();
293
294 // Should return immediately
295 token.cancelled().await;
296 assert!(token.is_cancelled());
297 }
298
299 #[tokio::test]
300 async fn test_cancellation_token_cancelled_wait() {
301 let token = CancellationToken::new();
302 let token_clone = token.clone();
303
304 tokio::spawn(async move {
305 tokio::time::sleep(Duration::from_millis(50)).await;
306 token_clone.cancel();
307 });
308
309 token.cancelled().await;
310 assert!(token.is_cancelled());
311 }
312
313 #[test]
314 fn test_shutdown_coordinator_create() {
315 let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
316 assert!(!coordinator.is_shutting_down());
317 }
318
319 #[test]
320 fn test_shutdown_coordinator_default() {
321 let coordinator = ShutdownCoordinator::default();
322 assert!(!coordinator.is_shutting_down());
323 }
324
325 #[test]
326 fn test_shutdown_coordinator_initiate() {
327 let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
328 coordinator.initiate_shutdown();
329 assert!(coordinator.is_shutting_down());
330 assert!(coordinator.token().is_cancelled());
331 }
332
333 #[test]
334 fn test_shutdown_coordinator_token() {
335 let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
336 let token = coordinator.token();
337
338 assert!(!token.is_cancelled());
339
340 coordinator.initiate_shutdown();
341 assert!(token.is_cancelled());
342 }
343
344 #[tokio::test]
345 async fn test_shutdown_coordinator_complete() {
346 let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
347
348 coordinator.initiate_shutdown();
349
350 let coordinator_clone = coordinator.clone();
351 tokio::spawn(async move {
352 tokio::time::sleep(Duration::from_millis(50)).await;
353 coordinator_clone.complete_shutdown();
354 });
355
356 // This should complete quickly (not timeout)
357 let result = tokio::time::timeout(Duration::from_millis(200), coordinator.wait_for_shutdown()).await;
358
359 assert!(result.is_ok());
360 assert!(result.unwrap()); // True = completed, not timed out
361 }
362
363 #[tokio::test]
364 async fn test_shutdown_coordinator_timeout() {
365 let coordinator = ShutdownCoordinator::new(Duration::from_millis(50));
366
367 coordinator.initiate_shutdown();
368 // Don't call complete_shutdown - let it timeout
369
370 let result = coordinator.wait_for_shutdown().await;
371 assert!(!result); // False = timed out
372 }
373
374 #[tokio::test]
375 async fn test_shutdown_coordinator_wait_custom_timeout() {
376 let coordinator = ShutdownCoordinator::new(Duration::from_secs(10));
377
378 coordinator.initiate_shutdown();
379
380 // Use shorter custom timeout
381 let result = coordinator.wait_with_timeout(Duration::from_millis(50)).await;
382 assert!(!result); // Timed out
383 }
384}