sentinel_proxy/reload/
coordinator.rs1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::{debug, info, trace, warn};
9
10pub struct GracefulReloadCoordinator {
15 active_requests: Arc<AtomicUsize>,
17 max_drain_time: Duration,
19 shutdown_requested: Arc<AtomicBool>,
21}
22
23impl GracefulReloadCoordinator {
24 pub fn new(max_drain_time: Duration) -> Self {
26 debug!(
27 max_drain_time_secs = max_drain_time.as_secs(),
28 "Creating graceful reload coordinator"
29 );
30 Self {
31 active_requests: Arc::new(AtomicUsize::new(0)),
32 max_drain_time,
33 shutdown_requested: Arc::new(AtomicBool::new(false)),
34 }
35 }
36
37 pub fn inc_requests(&self) {
39 let count = self.active_requests.fetch_add(1, Ordering::Relaxed) + 1;
40 trace!(active_requests = count, "Request started");
41 }
42
43 pub fn dec_requests(&self) {
45 let count = self.active_requests.fetch_sub(1, Ordering::Relaxed) - 1;
46 trace!(active_requests = count, "Request completed");
47 }
48
49 pub async fn wait_for_drain(&self) -> bool {
54 let start = Instant::now();
55 let initial_count = self.active_requests.load(Ordering::Relaxed);
56
57 info!(
58 active_requests = initial_count,
59 max_drain_time_secs = self.max_drain_time.as_secs(),
60 "Starting request drain"
61 );
62
63 let mut last_logged_count = initial_count;
64
65 while self.active_requests.load(Ordering::Relaxed) > 0 {
66 if start.elapsed() > self.max_drain_time {
67 let remaining = self.active_requests.load(Ordering::Relaxed);
68 warn!(
69 remaining_requests = remaining,
70 elapsed_secs = start.elapsed().as_secs(),
71 "Drain timeout reached, requests still active"
72 );
73 return false;
74 }
75
76 let current_count = self.active_requests.load(Ordering::Relaxed);
77 if current_count != last_logged_count {
78 debug!(
79 remaining_requests = current_count,
80 elapsed_ms = start.elapsed().as_millis(),
81 "Draining requests"
82 );
83 last_logged_count = current_count;
84 }
85
86 tokio::time::sleep(Duration::from_millis(100)).await;
87 }
88
89 info!(
90 elapsed_ms = start.elapsed().as_millis(),
91 initial_requests = initial_count,
92 "All requests drained successfully"
93 );
94 true
95 }
96
97 pub fn active_count(&self) -> usize {
99 self.active_requests.load(Ordering::Relaxed)
100 }
101
102 pub fn request_shutdown(&self) {
104 info!(
105 active_requests = self.active_requests.load(Ordering::Relaxed),
106 "Shutdown requested"
107 );
108 self.shutdown_requested.store(true, Ordering::SeqCst);
109 }
110
111 pub fn is_shutdown_requested(&self) -> bool {
113 self.shutdown_requested.load(Ordering::SeqCst)
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120
121 #[tokio::test]
122 async fn test_graceful_coordinator() {
123 let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(1));
124
125 coordinator.inc_requests();
127 coordinator.inc_requests();
128 assert_eq!(coordinator.active_count(), 2);
129
130 coordinator.dec_requests();
131 assert_eq!(coordinator.active_count(), 1);
132
133 coordinator.dec_requests();
134 assert_eq!(coordinator.active_count(), 0);
135
136 let drained = coordinator.wait_for_drain().await;
138 assert!(drained);
139 }
140
141 #[tokio::test]
142 async fn test_graceful_coordinator_shutdown_flag() {
143 let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(1));
144
145 assert!(!coordinator.is_shutdown_requested());
146
147 coordinator.request_shutdown();
148
149 assert!(coordinator.is_shutdown_requested());
150 }
151}