gatel_core/proxy/
health.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Instant;
4
5use tokio::sync::Mutex;
6use tracing::{debug, warn};
7
8use super::upstream::UpstreamPool;
9use crate::config::{HealthCheckConfig, PassiveHealthConfig};
10
11pub struct HealthChecker {
18 _task: tokio::task::JoinHandle<()>,
20}
21
22impl HealthChecker {
23 pub fn start(pool: Arc<UpstreamPool>, config: &HealthCheckConfig) -> Self {
26 let uri = config.uri.clone();
27 let interval = config.interval;
28 let timeout = config.timeout;
29 let unhealthy_threshold = config.unhealthy_threshold;
30 let healthy_threshold = config.healthy_threshold;
31 let n = pool.len();
32
33 let task = tokio::spawn(async move {
34 let mut consecutive_ok: Vec<u32> = vec![0; n];
36 let mut consecutive_fail: Vec<u32> = vec![0; n];
37
38 let client =
40 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
41 .build_http::<crate::Body>();
42
43 loop {
44 for idx in 0..n {
45 let addr = &pool.backends[idx].addr;
46 let check_uri = format!("http://{}{}", addr, uri);
47
48 let result = tokio::time::timeout(timeout, async {
49 let req = http::Request::builder()
50 .method(http::Method::GET)
51 .uri(&check_uri)
52 .body(crate::empty_body())
53 .map_err(|e| e.to_string())?;
54
55 let resp = client.request(req).await.map_err(|e| e.to_string())?;
56
57 if resp.status().is_success() {
58 Ok(())
59 } else {
60 Err(format!("status {}", resp.status()))
61 }
62 })
63 .await;
64
65 match result {
66 Ok(Ok(())) => {
67 consecutive_fail[idx] = 0;
68 consecutive_ok[idx] += 1;
69 if consecutive_ok[idx] >= healthy_threshold && !pool.is_healthy(idx) {
70 debug!(backend = addr, "active health check: marking healthy");
71 pool.set_healthy(idx, true);
72 }
73 }
74 Ok(Err(e)) => {
75 consecutive_ok[idx] = 0;
76 consecutive_fail[idx] += 1;
77 if consecutive_fail[idx] >= unhealthy_threshold && pool.is_healthy(idx)
78 {
79 warn!(
80 backend = addr,
81 error = %e,
82 "active health check: marking unhealthy"
83 );
84 pool.set_healthy(idx, false);
85 }
86 }
87 Err(_elapsed) => {
88 consecutive_ok[idx] = 0;
89 consecutive_fail[idx] += 1;
90 if consecutive_fail[idx] >= unhealthy_threshold && pool.is_healthy(idx)
91 {
92 warn!(
93 backend = addr,
94 "active health check: marking unhealthy (timeout)"
95 );
96 pool.set_healthy(idx, false);
97 }
98 }
99 }
100 }
101
102 tokio::time::sleep(interval).await;
103 }
104 });
105
106 Self { _task: task }
107 }
108}
109
110impl Drop for HealthChecker {
111 fn drop(&mut self) {
112 self._task.abort();
113 }
114}
115
116pub struct PassiveHealthChecker {
124 entries: Vec<PassiveEntry>,
125 config: PassiveHealthConfig,
126}
127
128struct PassiveEntry {
129 failures: Mutex<Vec<Instant>>,
132 disabled: AtomicBool,
134 disabled_at: Mutex<Option<Instant>>,
136}
137
138impl PassiveHealthChecker {
139 pub fn new(n: usize, config: &PassiveHealthConfig) -> Self {
141 let entries = (0..n)
142 .map(|_| PassiveEntry {
143 failures: Mutex::new(Vec::new()),
144 disabled: AtomicBool::new(false),
145 disabled_at: Mutex::new(None),
146 })
147 .collect();
148 Self {
149 entries,
150 config: config.clone(),
151 }
152 }
153
154 pub async fn record_failure(&self, idx: usize, pool: &UpstreamPool) {
157 let Some(entry) = self.entries.get(idx) else {
158 return;
159 };
160 let now = Instant::now();
161 let window = self.config.fail_window;
162
163 let mut failures = entry.failures.lock().await;
164 failures.push(now);
165 failures.retain(|&t| now.duration_since(t) < window);
167
168 if failures.len() as u32 >= self.config.max_fails
169 && !entry.disabled.swap(true, Ordering::Relaxed)
170 {
171 warn!(
172 backend = pool.backends[idx].addr,
173 fails = failures.len(),
174 "passive health: disabling backend"
175 );
176 pool.set_healthy(idx, false);
177 *entry.disabled_at.lock().await = Some(now);
178 }
179 }
180
181 pub async fn maybe_recover(&self, pool: &UpstreamPool) {
185 let cooldown = self.config.cooldown;
186 let now = Instant::now();
187
188 for (idx, entry) in self.entries.iter().enumerate() {
189 if !entry.disabled.load(Ordering::Relaxed) {
190 continue;
191 }
192 let disabled_at = *entry.disabled_at.lock().await;
193 if let Some(at) = disabled_at
194 && now.duration_since(at) >= cooldown
195 {
196 debug!(
197 backend = pool.backends[idx].addr,
198 "passive health: re-enabling backend after cooldown"
199 );
200 entry.disabled.store(false, Ordering::Relaxed);
201 pool.set_healthy(idx, true);
202 entry.failures.lock().await.clear();
204 *entry.disabled_at.lock().await = None;
205 }
206 }
207 }
208
209 pub fn is_disabled(&self, idx: usize) -> bool {
211 self.entries
212 .get(idx)
213 .map(|e| e.disabled.load(Ordering::Relaxed))
214 .unwrap_or(false)
215 }
216}