amaters_server/
shutdown.rs1use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use tokio::sync::broadcast;
9use tracing::{info, warn};
10
11#[derive(Clone)]
15pub struct ShutdownCoordinator {
16 sender: broadcast::Sender<()>,
18 shutdown_initiated: Arc<AtomicBool>,
20}
21
22impl ShutdownCoordinator {
23 pub fn new() -> Self {
25 let (sender, _) = broadcast::channel(16);
26 Self {
27 sender,
28 shutdown_initiated: Arc::new(AtomicBool::new(false)),
29 }
30 }
31
32 pub fn subscribe(&self) -> broadcast::Receiver<()> {
34 self.sender.subscribe()
35 }
36
37 pub fn shutdown(&self) {
39 if self.shutdown_initiated.swap(true, Ordering::SeqCst) {
40 return;
42 }
43
44 info!("Initiating graceful shutdown");
45
46 if let Err(e) = self.sender.send(()) {
48 warn!("Failed to broadcast shutdown signal: {}", e);
49 }
50 }
51
52 pub fn is_shutting_down(&self) -> bool {
54 self.shutdown_initiated.load(Ordering::SeqCst)
55 }
56}
57
58impl Default for ShutdownCoordinator {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64pub async fn setup_signal_handlers(coordinator: ShutdownCoordinator) {
68 tokio::spawn(async move {
69 if let Err(e) = wait_for_signal().await {
70 warn!("Error setting up signal handlers: {}", e);
71 return;
72 }
73
74 info!("Received shutdown signal");
75 coordinator.shutdown();
76 });
77}
78
79async fn wait_for_signal() -> Result<(), std::io::Error> {
81 #[cfg(unix)]
82 {
83 use tokio::signal::unix::{SignalKind, signal};
84
85 let mut sigterm = signal(SignalKind::terminate())?;
86 let mut sigint = signal(SignalKind::interrupt())?;
87
88 tokio::select! {
89 _ = sigterm.recv() => {
90 info!("Received SIGTERM");
91 }
92 _ = sigint.recv() => {
93 info!("Received SIGINT");
94 }
95 }
96 }
97
98 #[cfg(not(unix))]
99 {
100 use tokio::signal;
101 signal::ctrl_c().await?;
102 info!("Received Ctrl+C");
103 }
104
105 Ok(())
106}
107
108pub struct ShutdownGuard {
112 coordinator: ShutdownCoordinator,
113 disarmed: Arc<AtomicBool>,
114}
115
116impl ShutdownGuard {
117 pub fn new(coordinator: ShutdownCoordinator) -> Self {
119 Self {
120 coordinator,
121 disarmed: Arc::new(AtomicBool::new(false)),
122 }
123 }
124
125 pub fn disarm(&self) {
127 self.disarmed.store(true, Ordering::SeqCst);
128 }
129}
130
131impl Drop for ShutdownGuard {
132 fn drop(&mut self) {
133 if !self.disarmed.load(Ordering::SeqCst) {
134 warn!("ShutdownGuard dropped without disarming - triggering shutdown");
135 self.coordinator.shutdown();
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use std::time::Duration;
144 use tokio::time::timeout;
145
146 #[tokio::test]
147 async fn test_shutdown_coordinator() {
148 let coordinator = ShutdownCoordinator::new();
149 let mut receiver = coordinator.subscribe();
150
151 assert!(!coordinator.is_shutting_down());
152
153 coordinator.shutdown();
154
155 assert!(coordinator.is_shutting_down());
156
157 let result = timeout(Duration::from_millis(100), receiver.recv()).await;
159 assert!(result.is_ok());
160 }
161
162 #[tokio::test]
163 async fn test_multiple_subscribers() {
164 let coordinator = ShutdownCoordinator::new();
165 let mut rx1 = coordinator.subscribe();
166 let mut rx2 = coordinator.subscribe();
167 let mut rx3 = coordinator.subscribe();
168
169 coordinator.shutdown();
170
171 assert!(
173 timeout(Duration::from_millis(100), rx1.recv())
174 .await
175 .is_ok()
176 );
177 assert!(
178 timeout(Duration::from_millis(100), rx2.recv())
179 .await
180 .is_ok()
181 );
182 assert!(
183 timeout(Duration::from_millis(100), rx3.recv())
184 .await
185 .is_ok()
186 );
187 }
188
189 #[tokio::test]
190 async fn test_shutdown_idempotent() {
191 let coordinator = ShutdownCoordinator::new();
192
193 coordinator.shutdown();
194 coordinator.shutdown(); assert!(coordinator.is_shutting_down());
197 }
198
199 #[test]
200 fn test_shutdown_guard_disarm() {
201 let coordinator = ShutdownCoordinator::new();
202 let guard = ShutdownGuard::new(coordinator.clone());
203
204 guard.disarm();
205 drop(guard);
206
207 assert!(!coordinator.is_shutting_down());
209 }
210
211 #[test]
212 fn test_shutdown_guard_trigger() {
213 let coordinator = ShutdownCoordinator::new();
214 let guard = ShutdownGuard::new(coordinator.clone());
215
216 drop(guard);
217
218 assert!(coordinator.is_shutting_down());
220 }
221}