Skip to main content

gatel_core/server/
graceful.rs

1//! Graceful shutdown coordinator.
2//!
3//! Tracks active connections and provides a mechanism to drain them on
4//! shutdown.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::time::Duration;
9
10use tokio::sync::watch;
11use tracing::{debug, info, warn};
12
13/// Coordinates graceful shutdown across the server.
14#[derive(Clone)]
15pub struct GracefulShutdown {
16    active: Arc<AtomicUsize>,
17    shutdown_tx: Arc<watch::Sender<bool>>,
18    shutdown_rx: watch::Receiver<bool>,
19    grace_period: Duration,
20}
21
22impl GracefulShutdown {
23    pub fn new(grace_period: Duration) -> Self {
24        let (shutdown_tx, shutdown_rx) = watch::channel(false);
25        Self {
26            active: Arc::new(AtomicUsize::new(0)),
27            shutdown_tx: Arc::new(shutdown_tx),
28            shutdown_rx,
29            grace_period,
30        }
31    }
32
33    pub fn shutdown(&self) {
34        let _ = self.shutdown_tx.send(true);
35        info!("graceful shutdown initiated");
36    }
37
38    pub fn is_shutdown(&self) -> bool {
39        *self.shutdown_rx.borrow()
40    }
41
42    pub fn subscribe(&self) -> watch::Receiver<bool> {
43        self.shutdown_rx.clone()
44    }
45
46    pub fn track_conn(&self) -> ConnectionGuard {
47        self.active.fetch_add(1, Ordering::Relaxed);
48        ConnectionGuard {
49            active: Arc::clone(&self.active),
50        }
51    }
52
53    pub fn active_connections(&self) -> usize {
54        self.active.load(Ordering::Relaxed)
55    }
56
57    pub async fn drain(&self) -> bool {
58        let deadline = tokio::time::Instant::now() + self.grace_period;
59
60        loop {
61            let active = self.active.load(Ordering::Relaxed);
62            if active == 0 {
63                info!("all connections drained");
64                return true;
65            }
66
67            if tokio::time::Instant::now() >= deadline {
68                warn!(
69                    remaining = active,
70                    "grace period expired, forcing shutdown of remaining connections"
71                );
72                return false;
73            }
74
75            debug!(active, "waiting for connections to drain...");
76            tokio::time::sleep(Duration::from_millis(100)).await;
77        }
78    }
79}
80
81/// RAII guard that decrements the active connection count on drop.
82pub struct ConnectionGuard {
83    active: Arc<AtomicUsize>,
84}
85
86impl Drop for ConnectionGuard {
87    fn drop(&mut self) {
88        self.active.fetch_sub(1, Ordering::Relaxed);
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95
96    #[test]
97    fn track_and_drop() {
98        let shutdown = GracefulShutdown::new(Duration::from_secs(5));
99        assert_eq!(shutdown.active_connections(), 0);
100        let guard_a = shutdown.track_conn();
101        let guard_b = shutdown.track_conn();
102        assert_eq!(shutdown.active_connections(), 2);
103        drop(guard_a);
104        assert_eq!(shutdown.active_connections(), 1);
105        drop(guard_b);
106        assert_eq!(shutdown.active_connections(), 0);
107    }
108
109    #[test]
110    fn shutdown_signal() {
111        let shutdown = GracefulShutdown::new(Duration::from_secs(5));
112        assert!(!shutdown.is_shutdown());
113        shutdown.shutdown();
114        assert!(shutdown.is_shutdown());
115    }
116
117    #[tokio::test]
118    async fn drain_with_no_connections() {
119        let shutdown = GracefulShutdown::new(Duration::from_millis(100));
120        assert!(shutdown.drain().await);
121    }
122}