Skip to main content

loadwise_core/
health.rs

1//! Health state machine and tracking for backend nodes.
2
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::sync::RwLock;
6use std::time::{Duration, Instant};
7
8/// Current health state of a node.
9///
10/// State transitions (with [`ConsecutiveFailurePolicy`]):
11///
12/// ```text
13/// ┌─────────┐  failures >= ⌈threshold/2⌉  ┌──────────┐  failures >= threshold  ┌───────────┐
14/// │ Healthy  │ ──────────────────────────→ │ Degraded │ ──────────────────────→ │ Unhealthy │
15/// └─────────┘                              └──────────┘                         └───────────┘
16///      ↑                                        │                                    │
17///      │ 1 success                               │ 1 success                          │ 1 success
18///      │                                        ↓                                    ↓
19///      │                                   ┌─────────┐                          ┌────────────┐
20///      └───────────────────────────────────│ Healthy │←─── recovery_successes ──│ Recovering │
21///                                          └─────────┘                          └────────────┘
22/// ```
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum HealthStatus {
25    /// Node is operating normally. All traffic is routed here.
26    Healthy,
27    /// Node is responding but showing signs of stress (e.g., elevated latency).
28    /// Still receives traffic, but signals the system should be watchful.
29    Degraded,
30    /// Node has failed repeatedly and is temporarily removed from rotation.
31    /// No traffic is sent until [`HealthPolicy::should_probe`] allows a probe.
32    Unhealthy,
33    /// Node was unhealthy but is being given probe requests to verify recovery.
34    /// Receives limited traffic until enough consecutive successes promote it
35    /// back to [`Healthy`](Self::Healthy).
36    Recovering,
37}
38
39impl HealthStatus {
40    /// Returns `true` for any status that should still receive traffic
41    /// (`Healthy`, `Degraded`, `Recovering`).
42    pub fn is_available(self) -> bool {
43        matches!(self, Self::Healthy | Self::Degraded | Self::Recovering)
44    }
45
46    /// Returns a single-bit flag for this variant. Used by [`HealthFilter`](crate::filter::HealthFilter).
47    pub const fn bit_flag(self) -> u8 {
48        match self {
49            Self::Healthy => 1,
50            Self::Degraded => 2,
51            Self::Unhealthy => 4,
52            Self::Recovering => 8,
53        }
54    }
55}
56
57/// Mutable runtime health counters for a single node.
58#[derive(Debug, Clone)]
59pub struct NodeHealth {
60    /// Current health state.
61    pub status: HealthStatus,
62    /// Number of failures in a row since the last success (reset on success).
63    pub consecutive_failures: u32,
64    /// Number of successes in a row since the last failure (reset on failure).
65    pub consecutive_successes: u32,
66    /// When the most recent failure occurred, if ever.
67    pub last_failure: Option<Instant>,
68    /// When the most recent success occurred, if ever.
69    pub last_success: Option<Instant>,
70}
71
72impl Default for NodeHealth {
73    fn default() -> Self {
74        Self {
75            status: HealthStatus::Healthy,
76            consecutive_failures: 0,
77            consecutive_successes: 0,
78            last_failure: None,
79            last_success: None,
80        }
81    }
82}
83
84/// Classified result of a request, allowing [`HealthPolicy`] to distinguish
85/// between different failure modes.
86///
87/// # Examples
88///
89/// ```
90/// # extern crate loadwise_core as loadwise;
91/// use loadwise::health::Outcome;
92/// use std::time::Duration;
93///
94/// let success = Outcome::Success;
95/// let error = Outcome::Failure;
96/// let rate_limited = Outcome::RateLimited { retry_after: Some(Duration::from_secs(30)) };
97/// ```
98#[derive(Debug, Clone)]
99pub enum Outcome {
100    /// Request succeeded.
101    Success,
102    /// Request failed (server error, timeout, etc.).
103    Failure,
104    /// Request was rejected due to rate limiting. Should NOT count toward
105    /// consecutive failure thresholds.
106    RateLimited {
107        /// Hint from the server about when to retry.
108        retry_after: Option<Duration>,
109    },
110}
111
112/// Determines how health status transitions based on success/failure signals.
113pub trait HealthPolicy: Send + Sync {
114    /// Called after a successful request to the node.
115    fn on_success(&self, health: &mut NodeHealth);
116    /// Called after a failed request to the node.
117    fn on_failure(&self, health: &mut NodeHealth);
118    /// Whether an [`Unhealthy`](HealthStatus::Unhealthy) node should be given
119    /// a probe request to check if it has recovered.
120    fn should_probe(&self, health: &NodeHealth) -> bool;
121    /// Process a classified request outcome. The default implementation
122    /// delegates to [`on_success`](Self::on_success) / [`on_failure`](Self::on_failure)
123    /// and ignores rate-limited outcomes (they don't affect health state).
124    fn on_outcome(&self, health: &mut NodeHealth, outcome: &Outcome) {
125        match outcome {
126            Outcome::Success => self.on_success(health),
127            Outcome::Failure => self.on_failure(health),
128            Outcome::RateLimited { .. } => {}
129        }
130    }
131}
132
133/// Marks a node unhealthy after N consecutive failures, recovers after M successes.
134///
135/// When `failure_threshold` is 1, nodes jump directly from `Healthy` to
136/// `Unhealthy` — the `Degraded` state is skipped because there is no room
137/// for an intermediate warning.
138#[derive(bon::Builder)]
139pub struct ConsecutiveFailurePolicy {
140    /// Number of consecutive failures before a node becomes [`Unhealthy`](HealthStatus::Unhealthy).
141    /// At `⌈threshold/2⌉` failures the node first enters [`Degraded`](HealthStatus::Degraded).
142    /// **Default: 3.**
143    #[builder(default = 3)]
144    pub failure_threshold: u32,
145    /// Number of consecutive successes an [`Unhealthy`](HealthStatus::Unhealthy) /
146    /// [`Recovering`](HealthStatus::Recovering) node needs before returning to
147    /// [`Healthy`](HealthStatus::Healthy). **Default: 2.**
148    #[builder(default = 2)]
149    pub recovery_successes: u32,
150    /// Minimum time after the last failure before an [`Unhealthy`](HealthStatus::Unhealthy) node
151    /// is eligible for a probe request. **Default: 10 s.**
152    #[builder(default = Duration::from_secs(10))]
153    pub probe_interval: Duration,
154}
155
156impl Default for ConsecutiveFailurePolicy {
157    fn default() -> Self {
158        Self::builder().build()
159    }
160}
161
162impl HealthPolicy for ConsecutiveFailurePolicy {
163    fn on_success(&self, health: &mut NodeHealth) {
164        health.consecutive_failures = 0;
165        health.consecutive_successes += 1;
166        health.last_success = Some(Instant::now());
167
168        match health.status {
169            HealthStatus::Recovering => {
170                if health.consecutive_successes >= self.recovery_successes {
171                    health.status = HealthStatus::Healthy;
172                }
173            }
174            HealthStatus::Unhealthy => {
175                health.status = HealthStatus::Recovering;
176                health.consecutive_successes = 1;
177            }
178            HealthStatus::Degraded => {
179                health.status = HealthStatus::Healthy;
180            }
181            HealthStatus::Healthy => {}
182        }
183    }
184
185    fn on_failure(&self, health: &mut NodeHealth) {
186        health.consecutive_successes = 0;
187        health.consecutive_failures += 1;
188        health.last_failure = Some(Instant::now());
189
190        if health.consecutive_failures >= self.failure_threshold {
191            health.status = HealthStatus::Unhealthy;
192        } else if self.failure_threshold > 1
193            && health.consecutive_failures >= self.failure_threshold.div_ceil(2)
194        {
195            health.status = HealthStatus::Degraded;
196        }
197    }
198
199    fn should_probe(&self, health: &NodeHealth) -> bool {
200        if health.status != HealthStatus::Unhealthy {
201            return false;
202        }
203        match health.last_failure {
204            Some(t) => t.elapsed() >= self.probe_interval,
205            None => true,
206        }
207    }
208}
209
210/// Tracks health state for a set of nodes identified by `Id`.
211///
212/// Thread-safe — all methods take `&self` and synchronise internally via
213/// [`RwLock`].
214///
215/// # Examples
216///
217/// ```
218/// # extern crate loadwise_core as loadwise;
219/// use loadwise::{HealthTracker, HealthStatus, ConsecutiveFailurePolicy};
220///
221/// let tracker = HealthTracker::new(ConsecutiveFailurePolicy::default());
222///
223/// // Unknown nodes are considered healthy.
224/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Healthy);
225///
226/// // Three consecutive failures → Unhealthy (default threshold = 3).
227/// tracker.report_failure(&"node-1");
228/// tracker.report_failure(&"node-1");
229/// tracker.report_failure(&"node-1");
230/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Unhealthy);
231///
232/// // One success → Recovering.
233/// tracker.report_success(&"node-1");
234/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Recovering);
235///
236/// // Second success → Healthy again (default recovery_successes = 2).
237/// tracker.report_success(&"node-1");
238/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Healthy);
239/// ```
240pub struct HealthTracker<Id: Eq + Hash + Clone> {
241    states: RwLock<HashMap<Id, NodeHealth>>,
242    policy: Box<dyn HealthPolicy>,
243}
244
245impl<Id: Eq + Hash + Clone> std::fmt::Debug for HealthTracker<Id> {
246    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247        let count = self.states.read().unwrap().len();
248        f.debug_struct("HealthTracker")
249            .field("tracked_nodes", &count)
250            .finish_non_exhaustive()
251    }
252}
253
254impl<Id: Eq + Hash + Clone> HealthTracker<Id> {
255    /// Creates a new tracker with the given [`HealthPolicy`].
256    pub fn new(policy: impl HealthPolicy + 'static) -> Self {
257        Self {
258            states: RwLock::new(HashMap::new()),
259            policy: Box::new(policy),
260        }
261    }
262
263    /// Record a successful request to the node identified by `id`.
264    pub fn report_success(&self, id: &Id) {
265        let mut states = self.states.write().unwrap();
266        let health = states.entry(id.clone()).or_default();
267        self.policy.on_success(health);
268    }
269
270    /// Record a failed request to the node identified by `id`.
271    pub fn report_failure(&self, id: &Id) {
272        let mut states = self.states.write().unwrap();
273        let health = states.entry(id.clone()).or_default();
274        self.policy.on_failure(health);
275    }
276
277    /// Report a classified request outcome. See [`Outcome`] for details.
278    pub fn report_outcome(&self, id: &Id, outcome: &Outcome) {
279        let mut states = self.states.write().unwrap();
280        let health = states.entry(id.clone()).or_default();
281        self.policy.on_outcome(health, outcome);
282    }
283
284    /// Returns the current [`HealthStatus`] (defaults to `Healthy` for unknown nodes).
285    pub fn status(&self, id: &Id) -> HealthStatus {
286        self.states
287            .read()
288            .unwrap()
289            .get(id)
290            .map(|h| h.status)
291            .unwrap_or(HealthStatus::Healthy)
292    }
293
294    /// Whether the node should receive a probe request (delegates to the policy).
295    pub fn should_probe(&self, id: &Id) -> bool {
296        self.states
297            .read()
298            .unwrap()
299            .get(id)
300            .is_some_and(|h| self.policy.should_probe(h))
301    }
302
303    /// Returns a snapshot of the full [`NodeHealth`] for the given node.
304    pub fn get_health(&self, id: &Id) -> NodeHealth {
305        self.states
306            .read()
307            .unwrap()
308            .get(id)
309            .cloned()
310            .unwrap_or_default()
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn healthy_to_degraded_to_unhealthy() {
320        let policy = ConsecutiveFailurePolicy::builder()
321            .failure_threshold(3)
322            .build();
323        let mut health = NodeHealth::default();
324
325        policy.on_failure(&mut health);
326        assert_eq!(health.status, HealthStatus::Healthy);
327
328        policy.on_failure(&mut health);
329        assert_eq!(health.status, HealthStatus::Degraded);
330
331        policy.on_failure(&mut health);
332        assert_eq!(health.status, HealthStatus::Unhealthy);
333    }
334
335    #[test]
336    fn recovery_path() {
337        let policy = ConsecutiveFailurePolicy::builder()
338            .failure_threshold(2)
339            .recovery_successes(2)
340            .build();
341        let mut health = NodeHealth::default();
342
343        // Drive to unhealthy
344        policy.on_failure(&mut health);
345        policy.on_failure(&mut health);
346        assert_eq!(health.status, HealthStatus::Unhealthy);
347
348        // First success: unhealthy -> recovering
349        policy.on_success(&mut health);
350        assert_eq!(health.status, HealthStatus::Recovering);
351
352        // Second success: recovering -> healthy
353        policy.on_success(&mut health);
354        assert_eq!(health.status, HealthStatus::Healthy);
355    }
356
357    #[test]
358    fn threshold_one_skips_degraded() {
359        let policy = ConsecutiveFailurePolicy::builder()
360            .failure_threshold(1)
361            .build();
362        let mut health = NodeHealth::default();
363
364        // With threshold=1, first failure goes straight to Unhealthy (no Degraded)
365        policy.on_failure(&mut health);
366        assert_eq!(health.status, HealthStatus::Unhealthy);
367    }
368
369    #[test]
370    fn tracker_defaults_to_healthy() {
371        let tracker = HealthTracker::<String>::new(ConsecutiveFailurePolicy::default());
372        assert_eq!(tracker.status(&"unknown".to_string()), HealthStatus::Healthy);
373    }
374
375    #[test]
376    fn rate_limited_does_not_count_as_failure() {
377        let policy = ConsecutiveFailurePolicy::builder()
378            .failure_threshold(3)
379            .build();
380        let mut health = NodeHealth::default();
381
382        // One real failure (below degraded threshold ⌈3/2⌉=2)
383        policy.on_outcome(&mut health, &Outcome::Failure);
384        assert_eq!(health.consecutive_failures, 1);
385        assert_eq!(health.status, HealthStatus::Healthy);
386
387        // Rate limited — should NOT increment failure count
388        policy.on_outcome(&mut health, &Outcome::RateLimited { retry_after: None });
389        assert_eq!(health.consecutive_failures, 1);
390        assert_eq!(health.status, HealthStatus::Healthy);
391
392        // Two more real failures — now hits threshold
393        policy.on_outcome(&mut health, &Outcome::Failure);
394        assert_eq!(health.status, HealthStatus::Degraded);
395        policy.on_outcome(&mut health, &Outcome::Failure);
396        assert_eq!(health.status, HealthStatus::Unhealthy);
397    }
398
399    #[test]
400    fn outcome_success_resets_failures() {
401        let policy = ConsecutiveFailurePolicy::default();
402        let mut health = NodeHealth::default();
403
404        policy.on_outcome(&mut health, &Outcome::Failure);
405        assert_eq!(health.consecutive_failures, 1);
406
407        policy.on_outcome(&mut health, &Outcome::Success);
408        assert_eq!(health.consecutive_failures, 0);
409        assert_eq!(health.consecutive_successes, 1);
410    }
411}