ringkernel_core/
shutdown.rs1use std::sync::atomic::{AtomicBool, Ordering};
48use std::sync::Arc;
49use std::time::Duration;
50
51use tokio::sync::watch;
52use tokio::task::JoinHandle;
53
54#[derive(Clone)]
64pub struct ShutdownSignal {
65 requested: Arc<AtomicBool>,
67 rx: watch::Receiver<bool>,
69}
70
71impl ShutdownSignal {
72 pub fn is_shutdown_requested(&self) -> bool {
77 self.requested.load(Ordering::SeqCst)
78 }
79
80 pub async fn wait(&self) {
85 if self.is_shutdown_requested() {
87 return;
88 }
89
90 let mut rx = self.rx.clone();
91 loop {
94 if *rx.borrow_and_update() {
95 return;
96 }
97 if rx.changed().await.is_err() {
98 return;
100 }
101 }
102 }
103}
104
105impl std::fmt::Debug for ShutdownSignal {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct("ShutdownSignal")
108 .field("requested", &self.is_shutdown_requested())
109 .finish()
110 }
111}
112
113pub struct GracefulShutdown {
123 requested: Arc<AtomicBool>,
125 tx: watch::Sender<bool>,
127 rx: watch::Receiver<bool>,
129 grace_period: Duration,
131}
132
133impl GracefulShutdown {
134 pub fn new(grace_period: Duration) -> Self {
140 let (tx, rx) = watch::channel(false);
141 Self {
142 requested: Arc::new(AtomicBool::new(false)),
143 tx,
144 rx,
145 grace_period,
146 }
147 }
148
149 pub fn with_default_grace_period() -> Self {
151 Self::new(Duration::from_secs(5))
152 }
153
154 pub fn grace_period(&self) -> Duration {
156 self.grace_period
157 }
158
159 pub fn is_shutdown_requested(&self) -> bool {
161 self.requested.load(Ordering::SeqCst)
162 }
163
164 pub fn signal(&self) -> ShutdownSignal {
167 ShutdownSignal {
168 requested: Arc::clone(&self.requested),
169 rx: self.rx.clone(),
170 }
171 }
172
173 pub fn trigger(&self) {
175 self.requested.store(true, Ordering::SeqCst);
176 let _ = self.tx.send(true);
177 }
178
179 pub fn install(self) -> ShutdownGuard {
188 let requested = Arc::clone(&self.requested);
189 let tx = self.tx;
190 let grace_period = self.grace_period;
191 let signal = ShutdownSignal {
192 requested: Arc::clone(&self.requested),
193 rx: self.rx.clone(),
194 };
195
196 let handle = tokio::spawn(async move {
197 let sigint = tokio::signal::ctrl_c();
199
200 #[cfg(unix)]
201 {
202 let mut sigterm =
203 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
204 .expect("failed to register SIGTERM handler");
205
206 tokio::select! {
207 _ = sigint => {
208 tracing::info!("Received SIGINT (Ctrl-C), initiating graceful shutdown");
209 }
210 _ = sigterm.recv() => {
211 tracing::info!("Received SIGTERM, initiating graceful shutdown");
212 }
213 }
214 }
215
216 #[cfg(not(unix))]
217 {
218 let _ = sigint.await;
220 tracing::info!("Received Ctrl-C, initiating graceful shutdown");
221 }
222
223 requested.store(true, Ordering::SeqCst);
225 let _ = tx.send(true);
226
227 tracing::info!(
229 grace_period_secs = grace_period.as_secs_f64(),
230 "Shutdown signal sent; grace period started"
231 );
232 tokio::time::sleep(grace_period).await;
233 tracing::warn!("Grace period elapsed; force termination may follow");
234 });
235
236 ShutdownGuard { handle, signal }
237 }
238}
239
240impl std::fmt::Debug for GracefulShutdown {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 f.debug_struct("GracefulShutdown")
243 .field("requested", &self.is_shutdown_requested())
244 .field("grace_period", &self.grace_period)
245 .finish()
246 }
247}
248
249pub struct ShutdownGuard {
259 handle: JoinHandle<()>,
261 signal: ShutdownSignal,
263}
264
265impl ShutdownGuard {
266 pub fn signal(&self) -> ShutdownSignal {
268 self.signal.clone()
269 }
270
271 pub async fn wait(self) {
277 let _ = self.handle.await;
278 }
279
280 pub fn cancel(self) {
282 self.handle.abort();
283 }
284}
285
286impl std::fmt::Debug for ShutdownGuard {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 f.debug_struct("ShutdownGuard")
289 .field("signal", &self.signal)
290 .field("finished", &self.handle.is_finished())
291 .finish()
292 }
293}
294
295#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn test_default_grace_period() {
305 let shutdown = GracefulShutdown::with_default_grace_period();
306 assert_eq!(shutdown.grace_period(), Duration::from_secs(5));
307 }
308
309 #[test]
310 fn test_custom_grace_period() {
311 let shutdown = GracefulShutdown::new(Duration::from_secs(30));
312 assert_eq!(shutdown.grace_period(), Duration::from_secs(30));
313 }
314
315 #[test]
316 fn test_not_requested_initially() {
317 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
318 assert!(!shutdown.is_shutdown_requested());
319
320 let signal = shutdown.signal();
321 assert!(!signal.is_shutdown_requested());
322 }
323
324 #[test]
325 fn test_manual_trigger() {
326 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
327 let signal = shutdown.signal();
328
329 assert!(!signal.is_shutdown_requested());
330 shutdown.trigger();
331 assert!(signal.is_shutdown_requested());
332 assert!(shutdown.is_shutdown_requested());
333 }
334
335 #[test]
336 fn test_signal_clone_shares_state() {
337 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
338 let s1 = shutdown.signal();
339 let s2 = s1.clone();
340 let s3 = shutdown.signal();
341
342 assert!(!s1.is_shutdown_requested());
343 assert!(!s2.is_shutdown_requested());
344 assert!(!s3.is_shutdown_requested());
345
346 shutdown.trigger();
347
348 assert!(s1.is_shutdown_requested());
349 assert!(s2.is_shutdown_requested());
350 assert!(s3.is_shutdown_requested());
351 }
352
353 #[tokio::test]
354 async fn test_signal_wait_resolves_on_trigger() {
355 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
356 let signal = shutdown.signal();
357
358 let trigger_handle = tokio::spawn(async move {
360 tokio::time::sleep(Duration::from_millis(50)).await;
361 shutdown.trigger();
362 });
363
364 tokio::time::timeout(Duration::from_secs(2), signal.wait())
366 .await
367 .expect("signal.wait() should have resolved within timeout");
368
369 trigger_handle.await.unwrap();
370 }
371
372 #[tokio::test]
373 async fn test_signal_wait_returns_immediately_if_already_triggered() {
374 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
375 let signal = shutdown.signal();
376 shutdown.trigger();
377
378 tokio::time::timeout(Duration::from_millis(100), signal.wait())
380 .await
381 .expect("signal.wait() should return immediately when already triggered");
382 }
383
384 #[tokio::test]
385 async fn test_guard_signal() {
386 let shutdown = GracefulShutdown::new(Duration::from_millis(50));
387 let signal = shutdown.signal();
388 let guard = shutdown.install();
389
390 let guard_signal = guard.signal();
392 assert!(!guard_signal.is_shutdown_requested());
393 assert!(!signal.is_shutdown_requested());
394
395 guard.cancel();
397 }
398
399 #[tokio::test]
400 async fn test_guard_cancel() {
401 let shutdown = GracefulShutdown::new(Duration::from_secs(60));
402 let guard = shutdown.install();
403
404 guard.cancel();
406 }
407
408 #[tokio::test]
409 async fn test_multiple_signals_from_same_shutdown() {
410 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
411
412 let signals: Vec<_> = (0..10).map(|_| shutdown.signal()).collect();
413
414 for s in &signals {
415 assert!(!s.is_shutdown_requested());
416 }
417
418 shutdown.trigger();
419
420 for s in &signals {
421 assert!(s.is_shutdown_requested());
422 }
423 }
424
425 #[tokio::test]
426 async fn test_wait_with_dropped_sender() {
427 let signal = {
430 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
431 shutdown.signal()
432 };
434
435 tokio::time::timeout(Duration::from_millis(100), signal.wait())
436 .await
437 .expect("signal.wait() should resolve when sender is dropped");
438 }
439
440 #[test]
441 fn test_debug_impls() {
442 let shutdown = GracefulShutdown::new(Duration::from_secs(5));
443 let debug_str = format!("{:?}", shutdown);
444 assert!(debug_str.contains("GracefulShutdown"));
445 assert!(debug_str.contains("requested"));
446
447 let signal = shutdown.signal();
448 let debug_str = format!("{:?}", signal);
449 assert!(debug_str.contains("ShutdownSignal"));
450 }
451
452 #[tokio::test]
453 async fn test_guard_debug() {
454 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
455 let guard = shutdown.install();
456 let debug_str = format!("{:?}", guard);
457 assert!(debug_str.contains("ShutdownGuard"));
458 guard.cancel();
459 }
460
461 #[tokio::test]
462 async fn test_concurrent_trigger_is_safe() {
463 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
464 let signal = shutdown.signal();
465
466 let shutdown = Arc::new(shutdown);
468 let mut handles = Vec::new();
469 for _ in 0..10 {
470 let s = Arc::clone(&shutdown);
471 handles.push(tokio::spawn(async move {
472 s.trigger();
473 }));
474 }
475 for h in handles {
476 h.await.unwrap();
477 }
478
479 assert!(signal.is_shutdown_requested());
480 }
481
482 #[tokio::test]
483 async fn test_signal_wait_multiple_waiters() {
484 let shutdown = GracefulShutdown::new(Duration::from_secs(1));
485 let mut handles = Vec::new();
486
487 for _ in 0..5 {
488 let signal = shutdown.signal();
489 handles.push(tokio::spawn(async move {
490 signal.wait().await;
491 }));
492 }
493
494 tokio::time::sleep(Duration::from_millis(50)).await;
496 shutdown.trigger();
497
498 for h in handles {
500 tokio::time::timeout(Duration::from_secs(2), h)
501 .await
502 .expect("waiter should complete")
503 .expect("waiter task should not panic");
504 }
505 }
506}