1use crate::error::{RabbitError, Result};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::{broadcast, Mutex, RwLock};
13use tokio::time::{sleep, timeout};
14use tracing::{debug, error, info, warn};
15
16#[derive(Debug, Clone)]
18pub enum ShutdownSignal {
19 Graceful,
21 Immediate,
23 Error(String),
25}
26
27#[derive(Debug, Clone)]
29pub struct ShutdownConfig {
30 pub graceful_timeout: Duration,
32 pub phase_delay: Duration,
34 pub flush_pending: bool,
36 pub pending_timeout: Duration,
38}
39
40impl Default for ShutdownConfig {
41 fn default() -> Self {
42 Self {
43 graceful_timeout: Duration::from_secs(30),
44 phase_delay: Duration::from_millis(100),
45 flush_pending: true,
46 pending_timeout: Duration::from_secs(5),
47 }
48 }
49}
50
51pub struct ShutdownManager {
53 config: ShutdownConfig,
54 shutdown_sender: broadcast::Sender<ShutdownSignal>,
55 #[allow(dead_code)]
56 shutdown_receiver: Arc<Mutex<broadcast::Receiver<ShutdownSignal>>>,
57 shutdown_in_progress: Arc<RwLock<bool>>,
58 registered_components: Arc<Mutex<Vec<Arc<dyn ShutdownHandler>>>>,
59}
60
61impl ShutdownManager {
62 pub fn new(config: ShutdownConfig) -> Self {
64 let (sender, receiver) = broadcast::channel(100);
65
66 Self {
67 config,
68 shutdown_sender: sender,
69 shutdown_receiver: Arc::new(Mutex::new(receiver)),
70 shutdown_in_progress: Arc::new(RwLock::new(false)),
71 registered_components: Arc::new(Mutex::new(Vec::new())),
72 }
73 }
74
75 pub async fn register_component(&self, component: Arc<dyn ShutdownHandler>) {
77 let mut components = self.registered_components.lock().await;
78 components.push(component);
79 debug!("Registered component for shutdown handling");
80 }
81
82 pub fn subscribe(&self) -> broadcast::Receiver<ShutdownSignal> {
84 self.shutdown_sender.subscribe()
85 }
86
87 pub async fn shutdown(&self, signal: ShutdownSignal) -> Result<()> {
89 let mut shutdown_in_progress = self.shutdown_in_progress.write().await;
90 if *shutdown_in_progress {
91 debug!("Shutdown already in progress, ignoring duplicate signal");
92 return Ok(());
93 }
94
95 *shutdown_in_progress = true;
96 drop(shutdown_in_progress); info!("๐ Initiating shutdown: {:?}", signal);
99
100 if let Err(e) = self.shutdown_sender.send(signal.clone()) {
102 warn!("Failed to send shutdown signal: {}", e);
103 }
104
105 match signal {
107 ShutdownSignal::Graceful => self.execute_graceful_shutdown().await,
108 ShutdownSignal::Immediate => self.execute_immediate_shutdown().await,
109 ShutdownSignal::Error(ref msg) => {
110 error!("Shutdown due to error: {}", msg);
111 self.execute_immediate_shutdown().await
112 }
113 }
114 }
115
116 pub async fn is_shutdown_in_progress(&self) -> bool {
118 *self.shutdown_in_progress.read().await
119 }
120
121 async fn execute_graceful_shutdown(&self) -> Result<()> {
123 info!("๐ Starting graceful shutdown sequence");
124
125 let components = self.registered_components.lock().await.clone();
126
127 info!("๐ Phase 1: Preparing components for shutdown");
129 for (i, component) in components.iter().enumerate() {
130 debug!("Preparing component {} for shutdown", i);
131
132 if let Err(e) = timeout(self.config.pending_timeout, component.prepare_shutdown()).await
133 {
134 warn!("Component {} prepare_shutdown timed out: {:?}", i, e);
135 } else if let Err(e) = component.prepare_shutdown().await {
136 warn!("Component {} prepare_shutdown failed: {}", i, e);
137 }
138 }
139
140 sleep(self.config.phase_delay).await;
141
142 info!("โน๏ธ Phase 2: Stopping new work acceptance");
144 for (i, component) in components.iter().enumerate() {
145 debug!("Stopping component {} from accepting new work", i);
146
147 if let Err(e) =
148 timeout(self.config.pending_timeout, component.stop_accepting_work()).await
149 {
150 warn!("Component {} stop_accepting_work timed out: {:?}", i, e);
151 } else if let Err(e) = component.stop_accepting_work().await {
152 warn!("Component {} stop_accepting_work failed: {}", i, e);
153 }
154 }
155
156 sleep(self.config.phase_delay).await;
157
158 info!("โณ Phase 3: Waiting for pending work to complete");
160 for (i, component) in components.iter().enumerate() {
161 debug!("Waiting for component {} pending work", i);
162
163 if let Err(e) =
164 timeout(self.config.pending_timeout, component.wait_for_completion()).await
165 {
166 warn!("Component {} wait_for_completion timed out: {:?}", i, e);
167 } else if let Err(e) = component.wait_for_completion().await {
168 warn!("Component {} wait_for_completion failed: {}", i, e);
169 }
170 }
171
172 sleep(self.config.phase_delay).await;
173
174 info!("๐งน Phase 4: Final cleanup");
176 for (i, component) in components.iter().enumerate() {
177 debug!("Performing final cleanup for component {}", i);
178
179 if let Err(e) = timeout(self.config.pending_timeout, component.cleanup()).await {
180 warn!("Component {} cleanup timed out: {:?}", i, e);
181 } else if let Err(e) = component.cleanup().await {
182 warn!("Component {} cleanup failed: {}", i, e);
183 }
184 }
185
186 info!("โ
Graceful shutdown completed successfully");
187 Ok(())
188 }
189
190 async fn execute_immediate_shutdown(&self) -> Result<()> {
192 info!("โก Starting immediate shutdown sequence");
193
194 let components = self.registered_components.lock().await.clone();
195
196 for (i, component) in components.iter().enumerate() {
198 debug!("Force cleaning up component {}", i);
199
200 if let Err(e) = timeout(Duration::from_secs(2), component.force_shutdown()).await {
201 error!("Component {} force_shutdown timed out: {:?}", i, e);
202 } else if let Err(e) = component.force_shutdown().await {
203 error!("Component {} force_shutdown failed: {}", i, e);
204 }
205 }
206
207 info!("โก Immediate shutdown completed");
208 Ok(())
209 }
210}
211
212impl Default for ShutdownManager {
213 fn default() -> Self {
214 Self::new(ShutdownConfig::default())
215 }
216}
217
218#[async_trait::async_trait]
220pub trait ShutdownHandler: Send + Sync {
221 async fn prepare_shutdown(&self) -> Result<()> {
223 Ok(())
224 }
225
226 async fn stop_accepting_work(&self) -> Result<()> {
228 Ok(())
229 }
230
231 async fn wait_for_completion(&self) -> Result<()> {
233 Ok(())
234 }
235
236 async fn cleanup(&self) -> Result<()> {
238 Ok(())
239 }
240
241 async fn force_shutdown(&self) -> Result<()> {
243 self.cleanup().await
244 }
245}
246
247pub async fn setup_signal_handling(shutdown_manager: Arc<ShutdownManager>) -> Result<()> {
249 #[cfg(unix)]
250 {
251 use tokio::signal::unix::{signal, SignalKind};
252
253 let mut sigterm = signal(SignalKind::terminate()).map_err(|e| {
254 RabbitError::Configuration(format!("Failed to setup SIGTERM handler: {}", e))
255 })?;
256 let mut sigint = signal(SignalKind::interrupt()).map_err(|e| {
257 RabbitError::Configuration(format!("Failed to setup SIGINT handler: {}", e))
258 })?;
259
260 let shutdown_manager_clone = shutdown_manager.clone();
261 tokio::spawn(async move {
262 tokio::select! {
263 _ = sigterm.recv() => {
264 info!("Received SIGTERM, initiating graceful shutdown");
265 if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Graceful).await {
266 error!("Failed to execute graceful shutdown: {}", e);
267 }
268 }
269 _ = sigint.recv() => {
270 info!("Received SIGINT (Ctrl+C), initiating graceful shutdown");
271 if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Graceful).await {
272 error!("Failed to execute graceful shutdown: {}", e);
273 }
274 }
275 }
276 });
277 }
278
279 #[cfg(windows)]
280 {
281 use tokio::signal::windows;
282
283 let mut ctrl_c = windows::ctrl_c().map_err(|e| {
284 RabbitError::Configuration(format!("Failed to setup Ctrl+C handler: {}", e))
285 })?;
286 let mut ctrl_break = windows::ctrl_break().map_err(|e| {
287 RabbitError::Configuration(format!("Failed to setup Ctrl+Break handler: {}", e))
288 })?;
289 let mut ctrl_close = windows::ctrl_close().map_err(|e| {
290 RabbitError::Configuration(format!("Failed to setup Ctrl+Close handler: {}", e))
291 })?;
292
293 let shutdown_manager_clone = shutdown_manager.clone();
294 tokio::spawn(async move {
295 tokio::select! {
296 _ = ctrl_c.recv() => {
297 info!("Received Ctrl+C, initiating graceful shutdown");
298 if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Graceful).await {
299 error!("Failed to execute graceful shutdown: {}", e);
300 }
301 }
302 _ = ctrl_break.recv() => {
303 info!("Received Ctrl+Break, initiating immediate shutdown");
304 if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Immediate).await {
305 error!("Failed to execute immediate shutdown: {}", e);
306 }
307 }
308 _ = ctrl_close.recv() => {
309 info!("Received close signal, initiating graceful shutdown");
310 if let Err(e) = shutdown_manager_clone.shutdown(ShutdownSignal::Graceful).await {
311 error!("Failed to execute graceful shutdown: {}", e);
312 }
313 }
314 }
315 });
316 }
317
318 info!("๐ก Signal handlers setup complete");
319 Ok(())
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use std::sync::atomic::{AtomicBool, Ordering};
326
327 #[derive(Debug)]
328 struct MockComponent {
329 shutdown_called: Arc<AtomicBool>,
330 }
331
332 impl MockComponent {
333 fn new() -> Self {
334 Self {
335 shutdown_called: Arc::new(AtomicBool::new(false)),
336 }
337 }
338
339 fn was_shutdown_called(&self) -> bool {
340 self.shutdown_called.load(Ordering::Relaxed)
341 }
342 }
343
344 #[async_trait::async_trait]
345 impl ShutdownHandler for MockComponent {
346 async fn cleanup(&self) -> Result<()> {
347 self.shutdown_called.store(true, Ordering::Relaxed);
348 Ok(())
349 }
350 }
351
352 #[tokio::test]
353 async fn test_shutdown_manager_creation() {
354 let config = ShutdownConfig::default();
355 let manager = ShutdownManager::new(config);
356
357 assert!(!manager.is_shutdown_in_progress().await);
358 }
359
360 #[tokio::test]
361 async fn test_component_registration() {
362 let manager = ShutdownManager::default();
363 let component = Arc::new(MockComponent::new());
364
365 manager.register_component(component.clone()).await;
366
367 let _ = manager.shutdown(ShutdownSignal::Graceful).await;
369
370 sleep(Duration::from_millis(100)).await;
372
373 assert!(component.was_shutdown_called());
374 }
375
376 #[tokio::test]
377 async fn test_shutdown_signal_subscription() {
378 let manager = ShutdownManager::default();
379 let mut receiver = manager.subscribe();
380
381 tokio::spawn(async move {
383 sleep(Duration::from_millis(10)).await;
384 let _ = manager.shutdown(ShutdownSignal::Graceful).await;
385 });
386
387 let signal = receiver.recv().await.unwrap();
389 matches!(signal, ShutdownSignal::Graceful);
390 }
391}