sentinel_proxy/reload/
coordinator.rs1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::{info, warn};
9
10pub struct GracefulReloadCoordinator {
15 active_requests: Arc<AtomicUsize>,
17 max_drain_time: Duration,
19 shutdown_requested: Arc<AtomicBool>,
21}
22
23impl GracefulReloadCoordinator {
24 pub fn new(max_drain_time: Duration) -> Self {
26 Self {
27 active_requests: Arc::new(AtomicUsize::new(0)),
28 max_drain_time,
29 shutdown_requested: Arc::new(AtomicBool::new(false)),
30 }
31 }
32
33 pub fn inc_requests(&self) {
35 self.active_requests.fetch_add(1, Ordering::Relaxed);
36 }
37
38 pub fn dec_requests(&self) {
40 self.active_requests.fetch_sub(1, Ordering::Relaxed);
41 }
42
43 pub async fn wait_for_drain(&self) -> bool {
48 let start = Instant::now();
49
50 while self.active_requests.load(Ordering::Relaxed) > 0 {
51 if start.elapsed() > self.max_drain_time {
52 warn!(
53 "Drain timeout reached, {} requests still active",
54 self.active_requests.load(Ordering::Relaxed)
55 );
56 return false;
57 }
58
59 tokio::time::sleep(Duration::from_millis(100)).await;
60 }
61
62 info!("All requests drained successfully");
63 true
64 }
65
66 pub fn active_count(&self) -> usize {
68 self.active_requests.load(Ordering::Relaxed)
69 }
70
71 pub fn request_shutdown(&self) {
73 self.shutdown_requested.store(true, Ordering::SeqCst);
74 }
75
76 pub fn is_shutdown_requested(&self) -> bool {
78 self.shutdown_requested.load(Ordering::SeqCst)
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85
86 #[tokio::test]
87 async fn test_graceful_coordinator() {
88 let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(1));
89
90 coordinator.inc_requests();
92 coordinator.inc_requests();
93 assert_eq!(coordinator.active_count(), 2);
94
95 coordinator.dec_requests();
96 assert_eq!(coordinator.active_count(), 1);
97
98 coordinator.dec_requests();
99 assert_eq!(coordinator.active_count(), 0);
100
101 let drained = coordinator.wait_for_drain().await;
103 assert!(drained);
104 }
105
106 #[tokio::test]
107 async fn test_graceful_coordinator_shutdown_flag() {
108 let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(1));
109
110 assert!(!coordinator.is_shutdown_requested());
111
112 coordinator.request_shutdown();
113
114 assert!(coordinator.is_shutdown_requested());
115 }
116}