adaptive_pipeline_bootstrap/
shutdown.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Shutdown Coordination
9//!
10//! Manages graceful shutdown across application components.
11//!
12//! ## Design Pattern
13//!
14//! The shutdown coordinator provides:
15//! - **Cancellation tokens** for propagating shutdown signals
16//! - **Grace period** with timeout enforcement
17//! - **Atomic state** for shutdown tracking
18//! - **Async-aware** shutdown orchestration
19//!
20//! ## Usage
21//!
22//! ```rust
23//! use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
24//! use std::time::Duration;
25//!
26//! #[tokio::main]
27//! async fn main() {
28//!     let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
29//!
30//!     // Clone token for worker tasks
31//!     let token = coordinator.token();
32//!
33//!     // Spawn worker
34//!     tokio::spawn(async move {
35//!         loop {
36//!             tokio::select! {
37//!                 _ = token.cancelled() => {
38//!                     println!("Worker shutting down gracefully");
39//!                     break;
40//!                 }
41//!                 _ = tokio::time::sleep(Duration::from_secs(1)) => {
42//!                     println!("Working...");
43//!                 }
44//!             }
45//!         }
46//!     });
47//!
48//!     // Later: initiate shutdown
49//!     coordinator.initiate_shutdown();
50//!     coordinator.wait_for_shutdown().await;
51//! }
52//! ```
53
54use std::sync::atomic::{AtomicBool, Ordering};
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::sync::Notify;
58
59/// Default grace period for graceful shutdown (in seconds)
60pub const DEFAULT_GRACE_PERIOD_SECS: u64 = 5;
61
62/// Cancellation token for signaling shutdown
63///
64/// Lightweight clone-able token that can be passed to async tasks.
65///
66/// # Examples
67///
68/// ```
69/// use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
70/// use std::time::Duration;
71///
72/// # async fn example() {
73/// let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
74/// let token = coordinator.token();
75///
76/// // Pass token to async task
77/// tokio::spawn(async move {
78///     tokio::select! {
79///         _ = token.cancelled() => {
80///             println!("Task received shutdown signal");
81///         }
82///         _ = async { /* do work */ } => {
83///             println!("Task completed normally");
84///         }
85///     }
86/// });
87///
88/// // Later, initiate shutdown
89/// coordinator.initiate_shutdown();
90/// # }
91/// ```
92#[derive(Clone)]
93pub struct CancellationToken {
94    /// Shared cancellation flag
95    cancelled: Arc<AtomicBool>,
96    /// Notification for waiters
97    notify: Arc<Notify>,
98}
99
100impl CancellationToken {
101    /// Create a new cancellation token
102    fn new() -> Self {
103        Self {
104            cancelled: Arc::new(AtomicBool::new(false)),
105            notify: Arc::new(Notify::new()),
106        }
107    }
108
109    /// Cancel this token
110    pub fn cancel(&self) {
111        self.cancelled.store(true, Ordering::SeqCst);
112        self.notify.notify_waiters();
113    }
114
115    /// Check if cancelled (non-blocking)
116    pub fn is_cancelled(&self) -> bool {
117        self.cancelled.load(Ordering::SeqCst)
118    }
119
120    /// Wait for cancellation (async)
121    pub async fn cancelled(&self) {
122        if self.is_cancelled() {
123            return;
124        }
125        self.notify.notified().await;
126    }
127}
128
129/// Shutdown coordinator
130///
131/// Manages graceful shutdown with grace period and timeout enforcement.
132#[derive(Clone)]
133pub struct ShutdownCoordinator {
134    /// Cancellation token for shutdown signal
135    token: CancellationToken,
136
137    /// Grace period before forced shutdown
138    grace_period: Duration,
139
140    /// Shutdown initiated flag
141    shutdown_initiated: Arc<AtomicBool>,
142
143    /// Notification for shutdown completion
144    shutdown_complete: Arc<Notify>,
145}
146
147impl ShutdownCoordinator {
148    /// Create a new shutdown coordinator
149    ///
150    /// # Arguments
151    ///
152    /// * `grace_period` - Maximum time to wait for graceful shutdown
153    pub fn new(grace_period: Duration) -> Self {
154        Self {
155            token: CancellationToken::new(),
156            grace_period,
157            shutdown_initiated: Arc::new(AtomicBool::new(false)),
158            shutdown_complete: Arc::new(Notify::new()),
159        }
160    }
161
162    /// Get a cancellation token
163    ///
164    /// Tokens can be cloned and passed to async tasks for shutdown signaling.
165    pub fn token(&self) -> CancellationToken {
166        self.token.clone()
167    }
168
169    /// Check if shutdown has been initiated
170    pub fn is_shutting_down(&self) -> bool {
171        self.shutdown_initiated.load(Ordering::SeqCst)
172    }
173
174    /// Initiate graceful shutdown
175    ///
176    /// This will:
177    /// 1. Set shutdown initiated flag
178    /// 2. Cancel all tokens
179    /// 3. Start grace period timer
180    pub fn initiate_shutdown(&self) {
181        if self
182            .shutdown_initiated
183            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
184            .is_ok()
185        {
186            tracing::info!("Initiating graceful shutdown (grace period: {:?})", self.grace_period);
187            self.token.cancel();
188        }
189    }
190
191    /// Wait for shutdown to complete or timeout
192    ///
193    /// Returns `true` if shutdown completed within grace period,
194    /// `false` if timeout occurred.
195    ///
196    /// # Examples
197    ///
198    /// ```
199    /// use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
200    /// use std::time::Duration;
201    ///
202    /// # async fn example() {
203    /// let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
204    ///
205    /// // In main application loop
206    /// coordinator.initiate_shutdown();
207    ///
208    /// // Wait for all tasks to complete
209    /// if coordinator.wait_for_shutdown().await {
210    ///     println!("Shutdown completed gracefully");
211    /// } else {
212    ///     println!("Shutdown timed out, forcing exit");
213    /// }
214    ///
215    /// coordinator.complete_shutdown();
216    /// # }
217    /// ```
218    pub async fn wait_for_shutdown(&self) -> bool {
219        if !self.is_shutting_down() {
220            tracing::warn!("wait_for_shutdown called but shutdown not initiated");
221            return true;
222        }
223
224        // Race shutdown completion against timeout
225        tokio::select! {
226            _ = self.shutdown_complete.notified() => {
227                tracing::info!("Shutdown completed gracefully");
228                true
229            }
230            _ = tokio::time::sleep(self.grace_period) => {
231                tracing::warn!("Shutdown grace period expired, forcing shutdown");
232                false
233            }
234        }
235    }
236
237    /// Signal that shutdown is complete
238    ///
239    /// Call this after all cleanup is done to notify waiters.
240    pub fn complete_shutdown(&self) {
241        self.shutdown_complete.notify_waiters();
242    }
243
244    /// Wait for shutdown with custom timeout
245    pub async fn wait_with_timeout(&self, timeout: Duration) -> bool {
246        if !self.is_shutting_down() {
247            return true;
248        }
249
250        tokio::select! {
251            _ = self.shutdown_complete.notified() => true,
252            _ = tokio::time::sleep(timeout) => false,
253        }
254    }
255}
256
257impl Default for ShutdownCoordinator {
258    fn default() -> Self {
259        Self::new(Duration::from_secs(DEFAULT_GRACE_PERIOD_SECS))
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    #[test]
268    fn test_cancellation_token_create() {
269        let token = CancellationToken::new();
270        assert!(!token.is_cancelled());
271    }
272
273    #[test]
274    fn test_cancellation_token_cancel() {
275        let token = CancellationToken::new();
276        token.cancel();
277        assert!(token.is_cancelled());
278    }
279
280    #[test]
281    fn test_cancellation_token_clone() {
282        let token1 = CancellationToken::new();
283        let token2 = token1.clone();
284
285        token1.cancel();
286        assert!(token2.is_cancelled());
287    }
288
289    #[tokio::test]
290    async fn test_cancellation_token_cancelled_already() {
291        let token = CancellationToken::new();
292        token.cancel();
293
294        // Should return immediately
295        token.cancelled().await;
296        assert!(token.is_cancelled());
297    }
298
299    #[tokio::test]
300    async fn test_cancellation_token_cancelled_wait() {
301        let token = CancellationToken::new();
302        let token_clone = token.clone();
303
304        tokio::spawn(async move {
305            tokio::time::sleep(Duration::from_millis(50)).await;
306            token_clone.cancel();
307        });
308
309        token.cancelled().await;
310        assert!(token.is_cancelled());
311    }
312
313    #[test]
314    fn test_shutdown_coordinator_create() {
315        let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
316        assert!(!coordinator.is_shutting_down());
317    }
318
319    #[test]
320    fn test_shutdown_coordinator_default() {
321        let coordinator = ShutdownCoordinator::default();
322        assert!(!coordinator.is_shutting_down());
323    }
324
325    #[test]
326    fn test_shutdown_coordinator_initiate() {
327        let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
328        coordinator.initiate_shutdown();
329        assert!(coordinator.is_shutting_down());
330        assert!(coordinator.token().is_cancelled());
331    }
332
333    #[test]
334    fn test_shutdown_coordinator_token() {
335        let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
336        let token = coordinator.token();
337
338        assert!(!token.is_cancelled());
339
340        coordinator.initiate_shutdown();
341        assert!(token.is_cancelled());
342    }
343
344    #[tokio::test]
345    async fn test_shutdown_coordinator_complete() {
346        let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
347
348        coordinator.initiate_shutdown();
349
350        let coordinator_clone = coordinator.clone();
351        tokio::spawn(async move {
352            tokio::time::sleep(Duration::from_millis(50)).await;
353            coordinator_clone.complete_shutdown();
354        });
355
356        // This should complete quickly (not timeout)
357        let result = tokio::time::timeout(Duration::from_millis(200), coordinator.wait_for_shutdown()).await;
358
359        assert!(result.is_ok());
360        assert!(result.unwrap()); // True = completed, not timed out
361    }
362
363    #[tokio::test]
364    async fn test_shutdown_coordinator_timeout() {
365        let coordinator = ShutdownCoordinator::new(Duration::from_millis(50));
366
367        coordinator.initiate_shutdown();
368        // Don't call complete_shutdown - let it timeout
369
370        let result = coordinator.wait_for_shutdown().await;
371        assert!(!result); // False = timed out
372    }
373
374    #[tokio::test]
375    async fn test_shutdown_coordinator_wait_custom_timeout() {
376        let coordinator = ShutdownCoordinator::new(Duration::from_secs(10));
377
378        coordinator.initiate_shutdown();
379
380        // Use shorter custom timeout
381        let result = coordinator.wait_with_timeout(Duration::from_millis(50)).await;
382        assert!(!result); // Timed out
383    }
384}