sentinel_proxy/reload/
coordinator.rs

1//! Graceful reload coordination.
2//!
3//! Handles request draining and shutdown coordination for zero-downtime reloads.
4
5use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::{debug, info, trace, warn};
9
10/// Graceful reload coordinator
11///
12/// Tracks active requests and coordinates draining during configuration
13/// reloads or graceful shutdown.
14pub struct GracefulReloadCoordinator {
15    /// Active requests counter
16    active_requests: Arc<AtomicUsize>,
17    /// Maximum wait time for draining
18    max_drain_time: Duration,
19    /// Shutdown flag
20    shutdown_requested: Arc<AtomicBool>,
21}
22
23impl GracefulReloadCoordinator {
24    /// Create new coordinator
25    pub fn new(max_drain_time: Duration) -> Self {
26        debug!(
27            max_drain_time_secs = max_drain_time.as_secs(),
28            "Creating graceful reload coordinator"
29        );
30        Self {
31            active_requests: Arc::new(AtomicUsize::new(0)),
32            max_drain_time,
33            shutdown_requested: Arc::new(AtomicBool::new(false)),
34        }
35    }
36
37    /// Increment active request count
38    pub fn inc_requests(&self) {
39        let count = self.active_requests.fetch_add(1, Ordering::Relaxed) + 1;
40        trace!(active_requests = count, "Request started");
41    }
42
43    /// Decrement active request count
44    pub fn dec_requests(&self) {
45        let count = self.active_requests.fetch_sub(1, Ordering::Relaxed) - 1;
46        trace!(active_requests = count, "Request completed");
47    }
48
49    /// Wait for active requests to drain
50    ///
51    /// Returns `true` if all requests drained within the timeout,
52    /// `false` if timeout was reached with requests still active.
53    pub async fn wait_for_drain(&self) -> bool {
54        let start = Instant::now();
55        let initial_count = self.active_requests.load(Ordering::Relaxed);
56
57        info!(
58            active_requests = initial_count,
59            max_drain_time_secs = self.max_drain_time.as_secs(),
60            "Starting request drain"
61        );
62
63        let mut last_logged_count = initial_count;
64
65        while self.active_requests.load(Ordering::Relaxed) > 0 {
66            if start.elapsed() > self.max_drain_time {
67                let remaining = self.active_requests.load(Ordering::Relaxed);
68                warn!(
69                    remaining_requests = remaining,
70                    elapsed_secs = start.elapsed().as_secs(),
71                    "Drain timeout reached, requests still active"
72                );
73                return false;
74            }
75
76            let current_count = self.active_requests.load(Ordering::Relaxed);
77            if current_count != last_logged_count {
78                debug!(
79                    remaining_requests = current_count,
80                    elapsed_ms = start.elapsed().as_millis(),
81                    "Draining requests"
82                );
83                last_logged_count = current_count;
84            }
85
86            tokio::time::sleep(Duration::from_millis(100)).await;
87        }
88
89        info!(
90            elapsed_ms = start.elapsed().as_millis(),
91            initial_requests = initial_count,
92            "All requests drained successfully"
93        );
94        true
95    }
96
97    /// Get active request count
98    pub fn active_count(&self) -> usize {
99        self.active_requests.load(Ordering::Relaxed)
100    }
101
102    /// Request shutdown
103    pub fn request_shutdown(&self) {
104        info!(
105            active_requests = self.active_requests.load(Ordering::Relaxed),
106            "Shutdown requested"
107        );
108        self.shutdown_requested.store(true, Ordering::SeqCst);
109    }
110
111    /// Check if shutdown was requested
112    pub fn is_shutdown_requested(&self) -> bool {
113        self.shutdown_requested.load(Ordering::SeqCst)
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[tokio::test]
122    async fn test_graceful_coordinator() {
123        let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(1));
124
125        // Simulate active requests
126        coordinator.inc_requests();
127        coordinator.inc_requests();
128        assert_eq!(coordinator.active_count(), 2);
129
130        coordinator.dec_requests();
131        assert_eq!(coordinator.active_count(), 1);
132
133        coordinator.dec_requests();
134        assert_eq!(coordinator.active_count(), 0);
135
136        // Test drain
137        let drained = coordinator.wait_for_drain().await;
138        assert!(drained);
139    }
140
141    #[tokio::test]
142    async fn test_graceful_coordinator_shutdown_flag() {
143        let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(1));
144
145        assert!(!coordinator.is_shutdown_requested());
146
147        coordinator.request_shutdown();
148
149        assert!(coordinator.is_shutdown_requested());
150    }
151}