sentinel_proxy/reload/
coordinator.rs

1//! Graceful reload coordination.
2//!
3//! Handles request draining and shutdown coordination for zero-downtime reloads.
4
5use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::{info, warn};
9
10/// Graceful reload coordinator
11///
12/// Tracks active requests and coordinates draining during configuration
13/// reloads or graceful shutdown.
14pub struct GracefulReloadCoordinator {
15    /// Active requests counter
16    active_requests: Arc<AtomicUsize>,
17    /// Maximum wait time for draining
18    max_drain_time: Duration,
19    /// Shutdown flag
20    shutdown_requested: Arc<AtomicBool>,
21}
22
23impl GracefulReloadCoordinator {
24    /// Create new coordinator
25    pub fn new(max_drain_time: Duration) -> Self {
26        Self {
27            active_requests: Arc::new(AtomicUsize::new(0)),
28            max_drain_time,
29            shutdown_requested: Arc::new(AtomicBool::new(false)),
30        }
31    }
32
33    /// Increment active request count
34    pub fn inc_requests(&self) {
35        self.active_requests.fetch_add(1, Ordering::Relaxed);
36    }
37
38    /// Decrement active request count
39    pub fn dec_requests(&self) {
40        self.active_requests.fetch_sub(1, Ordering::Relaxed);
41    }
42
43    /// Wait for active requests to drain
44    ///
45    /// Returns `true` if all requests drained within the timeout,
46    /// `false` if timeout was reached with requests still active.
47    pub async fn wait_for_drain(&self) -> bool {
48        let start = Instant::now();
49
50        while self.active_requests.load(Ordering::Relaxed) > 0 {
51            if start.elapsed() > self.max_drain_time {
52                warn!(
53                    "Drain timeout reached, {} requests still active",
54                    self.active_requests.load(Ordering::Relaxed)
55                );
56                return false;
57            }
58
59            tokio::time::sleep(Duration::from_millis(100)).await;
60        }
61
62        info!("All requests drained successfully");
63        true
64    }
65
66    /// Get active request count
67    pub fn active_count(&self) -> usize {
68        self.active_requests.load(Ordering::Relaxed)
69    }
70
71    /// Request shutdown
72    pub fn request_shutdown(&self) {
73        self.shutdown_requested.store(true, Ordering::SeqCst);
74    }
75
76    /// Check if shutdown was requested
77    pub fn is_shutdown_requested(&self) -> bool {
78        self.shutdown_requested.load(Ordering::SeqCst)
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85
86    #[tokio::test]
87    async fn test_graceful_coordinator() {
88        let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(1));
89
90        // Simulate active requests
91        coordinator.inc_requests();
92        coordinator.inc_requests();
93        assert_eq!(coordinator.active_count(), 2);
94
95        coordinator.dec_requests();
96        assert_eq!(coordinator.active_count(), 1);
97
98        coordinator.dec_requests();
99        assert_eq!(coordinator.active_count(), 0);
100
101        // Test drain
102        let drained = coordinator.wait_for_drain().await;
103        assert!(drained);
104    }
105
106    #[tokio::test]
107    async fn test_graceful_coordinator_shutdown_flag() {
108        let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(1));
109
110        assert!(!coordinator.is_shutdown_requested());
111
112        coordinator.request_shutdown();
113
114        assert!(coordinator.is_shutdown_requested());
115    }
116}