Skip to main content

gatel_core/proxy/
health.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Instant;
4
5use tokio::sync::Mutex;
6use tracing::{debug, warn};
7
8use super::upstream::UpstreamPool;
9use crate::config::{HealthCheckConfig, PassiveHealthConfig};
10
11// ---------------------------------------------------------------------------
12// Active health checker
13// ---------------------------------------------------------------------------
14
15/// Performs periodic HTTP health checks against every backend in the pool and
16/// updates the pool's `healthy` flags accordingly.
17pub struct HealthChecker {
18    /// Handle to the spawned background task so we can abort on drop.
19    _task: tokio::task::JoinHandle<()>,
20}
21
22impl HealthChecker {
23    /// Spawn an active health-check loop.  Returns immediately; the checks
24    /// run on a background Tokio task.
25    pub fn start(pool: Arc<UpstreamPool>, config: &HealthCheckConfig) -> Self {
26        let uri = config.uri.clone();
27        let interval = config.interval;
28        let timeout = config.timeout;
29        let unhealthy_threshold = config.unhealthy_threshold;
30        let healthy_threshold = config.healthy_threshold;
31        let n = pool.len();
32
33        let task = tokio::spawn(async move {
34            // Per-backend consecutive success / failure counters.
35            let mut consecutive_ok: Vec<u32> = vec![0; n];
36            let mut consecutive_fail: Vec<u32> = vec![0; n];
37
38            // Build a lightweight HTTP client for probes.
39            let client =
40                hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
41                    .build_http::<crate::Body>();
42
43            loop {
44                for idx in 0..n {
45                    let addr = &pool.backends[idx].addr;
46                    let check_uri = format!("http://{}{}", addr, uri);
47
48                    let result = tokio::time::timeout(timeout, async {
49                        let req = http::Request::builder()
50                            .method(http::Method::GET)
51                            .uri(&check_uri)
52                            .body(crate::empty_body())
53                            .map_err(|e| e.to_string())?;
54
55                        let resp = client.request(req).await.map_err(|e| e.to_string())?;
56
57                        if resp.status().is_success() {
58                            Ok(())
59                        } else {
60                            Err(format!("status {}", resp.status()))
61                        }
62                    })
63                    .await;
64
65                    match result {
66                        Ok(Ok(())) => {
67                            consecutive_fail[idx] = 0;
68                            consecutive_ok[idx] += 1;
69                            if consecutive_ok[idx] >= healthy_threshold && !pool.is_healthy(idx) {
70                                debug!(backend = addr, "active health check: marking healthy");
71                                pool.set_healthy(idx, true);
72                            }
73                        }
74                        Ok(Err(e)) => {
75                            consecutive_ok[idx] = 0;
76                            consecutive_fail[idx] += 1;
77                            if consecutive_fail[idx] >= unhealthy_threshold && pool.is_healthy(idx)
78                            {
79                                warn!(
80                                    backend = addr,
81                                    error = %e,
82                                    "active health check: marking unhealthy"
83                                );
84                                pool.set_healthy(idx, false);
85                            }
86                        }
87                        Err(_elapsed) => {
88                            consecutive_ok[idx] = 0;
89                            consecutive_fail[idx] += 1;
90                            if consecutive_fail[idx] >= unhealthy_threshold && pool.is_healthy(idx)
91                            {
92                                warn!(
93                                    backend = addr,
94                                    "active health check: marking unhealthy (timeout)"
95                                );
96                                pool.set_healthy(idx, false);
97                            }
98                        }
99                    }
100                }
101
102                tokio::time::sleep(interval).await;
103            }
104        });
105
106        Self { _task: task }
107    }
108}
109
110impl Drop for HealthChecker {
111    fn drop(&mut self) {
112        self._task.abort();
113    }
114}
115
116// ---------------------------------------------------------------------------
117// Passive health checker
118// ---------------------------------------------------------------------------
119
120/// Tracks 5xx responses per backend and temporarily marks backends unhealthy
121/// when they exceed a failure threshold within a sliding time window.
122/// After a cooldown period the backend is automatically re-enabled.
123pub struct PassiveHealthChecker {
124    entries: Vec<PassiveEntry>,
125    config: PassiveHealthConfig,
126}
127
128struct PassiveEntry {
129    /// Ring buffer of timestamps (as millis since an arbitrary epoch) of
130    /// recent failures.  Protected by a mutex because we occasionally compact.
131    failures: Mutex<Vec<Instant>>,
132    /// Whether the backend has been passively disabled.
133    disabled: AtomicBool,
134    /// Timestamp (millis since epoch) when the backend was disabled.
135    disabled_at: Mutex<Option<Instant>>,
136}
137
138impl PassiveHealthChecker {
139    /// Create a new passive health tracker for `n` backends.
140    pub fn new(n: usize, config: &PassiveHealthConfig) -> Self {
141        let entries = (0..n)
142            .map(|_| PassiveEntry {
143                failures: Mutex::new(Vec::new()),
144                disabled: AtomicBool::new(false),
145                disabled_at: Mutex::new(None),
146            })
147            .collect();
148        Self {
149            entries,
150            config: config.clone(),
151        }
152    }
153
154    /// Record a 5xx failure for backend `idx`.  If the number of failures in
155    /// the configured window exceeds `max_fails`, the backend is disabled.
156    pub async fn record_failure(&self, idx: usize, pool: &UpstreamPool) {
157        let Some(entry) = self.entries.get(idx) else {
158            return;
159        };
160        let now = Instant::now();
161        let window = self.config.fail_window;
162
163        let mut failures = entry.failures.lock().await;
164        failures.push(now);
165        // Remove failures outside the window.
166        failures.retain(|&t| now.duration_since(t) < window);
167
168        if failures.len() as u32 >= self.config.max_fails
169            && !entry.disabled.swap(true, Ordering::Relaxed)
170        {
171            warn!(
172                backend = pool.backends[idx].addr,
173                fails = failures.len(),
174                "passive health: disabling backend"
175            );
176            pool.set_healthy(idx, false);
177            *entry.disabled_at.lock().await = Some(now);
178        }
179    }
180
181    /// Check whether any previously-disabled backend should be re-enabled
182    /// after the cooldown period.  Call this periodically (e.g. after each
183    /// request or on a timer).
184    pub async fn maybe_recover(&self, pool: &UpstreamPool) {
185        let cooldown = self.config.cooldown;
186        let now = Instant::now();
187
188        for (idx, entry) in self.entries.iter().enumerate() {
189            if !entry.disabled.load(Ordering::Relaxed) {
190                continue;
191            }
192            let disabled_at = *entry.disabled_at.lock().await;
193            if let Some(at) = disabled_at
194                && now.duration_since(at) >= cooldown
195            {
196                debug!(
197                    backend = pool.backends[idx].addr,
198                    "passive health: re-enabling backend after cooldown"
199                );
200                entry.disabled.store(false, Ordering::Relaxed);
201                pool.set_healthy(idx, true);
202                // Reset failure history so we start fresh.
203                entry.failures.lock().await.clear();
204                *entry.disabled_at.lock().await = None;
205            }
206        }
207    }
208
209    /// Returns `true` if the backend is currently passively disabled.
210    pub fn is_disabled(&self, idx: usize) -> bool {
211        self.entries
212            .get(idx)
213            .map(|e| e.disabled.load(Ordering::Relaxed))
214            .unwrap_or(false)
215    }
216}