loadwise_core/health.rs
1//! Health state machine and tracking for backend nodes.
2
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::sync::RwLock;
6use std::time::{Duration, Instant};
7
8/// Current health state of a node.
9///
10/// State transitions (with [`ConsecutiveFailurePolicy`]):
11///
12/// ```text
13/// ┌─────────┐ failures >= ⌈threshold/2⌉ ┌──────────┐ failures >= threshold ┌───────────┐
14/// │ Healthy │ ──────────────────────────→ │ Degraded │ ──────────────────────→ │ Unhealthy │
15/// └─────────┘ └──────────┘ └───────────┘
16/// ↑ │ │
17/// │ 1 success │ 1 success │ 1 success
18/// │ ↓ ↓
19/// │ ┌─────────┐ ┌────────────┐
20/// └───────────────────────────────────│ Healthy │←─── recovery_successes ──│ Recovering │
21/// └─────────┘ └────────────┘
22/// ```
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum HealthStatus {
25 /// Node is operating normally. All traffic is routed here.
26 Healthy,
27 /// Node is responding but showing signs of stress (e.g., elevated latency).
28 /// Still receives traffic, but signals the system should be watchful.
29 Degraded,
30 /// Node has failed repeatedly and is temporarily removed from rotation.
31 /// No traffic is sent until [`HealthPolicy::should_probe`] allows a probe.
32 Unhealthy,
33 /// Node was unhealthy but is being given probe requests to verify recovery.
34 /// Receives limited traffic until enough consecutive successes promote it
35 /// back to [`Healthy`](Self::Healthy).
36 Recovering,
37}
38
39impl HealthStatus {
40 /// Returns `true` for any status that should still receive traffic
41 /// (`Healthy`, `Degraded`, `Recovering`).
42 pub fn is_available(self) -> bool {
43 matches!(self, Self::Healthy | Self::Degraded | Self::Recovering)
44 }
45
46 /// Returns a single-bit flag for this variant. Used by [`HealthFilter`](crate::filter::HealthFilter).
47 pub const fn bit_flag(self) -> u8 {
48 match self {
49 Self::Healthy => 1,
50 Self::Degraded => 2,
51 Self::Unhealthy => 4,
52 Self::Recovering => 8,
53 }
54 }
55}
56
57/// Mutable runtime health counters for a single node.
58#[derive(Debug, Clone)]
59pub struct NodeHealth {
60 /// Current health state.
61 pub status: HealthStatus,
62 /// Number of failures in a row since the last success (reset on success).
63 pub consecutive_failures: u32,
64 /// Number of successes in a row since the last failure (reset on failure).
65 pub consecutive_successes: u32,
66 /// When the most recent failure occurred, if ever.
67 pub last_failure: Option<Instant>,
68 /// When the most recent success occurred, if ever.
69 pub last_success: Option<Instant>,
70}
71
72impl Default for NodeHealth {
73 fn default() -> Self {
74 Self {
75 status: HealthStatus::Healthy,
76 consecutive_failures: 0,
77 consecutive_successes: 0,
78 last_failure: None,
79 last_success: None,
80 }
81 }
82}
83
84/// Classified result of a request, allowing [`HealthPolicy`] to distinguish
85/// between different failure modes.
86///
87/// # Examples
88///
89/// ```
90/// # extern crate loadwise_core as loadwise;
91/// use loadwise::health::Outcome;
92/// use std::time::Duration;
93///
94/// let success = Outcome::Success;
95/// let error = Outcome::Failure;
96/// let rate_limited = Outcome::RateLimited { retry_after: Some(Duration::from_secs(30)) };
97/// ```
98#[derive(Debug, Clone)]
99pub enum Outcome {
100 /// Request succeeded.
101 Success,
102 /// Request failed (server error, timeout, etc.).
103 Failure,
104 /// Request was rejected due to rate limiting. Should NOT count toward
105 /// consecutive failure thresholds.
106 RateLimited {
107 /// Hint from the server about when to retry.
108 retry_after: Option<Duration>,
109 },
110}
111
112/// Determines how health status transitions based on success/failure signals.
113pub trait HealthPolicy: Send + Sync {
114 /// Called after a successful request to the node.
115 fn on_success(&self, health: &mut NodeHealth);
116 /// Called after a failed request to the node.
117 fn on_failure(&self, health: &mut NodeHealth);
118 /// Whether an [`Unhealthy`](HealthStatus::Unhealthy) node should be given
119 /// a probe request to check if it has recovered.
120 fn should_probe(&self, health: &NodeHealth) -> bool;
121 /// Process a classified request outcome. The default implementation
122 /// delegates to [`on_success`](Self::on_success) / [`on_failure`](Self::on_failure)
123 /// and ignores rate-limited outcomes (they don't affect health state).
124 fn on_outcome(&self, health: &mut NodeHealth, outcome: &Outcome) {
125 match outcome {
126 Outcome::Success => self.on_success(health),
127 Outcome::Failure => self.on_failure(health),
128 Outcome::RateLimited { .. } => {}
129 }
130 }
131}
132
133/// Marks a node unhealthy after N consecutive failures, recovers after M successes.
134///
135/// When `failure_threshold` is 1, nodes jump directly from `Healthy` to
136/// `Unhealthy` — the `Degraded` state is skipped because there is no room
137/// for an intermediate warning.
138#[derive(bon::Builder)]
139pub struct ConsecutiveFailurePolicy {
140 /// Number of consecutive failures before a node becomes [`Unhealthy`](HealthStatus::Unhealthy).
141 /// At `⌈threshold/2⌉` failures the node first enters [`Degraded`](HealthStatus::Degraded).
142 /// **Default: 3.**
143 #[builder(default = 3)]
144 pub failure_threshold: u32,
145 /// Number of consecutive successes an [`Unhealthy`](HealthStatus::Unhealthy) /
146 /// [`Recovering`](HealthStatus::Recovering) node needs before returning to
147 /// [`Healthy`](HealthStatus::Healthy). **Default: 2.**
148 #[builder(default = 2)]
149 pub recovery_successes: u32,
150 /// Minimum time after the last failure before an [`Unhealthy`](HealthStatus::Unhealthy) node
151 /// is eligible for a probe request. **Default: 10 s.**
152 #[builder(default = Duration::from_secs(10))]
153 pub probe_interval: Duration,
154}
155
156impl Default for ConsecutiveFailurePolicy {
157 fn default() -> Self {
158 Self::builder().build()
159 }
160}
161
162impl HealthPolicy for ConsecutiveFailurePolicy {
163 fn on_success(&self, health: &mut NodeHealth) {
164 health.consecutive_failures = 0;
165 health.consecutive_successes += 1;
166 health.last_success = Some(Instant::now());
167
168 match health.status {
169 HealthStatus::Recovering => {
170 if health.consecutive_successes >= self.recovery_successes {
171 health.status = HealthStatus::Healthy;
172 }
173 }
174 HealthStatus::Unhealthy => {
175 health.status = HealthStatus::Recovering;
176 health.consecutive_successes = 1;
177 }
178 HealthStatus::Degraded => {
179 health.status = HealthStatus::Healthy;
180 }
181 HealthStatus::Healthy => {}
182 }
183 }
184
185 fn on_failure(&self, health: &mut NodeHealth) {
186 health.consecutive_successes = 0;
187 health.consecutive_failures += 1;
188 health.last_failure = Some(Instant::now());
189
190 if health.consecutive_failures >= self.failure_threshold {
191 health.status = HealthStatus::Unhealthy;
192 } else if self.failure_threshold > 1
193 && health.consecutive_failures >= self.failure_threshold.div_ceil(2)
194 {
195 health.status = HealthStatus::Degraded;
196 }
197 }
198
199 fn should_probe(&self, health: &NodeHealth) -> bool {
200 if health.status != HealthStatus::Unhealthy {
201 return false;
202 }
203 match health.last_failure {
204 Some(t) => t.elapsed() >= self.probe_interval,
205 None => true,
206 }
207 }
208}
209
210/// Tracks health state for a set of nodes identified by `Id`.
211///
212/// Thread-safe — all methods take `&self` and synchronise internally via
213/// [`RwLock`].
214///
215/// # Examples
216///
217/// ```
218/// # extern crate loadwise_core as loadwise;
219/// use loadwise::{HealthTracker, HealthStatus, ConsecutiveFailurePolicy};
220///
221/// let tracker = HealthTracker::new(ConsecutiveFailurePolicy::default());
222///
223/// // Unknown nodes are considered healthy.
224/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Healthy);
225///
226/// // Three consecutive failures → Unhealthy (default threshold = 3).
227/// tracker.report_failure(&"node-1");
228/// tracker.report_failure(&"node-1");
229/// tracker.report_failure(&"node-1");
230/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Unhealthy);
231///
232/// // One success → Recovering.
233/// tracker.report_success(&"node-1");
234/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Recovering);
235///
236/// // Second success → Healthy again (default recovery_successes = 2).
237/// tracker.report_success(&"node-1");
238/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Healthy);
239/// ```
240pub struct HealthTracker<Id: Eq + Hash + Clone> {
241 states: RwLock<HashMap<Id, NodeHealth>>,
242 policy: Box<dyn HealthPolicy>,
243}
244
245impl<Id: Eq + Hash + Clone> std::fmt::Debug for HealthTracker<Id> {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 let count = self.states.read().unwrap().len();
248 f.debug_struct("HealthTracker")
249 .field("tracked_nodes", &count)
250 .finish_non_exhaustive()
251 }
252}
253
254impl<Id: Eq + Hash + Clone> HealthTracker<Id> {
255 /// Creates a new tracker with the given [`HealthPolicy`].
256 pub fn new(policy: impl HealthPolicy + 'static) -> Self {
257 Self {
258 states: RwLock::new(HashMap::new()),
259 policy: Box::new(policy),
260 }
261 }
262
263 /// Record a successful request to the node identified by `id`.
264 pub fn report_success(&self, id: &Id) {
265 let mut states = self.states.write().unwrap();
266 let health = states.entry(id.clone()).or_default();
267 self.policy.on_success(health);
268 }
269
270 /// Record a failed request to the node identified by `id`.
271 pub fn report_failure(&self, id: &Id) {
272 let mut states = self.states.write().unwrap();
273 let health = states.entry(id.clone()).or_default();
274 self.policy.on_failure(health);
275 }
276
277 /// Report a classified request outcome. See [`Outcome`] for details.
278 pub fn report_outcome(&self, id: &Id, outcome: &Outcome) {
279 let mut states = self.states.write().unwrap();
280 let health = states.entry(id.clone()).or_default();
281 self.policy.on_outcome(health, outcome);
282 }
283
284 /// Returns the current [`HealthStatus`] (defaults to `Healthy` for unknown nodes).
285 pub fn status(&self, id: &Id) -> HealthStatus {
286 self.states
287 .read()
288 .unwrap()
289 .get(id)
290 .map(|h| h.status)
291 .unwrap_or(HealthStatus::Healthy)
292 }
293
294 /// Whether the node should receive a probe request (delegates to the policy).
295 pub fn should_probe(&self, id: &Id) -> bool {
296 self.states
297 .read()
298 .unwrap()
299 .get(id)
300 .is_some_and(|h| self.policy.should_probe(h))
301 }
302
303 /// Returns a snapshot of the full [`NodeHealth`] for the given node.
304 pub fn get_health(&self, id: &Id) -> NodeHealth {
305 self.states
306 .read()
307 .unwrap()
308 .get(id)
309 .cloned()
310 .unwrap_or_default()
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn healthy_to_degraded_to_unhealthy() {
320 let policy = ConsecutiveFailurePolicy::builder()
321 .failure_threshold(3)
322 .build();
323 let mut health = NodeHealth::default();
324
325 policy.on_failure(&mut health);
326 assert_eq!(health.status, HealthStatus::Healthy);
327
328 policy.on_failure(&mut health);
329 assert_eq!(health.status, HealthStatus::Degraded);
330
331 policy.on_failure(&mut health);
332 assert_eq!(health.status, HealthStatus::Unhealthy);
333 }
334
335 #[test]
336 fn recovery_path() {
337 let policy = ConsecutiveFailurePolicy::builder()
338 .failure_threshold(2)
339 .recovery_successes(2)
340 .build();
341 let mut health = NodeHealth::default();
342
343 // Drive to unhealthy
344 policy.on_failure(&mut health);
345 policy.on_failure(&mut health);
346 assert_eq!(health.status, HealthStatus::Unhealthy);
347
348 // First success: unhealthy -> recovering
349 policy.on_success(&mut health);
350 assert_eq!(health.status, HealthStatus::Recovering);
351
352 // Second success: recovering -> healthy
353 policy.on_success(&mut health);
354 assert_eq!(health.status, HealthStatus::Healthy);
355 }
356
357 #[test]
358 fn threshold_one_skips_degraded() {
359 let policy = ConsecutiveFailurePolicy::builder()
360 .failure_threshold(1)
361 .build();
362 let mut health = NodeHealth::default();
363
364 // With threshold=1, first failure goes straight to Unhealthy (no Degraded)
365 policy.on_failure(&mut health);
366 assert_eq!(health.status, HealthStatus::Unhealthy);
367 }
368
369 #[test]
370 fn tracker_defaults_to_healthy() {
371 let tracker = HealthTracker::<String>::new(ConsecutiveFailurePolicy::default());
372 assert_eq!(tracker.status(&"unknown".to_string()), HealthStatus::Healthy);
373 }
374
375 #[test]
376 fn rate_limited_does_not_count_as_failure() {
377 let policy = ConsecutiveFailurePolicy::builder()
378 .failure_threshold(3)
379 .build();
380 let mut health = NodeHealth::default();
381
382 // One real failure (below degraded threshold ⌈3/2⌉=2)
383 policy.on_outcome(&mut health, &Outcome::Failure);
384 assert_eq!(health.consecutive_failures, 1);
385 assert_eq!(health.status, HealthStatus::Healthy);
386
387 // Rate limited — should NOT increment failure count
388 policy.on_outcome(&mut health, &Outcome::RateLimited { retry_after: None });
389 assert_eq!(health.consecutive_failures, 1);
390 assert_eq!(health.status, HealthStatus::Healthy);
391
392 // Two more real failures — now hits threshold
393 policy.on_outcome(&mut health, &Outcome::Failure);
394 assert_eq!(health.status, HealthStatus::Degraded);
395 policy.on_outcome(&mut health, &Outcome::Failure);
396 assert_eq!(health.status, HealthStatus::Unhealthy);
397 }
398
399 #[test]
400 fn outcome_success_resets_failures() {
401 let policy = ConsecutiveFailurePolicy::default();
402 let mut health = NodeHealth::default();
403
404 policy.on_outcome(&mut health, &Outcome::Failure);
405 assert_eq!(health.consecutive_failures, 1);
406
407 policy.on_outcome(&mut health, &Outcome::Success);
408 assert_eq!(health.consecutive_failures, 0);
409 assert_eq!(health.consecutive_successes, 1);
410 }
411}