1use crate::error::{QuantumLogError, Result};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::{broadcast, Notify, RwLock};
9use tokio::time::timeout;
10use tracing::{error, info, warn};
11
12#[derive(Debug, Clone)]
16pub struct ShutdownHandle {
17 shutdown_tx: broadcast::Sender<ShutdownSignal>,
19 state: Arc<RwLock<ShutdownState>>,
21 completion_notify: Arc<Notify>,
23 timeout_duration: Duration,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum ShutdownSignal {
30 Graceful,
32 Force,
34 Immediate,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ShutdownState {
41 Running,
43 Shutting,
45 Shutdown,
47 Failed(String),
49}
50
51pub struct ShutdownListener {
55 shutdown_rx: broadcast::Receiver<ShutdownSignal>,
57 component_name: String,
59}
60
61#[derive(Debug, Clone, Default)]
63pub struct ShutdownStats {
64 pub start_time: Option<std::time::Instant>,
66 pub end_time: Option<std::time::Instant>,
68 pub processed_logs: u64,
70 pub flushed_batches: u32,
72 pub shutdown_components: u32,
74 pub failed_components: u32,
76}
77
78impl ShutdownHandle {
79 pub fn new(timeout_duration: Duration) -> Self {
81 let (shutdown_tx, _) = broadcast::channel(16);
82
83 Self {
84 shutdown_tx,
85 state: Arc::new(RwLock::new(ShutdownState::Running)),
86 completion_notify: Arc::new(Notify::new()),
87 timeout_duration,
88 }
89 }
90
91 pub fn create_listener(&self, component_name: impl Into<String>) -> ShutdownListener {
93 ShutdownListener {
94 shutdown_rx: self.shutdown_tx.subscribe(),
95 component_name: component_name.into(),
96 }
97 }
98
99 pub async fn shutdown_graceful(&self) -> Result<ShutdownStats> {
101 self.shutdown_with_signal(ShutdownSignal::Graceful).await
102 }
103
104 pub async fn shutdown_force(&self) -> Result<ShutdownStats> {
106 self.shutdown_with_signal(ShutdownSignal::Force).await
107 }
108
109 pub async fn shutdown_immediate(&self) -> Result<ShutdownStats> {
111 self.shutdown_with_signal(ShutdownSignal::Immediate).await
112 }
113
114 async fn shutdown_with_signal(&self, signal: ShutdownSignal) -> Result<ShutdownStats> {
116 {
118 let mut state = self.state.write().await;
119 match *state {
120 ShutdownState::Running => {
121 *state = ShutdownState::Shutting;
122 info!("Starting {} shutdown", signal_name(&signal));
123 }
124 ShutdownState::Shutting => {
125 warn!("Shutdown already in progress");
126 return Err(QuantumLogError::ShutdownInProgress);
127 }
128 ShutdownState::Shutdown => {
129 warn!("Already shutdown");
130 return Err(QuantumLogError::AlreadyShutdown);
131 }
132 ShutdownState::Failed(ref reason) => {
133 warn!("Previous shutdown failed: {}", reason);
134 return Err(QuantumLogError::ShutdownFailed(reason.clone()));
135 }
136 }
137 }
138
139 let stats = Arc::new(RwLock::new(ShutdownStats {
140 start_time: Some(std::time::Instant::now()),
141 ..Default::default()
142 }));
143
144 if let Err(e) = self.shutdown_tx.send(signal.clone()) {
146 error!("Failed to send shutdown signal: {}", e);
147 let mut state = self.state.write().await;
148 *state = ShutdownState::Failed(format!("Failed to send signal: {}", e));
149 return Err(QuantumLogError::ShutdownFailed(format!(
150 "Signal send failed: {}",
151 e
152 )));
153 }
154
155 let result = match signal {
157 ShutdownSignal::Immediate => {
158 Ok(())
160 }
161 _ => {
162 timeout(self.timeout_duration, self.completion_notify.notified())
164 .await
165 .map_err(|_| QuantumLogError::ShutdownTimeout)
166 }
167 };
168
169 let final_stats = {
171 let mut stats_guard = stats.write().await;
172 stats_guard.end_time = Some(std::time::Instant::now());
173 stats_guard.clone()
174 };
175
176 match result {
177 Ok(_) => {
178 let mut state = self.state.write().await;
179 *state = ShutdownState::Shutdown;
180 info!("Shutdown completed successfully");
181 Ok(final_stats)
182 }
183 Err(e) => {
184 let mut state = self.state.write().await;
185 *state = ShutdownState::Failed(e.to_string());
186 error!("Shutdown failed: {}", e);
187 Err(e)
188 }
189 }
190 }
191
192 pub fn notify_completion(&self) {
194 self.completion_notify.notify_waiters();
195 }
196
197 pub async fn get_state(&self) -> ShutdownState {
199 self.state.read().await.clone()
200 }
201
202 pub async fn is_shutting_down(&self) -> bool {
204 matches!(
205 *self.state.read().await,
206 ShutdownState::Shutting | ShutdownState::Shutdown
207 )
208 }
209
210 pub async fn is_shutdown(&self) -> bool {
212 matches!(*self.state.read().await, ShutdownState::Shutdown)
213 }
214
215 pub fn set_timeout(&mut self, timeout: Duration) {
217 self.timeout_duration = timeout;
218 }
219}
220
221impl ShutdownListener {
222 pub async fn wait_for_shutdown(&mut self) -> Result<ShutdownSignal> {
224 match self.shutdown_rx.recv().await {
225 Ok(signal) => {
226 info!(
227 "Component '{}' received shutdown signal: {:?}",
228 self.component_name, signal
229 );
230 Ok(signal)
231 }
232 Err(broadcast::error::RecvError::Closed) => {
233 warn!(
234 "Shutdown channel closed for component '{}'",
235 self.component_name
236 );
237 Err(QuantumLogError::ShutdownChannelClosed)
238 }
239 Err(broadcast::error::RecvError::Lagged(skipped)) => {
240 warn!(
241 "Component '{}' lagged behind, skipped {} signals",
242 self.component_name, skipped
243 );
244 Box::pin(self.wait_for_shutdown()).await
246 }
247 }
248 }
249
250 pub fn try_recv_shutdown(&mut self) -> Option<ShutdownSignal> {
252 match self.shutdown_rx.try_recv() {
253 Ok(signal) => {
254 info!(
255 "Component '{}' received shutdown signal: {:?}",
256 self.component_name, signal
257 );
258 Some(signal)
259 }
260 Err(broadcast::error::TryRecvError::Empty) => None,
261 Err(broadcast::error::TryRecvError::Closed) => {
262 warn!(
263 "Shutdown channel closed for component '{}'",
264 self.component_name
265 );
266 Some(ShutdownSignal::Immediate)
267 }
268 Err(broadcast::error::TryRecvError::Lagged(skipped)) => {
269 warn!(
270 "Component '{}' lagged behind, skipped {} signals",
271 self.component_name, skipped
272 );
273 Some(ShutdownSignal::Force)
275 }
276 }
277 }
278
279 pub fn component_name(&self) -> &str {
281 &self.component_name
282 }
283}
284
285fn signal_name(signal: &ShutdownSignal) -> &'static str {
287 match signal {
288 ShutdownSignal::Graceful => "graceful",
289 ShutdownSignal::Force => "force",
290 ShutdownSignal::Immediate => "immediate",
291 }
292}
293
294#[derive(Debug, Clone)]
296pub struct ShutdownTimeouts {
297 pub graceful: Duration,
299 pub force: Duration,
301 pub component: Duration,
303}
304
305impl Default for ShutdownTimeouts {
306 fn default() -> Self {
307 Self {
308 graceful: Duration::from_secs(30),
309 force: Duration::from_secs(10),
310 component: Duration::from_secs(5),
311 }
312 }
313}
314
315pub struct ShutdownCoordinator {
319 handle: ShutdownHandle,
321 components: Arc<RwLock<Vec<String>>>,
323}
324
325impl ShutdownCoordinator {
326 pub fn new(timeouts: ShutdownTimeouts) -> Self {
328 Self {
329 handle: ShutdownHandle::new(timeouts.graceful),
330 components: Arc::new(RwLock::new(Vec::new())),
331 }
332 }
333
334 pub async fn register_component(&self, name: impl Into<String>) -> ShutdownListener {
336 let name = name.into();
337 self.components.write().await.push(name.clone());
338 self.handle.create_listener(name)
339 }
340
341 pub fn handle(&self) -> &ShutdownHandle {
343 &self.handle
344 }
345
346 pub async fn get_components(&self) -> Vec<String> {
348 self.components.read().await.clone()
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use tokio::time::sleep;
356
357 #[tokio::test]
358 async fn test_shutdown_handle_creation() {
359 let handle = ShutdownHandle::new(Duration::from_secs(5));
360 assert!(matches!(handle.get_state().await, ShutdownState::Running));
361 }
362
363 #[tokio::test]
364 async fn test_shutdown_listener() {
365 let handle = ShutdownHandle::new(Duration::from_secs(5));
366 let mut listener = handle.create_listener("test_component");
367
368 let handle_clone = handle.clone();
370 tokio::spawn(async move {
371 sleep(Duration::from_millis(100)).await;
372 let _ = handle_clone.shutdown_tx.send(ShutdownSignal::Graceful);
373 });
374
375 let signal = listener.wait_for_shutdown().await.unwrap();
377 assert_eq!(signal, ShutdownSignal::Graceful);
378 }
379
380 #[tokio::test]
381 async fn test_shutdown_coordinator() {
382 let coordinator = ShutdownCoordinator::new(ShutdownTimeouts::default());
383
384 let _listener1 = coordinator.register_component("component1").await;
385 let _listener2 = coordinator.register_component("component2").await;
386
387 let components = coordinator.get_components().await;
388 assert_eq!(components.len(), 2);
389 assert!(components.contains(&"component1".to_string()));
390 assert!(components.contains(&"component2".to_string()));
391 }
392
393 #[tokio::test]
394 async fn test_shutdown_states() {
395 let handle = ShutdownHandle::new(Duration::from_secs(1));
396
397 assert!(!handle.is_shutting_down().await);
398 assert!(!handle.is_shutdown().await);
399
400 handle.notify_completion();
402
403 }
405}