Skip to main content

quantum_log/
shutdown.rs

1//! 优雅停机模块
2//!
3//! 此模块提供了 QuantumLog 的优雅停机机制,确保所有日志都能被正确处理和刷新。
4
5use crate::error::{QuantumLogError, Result};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::{broadcast, Notify, RwLock};
9use tokio::time::timeout;
10use tracing::{error, info, warn};
11
12/// 停机句柄
13///
14/// 用于控制 QuantumLog 的优雅停机过程
15#[derive(Debug, Clone)]
16pub struct ShutdownHandle {
17    /// 停机信号发送器
18    shutdown_tx: broadcast::Sender<ShutdownSignal>,
19    /// 停机状态
20    state: Arc<RwLock<ShutdownState>>,
21    /// 停机完成通知
22    completion_notify: Arc<Notify>,
23    /// 停机超时时间
24    timeout_duration: Duration,
25}
26
27/// 停机信号
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum ShutdownSignal {
30    /// 优雅停机
31    Graceful,
32    /// 强制停机
33    Force,
34    /// 立即停机
35    Immediate,
36}
37
38/// 停机状态
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ShutdownState {
41    /// 运行中
42    Running,
43    /// 停机中
44    Shutting,
45    /// 已停机
46    Shutdown,
47    /// 停机失败
48    Failed(String),
49}
50
51/// 停机监听器
52///
53/// 用于接收停机信号的组件
54pub struct ShutdownListener {
55    /// 停机信号接收器
56    shutdown_rx: broadcast::Receiver<ShutdownSignal>,
57    /// 组件名称
58    component_name: String,
59}
60
61/// 停机统计信息
62#[derive(Debug, Clone, Default)]
63pub struct ShutdownStats {
64    /// 停机开始时间
65    pub start_time: Option<std::time::Instant>,
66    /// 停机完成时间
67    pub end_time: Option<std::time::Instant>,
68    /// 处理的日志数量
69    pub processed_logs: u64,
70    /// 刷新的批次数量
71    pub flushed_batches: u32,
72    /// 停机的组件数量
73    pub shutdown_components: u32,
74    /// 失败的组件数量
75    pub failed_components: u32,
76}
77
78impl ShutdownHandle {
79    /// 创建新的停机句柄
80    pub fn new(timeout_duration: Duration) -> Self {
81        let (shutdown_tx, _) = broadcast::channel(16);
82
83        Self {
84            shutdown_tx,
85            state: Arc::new(RwLock::new(ShutdownState::Running)),
86            completion_notify: Arc::new(Notify::new()),
87            timeout_duration,
88        }
89    }
90
91    /// 创建停机监听器
92    pub fn create_listener(&self, component_name: impl Into<String>) -> ShutdownListener {
93        ShutdownListener {
94            shutdown_rx: self.shutdown_tx.subscribe(),
95            component_name: component_name.into(),
96        }
97    }
98
99    /// 发起优雅停机
100    pub async fn shutdown_graceful(&self) -> Result<ShutdownStats> {
101        self.shutdown_with_signal(ShutdownSignal::Graceful).await
102    }
103
104    /// 发起强制停机
105    pub async fn shutdown_force(&self) -> Result<ShutdownStats> {
106        self.shutdown_with_signal(ShutdownSignal::Force).await
107    }
108
109    /// 发起立即停机
110    pub async fn shutdown_immediate(&self) -> Result<ShutdownStats> {
111        self.shutdown_with_signal(ShutdownSignal::Immediate).await
112    }
113
114    /// 使用指定信号停机
115    async fn shutdown_with_signal(&self, signal: ShutdownSignal) -> Result<ShutdownStats> {
116        // 检查当前状态
117        {
118            let mut state = self.state.write().await;
119            match *state {
120                ShutdownState::Running => {
121                    *state = ShutdownState::Shutting;
122                    info!("Starting {} shutdown", signal_name(&signal));
123                }
124                ShutdownState::Shutting => {
125                    warn!("Shutdown already in progress");
126                    return Err(QuantumLogError::ShutdownInProgress);
127                }
128                ShutdownState::Shutdown => {
129                    warn!("Already shutdown");
130                    return Err(QuantumLogError::AlreadyShutdown);
131                }
132                ShutdownState::Failed(ref reason) => {
133                    warn!("Previous shutdown failed: {}", reason);
134                    return Err(QuantumLogError::ShutdownFailed(reason.clone()));
135                }
136            }
137        }
138
139        let stats = Arc::new(RwLock::new(ShutdownStats {
140            start_time: Some(std::time::Instant::now()),
141            ..Default::default()
142        }));
143
144        // 发送停机信号
145        if let Err(e) = self.shutdown_tx.send(signal.clone()) {
146            error!("Failed to send shutdown signal: {}", e);
147            let mut state = self.state.write().await;
148            *state = ShutdownState::Failed(format!("Failed to send signal: {}", e));
149            return Err(QuantumLogError::ShutdownFailed(format!(
150                "Signal send failed: {}",
151                e
152            )));
153        }
154
155        // 等待停机完成或超时
156        let result = match signal {
157            ShutdownSignal::Immediate => {
158                // 立即停机不等待
159                Ok(())
160            }
161            _ => {
162                // 等待停机完成
163                timeout(self.timeout_duration, self.completion_notify.notified())
164                    .await
165                    .map_err(|_| QuantumLogError::ShutdownTimeout)
166            }
167        };
168
169        // 更新状态和统计信息
170        let final_stats = {
171            let mut stats_guard = stats.write().await;
172            stats_guard.end_time = Some(std::time::Instant::now());
173            stats_guard.clone()
174        };
175
176        match result {
177            Ok(_) => {
178                let mut state = self.state.write().await;
179                *state = ShutdownState::Shutdown;
180                info!("Shutdown completed successfully");
181                Ok(final_stats)
182            }
183            Err(e) => {
184                let mut state = self.state.write().await;
185                *state = ShutdownState::Failed(e.to_string());
186                error!("Shutdown failed: {}", e);
187                Err(e)
188            }
189        }
190    }
191
192    /// 通知停机完成
193    pub fn notify_completion(&self) {
194        self.completion_notify.notify_waiters();
195    }
196
197    /// 获取当前停机状态
198    pub async fn get_state(&self) -> ShutdownState {
199        self.state.read().await.clone()
200    }
201
202    /// 检查是否正在停机
203    pub async fn is_shutting_down(&self) -> bool {
204        matches!(
205            *self.state.read().await,
206            ShutdownState::Shutting | ShutdownState::Shutdown
207        )
208    }
209
210    /// 检查是否已停机
211    pub async fn is_shutdown(&self) -> bool {
212        matches!(*self.state.read().await, ShutdownState::Shutdown)
213    }
214
215    /// 设置超时时间
216    pub fn set_timeout(&mut self, timeout: Duration) {
217        self.timeout_duration = timeout;
218    }
219}
220
221impl ShutdownListener {
222    /// 等待停机信号
223    pub async fn wait_for_shutdown(&mut self) -> Result<ShutdownSignal> {
224        match self.shutdown_rx.recv().await {
225            Ok(signal) => {
226                info!(
227                    "Component '{}' received shutdown signal: {:?}",
228                    self.component_name, signal
229                );
230                Ok(signal)
231            }
232            Err(broadcast::error::RecvError::Closed) => {
233                warn!(
234                    "Shutdown channel closed for component '{}'",
235                    self.component_name
236                );
237                Err(QuantumLogError::ShutdownChannelClosed)
238            }
239            Err(broadcast::error::RecvError::Lagged(skipped)) => {
240                warn!(
241                    "Component '{}' lagged behind, skipped {} signals",
242                    self.component_name, skipped
243                );
244                // 尝试再次接收
245                Box::pin(self.wait_for_shutdown()).await
246            }
247        }
248    }
249
250    /// 非阻塞检查停机信号
251    pub fn try_recv_shutdown(&mut self) -> Option<ShutdownSignal> {
252        match self.shutdown_rx.try_recv() {
253            Ok(signal) => {
254                info!(
255                    "Component '{}' received shutdown signal: {:?}",
256                    self.component_name, signal
257                );
258                Some(signal)
259            }
260            Err(broadcast::error::TryRecvError::Empty) => None,
261            Err(broadcast::error::TryRecvError::Closed) => {
262                warn!(
263                    "Shutdown channel closed for component '{}'",
264                    self.component_name
265                );
266                Some(ShutdownSignal::Immediate)
267            }
268            Err(broadcast::error::TryRecvError::Lagged(skipped)) => {
269                warn!(
270                    "Component '{}' lagged behind, skipped {} signals",
271                    self.component_name, skipped
272                );
273                // 返回强制停机信号
274                Some(ShutdownSignal::Force)
275            }
276        }
277    }
278
279    /// 获取组件名称
280    pub fn component_name(&self) -> &str {
281        &self.component_name
282    }
283}
284
285/// 获取信号名称
286fn signal_name(signal: &ShutdownSignal) -> &'static str {
287    match signal {
288        ShutdownSignal::Graceful => "graceful",
289        ShutdownSignal::Force => "force",
290        ShutdownSignal::Immediate => "immediate",
291    }
292}
293
294/// 停机超时配置
295#[derive(Debug, Clone)]
296pub struct ShutdownTimeouts {
297    /// 优雅停机超时
298    pub graceful: Duration,
299    /// 强制停机超时
300    pub force: Duration,
301    /// 组件停机超时
302    pub component: Duration,
303}
304
305impl Default for ShutdownTimeouts {
306    fn default() -> Self {
307        Self {
308            graceful: Duration::from_secs(30),
309            force: Duration::from_secs(10),
310            component: Duration::from_secs(5),
311        }
312    }
313}
314
315/// 停机协调器
316///
317/// 协调多个组件的停机过程
318pub struct ShutdownCoordinator {
319    /// 停机句柄
320    handle: ShutdownHandle,
321    /// 注册的组件
322    components: Arc<RwLock<Vec<String>>>,
323}
324
325impl ShutdownCoordinator {
326    /// 创建新的停机协调器
327    pub fn new(timeouts: ShutdownTimeouts) -> Self {
328        Self {
329            handle: ShutdownHandle::new(timeouts.graceful),
330            components: Arc::new(RwLock::new(Vec::new())),
331        }
332    }
333
334    /// 注册组件
335    pub async fn register_component(&self, name: impl Into<String>) -> ShutdownListener {
336        let name = name.into();
337        self.components.write().await.push(name.clone());
338        self.handle.create_listener(name)
339    }
340
341    /// 获取停机句柄
342    pub fn handle(&self) -> &ShutdownHandle {
343        &self.handle
344    }
345
346    /// 获取注册的组件列表
347    pub async fn get_components(&self) -> Vec<String> {
348        self.components.read().await.clone()
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use tokio::time::sleep;
356
357    #[tokio::test]
358    async fn test_shutdown_handle_creation() {
359        let handle = ShutdownHandle::new(Duration::from_secs(5));
360        assert!(matches!(handle.get_state().await, ShutdownState::Running));
361    }
362
363    #[tokio::test]
364    async fn test_shutdown_listener() {
365        let handle = ShutdownHandle::new(Duration::from_secs(5));
366        let mut listener = handle.create_listener("test_component");
367
368        // 在后台发送停机信号
369        let handle_clone = handle.clone();
370        tokio::spawn(async move {
371            sleep(Duration::from_millis(100)).await;
372            let _ = handle_clone.shutdown_tx.send(ShutdownSignal::Graceful);
373        });
374
375        // 等待停机信号
376        let signal = listener.wait_for_shutdown().await.unwrap();
377        assert_eq!(signal, ShutdownSignal::Graceful);
378    }
379
380    #[tokio::test]
381    async fn test_shutdown_coordinator() {
382        let coordinator = ShutdownCoordinator::new(ShutdownTimeouts::default());
383
384        let _listener1 = coordinator.register_component("component1").await;
385        let _listener2 = coordinator.register_component("component2").await;
386
387        let components = coordinator.get_components().await;
388        assert_eq!(components.len(), 2);
389        assert!(components.contains(&"component1".to_string()));
390        assert!(components.contains(&"component2".to_string()));
391    }
392
393    #[tokio::test]
394    async fn test_shutdown_states() {
395        let handle = ShutdownHandle::new(Duration::from_secs(1));
396
397        assert!(!handle.is_shutting_down().await);
398        assert!(!handle.is_shutdown().await);
399
400        // 模拟停机完成
401        handle.notify_completion();
402
403        // 注意:这个测试可能需要调整,因为状态变化是在 shutdown_graceful 中进行的
404    }
405}