loadwise-core 0.1.0

Core traits, strategies, and in-memory stores for loadwise
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
//! Health state machine and tracking for backend nodes.

use std::collections::HashMap;
use std::hash::Hash;
use std::sync::RwLock;
use std::time::{Duration, Instant};

/// Current health state of a node.
///
/// State transitions (with [`ConsecutiveFailurePolicy`]):
///
/// ```text
/// ┌─────────┐  failures >= ⌈threshold/2⌉  ┌──────────┐  failures >= threshold  ┌───────────┐
/// │ Healthy  │ ──────────────────────────→ │ Degraded │ ──────────────────────→ │ Unhealthy │
/// └─────────┘                              └──────────┘                         └───────────┘
///      ↑                                        │                                    │
///      │ 1 success                               │ 1 success                          │ 1 success
///      │                                        ↓                                    ↓
///      │                                   ┌─────────┐                          ┌────────────┐
///      └───────────────────────────────────│ Healthy │←─── recovery_successes ──│ Recovering │
///                                          └─────────┘                          └────────────┘
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum HealthStatus {
    /// Node is operating normally. All traffic is routed here.
    Healthy,
    /// Node is responding but showing signs of stress (e.g., elevated latency).
    /// Still receives traffic, but signals the system should be watchful.
    Degraded,
    /// Node has failed repeatedly and is temporarily removed from rotation.
    /// No traffic is sent until [`HealthPolicy::should_probe`] allows a probe.
    Unhealthy,
    /// Node was unhealthy but is being given probe requests to verify recovery.
    /// Receives limited traffic until enough consecutive successes promote it
    /// back to [`Healthy`](Self::Healthy).
    Recovering,
}

impl HealthStatus {
    /// Returns `true` for any status that should still receive traffic
    /// (`Healthy`, `Degraded`, `Recovering`).
    pub fn is_available(self) -> bool {
        matches!(self, Self::Healthy | Self::Degraded | Self::Recovering)
    }

    /// Returns a single-bit flag for this variant. Used by [`HealthFilter`](crate::filter::HealthFilter).
    pub const fn bit_flag(self) -> u8 {
        match self {
            Self::Healthy => 1,
            Self::Degraded => 2,
            Self::Unhealthy => 4,
            Self::Recovering => 8,
        }
    }
}

/// Mutable runtime health counters for a single node.
#[derive(Debug, Clone)]
pub struct NodeHealth {
    /// Current health state.
    pub status: HealthStatus,
    /// Number of failures in a row since the last success (reset on success).
    pub consecutive_failures: u32,
    /// Number of successes in a row since the last failure (reset on failure).
    pub consecutive_successes: u32,
    /// When the most recent failure occurred, if ever.
    pub last_failure: Option<Instant>,
    /// When the most recent success occurred, if ever.
    pub last_success: Option<Instant>,
}

impl Default for NodeHealth {
    fn default() -> Self {
        Self {
            status: HealthStatus::Healthy,
            consecutive_failures: 0,
            consecutive_successes: 0,
            last_failure: None,
            last_success: None,
        }
    }
}

/// Classified result of a request, allowing [`HealthPolicy`] to distinguish
/// between different failure modes.
///
/// # Examples
///
/// ```
/// # extern crate loadwise_core as loadwise;
/// use loadwise::health::Outcome;
/// use std::time::Duration;
///
/// let success = Outcome::Success;
/// let error = Outcome::Failure;
/// let rate_limited = Outcome::RateLimited { retry_after: Some(Duration::from_secs(30)) };
/// ```
#[derive(Debug, Clone)]
pub enum Outcome {
    /// Request succeeded.
    Success,
    /// Request failed (server error, timeout, etc.).
    Failure,
    /// Request was rejected due to rate limiting. Should NOT count toward
    /// consecutive failure thresholds.
    RateLimited {
        /// Hint from the server about when to retry.
        retry_after: Option<Duration>,
    },
}

/// Determines how health status transitions based on success/failure signals.
pub trait HealthPolicy: Send + Sync {
    /// Called after a successful request to the node.
    fn on_success(&self, health: &mut NodeHealth);
    /// Called after a failed request to the node.
    fn on_failure(&self, health: &mut NodeHealth);
    /// Whether an [`Unhealthy`](HealthStatus::Unhealthy) node should be given
    /// a probe request to check if it has recovered.
    fn should_probe(&self, health: &NodeHealth) -> bool;
    /// Process a classified request outcome. The default implementation
    /// delegates to [`on_success`](Self::on_success) / [`on_failure`](Self::on_failure)
    /// and ignores rate-limited outcomes (they don't affect health state).
    fn on_outcome(&self, health: &mut NodeHealth, outcome: &Outcome) {
        match outcome {
            Outcome::Success => self.on_success(health),
            Outcome::Failure => self.on_failure(health),
            Outcome::RateLimited { .. } => {}
        }
    }
}

