gatel_core/server/
graceful.rs1use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::time::Duration;
9
10use tokio::sync::watch;
11use tracing::{debug, info, warn};
12
13#[derive(Clone)]
15pub struct GracefulShutdown {
16 active: Arc<AtomicUsize>,
17 shutdown_tx: Arc<watch::Sender<bool>>,
18 shutdown_rx: watch::Receiver<bool>,
19 grace_period: Duration,
20}
21
22impl GracefulShutdown {
23 pub fn new(grace_period: Duration) -> Self {
24 let (shutdown_tx, shutdown_rx) = watch::channel(false);
25 Self {
26 active: Arc::new(AtomicUsize::new(0)),
27 shutdown_tx: Arc::new(shutdown_tx),
28 shutdown_rx,
29 grace_period,
30 }
31 }
32
33 pub fn shutdown(&self) {
34 let _ = self.shutdown_tx.send(true);
35 info!("graceful shutdown initiated");
36 }
37
38 pub fn is_shutdown(&self) -> bool {
39 *self.shutdown_rx.borrow()
40 }
41
42 pub fn subscribe(&self) -> watch::Receiver<bool> {
43 self.shutdown_rx.clone()
44 }
45
46 pub fn track_conn(&self) -> ConnectionGuard {
47 self.active.fetch_add(1, Ordering::Relaxed);
48 ConnectionGuard {
49 active: Arc::clone(&self.active),
50 }
51 }
52
53 pub fn active_connections(&self) -> usize {
54 self.active.load(Ordering::Relaxed)
55 }
56
57 pub async fn drain(&self) -> bool {
58 let deadline = tokio::time::Instant::now() + self.grace_period;
59
60 loop {
61 let active = self.active.load(Ordering::Relaxed);
62 if active == 0 {
63 info!("all connections drained");
64 return true;
65 }
66
67 if tokio::time::Instant::now() >= deadline {
68 warn!(
69 remaining = active,
70 "grace period expired, forcing shutdown of remaining connections"
71 );
72 return false;
73 }
74
75 debug!(active, "waiting for connections to drain...");
76 tokio::time::sleep(Duration::from_millis(100)).await;
77 }
78 }
79}
80
81pub struct ConnectionGuard {
83 active: Arc<AtomicUsize>,
84}
85
86impl Drop for ConnectionGuard {
87 fn drop(&mut self) {
88 self.active.fetch_sub(1, Ordering::Relaxed);
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95
96 #[test]
97 fn track_and_drop() {
98 let shutdown = GracefulShutdown::new(Duration::from_secs(5));
99 assert_eq!(shutdown.active_connections(), 0);
100 let guard_a = shutdown.track_conn();
101 let guard_b = shutdown.track_conn();
102 assert_eq!(shutdown.active_connections(), 2);
103 drop(guard_a);
104 assert_eq!(shutdown.active_connections(), 1);
105 drop(guard_b);
106 assert_eq!(shutdown.active_connections(), 0);
107 }
108
109 #[test]
110 fn shutdown_signal() {
111 let shutdown = GracefulShutdown::new(Duration::from_secs(5));
112 assert!(!shutdown.is_shutdown());
113 shutdown.shutdown();
114 assert!(shutdown.is_shutdown());
115 }
116
117 #[tokio::test]
118 async fn drain_with_no_connections() {
119 let shutdown = GracefulShutdown::new(Duration::from_millis(100));
120 assert!(shutdown.drain().await);
121 }
122}