rust_rabbit/
shutdown.rs

1//! Graceful shutdown handling for RustRabbit
2//!
3//! This module provides mechanisms for cleanly shutting down all RustRabbit components:
4//! - Connection pool cleanup
5//! - Consumer graceful stopping
6//! - Batch flushing before exit
7//! - Health monitoring shutdown
8
9use crate::error::{RabbitError, Result};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::{broadcast, Mutex, RwLock};
13use tokio::time::{sleep, timeout};
14use tracing::{debug, error, info, warn};
15
16/// Shutdown signal types
17#[derive(Debug, Clone)]
18pub enum ShutdownSignal {
19    /// Graceful shutdown requested
20    Graceful,
21    /// Immediate shutdown requested
22    Immediate,
23    /// Shutdown due to error
24    Error(String),
25}
26
27/// Shutdown configuration
28#[derive(Debug, Clone)]
29pub struct ShutdownConfig {
30    /// Maximum time to wait for graceful shutdown
31    pub graceful_timeout: Duration,
32    /// Time to wait between shutdown phases
33    pub phase_delay: Duration,
34    /// Whether to flush pending messages during shutdown
35    pub flush_pending: bool,
36    /// Maximum time to wait for pending operations
37    pub pending_timeout: Duration,
38}
39
40impl Default for ShutdownConfig {
41    fn default() -> Self {
42        Self {
43            graceful_timeout: Duration::from_secs(30),
44            phase_delay: Duration::from_millis(100),
45            flush_pending: true,
46            pending_timeout: Duration::from_secs(5),
47        }
48    }
49}
50
51/// Shutdown manager for coordinating graceful shutdown
52pub struct ShutdownManager {
53    config: ShutdownConfig,
54    shutdown_sender: broadcast::Sender<ShutdownSignal>,
55    #[allow(dead_code)]
56    shutdown_receiver: Arc<Mutex<broadcast::Receiver<ShutdownSignal>>>,
57    shutdown_in_progress: Arc<RwLock<bool>>,
58    registered_components: Arc<Mutex<Vec<Arc<dyn ShutdownHandler>>>>,
59}
60
61impl ShutdownManager {
62    /// Create a new shutdown manager
63    pub fn new(config: ShutdownConfig) -> Self {
64        let (sender, receiver) = broadcast::channel(100);
65
66        Self {
67            config,
68            shutdown_sender: sender,
69            shutdown_receiver: Arc::new(Mutex::new(receiver)),
70            shutdown_in_progress: Arc::new(RwLock::new(false)),
71            registered_components: Arc::new(Mutex::new(Vec::new())),
72        }
73    }
74
75    /// Register a component for shutdown notifications
76    pub async fn register_component(&self, component: Arc<dyn ShutdownHandler>) {
77        let mut components = self.registered_components.lock().await;
78        components.push(component);
79        debug!("Registered component for shutdown handling");
80    }
81
82    /// Get a shutdown signal receiver
83    pub fn subscribe(&self) -> broadcast::Receiver<ShutdownSignal> {
84        self.shutdown_sender.subscribe()
85    }
86
87    /// Initiate graceful shutdown
88    pub async fn shutdown(&self, signal: ShutdownSignal) -> Result<()> {
89        let mut shutdown_in_progress = self.shutdown_in_progress.write().await;
90        if *shutdown_in_progress {
91            debug!("Shutdown already in progress, ignoring duplicate signal");
92            return Ok(());
93        }
94
95        *shutdown_in_progress = true;
96        drop(shutdown_in_progress); // Release the lock
97
98        info!("๐Ÿ›‘ Initiating shutdown: {:?}", signal);
99
100        // Send shutdown signal to all subscribers
101        if let Err(e) = self.shutdown_sender.send(signal.clone()) {
102            warn!("Failed to send shutdown signal: {}", e);
103        }
104
105        // Execute shutdown based on signal type
106        match signal {
107            ShutdownSignal::Graceful => self.execute_graceful_shutdown().await,
108            ShutdownSignal::Immediate => self.execute_immediate_shutdown().await,
109            ShutdownSignal::Error(ref msg) => {
110                error!("Shutdown due to error: {}", msg);
111                self.execute_immediate_shutdown().await
112            }
113        }
114    }
115
116    /// Check if shutdown is in progress
117    pub async fn is_shutdown_in_progress(&self) -> bool {
118        *self.shutdown_in_progress.read().await
119    }
120
121    /// Execute graceful shutdown sequence
122    async fn execute_graceful_shutdown(&self) -> Result<()> {
123        info!("๐Ÿ”„ Starting graceful shutdown sequence");
124
125        let components = self.registered_components.lock().await.clone();
126
127        // Phase 1: Prepare for shutdown
128        info!("๐Ÿ“‹ Phase 1: Preparing components for shutdown");
129        for (i, component) in components.iter().enumerate() {
130            debug!("Preparing component {} for shutdown", i);
131
132            if let Err(e) = timeout(self.config.pending_timeout, component.prepare_shutdown()).await
133            {
134                warn!("Component {} prepare_shutdown timed out: {:?}", i, e);
135            } else if let Err(e) = component.prepare_shutdown().await {
136                warn!("Component {} prepare_shutdown failed: {}", i, e);
137            }
138        }
139
140        sleep(self.config.phase_delay).await;
141
142        // Phase 2: Stop accepting new work
143        info!("โน๏ธ Phase 2: Stopping new work acceptance");
144        for (i, component) in components.iter().enumerate() {
145            debug!("Stopping component {} from accepting new work", i);
146
147            if let Err(e) =
148                timeout(self.config.pending_timeout, component.stop_accepting_work()).await
149            {
150                warn!("Component {} stop_accepting_work timed out: {:?}", i, e);
151            } else if let Err(e) = component.stop_accepting_work().await {
152                warn!("Component {} stop_accepting_work failed: {}", i, e);
153            }
154        }
155
156        sleep(self.config.phase_delay).await;
157
158        // Phase 3: Wait for pending work to complete
159        info!("โณ Phase 3: Waiting for pending work to complete");
160        for (i, component) in components.iter().enumerate() {
161            debug!("Waiting for component {} pending work", i);
162
163            if let Err(e) =
164                timeout(self.config.pending_timeout, component.wait_for_completion()).await
165            {
166                warn!("Component {} wait_for_completion timed out: {:?}", i, e);
167            } else if let Err(e) = component.wait_for_completion().await {
168                warn!("Component {} wait_for_completion failed: {}", i, e);
169            }
170        }
171
172        sleep(self.config.phase_delay).await;
173
174        // Phase 4: Final cleanup
175        info!("๐Ÿงน Phase 4: Final cleanup");
176        for (i, component) in components.iter().enumerate() {
177            debug!("Performing final cleanup for component {}", i);
178
179            if let Err(e) = timeout(self.config.pending_timeout, component.cleanup()).await {
180                warn!("Component {} cleanup timed out: {:?}", i, e);
181            } else if let Err(e) = component.cleanup().await {
182                warn!("Component {} cleanup failed: {}", i, e);
183            }
184        }
185
186        info!("โœ… Graceful shutdown completed successfully");
187        Ok(())
188    }
189
190    /// Execute immediate shutdown sequence
191    async fn execute_immediate_shutdown(&self) -> Result<()> {
192        info!("โšก Starting immediate shutdown sequence");
193
194        let components = self.registered_components.lock().await.clone();
195
196        // Force immediate cleanup of all components
197        for (i, component) in components.iter().enumerate() {
198            debug!("Force cleaning up component {}", i);
199
200            if let Err(e) = timeout(Duration::from_secs(2), component.force_shutdown()).await {
201                error!("Component {} force_shutdown timed out: {:?}", i, e);
202            } else if let Err(e) = component.force_shutdown().await {
203                error!("Component {} force_shutdown failed: {}", i, e);
204            }
205        }
206
207        info!("โšก Immediate shutdown completed");
208        Ok(())
209    }
210}
211
212impl Default for ShutdownManager {
213    fn default() -> Self {
214        Self::new(ShutdownConfig::default())
215    }
216}
217
218/// Trait for components that need shutdown handling
219#[async_trait::async_trait]
220pub trait ShutdownHandler: Send + Sync {
221    /// Prepare for shutdown (e.g., set shutdown flags)
222    async fn prepare_shutdown(&self) -> Result<()> {
223        Ok(())
224    }
225
226    /// Stop accepting new work
227    async fn stop_accepting_work(&self) -> Result<()> {
228        Ok(())
229    }
230
231    /// Wait for pending work to complete
232    async fn wait_for_completion(&self) -> Result<()> {
233        Ok(())
234    }
235
236    /// Perform final cleanup
237    async fn cleanup(&self) -> Result<()> {
238        Ok(())
239    }
240
241    /// Force immediate shutdown (emergency)
242    async fn force_shutdown(&self) -> Result<()> {
243        self.cleanup().await
244    }
245}
246
247/// CTRL+C signal handler setup
248pub async fn setup_signal_handling(shutdown_manager: Arc<ShutdownManager>) -> Result<()> {
249    #[cfg(unix)]
250    {
251        use tokio::signal::unix::{signal, SignalKind};
252
253        let mut sigterm = signal(SignalKind::terminate()).map_err(|e| {
254            RabbitError::Configuration(format!("Failed to setup SIGTERM handler: {}", e))
255        })?;
256        let mut sigint = signal(SignalKind::interrupt()).map_err(|e| {
257            RabbitError::Configuration(format!("Failed to setup SIGINT handler: {}", e))
258        })?;
259
260        let shutdown_manager_clone = shutdown_manager.clone();
261        tokio::spawn(async move {
262            tokio::select! {
263                _ = sigterm.recv() => {
264                    info!("Received SIGTERM, initiating graceful shutdown");
265                    if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Graceful).await {
266                        error!("Failed to execute graceful shutdown: {}", e);
267                    }
268                }
269                _ = sigint.recv() => {
270                    info!("Received SIGINT (Ctrl+C), initiating graceful shutdown");
271                    if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Graceful).await {
272                        error!("Failed to execute graceful shutdown: {}", e);
273                    }
274                }
275            }
276        });
277    }
278
279    #[cfg(windows)]
280    {
281        use tokio::signal::windows;
282
283        let mut ctrl_c = windows::ctrl_c().map_err(|e| {
284            RabbitError::Configuration(format!("Failed to setup Ctrl+C handler: {}", e))
285        })?;
286        let mut ctrl_break = windows::ctrl_break().map_err(|e| {
287            RabbitError::Configuration(format!("Failed to setup Ctrl+Break handler: {}", e))
288        })?;
289        let mut ctrl_close = windows::ctrl_close().map_err(|e| {
290            RabbitError::Configuration(format!("Failed to setup Ctrl+Close handler: {}", e))
291        })?;
292
293        let shutdown_manager_clone = shutdown_manager.clone();
294        tokio::spawn(async move {
295            tokio::select! {
296                _ = ctrl_c.recv() => {
297                    info!("Received Ctrl+C, initiating graceful shutdown");
298                    if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Graceful).await {
299                        error!("Failed to execute graceful shutdown: {}", e);
300                    }
301                }
302                _ = ctrl_break.recv() => {
303                    info!("Received Ctrl+Break, initiating immediate shutdown");
304                    if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Immediate).await {
305                        error!("Failed to execute immediate shutdown: {}", e);
306                    }
307                }
308                _ = ctrl_close.recv() => {
309                    info!("Received close signal, initiating graceful shutdown");
310                    if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Graceful).await {
311                        error!("Failed to execute graceful shutdown: {}", e);
312                    }
313                }
314            }
315        });
316    }
317
318    info!("๐Ÿ“ก Signal handlers setup complete");
319    Ok(())
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use std::sync::atomic::{AtomicBool, Ordering};
326
327    #[derive(Debug)]
328    struct MockComponent {
329        shutdown_called: Arc<AtomicBool>,
330    }
331
332    impl MockComponent {
333        fn new() -> Self {
334            Self {
335                shutdown_called: Arc::new(AtomicBool::new(false)),
336            }
337        }
338
339        fn was_shutdown_called(&self) -> bool {
340            self.shutdown_called.load(Ordering::Relaxed)
341        }
342    }
343
344    #[async_trait::async_trait]
345    impl ShutdownHandler for MockComponent {
346        async fn cleanup(&self) -> Result<()> {
347            self.shutdown_called.store(true, Ordering::Relaxed);
348            Ok(())
349        }
350    }
351
352    #[tokio::test]
353    async fn test_shutdown_manager_creation() {
354        let config = ShutdownConfig::default();
355        let manager = ShutdownManager::new(config);
356
357        assert!(!manager.is_shutdown_in_progress().await);
358    }
359
360    #[tokio::test]
361    async fn test_component_registration() {
362        let manager = ShutdownManager::default();
363        let component = Arc::new(MockComponent::new());
364
365        manager.register_component(component.clone()).await;
366
367        // Trigger shutdown
368        let _ = manager.shutdown(ShutdownSignal::Graceful).await;
369
370        // Give some time for shutdown to complete
371        sleep(Duration::from_millis(100)).await;
372
373        assert!(component.was_shutdown_called());
374    }
375
376    #[tokio::test]
377    async fn test_shutdown_signal_subscription() {
378        let manager = ShutdownManager::default();
379        let mut receiver = manager.subscribe();
380
381        // Send shutdown signal
382        tokio::spawn(async move {
383            sleep(Duration::from_millis(10)).await;
384            let _ = manager.shutdown(ShutdownSignal::Graceful).await;
385        });
386
387        // Receive the signal
388        let signal = receiver.recv().await.unwrap();
389        matches!(signal, ShutdownSignal::Graceful);
390    }
391}