/// Marks a node unhealthy after N consecutive failures, recovers after M successes.
///
/// When `failure_threshold` is 1, nodes jump directly from `Healthy` to
/// `Unhealthy` — the `Degraded` state is skipped because there is no room
/// for an intermediate warning.
#[derive(bon::Builder)]
pub struct ConsecutiveFailurePolicy {
    /// Number of consecutive failures before a node becomes [`Unhealthy`](HealthStatus::Unhealthy).
    /// At `⌈threshold/2⌉` failures the node first enters [`Degraded`](HealthStatus::Degraded).
    /// **Default: 3.**
    #[builder(default = 3)]
    pub failure_threshold: u32,
    /// Number of consecutive successes an [`Unhealthy`](HealthStatus::Unhealthy) /
    /// [`Recovering`](HealthStatus::Recovering) node needs before returning to
    /// [`Healthy`](HealthStatus::Healthy). **Default: 2.**
    #[builder(default = 2)]
    pub recovery_successes: u32,
    /// Minimum time after the last failure before an [`Unhealthy`](HealthStatus::Unhealthy) node
    /// is eligible for a probe request. **Default: 10 s.**
    #[builder(default = Duration::from_secs(10))]
    pub probe_interval: Duration,
}

impl Default for ConsecutiveFailurePolicy {
    fn default() -> Self {
        Self::builder().build()
    }
}

impl HealthPolicy for ConsecutiveFailurePolicy {
    fn on_success(&self, health: &mut NodeHealth) {
        health.consecutive_failures = 0;
        health.consecutive_successes += 1;
        health.last_success = Some(Instant::now());

        match health.status {
            HealthStatus::Recovering => {
                if health.consecutive_successes >= self.recovery_successes {
                    health.status = HealthStatus::Healthy;
                }
            }
            HealthStatus::Unhealthy => {
                health.status = HealthStatus::Recovering;
                health.consecutive_successes = 1;
            }
            HealthStatus::Degraded => {
                health.status = HealthStatus::Healthy;
            }
            HealthStatus::Healthy => {}
        }
    }

    fn on_failure(&self, health: &mut NodeHealth) {
        health.consecutive_successes = 0;
        health.consecutive_failures += 1;
        health.last_failure = Some(Instant::now());

        if health.consecutive_failures >= self.failure_threshold {
            health.status = HealthStatus::Unhealthy;
        } else if self.failure_threshold > 1
            && health.consecutive_failures >= self.failure_threshold.div_ceil(2)
        {
            health.status = HealthStatus::Degraded;
        }
    }

    fn should_probe(&self, health: &NodeHealth) -> bool {
        if health.status != HealthStatus::Unhealthy {
            return false;
        }
        match health.last_failure {
            Some(t) => t.elapsed() >= self.probe_interval,
            None => true,
        }
    }
}

/// Tracks health state for a set of nodes identified by `Id`.
///
/// Thread-safe — all methods take `&self` and synchronise internally via
/// [`RwLock`].
///
/// # Examples
///
/// ```
/// # extern crate loadwise_core as loadwise;
/// use loadwise::{HealthTracker, HealthStatus, ConsecutiveFailurePolicy};
///
/// let tracker = HealthTracker::new(ConsecutiveFailurePolicy::default());
///
/// // Unknown nodes are considered healthy.
/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Healthy);
///
/// // Three consecutive failures → Unhealthy (default threshold = 3).
/// tracker.report_failure(&"node-1");
/// tracker.report_failure(&"node-1");
/// tracker.report_failure(&"node-1");
/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Unhealthy);
///
/// // One success → Recovering.
/// tracker.report_success(&"node-1");
/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Recovering);
///
/// // Second success → Healthy again (default recovery_successes = 2).
/// tracker.report_success(&"node-1");
/// assert_eq!(tracker.status(&"node-1"), HealthStatus::Healthy);
/// ```
pub struct HealthTracker<Id: Eq + Hash + Clone> {
    states: RwLock<HashMap<Id, NodeHealth>>,
    policy: Box<dyn HealthPolicy>,
}

impl<Id: Eq + Hash + Clone> std::fmt::Debug for HealthTracker<Id> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let count = self.states.read().unwrap().len();
        f.debug_struct("HealthTracker")
            .field("tracked_nodes", &count)
            .finish_non_exhaustive()
    }
}

impl<Id: Eq + Hash + Clone> HealthTracker<Id> {
    /// Creates a new tracker with the given [`HealthPolicy`].
    pub fn new(policy: impl HealthPolicy + 'static) -> Self {
        Self {
            states: RwLock::new(HashMap::new()),
            policy: Box::new(policy),
        }
    }

