Skip to main content

throttle_net/
circuit.rs

1//! A circuit breaker that wraps any [`Limiter`] and fails fast when a downstream
2//! is unhealthy.
3//!
4//! A limiter paces requests; a breaker *stops* them. When the protected
5//! downstream produces enough failures, the breaker trips **open** and sheds
6//! requests immediately — without consuming the wrapped limiter's tokens — so a
7//! struggling dependency is given room to recover instead of being hammered.
8//! After a cooldown it goes **half-open**, admitting a few trial requests; if
9//! they succeed it **closes** and normal pacing resumes, otherwise it opens
10//! again.
11//!
12//! Outcomes are reported back through a [`Permit`]: [`acquire`](CircuitBreaker::acquire)
13//! hands you one, and you call [`success`](Permit::success) or
14//! [`failure`](Permit::failure) after the call. Dropping a permit without
15//! settling it counts as a failure, so an early return or panic is treated
16//! conservatively.
17
18use core::time::Duration;
19use std::collections::VecDeque;
20use std::sync::{Mutex, MutexGuard, PoisonError};
21
22use clock_lib::{Clock, Monotonic, SystemClock};
23
24use crate::decision::Decision;
25use crate::error::ThrottleError;
26use crate::limiter::Limiter;
27
28/// The condition under which a closed breaker trips open.
29///
30/// `#[non_exhaustive]`: more trip conditions may be added.
31#[non_exhaustive]
32#[derive(Debug, Clone, Copy, PartialEq)]
33pub enum Trip {
34    /// Trip after this many consecutive failures (a success resets the count).
35    Consecutive(u32),
36    /// Trip when the failure ratio over the last `window` calls reaches `ratio`,
37    /// once at least `min_calls` have been observed.
38    Ratio {
39        /// How many recent calls to consider.
40        window: u32,
41        /// The failure fraction in `[0.0, 1.0]` that trips the breaker.
42        ratio: f64,
43        /// The minimum calls before the ratio is evaluated.
44        min_calls: u32,
45    },
46    /// Trip when at least `failures` failures occur within a rolling `period`.
47    Windowed {
48        /// The failure count that trips the breaker.
49        failures: u32,
50        /// The rolling time window the failures are counted in.
51        period: Duration,
52    },
53}
54
55/// The breaker's current state, as a snapshot.
56///
57/// `#[non_exhaustive]`: matching should include a wildcard.
58#[non_exhaustive]
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum BreakerState {
61    /// Requests flow to the wrapped limiter; failures are being counted.
62    Closed,
63    /// Requests are shed immediately; the downstream is being given time.
64    Open,
65    /// A few trial requests are allowed to test whether the downstream recovered.
66    HalfOpen,
67}
68
69/// The mutable state machine, guarded by a single mutex.
70struct Shared {
71    state: BreakerState,
72    /// Consecutive failures (for [`Trip::Consecutive`]).
73    consecutive: u32,
74    /// Recent outcomes, `true` = failure (for [`Trip::Ratio`]).
75    outcomes: VecDeque<bool>,
76    /// Failure timestamps in milliseconds (for [`Trip::Windowed`]).
77    failure_times: VecDeque<u64>,
78    /// Trial requests in flight while half-open.
79    half_open_inflight: u32,
80    /// Successful trials accumulated while half-open.
81    half_open_successes: u32,
82    /// Milliseconds (store epoch) at which an open breaker may go half-open.
83    open_until_ms: u64,
84}
85
86impl Shared {
87    fn new() -> Self {
88        Self {
89            state: BreakerState::Closed,
90            consecutive: 0,
91            outcomes: VecDeque::new(),
92            failure_times: VecDeque::new(),
93            half_open_inflight: 0,
94            half_open_successes: 0,
95            open_until_ms: 0,
96        }
97    }
98
99    /// Resets the failure bookkeeping when the breaker closes.
100    fn reset_counters(&mut self) {
101        self.consecutive = 0;
102        self.outcomes.clear();
103        self.failure_times.clear();
104        self.half_open_inflight = 0;
105        self.half_open_successes = 0;
106    }
107}
108
109/// Whether the breaker admits a request right now.
110enum Admit {
111    Allow,
112    Reject(Duration),
113}
114
115/// A circuit breaker wrapping a limiter `L`, timed by clock `C`.
116///
117/// Construct one with [`CircuitBreaker::builder`]. Build requires the
118/// `circuit-breaker` feature.
119///
120/// # Examples
121///
122/// ```
123/// # async fn run() {
124/// use std::time::Duration;
125/// use throttle_net::{CircuitBreaker, Throttle, Trip};
126///
127/// let breaker = CircuitBreaker::builder()
128///     .trip(Trip::Consecutive(5))
129///     .cooldown(Duration::from_secs(10))
130///     .build(Throttle::per_second(100));
131///
132/// match breaker.acquire().await {
133///     Ok(permit) => {
134///         // ... call the downstream ...
135///         let ok = true;
136///         if ok { permit.success() } else { permit.failure() }
137///     }
138///     Err(_shed) => {
139///         // breaker open (or limiter exhausted): fail fast
140///     }
141/// }
142/// # }
143/// ```
144pub struct CircuitBreaker<L, C = SystemClock>
145where
146    C: Clock,
147{
148    inner: L,
149    config: Config,
150    shared: Mutex<Shared>,
151    clock: C,
152    epoch: Monotonic,
153}
154
155/// Validated breaker configuration.
156#[derive(Debug, Clone, Copy)]
157struct Config {
158    trip: Trip,
159    cooldown: Duration,
160    half_open_trials: u32,
161    half_open_required: u32,
162}
163
164// Anchored on a concrete, limiter-free type so `CircuitBreaker::builder()` needs
165// no type annotation; the wrapped limiter type is fixed later by
166// [`CircuitBreakerBuilder::build`].
167impl CircuitBreaker<core::convert::Infallible> {
168    /// Starts building a breaker. Defaults: [`Trip::Consecutive(5)`](Trip::Consecutive),
169    /// a 30-second cooldown, and a single trial that must succeed to close.
170    #[must_use]
171    pub fn builder() -> CircuitBreakerBuilder {
172        CircuitBreakerBuilder::new()
173    }
174}
175
176impl<L, C> CircuitBreaker<L, C>
177where
178    L: Limiter,
179    C: Clock + Clone,
180{
181    fn new(inner: L, config: Config, clock: C) -> Self {
182        let epoch = clock.now();
183        Self {
184            inner,
185            config,
186            shared: Mutex::new(Shared::new()),
187            clock,
188            epoch,
189        }
190    }
191
192    /// Replaces the time source (the cooldown clock), for deterministic tests.
193    /// The breaker is reset to closed around the new clock.
194    #[must_use]
195    pub fn with_clock<C2>(self, clock: C2) -> CircuitBreaker<L, C2>
196    where
197        C2: Clock + Clone,
198    {
199        CircuitBreaker::new(self.inner, self.config, clock)
200    }
201
202    /// The current state (a momentary snapshot).
203    #[must_use]
204    pub fn state(&self) -> BreakerState {
205        self.lock().state
206    }
207
208    /// A shared reference to the wrapped limiter.
209    pub fn inner(&self) -> &L {
210        &self.inner
211    }
212
213    #[inline]
214    fn lock(&self) -> MutexGuard<'_, Shared> {
215        self.shared.lock().unwrap_or_else(PoisonError::into_inner)
216    }
217
218    #[inline]
219    fn now_ms(&self) -> u64 {
220        let elapsed = self.clock.now().saturating_duration_since(self.epoch);
221        u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX)
222    }
223
224    /// Decides admission and performs any state transition the clock has earned
225    /// (open → half-open). Reserves a half-open trial slot when it admits one.
226    fn admit(&self, now_ms: u64) -> Admit {
227        let mut shared = self.lock();
228        match shared.state {
229            BreakerState::Closed => Admit::Allow,
230            BreakerState::Open => {
231                if now_ms >= shared.open_until_ms {
232                    shared.state = BreakerState::HalfOpen;
233                    shared.half_open_inflight = 1;
234                    shared.half_open_successes = 0;
235                    Admit::Allow
236                } else {
237                    Admit::Reject(Duration::from_millis(shared.open_until_ms - now_ms))
238                }
239            }
240            BreakerState::HalfOpen => {
241                if shared.half_open_inflight < self.config.half_open_trials {
242                    shared.half_open_inflight += 1;
243                    Admit::Allow
244                } else {
245                    // Trials are already in flight; shed extra probes.
246                    Admit::Reject(Duration::ZERO)
247                }
248            }
249        }
250    }
251
252    /// Releases a reserved half-open slot when an admitted request never reaches
253    /// the downstream (e.g. the wrapped limiter says the cost is impossible).
254    fn abort(&self) {
255        let mut shared = self.lock();
256        if shared.state == BreakerState::HalfOpen {
257            shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
258        }
259    }
260
261    /// Records the outcome of a settled request and transitions as needed.
262    fn record(&self, success: bool) {
263        let now_ms = self.now_ms();
264        let mut shared = self.lock();
265        match shared.state {
266            BreakerState::HalfOpen => {
267                shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
268                if success {
269                    shared.half_open_successes += 1;
270                    if shared.half_open_successes >= self.config.half_open_required {
271                        shared.state = BreakerState::Closed;
272                        shared.reset_counters();
273                    }
274                } else {
275                    self.open(&mut shared, now_ms);
276                }
277            }
278            BreakerState::Closed => {
279                if success {
280                    shared.consecutive = 0;
281                    record_outcome(&mut shared, false, now_ms, self.config.trip);
282                } else {
283                    shared.consecutive += 1;
284                    record_outcome(&mut shared, true, now_ms, self.config.trip);
285                    if tripped(&shared, now_ms, self.config.trip) {
286                        self.open(&mut shared, now_ms);
287                    }
288                }
289            }
290            // A record while fully open is unusual (nothing was admitted); ignore.
291            BreakerState::Open => {}
292        }
293    }
294
295    /// Moves the breaker to open and arms the cooldown.
296    fn open(&self, shared: &mut Shared, now_ms: u64) {
297        shared.state = BreakerState::Open;
298        shared.open_until_ms = now_ms
299            .saturating_add(u64::try_from(self.config.cooldown.as_millis()).unwrap_or(u64::MAX));
300        shared.half_open_inflight = 0;
301        shared.half_open_successes = 0;
302    }
303
304    /// Reports a successful protected call. Prefer settling a [`Permit`].
305    pub fn record_success(&self) {
306        self.record(true);
307    }
308
309    /// Reports a failed protected call. Prefer settling a [`Permit`].
310    pub fn record_failure(&self) {
311        self.record(false);
312    }
313
314    /// Attempts to admit a request without waiting, returning a [`Permit`] on
315    /// success.
316    ///
317    /// # Errors
318    ///
319    /// - [`ThrottleError::CircuitOpen`] when the breaker is open (or its
320    ///   half-open trials are full): the request is shed without touching the
321    ///   wrapped limiter.
322    /// - [`ThrottleError::CostExceedsCapacity`] when the wrapped limiter can
323    ///   never grant a single unit.
324    ///
325    /// Returns `Ok(None)` when the breaker would admit but the wrapped limiter
326    /// has no token available right now (normal rate-limiting, not a breaker
327    /// fault).
328    pub fn try_acquire(&self) -> Result<Option<Permit<'_, L, C>>, ThrottleError> {
329        let now_ms = self.now_ms();
330        match self.admit(now_ms) {
331            Admit::Reject(retry_after) => Err(ThrottleError::CircuitOpen { retry_after }),
332            Admit::Allow => match self.inner.acquire_cost(1) {
333                Decision::Acquired => Ok(Some(Permit::new(self))),
334                Decision::Retry { .. } => {
335                    self.abort();
336                    Ok(None)
337                }
338                Decision::Impossible => {
339                    self.abort();
340                    Err(ThrottleError::CostExceedsCapacity {
341                        cost: 1,
342                        capacity: self.inner.capacity(),
343                    })
344                }
345            },
346        }
347    }
348}
349
350#[cfg(feature = "tokio")]
351#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
352impl<L, C> CircuitBreaker<L, C>
353where
354    L: Limiter,
355    C: Clock + Clone,
356{
357    /// Admits a request, failing fast if the breaker is open and otherwise
358    /// pacing on the wrapped limiter until a token is free.
359    ///
360    /// A circuit-open condition returns immediately (load shedding); a plain
361    /// rate-limit waits. Returns a [`Permit`] to settle with the call's outcome.
362    ///
363    /// # Errors
364    ///
365    /// - [`ThrottleError::CircuitOpen`] when the breaker is open or its trials
366    ///   are full — returned without waiting.
367    /// - [`ThrottleError::CostExceedsCapacity`] when the wrapped limiter can
368    ///   never grant the request.
369    pub async fn acquire(&self) -> Result<Permit<'_, L, C>, ThrottleError> {
370        // Breaker admission is checked once, up front: an open breaker fails fast
371        // rather than waiting. The reserved slot (if half-open) is held across the
372        // rate-limit wait and released by the permit or on an impossible cost.
373        match self.admit(self.now_ms()) {
374            Admit::Reject(retry_after) => return Err(ThrottleError::CircuitOpen { retry_after }),
375            Admit::Allow => {}
376        }
377        loop {
378            match self.inner.acquire_cost(1) {
379                Decision::Acquired => return Ok(Permit::new(self)),
380                Decision::Retry { after } => tokio::time::sleep(after).await,
381                Decision::Impossible => {
382                    self.abort();
383                    return Err(ThrottleError::CostExceedsCapacity {
384                        cost: 1,
385                        capacity: self.inner.capacity(),
386                    });
387                }
388            }
389        }
390    }
391}
392
393/// Appends an outcome to the structures the configured [`Trip`] needs.
394fn record_outcome(shared: &mut Shared, failure: bool, now_ms: u64, trip: Trip) {
395    match trip {
396        Trip::Consecutive(_) => {}
397        Trip::Ratio { window, .. } => {
398            shared.outcomes.push_back(failure);
399            while shared.outcomes.len() > window as usize {
400                let _ = shared.outcomes.pop_front();
401            }
402        }
403        Trip::Windowed { period, .. } => {
404            if failure {
405                shared.failure_times.push_back(now_ms);
406            }
407            let cutoff =
408                now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
409            while shared.failure_times.front().is_some_and(|&t| t < cutoff) {
410                let _ = shared.failure_times.pop_front();
411            }
412        }
413    }
414}
415
416/// Whether the closed breaker's failure state has reached its trip condition.
417fn tripped(shared: &Shared, now_ms: u64, trip: Trip) -> bool {
418    match trip {
419        Trip::Consecutive(n) => shared.consecutive >= n,
420        Trip::Ratio {
421            ratio, min_calls, ..
422        } => {
423            let total = shared.outcomes.len() as u32;
424            if total < min_calls || total == 0 {
425                return false;
426            }
427            let failures = shared.outcomes.iter().filter(|&&f| f).count() as u32;
428            f64::from(failures) / f64::from(total) >= ratio
429        }
430        Trip::Windowed { failures, period } => {
431            let cutoff =
432                now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
433            let recent = shared
434                .failure_times
435                .iter()
436                .filter(|&&t| t >= cutoff)
437                .count() as u32;
438            recent >= failures
439        }
440    }
441}
442
443/// A reserved permission to make one protected call.
444///
445/// Settle it with [`success`](Self::success) or [`failure`](Self::failure) after
446/// the call returns. If dropped unsettled — an early return, a `?`, or a panic —
447/// it records a **failure**, so the breaker errs toward protecting the
448/// downstream.
449#[must_use = "settle the permit with `.success()` or `.failure()`; dropping it counts as a failure"]
450pub struct Permit<'a, L, C>
451where
452    L: Limiter,
453    C: Clock + Clone,
454{
455    breaker: &'a CircuitBreaker<L, C>,
456    settled: bool,
457}
458
459impl<'a, L, C> Permit<'a, L, C>
460where
461    L: Limiter,
462    C: Clock + Clone,
463{
464    fn new(breaker: &'a CircuitBreaker<L, C>) -> Self {
465        Self {
466            breaker,
467            settled: false,
468        }
469    }
470
471    /// Records that the protected call succeeded.
472    pub fn success(mut self) {
473        self.breaker.record(true);
474        self.settled = true;
475    }
476
477    /// Records that the protected call failed.
478    pub fn failure(mut self) {
479        self.breaker.record(false);
480        self.settled = true;
481    }
482}
483
484impl<L, C> Drop for Permit<'_, L, C>
485where
486    L: Limiter,
487    C: Clock + Clone,
488{
489    fn drop(&mut self) {
490        if !self.settled {
491            self.breaker.record(false);
492        }
493    }
494}
495
496/// Builder for a [`CircuitBreaker`].
497#[derive(Debug, Clone, Copy)]
498pub struct CircuitBreakerBuilder {
499    trip: Trip,
500    cooldown: Duration,
501    half_open_trials: u32,
502    half_open_required: u32,
503}
504
505impl Default for CircuitBreakerBuilder {
506    fn default() -> Self {
507        Self::new()
508    }
509}
510
511impl CircuitBreakerBuilder {
512    /// Creates a builder with the default policy: [`Trip::Consecutive(5)`](Trip::Consecutive),
513    /// a 30-second cooldown, and a single trial that must succeed to close.
514    #[must_use]
515    pub fn new() -> Self {
516        Self {
517            trip: Trip::Consecutive(5),
518            cooldown: Duration::from_secs(30),
519            half_open_trials: 1,
520            half_open_required: 1,
521        }
522    }
523
524    /// Sets the condition under which the breaker trips open.
525    #[must_use]
526    pub fn trip(mut self, trip: Trip) -> Self {
527        self.trip = trip;
528        self
529    }
530
531    /// Sets how long the breaker stays open before admitting a trial request.
532    #[must_use]
533    pub fn cooldown(mut self, cooldown: Duration) -> Self {
534        self.cooldown = cooldown;
535        self
536    }
537
538    /// Sets how many trial requests may run concurrently while half-open, and
539    /// how many must succeed to close. `trials` and `required` are clamped to at
540    /// least one; `required` is clamped to at most `trials`.
541    #[must_use]
542    pub fn half_open(mut self, trials: u32, required: u32) -> Self {
543        self.half_open_trials = trials.max(1);
544        self.half_open_required = required.max(1).min(self.half_open_trials);
545        self
546    }
547
548    /// Wraps `limiter`, producing a breaker driven by the system clock.
549    #[must_use]
550    pub fn build<L>(self, limiter: L) -> CircuitBreaker<L, SystemClock>
551    where
552        L: Limiter,
553    {
554        CircuitBreaker::new(
555            limiter,
556            Config {
557                trip: self.trip,
558                cooldown: self.cooldown,
559                half_open_trials: self.half_open_trials,
560                half_open_required: self.half_open_required,
561            },
562            SystemClock::new(),
563        )
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    #![allow(clippy::unwrap_used, clippy::expect_used)]
570
571    use super::{BreakerState, CircuitBreaker, Trip};
572    use crate::throttle::Throttle;
573    use clock_lib::ManualClock;
574    use core::time::Duration;
575    use std::sync::Arc;
576
577    fn assert_send_sync<T: Send + Sync>() {}
578
579    #[test]
580    fn test_breaker_is_send_sync() {
581        assert_send_sync::<CircuitBreaker<Throttle>>();
582    }
583
584    fn breaker(
585        trip: Trip,
586        cooldown: Duration,
587        clock: Arc<ManualClock>,
588    ) -> CircuitBreaker<Throttle, Arc<ManualClock>> {
589        CircuitBreaker::builder()
590            .trip(trip)
591            .cooldown(cooldown)
592            .half_open(1, 1)
593            .build(Throttle::per_second(1_000_000))
594            .with_clock(clock)
595    }
596
597    #[test]
598    fn test_consecutive_failures_trip_open() {
599        let clock = Arc::new(ManualClock::new());
600        let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
601
602        assert_eq!(cb.state(), BreakerState::Closed);
603        cb.record_failure();
604        cb.record_failure();
605        assert_eq!(cb.state(), BreakerState::Closed);
606        cb.record_failure(); // third in a row
607        assert_eq!(cb.state(), BreakerState::Open);
608    }
609
610    #[test]
611    fn test_success_resets_consecutive_count() {
612        let clock = Arc::new(ManualClock::new());
613        let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
614
615        cb.record_failure();
616        cb.record_failure();
617        cb.record_success(); // resets
618        cb.record_failure();
619        cb.record_failure();
620        assert_eq!(cb.state(), BreakerState::Closed); // only two since reset
621    }
622
623    #[test]
624    fn test_open_sheds_requests_without_touching_limiter() {
625        let clock = Arc::new(ManualClock::new());
626        let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock);
627
628        cb.record_failure(); // trips open
629        assert_eq!(cb.state(), BreakerState::Open);
630
631        let before = cb.inner().available();
632        let result = cb.try_acquire();
633        assert!(matches!(
634            result,
635            Err(crate::ThrottleError::CircuitOpen { .. })
636        ));
637        // The wrapped limiter was not consumed.
638        assert_eq!(cb.inner().available(), before);
639    }
640
641    #[test]
642    fn test_half_open_after_cooldown_then_close_on_success() {
643        let clock = Arc::new(ManualClock::new());
644        let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
645
646        cb.record_failure(); // open
647        assert_eq!(cb.state(), BreakerState::Open);
648
649        clock.advance(Duration::from_secs(10)); // cooldown elapsed
650        let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
651        assert_eq!(cb.state(), BreakerState::HalfOpen);
652        permit.success();
653        assert_eq!(cb.state(), BreakerState::Closed);
654    }
655
656    #[test]
657    fn test_half_open_failure_reopens() {
658        let clock = Arc::new(ManualClock::new());
659        let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
660
661        cb.record_failure(); // open
662        clock.advance(Duration::from_secs(10));
663        let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
664        assert_eq!(cb.state(), BreakerState::HalfOpen);
665        permit.failure(); // trial failed
666        assert_eq!(cb.state(), BreakerState::Open);
667    }
668
669    #[test]
670    fn test_open_rejects_until_cooldown_elapses() {
671        let clock = Arc::new(ManualClock::new());
672        let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
673
674        cb.record_failure(); // open
675        clock.advance(Duration::from_secs(9)); // not yet
676        assert!(matches!(
677            cb.try_acquire(),
678            Err(crate::ThrottleError::CircuitOpen { .. })
679        ));
680        clock.advance(Duration::from_secs(1)); // now cooled down
681        assert!(cb.try_acquire().unwrap().is_some());
682    }
683
684    #[test]
685    fn test_dropping_permit_counts_as_failure() {
686        let clock = Arc::new(ManualClock::new());
687        let cb = breaker(Trip::Consecutive(2), Duration::from_secs(10), clock);
688
689        // Two acquired-but-dropped permits count as two failures and trip it.
690        drop(cb.try_acquire().unwrap());
691        assert_eq!(cb.state(), BreakerState::Closed);
692        drop(cb.try_acquire().unwrap());
693        assert_eq!(cb.state(), BreakerState::Open);
694    }
695
696    #[test]
697    fn test_ratio_trip() {
698        let clock = Arc::new(ManualClock::new());
699        let cb = breaker(
700            Trip::Ratio {
701                window: 10,
702                ratio: 0.5,
703                min_calls: 4,
704            },
705            Duration::from_secs(10),
706            clock,
707        );
708
709        cb.record_success();
710        cb.record_success();
711        assert_eq!(cb.state(), BreakerState::Closed);
712        cb.record_failure();
713        cb.record_failure(); // 2/4 = 0.5 with 4 calls
714        assert_eq!(cb.state(), BreakerState::Open);
715    }
716
717    #[test]
718    fn test_windowed_trip_prunes_old_failures() {
719        let clock = Arc::new(ManualClock::new());
720        let cb = breaker(
721            Trip::Windowed {
722                failures: 3,
723                period: Duration::from_secs(5),
724            },
725            Duration::from_secs(10),
726            clock.clone(),
727        );
728
729        cb.record_failure();
730        clock.advance(Duration::from_secs(6)); // first failure ages out of the window
731        cb.record_failure();
732        cb.record_failure();
733        assert_eq!(cb.state(), BreakerState::Closed); // only 2 within 5s
734        cb.record_failure();
735        assert_eq!(cb.state(), BreakerState::Open); // 3 within 5s
736    }
737}