Skip to main content

throttle_net/
circuit.rs

1//! A circuit breaker that wraps any [`Limiter`] and fails fast when a downstream
2//! is unhealthy.
3//!
4//! A limiter paces requests; a breaker *stops* them. When the protected
5//! downstream produces enough failures, the breaker trips **open** and sheds
6//! requests immediately — without consuming the wrapped limiter's tokens — so a
7//! struggling dependency is given room to recover instead of being hammered.
8//! After a cooldown it goes **half-open**, admitting a few trial requests; if
9//! they succeed it **closes** and normal pacing resumes, otherwise it opens
10//! again.
11//!
12//! Outcomes are reported back through a [`Permit`]: [`acquire`](CircuitBreaker::acquire)
13//! hands you one, and you call [`success`](Permit::success) or
14//! [`failure`](Permit::failure) after the call. Dropping a permit without
15//! settling it counts as a failure, so an early return or panic is treated
16//! conservatively.
17
18use core::time::Duration;
19use std::collections::VecDeque;
20use std::sync::{Mutex, MutexGuard, PoisonError};
21
22use clock_lib::{Clock, Monotonic, SystemClock};
23
24use crate::decision::Decision;
25use crate::error::ThrottleError;
26use crate::limiter::Limiter;
27
28/// The condition under which a closed breaker trips open.
29///
30/// `#[non_exhaustive]`: more trip conditions may be added.
31#[non_exhaustive]
32#[derive(Debug, Clone, Copy, PartialEq)]
33pub enum Trip {
34    /// Trip after this many consecutive failures (a success resets the count).
35    Consecutive(u32),
36    /// Trip when the failure ratio over the last `window` calls reaches `ratio`,
37    /// once at least `min_calls` have been observed.
38    Ratio {
39        /// How many recent calls to consider.
40        window: u32,
41        /// The failure fraction in `[0.0, 1.0]` that trips the breaker.
42        ratio: f64,
43        /// The minimum calls before the ratio is evaluated.
44        min_calls: u32,
45    },
46    /// Trip when at least `failures` failures occur within a rolling `period`.
47    Windowed {
48        /// The failure count that trips the breaker.
49        failures: u32,
50        /// The rolling time window the failures are counted in.
51        period: Duration,
52    },
53}
54
55/// The breaker's current state, as a snapshot.
56///
57/// `#[non_exhaustive]`: matching should include a wildcard.
58#[non_exhaustive]
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum BreakerState {
61    /// Requests flow to the wrapped limiter; failures are being counted.
62    Closed,
63    /// Requests are shed immediately; the downstream is being given time.
64    Open,
65    /// A few trial requests are allowed to test whether the downstream recovered.
66    HalfOpen,
67}
68
69/// The mutable state machine, guarded by a single mutex.
70struct Shared {
71    state: BreakerState,
72    /// Consecutive failures (for [`Trip::Consecutive`]).
73    consecutive: u32,
74    /// Recent outcomes, `true` = failure (for [`Trip::Ratio`]).
75    outcomes: VecDeque<bool>,
76    /// Failure timestamps in milliseconds (for [`Trip::Windowed`]).
77    failure_times: VecDeque<u64>,
78    /// Trial requests in flight while half-open.
79    half_open_inflight: u32,
80    /// Successful trials accumulated while half-open.
81    half_open_successes: u32,
82    /// Milliseconds (store epoch) at which an open breaker may go half-open.
83    open_until_ms: u64,
84}
85
86impl Shared {
87    fn new() -> Self {
88        Self {
89            state: BreakerState::Closed,
90            consecutive: 0,
91            outcomes: VecDeque::new(),
92            failure_times: VecDeque::new(),
93            half_open_inflight: 0,
94            half_open_successes: 0,
95            open_until_ms: 0,
96        }
97    }
98
99    /// Resets the failure bookkeeping when the breaker closes.
100    fn reset_counters(&mut self) {
101        self.consecutive = 0;
102        self.outcomes.clear();
103        self.failure_times.clear();
104        self.half_open_inflight = 0;
105        self.half_open_successes = 0;
106    }
107}
108
109/// Whether the breaker admits a request right now.
110enum Admit {
111    Allow,
112    Reject(Duration),
113}
114
115/// A circuit breaker wrapping a limiter `L`, timed by clock `C`.
116///
117/// Construct one with [`CircuitBreaker::builder`]. Build requires the
118/// `circuit-breaker` feature.
119///
120/// # Examples
121///
122/// ```
123/// # async fn run() {
124/// use std::time::Duration;
125/// use throttle_net::{CircuitBreaker, Throttle, Trip};
126///
127/// let breaker = CircuitBreaker::builder()
128///     .trip(Trip::Consecutive(5))
129///     .cooldown(Duration::from_secs(10))
130///     .build(Throttle::per_second(100));
131///
132/// match breaker.acquire().await {
133///     Ok(permit) => {
134///         // ... call the downstream ...
135///         let ok = true;
136///         if ok { permit.success() } else { permit.failure() }
137///     }
138///     Err(_shed) => {
139///         // breaker open (or limiter exhausted): fail fast
140///     }
141/// }
142/// # }
143/// ```
144pub struct CircuitBreaker<L, C = SystemClock>
145where
146    C: Clock,
147{
148    inner: L,
149    config: Config,
150    shared: Mutex<Shared>,
151    clock: C,
152    epoch: Monotonic,
153}
154
155/// Validated breaker configuration.
156#[derive(Debug, Clone, Copy)]
157struct Config {
158    trip: Trip,
159    cooldown: Duration,
160    half_open_trials: u32,
161    half_open_required: u32,
162}
163
164// Anchored on a concrete, limiter-free type so `CircuitBreaker::builder()` needs
165// no type annotation; the wrapped limiter type is fixed later by
166// [`CircuitBreakerBuilder::build`].
167impl CircuitBreaker<core::convert::Infallible> {
168    /// Starts building a breaker. Defaults: [`Trip::Consecutive(5)`](Trip::Consecutive),
169    /// a 30-second cooldown, and a single trial that must succeed to close.
170    #[must_use]
171    pub fn builder() -> CircuitBreakerBuilder {
172        CircuitBreakerBuilder::new()
173    }
174}
175
176impl<L, C> CircuitBreaker<L, C>
177where
178    L: Limiter,
179    C: Clock + Clone,
180{
181    fn new(inner: L, config: Config, clock: C) -> Self {
182        let epoch = clock.now();
183        Self {
184            inner,
185            config,
186            shared: Mutex::new(Shared::new()),
187            clock,
188            epoch,
189        }
190    }
191
192    /// Replaces the time source (the cooldown clock), for deterministic tests.
193    /// The breaker is reset to closed around the new clock.
194    #[must_use]
195    pub fn with_clock<C2>(self, clock: C2) -> CircuitBreaker<L, C2>
196    where
197        C2: Clock + Clone,
198    {
199        CircuitBreaker::new(self.inner, self.config, clock)
200    }
201
202    /// The current state (a momentary snapshot).
203    #[must_use]
204    pub fn state(&self) -> BreakerState {
205        self.lock().state
206    }
207
208    /// A shared reference to the wrapped limiter.
209    pub fn inner(&self) -> &L {
210        &self.inner
211    }
212
213    #[inline]
214    fn lock(&self) -> MutexGuard<'_, Shared> {
215        self.shared.lock().unwrap_or_else(PoisonError::into_inner)
216    }
217
218    #[inline]
219    fn now_ms(&self) -> u64 {
220        let elapsed = self.clock.now().saturating_duration_since(self.epoch);
221        u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX)
222    }
223
224    /// Decides admission and performs any state transition the clock has earned
225    /// (open → half-open). Reserves a half-open trial slot when it admits one.
226    fn admit(&self, now_ms: u64) -> Admit {
227        let mut shared = self.lock();
228        match shared.state {
229            BreakerState::Closed => Admit::Allow,
230            BreakerState::Open => {
231                if now_ms >= shared.open_until_ms {
232                    shared.state = BreakerState::HalfOpen;
233                    shared.half_open_inflight = 1;
234                    shared.half_open_successes = 0;
235                    crate::obs::circuit_transition("Open", "HalfOpen", 1);
236                    Admit::Allow
237                } else {
238                    Admit::Reject(Duration::from_millis(shared.open_until_ms - now_ms))
239                }
240            }
241            BreakerState::HalfOpen => {
242                if shared.half_open_inflight < self.config.half_open_trials {
243                    shared.half_open_inflight += 1;
244                    Admit::Allow
245                } else {
246                    // Trials are already in flight; shed extra probes.
247                    Admit::Reject(Duration::ZERO)
248                }
249            }
250        }
251    }
252
253    /// Releases a reserved half-open slot when an admitted request never reaches
254    /// the downstream (e.g. the wrapped limiter says the cost is impossible).
255    fn abort(&self) {
256        let mut shared = self.lock();
257        if shared.state == BreakerState::HalfOpen {
258            shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
259        }
260    }
261
262    /// Records the outcome of a settled request and transitions as needed.
263    fn record(&self, success: bool) {
264        let now_ms = self.now_ms();
265        let mut shared = self.lock();
266        match shared.state {
267            BreakerState::HalfOpen => {
268                shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
269                if success {
270                    shared.half_open_successes += 1;
271                    if shared.half_open_successes >= self.config.half_open_required {
272                        shared.state = BreakerState::Closed;
273                        shared.reset_counters();
274                        crate::obs::circuit_transition("HalfOpen", "Closed", 0);
275                    }
276                } else {
277                    self.open(&mut shared, now_ms);
278                }
279            }
280            BreakerState::Closed => {
281                if success {
282                    shared.consecutive = 0;
283                    record_outcome(&mut shared, false, now_ms, self.config.trip);
284                } else {
285                    shared.consecutive += 1;
286                    record_outcome(&mut shared, true, now_ms, self.config.trip);
287                    if tripped(&shared, now_ms, self.config.trip) {
288                        self.open(&mut shared, now_ms);
289                    }
290                }
291            }
292            // A record while fully open is unusual (nothing was admitted); ignore.
293            BreakerState::Open => {}
294        }
295    }
296
297    /// Moves the breaker to open and arms the cooldown.
298    fn open(&self, shared: &mut Shared, now_ms: u64) {
299        let from = if shared.state == BreakerState::HalfOpen {
300            "HalfOpen"
301        } else {
302            "Closed"
303        };
304        shared.state = BreakerState::Open;
305        shared.open_until_ms = now_ms
306            .saturating_add(u64::try_from(self.config.cooldown.as_millis()).unwrap_or(u64::MAX));
307        shared.half_open_inflight = 0;
308        shared.half_open_successes = 0;
309        crate::obs::circuit_transition(from, "Open", 2);
310    }
311
312    /// Reports a successful protected call. Prefer settling a [`Permit`].
313    pub fn record_success(&self) {
314        self.record(true);
315    }
316
317    /// Reports a failed protected call. Prefer settling a [`Permit`].
318    pub fn record_failure(&self) {
319        self.record(false);
320    }
321
322    /// Attempts to admit a request without waiting, returning a [`Permit`] on
323    /// success.
324    ///
325    /// # Errors
326    ///
327    /// - [`ThrottleError::CircuitOpen`] when the breaker is open (or its
328    ///   half-open trials are full): the request is shed without touching the
329    ///   wrapped limiter.
330    /// - [`ThrottleError::CostExceedsCapacity`] when the wrapped limiter can
331    ///   never grant a single unit.
332    ///
333    /// Returns `Ok(None)` when the breaker would admit but the wrapped limiter
334    /// has no token available right now (normal rate-limiting, not a breaker
335    /// fault).
336    pub fn try_acquire(&self) -> Result<Option<Permit<'_, L, C>>, ThrottleError> {
337        let now_ms = self.now_ms();
338        match self.admit(now_ms) {
339            Admit::Reject(retry_after) => Err(ThrottleError::CircuitOpen { retry_after }),
340            Admit::Allow => match self.inner.acquire_cost(1) {
341                Decision::Acquired => Ok(Some(Permit::new(self))),
342                Decision::Retry { .. } => {
343                    self.abort();
344                    Ok(None)
345                }
346                Decision::Impossible => {
347                    self.abort();
348                    Err(ThrottleError::CostExceedsCapacity {
349                        cost: 1,
350                        capacity: self.inner.capacity(),
351                    })
352                }
353            },
354        }
355    }
356}
357
358#[cfg(feature = "runtime")]
359#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
360impl<L, C> CircuitBreaker<L, C>
361where
362    L: Limiter,
363    C: Clock + Clone,
364{
365    /// Admits a request, failing fast if the breaker is open and otherwise
366    /// pacing on the wrapped limiter until a token is free.
367    ///
368    /// A circuit-open condition returns immediately (load shedding); a plain
369    /// rate-limit waits. Returns a [`Permit`] to settle with the call's outcome.
370    ///
371    /// # Errors
372    ///
373    /// - [`ThrottleError::CircuitOpen`] when the breaker is open or its trials
374    ///   are full — returned without waiting.
375    /// - [`ThrottleError::CostExceedsCapacity`] when the wrapped limiter can
376    ///   never grant the request.
377    pub async fn acquire(&self) -> Result<Permit<'_, L, C>, ThrottleError> {
378        // Breaker admission is checked once, up front: an open breaker fails fast
379        // rather than waiting. The reserved slot (if half-open) is held across the
380        // rate-limit wait and released by the permit or on an impossible cost.
381        match self.admit(self.now_ms()) {
382            Admit::Reject(retry_after) => return Err(ThrottleError::CircuitOpen { retry_after }),
383            Admit::Allow => {}
384        }
385        loop {
386            match self.inner.acquire_cost(1) {
387                Decision::Acquired => return Ok(Permit::new(self)),
388                Decision::Retry { after } => crate::rt::sleep(after).await,
389                Decision::Impossible => {
390                    self.abort();
391                    return Err(ThrottleError::CostExceedsCapacity {
392                        cost: 1,
393                        capacity: self.inner.capacity(),
394                    });
395                }
396            }
397        }
398    }
399}
400
401/// Appends an outcome to the structures the configured [`Trip`] needs.
402fn record_outcome(shared: &mut Shared, failure: bool, now_ms: u64, trip: Trip) {
403    match trip {
404        Trip::Consecutive(_) => {}
405        Trip::Ratio { window, .. } => {
406            shared.outcomes.push_back(failure);
407            while shared.outcomes.len() > window as usize {
408                let _ = shared.outcomes.pop_front();
409            }
410        }
411        Trip::Windowed { period, .. } => {
412            if failure {
413                shared.failure_times.push_back(now_ms);
414            }
415            let cutoff =
416                now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
417            while shared.failure_times.front().is_some_and(|&t| t < cutoff) {
418                let _ = shared.failure_times.pop_front();
419            }
420        }
421    }
422}
423
424/// Whether the closed breaker's failure state has reached its trip condition.
425fn tripped(shared: &Shared, now_ms: u64, trip: Trip) -> bool {
426    match trip {
427        Trip::Consecutive(n) => shared.consecutive >= n,
428        Trip::Ratio {
429            ratio, min_calls, ..
430        } => {
431            let total = shared.outcomes.len() as u32;
432            if total < min_calls || total == 0 {
433                return false;
434            }
435            let failures = shared.outcomes.iter().filter(|&&f| f).count() as u32;
436            f64::from(failures) / f64::from(total) >= ratio
437        }
438        Trip::Windowed { failures, period } => {
439            let cutoff =
440                now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
441            let recent = shared
442                .failure_times
443                .iter()
444                .filter(|&&t| t >= cutoff)
445                .count() as u32;
446            recent >= failures
447        }
448    }
449}
450
451/// A reserved permission to make one protected call.
452///
453/// Settle it with [`success`](Self::success) or [`failure`](Self::failure) after
454/// the call returns. If dropped unsettled — an early return, a `?`, or a panic —
455/// it records a **failure**, so the breaker errs toward protecting the
456/// downstream.
457#[must_use = "settle the permit with `.success()` or `.failure()`; dropping it counts as a failure"]
458pub struct Permit<'a, L, C>
459where
460    L: Limiter,
461    C: Clock + Clone,
462{
463    breaker: &'a CircuitBreaker<L, C>,
464    settled: bool,
465}
466
467impl<'a, L, C> Permit<'a, L, C>
468where
469    L: Limiter,
470    C: Clock + Clone,
471{
472    fn new(breaker: &'a CircuitBreaker<L, C>) -> Self {
473        Self {
474            breaker,
475            settled: false,
476        }
477    }
478
479    /// Records that the protected call succeeded.
480    pub fn success(mut self) {
481        self.breaker.record(true);
482        self.settled = true;
483    }
484
485    /// Records that the protected call failed.
486    pub fn failure(mut self) {
487        self.breaker.record(false);
488        self.settled = true;
489    }
490}
491
492impl<L, C> Drop for Permit<'_, L, C>
493where
494    L: Limiter,
495    C: Clock + Clone,
496{
497    fn drop(&mut self) {
498        if !self.settled {
499            self.breaker.record(false);
500        }
501    }
502}
503
504/// Builder for a [`CircuitBreaker`].
505#[derive(Debug, Clone, Copy)]
506pub struct CircuitBreakerBuilder {
507    trip: Trip,
508    cooldown: Duration,
509    half_open_trials: u32,
510    half_open_required: u32,
511}
512
513impl Default for CircuitBreakerBuilder {
514    fn default() -> Self {
515        Self::new()
516    }
517}
518
519impl CircuitBreakerBuilder {
520    /// Creates a builder with the default policy: [`Trip::Consecutive(5)`](Trip::Consecutive),
521    /// a 30-second cooldown, and a single trial that must succeed to close.
522    #[must_use]
523    pub fn new() -> Self {
524        Self {
525            trip: Trip::Consecutive(5),
526            cooldown: Duration::from_secs(30),
527            half_open_trials: 1,
528            half_open_required: 1,
529        }
530    }
531
532    /// Sets the condition under which the breaker trips open.
533    #[must_use]
534    pub fn trip(mut self, trip: Trip) -> Self {
535        self.trip = trip;
536        self
537    }
538
539    /// Sets how long the breaker stays open before admitting a trial request.
540    #[must_use]
541    pub fn cooldown(mut self, cooldown: Duration) -> Self {
542        self.cooldown = cooldown;
543        self
544    }
545
546    /// Sets how many trial requests may run concurrently while half-open, and
547    /// how many must succeed to close. `trials` and `required` are clamped to at
548    /// least one; `required` is clamped to at most `trials`.
549    #[must_use]
550    pub fn half_open(mut self, trials: u32, required: u32) -> Self {
551        self.half_open_trials = trials.max(1);
552        self.half_open_required = required.max(1).min(self.half_open_trials);
553        self
554    }
555
556    /// Wraps `limiter`, producing a breaker driven by the system clock.
557    #[must_use]
558    pub fn build<L>(self, limiter: L) -> CircuitBreaker<L, SystemClock>
559    where
560        L: Limiter,
561    {
562        CircuitBreaker::new(
563            limiter,
564            Config {
565                trip: self.trip,
566                cooldown: self.cooldown,
567                half_open_trials: self.half_open_trials,
568                half_open_required: self.half_open_required,
569            },
570            SystemClock::new(),
571        )
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    #![allow(clippy::unwrap_used, clippy::expect_used)]
578
579    use super::{BreakerState, CircuitBreaker, Trip};
580    use crate::throttle::Throttle;
581    use clock_lib::ManualClock;
582    use core::time::Duration;
583    use std::sync::Arc;
584
585    fn assert_send_sync<T: Send + Sync>() {}
586
587    #[test]
588    fn test_breaker_is_send_sync() {
589        assert_send_sync::<CircuitBreaker<Throttle>>();
590    }
591
592    fn breaker(
593        trip: Trip,
594        cooldown: Duration,
595        clock: Arc<ManualClock>,
596    ) -> CircuitBreaker<Throttle, Arc<ManualClock>> {
597        CircuitBreaker::builder()
598            .trip(trip)
599            .cooldown(cooldown)
600            .half_open(1, 1)
601            .build(Throttle::per_second(1_000_000))
602            .with_clock(clock)
603    }
604
605    #[test]
606    fn test_consecutive_failures_trip_open() {
607        let clock = Arc::new(ManualClock::new());
608        let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
609
610        assert_eq!(cb.state(), BreakerState::Closed);
611        cb.record_failure();
612        cb.record_failure();
613        assert_eq!(cb.state(), BreakerState::Closed);
614        cb.record_failure(); // third in a row
615        assert_eq!(cb.state(), BreakerState::Open);
616    }
617
618    #[test]
619    fn test_success_resets_consecutive_count() {
620        let clock = Arc::new(ManualClock::new());
621        let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
622
623        cb.record_failure();
624        cb.record_failure();
625        cb.record_success(); // resets
626        cb.record_failure();
627        cb.record_failure();
628        assert_eq!(cb.state(), BreakerState::Closed); // only two since reset
629    }
630
631    #[test]
632    fn test_open_sheds_requests_without_touching_limiter() {
633        let clock = Arc::new(ManualClock::new());
634        let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock);
635
636        cb.record_failure(); // trips open
637        assert_eq!(cb.state(), BreakerState::Open);
638
639        let before = cb.inner().available();
640        let result = cb.try_acquire();
641        assert!(matches!(
642            result,
643            Err(crate::ThrottleError::CircuitOpen { .. })
644        ));
645        // The wrapped limiter was not consumed.
646        assert_eq!(cb.inner().available(), before);
647    }
648
649    #[test]
650    fn test_half_open_after_cooldown_then_close_on_success() {
651        let clock = Arc::new(ManualClock::new());
652        let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
653
654        cb.record_failure(); // open
655        assert_eq!(cb.state(), BreakerState::Open);
656
657        clock.advance(Duration::from_secs(10)); // cooldown elapsed
658        let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
659        assert_eq!(cb.state(), BreakerState::HalfOpen);
660        permit.success();
661        assert_eq!(cb.state(), BreakerState::Closed);
662    }
663
664    #[test]
665    fn test_half_open_failure_reopens() {
666        let clock = Arc::new(ManualClock::new());
667        let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
668
669        cb.record_failure(); // open
670        clock.advance(Duration::from_secs(10));
671        let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
672        assert_eq!(cb.state(), BreakerState::HalfOpen);
673        permit.failure(); // trial failed
674        assert_eq!(cb.state(), BreakerState::Open);
675    }
676
677    #[test]
678    fn test_open_rejects_until_cooldown_elapses() {
679        let clock = Arc::new(ManualClock::new());
680        let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
681
682        cb.record_failure(); // open
683        clock.advance(Duration::from_secs(9)); // not yet
684        assert!(matches!(
685            cb.try_acquire(),
686            Err(crate::ThrottleError::CircuitOpen { .. })
687        ));
688        clock.advance(Duration::from_secs(1)); // now cooled down
689        assert!(cb.try_acquire().unwrap().is_some());
690    }
691
692    #[test]
693    fn test_dropping_permit_counts_as_failure() {
694        let clock = Arc::new(ManualClock::new());
695        let cb = breaker(Trip::Consecutive(2), Duration::from_secs(10), clock);
696
697        // Two acquired-but-dropped permits count as two failures and trip it.
698        drop(cb.try_acquire().unwrap());
699        assert_eq!(cb.state(), BreakerState::Closed);
700        drop(cb.try_acquire().unwrap());
701        assert_eq!(cb.state(), BreakerState::Open);
702    }
703
704    #[test]
705    fn test_ratio_trip() {
706        let clock = Arc::new(ManualClock::new());
707        let cb = breaker(
708            Trip::Ratio {
709                window: 10,
710                ratio: 0.5,
711                min_calls: 4,
712            },
713            Duration::from_secs(10),
714            clock,
715        );
716
717        cb.record_success();
718        cb.record_success();
719        assert_eq!(cb.state(), BreakerState::Closed);
720        cb.record_failure();
721        cb.record_failure(); // 2/4 = 0.5 with 4 calls
722        assert_eq!(cb.state(), BreakerState::Open);
723    }
724
725    #[test]
726    fn test_windowed_trip_prunes_old_failures() {
727        let clock = Arc::new(ManualClock::new());
728        let cb = breaker(
729            Trip::Windowed {
730                failures: 3,
731                period: Duration::from_secs(5),
732            },
733            Duration::from_secs(10),
734            clock.clone(),
735        );
736
737        cb.record_failure();
738        clock.advance(Duration::from_secs(6)); // first failure ages out of the window
739        cb.record_failure();
740        cb.record_failure();
741        assert_eq!(cb.state(), BreakerState::Closed); // only 2 within 5s
742        cb.record_failure();
743        assert_eq!(cb.state(), BreakerState::Open); // 3 within 5s
744    }
745}