    /// Record a successful request to the node identified by `id`.
    pub fn report_success(&self, id: &Id) {
        let mut states = self.states.write().unwrap();
        let health = states.entry(id.clone()).or_default();
        self.policy.on_success(health);
    }

    /// Record a failed request to the node identified by `id`.
    pub fn report_failure(&self, id: &Id) {
        let mut states = self.states.write().unwrap();
        let health = states.entry(id.clone()).or_default();
        self.policy.on_failure(health);
    }

    /// Report a classified request outcome. See [`Outcome`] for details.
    pub fn report_outcome(&self, id: &Id, outcome: &Outcome) {
        let mut states = self.states.write().unwrap();
        let health = states.entry(id.clone()).or_default();
        self.policy.on_outcome(health, outcome);
    }

    /// Returns the current [`HealthStatus`] (defaults to `Healthy` for unknown nodes).
    pub fn status(&self, id: &Id) -> HealthStatus {
        self.states
            .read()
            .unwrap()
            .get(id)
            .map(|h| h.status)
            .unwrap_or(HealthStatus::Healthy)
    }

    /// Whether the node should receive a probe request (delegates to the policy).
    pub fn should_probe(&self, id: &Id) -> bool {
        self.states
            .read()
            .unwrap()
            .get(id)
            .is_some_and(|h| self.policy.should_probe(h))
    }

    /// Returns a snapshot of the full [`NodeHealth`] for the given node.
    pub fn get_health(&self, id: &Id) -> NodeHealth {
        self.states
            .read()
            .unwrap()
            .get(id)
            .cloned()
            .unwrap_or_default()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn healthy_to_degraded_to_unhealthy() {
        let policy = ConsecutiveFailurePolicy::builder()
            .failure_threshold(3)
            .build();
        let mut health = NodeHealth::default();

        policy.on_failure(&mut health);
        assert_eq!(health.status, HealthStatus::Healthy);

        policy.on_failure(&mut health);
        assert_eq!(health.status, HealthStatus::Degraded);

        policy.on_failure(&mut health);
        assert_eq!(health.status, HealthStatus::Unhealthy);
    }

    #[test]
    fn recovery_path() {
        let policy = ConsecutiveFailurePolicy::builder()
            .failure_threshold(2)
            .recovery_successes(2)
            .build();
        let mut health = NodeHealth::default();

        // Drive to unhealthy
        policy.on_failure(&mut health);
        policy.on_failure(&mut health);
        assert_eq!(health.status, HealthStatus::Unhealthy);

        // First success: unhealthy -> recovering
        policy.on_success(&mut health);
        assert_eq!(health.status, HealthStatus::Recovering);

        // Second success: recovering -> healthy
        policy.on_success(&mut health);
        assert_eq!(health.status, HealthStatus::Healthy);
    }

    #[test]
    fn threshold_one_skips_degraded() {
        let policy = ConsecutiveFailurePolicy::builder()
            .failure_threshold(1)
            .build();
        let mut health = NodeHealth::default();

        // With threshold=1, first failure goes straight to Unhealthy (no Degraded)
        policy.on_failure(&mut health);
        assert_eq!(health.status, HealthStatus::Unhealthy);
    }

    #[test]
    fn tracker_defaults_to_healthy() {
        let tracker = HealthTracker::<String>::new(ConsecutiveFailurePolicy::default());
        assert_eq!(tracker.status(&"unknown".to_string()), HealthStatus::Healthy);
    }

    #[test]
    fn rate_limited_does_not_count_as_failure() {
        let policy = ConsecutiveFailurePolicy::builder()
            .failure_threshold(3)
            .build();
        let mut health = NodeHealth::default();

        // One real failure (below degraded threshold ⌈3/2⌉=2)
        policy.on_outcome(&mut health, &Outcome::Failure);
        assert_eq!(health.consecutive_failures, 1);
        assert_eq!(health.status, HealthStatus::Healthy);

        // Rate limited — should NOT increment failure count
        policy.on_outcome(&mut health, &Outcome::RateLimited { retry_after: None });
        assert_eq!(health.consecutive_failures, 1);
        assert_eq!(health.status, HealthStatus::Healthy);

        // Two more real failures — now hits threshold
        policy.on_outcome(&mut health, &Outcome::Failure);
        assert_eq!(health.status, HealthStatus::Degraded);
        policy.on_outcome(&mut health, &Outcome::Failure);
        assert_eq!(health.status, HealthStatus::Unhealthy);
    }

    #[test]
    fn outcome_success_resets_failures() {
        let policy = ConsecutiveFailurePolicy::default();
        let mut health = NodeHealth::default();

        policy.on_outcome(&mut health, &Outcome::Failure);
        assert_eq!(health.consecutive_failures, 1);

        policy.on_outcome(&mut health, &Outcome::Success);
        assert_eq!(health.consecutive_failures, 0);
        assert_eq!(health.consecutive_successes, 1);
    }
}