1use core::time::Duration;
19use std::collections::VecDeque;
20use std::sync::{Mutex, MutexGuard, PoisonError};
21
22use clock_lib::{Clock, Monotonic, SystemClock};
23
24use crate::decision::Decision;
25use crate::error::ThrottleError;
26use crate::limiter::Limiter;
27
28#[non_exhaustive]
32#[derive(Debug, Clone, Copy, PartialEq)]
33pub enum Trip {
34 Consecutive(u32),
36 Ratio {
39 window: u32,
41 ratio: f64,
43 min_calls: u32,
45 },
46 Windowed {
48 failures: u32,
50 period: Duration,
52 },
53}
54
55#[non_exhaustive]
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum BreakerState {
61 Closed,
63 Open,
65 HalfOpen,
67}
68
69struct Shared {
71 state: BreakerState,
72 consecutive: u32,
74 outcomes: VecDeque<bool>,
76 failure_times: VecDeque<u64>,
78 half_open_inflight: u32,
80 half_open_successes: u32,
82 open_until_ms: u64,
84}
85
86impl Shared {
87 fn new() -> Self {
88 Self {
89 state: BreakerState::Closed,
90 consecutive: 0,
91 outcomes: VecDeque::new(),
92 failure_times: VecDeque::new(),
93 half_open_inflight: 0,
94 half_open_successes: 0,
95 open_until_ms: 0,
96 }
97 }
98
99 fn reset_counters(&mut self) {
101 self.consecutive = 0;
102 self.outcomes.clear();
103 self.failure_times.clear();
104 self.half_open_inflight = 0;
105 self.half_open_successes = 0;
106 }
107}
108
109enum Admit {
111 Allow,
112 Reject(Duration),
113}
114
115pub struct CircuitBreaker<L, C = SystemClock>
145where
146 C: Clock,
147{
148 inner: L,
149 config: Config,
150 shared: Mutex<Shared>,
151 clock: C,
152 epoch: Monotonic,
153}
154
155#[derive(Debug, Clone, Copy)]
157struct Config {
158 trip: Trip,
159 cooldown: Duration,
160 half_open_trials: u32,
161 half_open_required: u32,
162}
163
164impl CircuitBreaker<core::convert::Infallible> {
168 #[must_use]
171 pub fn builder() -> CircuitBreakerBuilder {
172 CircuitBreakerBuilder::new()
173 }
174}
175
176impl<L, C> CircuitBreaker<L, C>
177where
178 L: Limiter,
179 C: Clock + Clone,
180{
181 fn new(inner: L, config: Config, clock: C) -> Self {
182 let epoch = clock.now();
183 Self {
184 inner,
185 config,
186 shared: Mutex::new(Shared::new()),
187 clock,
188 epoch,
189 }
190 }
191
192 #[must_use]
195 pub fn with_clock<C2>(self, clock: C2) -> CircuitBreaker<L, C2>
196 where
197 C2: Clock + Clone,
198 {
199 CircuitBreaker::new(self.inner, self.config, clock)
200 }
201
202 #[must_use]
204 pub fn state(&self) -> BreakerState {
205 self.lock().state
206 }
207
208 pub fn inner(&self) -> &L {
210 &self.inner
211 }
212
213 #[inline]
214 fn lock(&self) -> MutexGuard<'_, Shared> {
215 self.shared.lock().unwrap_or_else(PoisonError::into_inner)
216 }
217
218 #[inline]
219 fn now_ms(&self) -> u64 {
220 let elapsed = self.clock.now().saturating_duration_since(self.epoch);
221 u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX)
222 }
223
224 fn admit(&self, now_ms: u64) -> Admit {
227 let mut shared = self.lock();
228 match shared.state {
229 BreakerState::Closed => Admit::Allow,
230 BreakerState::Open => {
231 if now_ms >= shared.open_until_ms {
232 shared.state = BreakerState::HalfOpen;
233 shared.half_open_inflight = 1;
234 shared.half_open_successes = 0;
235 Admit::Allow
236 } else {
237 Admit::Reject(Duration::from_millis(shared.open_until_ms - now_ms))
238 }
239 }
240 BreakerState::HalfOpen => {
241 if shared.half_open_inflight < self.config.half_open_trials {
242 shared.half_open_inflight += 1;
243 Admit::Allow
244 } else {
245 Admit::Reject(Duration::ZERO)
247 }
248 }
249 }
250 }
251
252 fn abort(&self) {
255 let mut shared = self.lock();
256 if shared.state == BreakerState::HalfOpen {
257 shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
258 }
259 }
260
261 fn record(&self, success: bool) {
263 let now_ms = self.now_ms();
264 let mut shared = self.lock();
265 match shared.state {
266 BreakerState::HalfOpen => {
267 shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
268 if success {
269 shared.half_open_successes += 1;
270 if shared.half_open_successes >= self.config.half_open_required {
271 shared.state = BreakerState::Closed;
272 shared.reset_counters();
273 }
274 } else {
275 self.open(&mut shared, now_ms);
276 }
277 }
278 BreakerState::Closed => {
279 if success {
280 shared.consecutive = 0;
281 record_outcome(&mut shared, false, now_ms, self.config.trip);
282 } else {
283 shared.consecutive += 1;
284 record_outcome(&mut shared, true, now_ms, self.config.trip);
285 if tripped(&shared, now_ms, self.config.trip) {
286 self.open(&mut shared, now_ms);
287 }
288 }
289 }
290 BreakerState::Open => {}
292 }
293 }
294
295 fn open(&self, shared: &mut Shared, now_ms: u64) {
297 shared.state = BreakerState::Open;
298 shared.open_until_ms = now_ms
299 .saturating_add(u64::try_from(self.config.cooldown.as_millis()).unwrap_or(u64::MAX));
300 shared.half_open_inflight = 0;
301 shared.half_open_successes = 0;
302 }
303
304 pub fn record_success(&self) {
306 self.record(true);
307 }
308
309 pub fn record_failure(&self) {
311 self.record(false);
312 }
313
314 pub fn try_acquire(&self) -> Result<Option<Permit<'_, L, C>>, ThrottleError> {
329 let now_ms = self.now_ms();
330 match self.admit(now_ms) {
331 Admit::Reject(retry_after) => Err(ThrottleError::CircuitOpen { retry_after }),
332 Admit::Allow => match self.inner.acquire_cost(1) {
333 Decision::Acquired => Ok(Some(Permit::new(self))),
334 Decision::Retry { .. } => {
335 self.abort();
336 Ok(None)
337 }
338 Decision::Impossible => {
339 self.abort();
340 Err(ThrottleError::CostExceedsCapacity {
341 cost: 1,
342 capacity: self.inner.capacity(),
343 })
344 }
345 },
346 }
347 }
348}
349
350#[cfg(feature = "tokio")]
351#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
352impl<L, C> CircuitBreaker<L, C>
353where
354 L: Limiter,
355 C: Clock + Clone,
356{
357 pub async fn acquire(&self) -> Result<Permit<'_, L, C>, ThrottleError> {
370 match self.admit(self.now_ms()) {
374 Admit::Reject(retry_after) => return Err(ThrottleError::CircuitOpen { retry_after }),
375 Admit::Allow => {}
376 }
377 loop {
378 match self.inner.acquire_cost(1) {
379 Decision::Acquired => return Ok(Permit::new(self)),
380 Decision::Retry { after } => tokio::time::sleep(after).await,
381 Decision::Impossible => {
382 self.abort();
383 return Err(ThrottleError::CostExceedsCapacity {
384 cost: 1,
385 capacity: self.inner.capacity(),
386 });
387 }
388 }
389 }
390 }
391}
392
393fn record_outcome(shared: &mut Shared, failure: bool, now_ms: u64, trip: Trip) {
395 match trip {
396 Trip::Consecutive(_) => {}
397 Trip::Ratio { window, .. } => {
398 shared.outcomes.push_back(failure);
399 while shared.outcomes.len() > window as usize {
400 let _ = shared.outcomes.pop_front();
401 }
402 }
403 Trip::Windowed { period, .. } => {
404 if failure {
405 shared.failure_times.push_back(now_ms);
406 }
407 let cutoff =
408 now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
409 while shared.failure_times.front().is_some_and(|&t| t < cutoff) {
410 let _ = shared.failure_times.pop_front();
411 }
412 }
413 }
414}
415
416fn tripped(shared: &Shared, now_ms: u64, trip: Trip) -> bool {
418 match trip {
419 Trip::Consecutive(n) => shared.consecutive >= n,
420 Trip::Ratio {
421 ratio, min_calls, ..
422 } => {
423 let total = shared.outcomes.len() as u32;
424 if total < min_calls || total == 0 {
425 return false;
426 }
427 let failures = shared.outcomes.iter().filter(|&&f| f).count() as u32;
428 f64::from(failures) / f64::from(total) >= ratio
429 }
430 Trip::Windowed { failures, period } => {
431 let cutoff =
432 now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
433 let recent = shared
434 .failure_times
435 .iter()
436 .filter(|&&t| t >= cutoff)
437 .count() as u32;
438 recent >= failures
439 }
440 }
441}
442
443#[must_use = "settle the permit with `.success()` or `.failure()`; dropping it counts as a failure"]
450pub struct Permit<'a, L, C>
451where
452 L: Limiter,
453 C: Clock + Clone,
454{
455 breaker: &'a CircuitBreaker<L, C>,
456 settled: bool,
457}
458
459impl<'a, L, C> Permit<'a, L, C>
460where
461 L: Limiter,
462 C: Clock + Clone,
463{
464 fn new(breaker: &'a CircuitBreaker<L, C>) -> Self {
465 Self {
466 breaker,
467 settled: false,
468 }
469 }
470
471 pub fn success(mut self) {
473 self.breaker.record(true);
474 self.settled = true;
475 }
476
477 pub fn failure(mut self) {
479 self.breaker.record(false);
480 self.settled = true;
481 }
482}
483
484impl<L, C> Drop for Permit<'_, L, C>
485where
486 L: Limiter,
487 C: Clock + Clone,
488{
489 fn drop(&mut self) {
490 if !self.settled {
491 self.breaker.record(false);
492 }
493 }
494}
495
496#[derive(Debug, Clone, Copy)]
498pub struct CircuitBreakerBuilder {
499 trip: Trip,
500 cooldown: Duration,
501 half_open_trials: u32,
502 half_open_required: u32,
503}
504
505impl Default for CircuitBreakerBuilder {
506 fn default() -> Self {
507 Self::new()
508 }
509}
510
511impl CircuitBreakerBuilder {
512 #[must_use]
515 pub fn new() -> Self {
516 Self {
517 trip: Trip::Consecutive(5),
518 cooldown: Duration::from_secs(30),
519 half_open_trials: 1,
520 half_open_required: 1,
521 }
522 }
523
524 #[must_use]
526 pub fn trip(mut self, trip: Trip) -> Self {
527 self.trip = trip;
528 self
529 }
530
531 #[must_use]
533 pub fn cooldown(mut self, cooldown: Duration) -> Self {
534 self.cooldown = cooldown;
535 self
536 }
537
538 #[must_use]
542 pub fn half_open(mut self, trials: u32, required: u32) -> Self {
543 self.half_open_trials = trials.max(1);
544 self.half_open_required = required.max(1).min(self.half_open_trials);
545 self
546 }
547
548 #[must_use]
550 pub fn build<L>(self, limiter: L) -> CircuitBreaker<L, SystemClock>
551 where
552 L: Limiter,
553 {
554 CircuitBreaker::new(
555 limiter,
556 Config {
557 trip: self.trip,
558 cooldown: self.cooldown,
559 half_open_trials: self.half_open_trials,
560 half_open_required: self.half_open_required,
561 },
562 SystemClock::new(),
563 )
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 #![allow(clippy::unwrap_used, clippy::expect_used)]
570
571 use super::{BreakerState, CircuitBreaker, Trip};
572 use crate::throttle::Throttle;
573 use clock_lib::ManualClock;
574 use core::time::Duration;
575 use std::sync::Arc;
576
577 fn assert_send_sync<T: Send + Sync>() {}
578
579 #[test]
580 fn test_breaker_is_send_sync() {
581 assert_send_sync::<CircuitBreaker<Throttle>>();
582 }
583
584 fn breaker(
585 trip: Trip,
586 cooldown: Duration,
587 clock: Arc<ManualClock>,
588 ) -> CircuitBreaker<Throttle, Arc<ManualClock>> {
589 CircuitBreaker::builder()
590 .trip(trip)
591 .cooldown(cooldown)
592 .half_open(1, 1)
593 .build(Throttle::per_second(1_000_000))
594 .with_clock(clock)
595 }
596
597 #[test]
598 fn test_consecutive_failures_trip_open() {
599 let clock = Arc::new(ManualClock::new());
600 let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
601
602 assert_eq!(cb.state(), BreakerState::Closed);
603 cb.record_failure();
604 cb.record_failure();
605 assert_eq!(cb.state(), BreakerState::Closed);
606 cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
608 }
609
610 #[test]
611 fn test_success_resets_consecutive_count() {
612 let clock = Arc::new(ManualClock::new());
613 let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
614
615 cb.record_failure();
616 cb.record_failure();
617 cb.record_success(); cb.record_failure();
619 cb.record_failure();
620 assert_eq!(cb.state(), BreakerState::Closed); }
622
623 #[test]
624 fn test_open_sheds_requests_without_touching_limiter() {
625 let clock = Arc::new(ManualClock::new());
626 let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock);
627
628 cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
630
631 let before = cb.inner().available();
632 let result = cb.try_acquire();
633 assert!(matches!(
634 result,
635 Err(crate::ThrottleError::CircuitOpen { .. })
636 ));
637 assert_eq!(cb.inner().available(), before);
639 }
640
641 #[test]
642 fn test_half_open_after_cooldown_then_close_on_success() {
643 let clock = Arc::new(ManualClock::new());
644 let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
645
646 cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
648
649 clock.advance(Duration::from_secs(10)); let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
651 assert_eq!(cb.state(), BreakerState::HalfOpen);
652 permit.success();
653 assert_eq!(cb.state(), BreakerState::Closed);
654 }
655
656 #[test]
657 fn test_half_open_failure_reopens() {
658 let clock = Arc::new(ManualClock::new());
659 let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
660
661 cb.record_failure(); clock.advance(Duration::from_secs(10));
663 let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
664 assert_eq!(cb.state(), BreakerState::HalfOpen);
665 permit.failure(); assert_eq!(cb.state(), BreakerState::Open);
667 }
668
669 #[test]
670 fn test_open_rejects_until_cooldown_elapses() {
671 let clock = Arc::new(ManualClock::new());
672 let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
673
674 cb.record_failure(); clock.advance(Duration::from_secs(9)); assert!(matches!(
677 cb.try_acquire(),
678 Err(crate::ThrottleError::CircuitOpen { .. })
679 ));
680 clock.advance(Duration::from_secs(1)); assert!(cb.try_acquire().unwrap().is_some());
682 }
683
684 #[test]
685 fn test_dropping_permit_counts_as_failure() {
686 let clock = Arc::new(ManualClock::new());
687 let cb = breaker(Trip::Consecutive(2), Duration::from_secs(10), clock);
688
689 drop(cb.try_acquire().unwrap());
691 assert_eq!(cb.state(), BreakerState::Closed);
692 drop(cb.try_acquire().unwrap());
693 assert_eq!(cb.state(), BreakerState::Open);
694 }
695
696 #[test]
697 fn test_ratio_trip() {
698 let clock = Arc::new(ManualClock::new());
699 let cb = breaker(
700 Trip::Ratio {
701 window: 10,
702 ratio: 0.5,
703 min_calls: 4,
704 },
705 Duration::from_secs(10),
706 clock,
707 );
708
709 cb.record_success();
710 cb.record_success();
711 assert_eq!(cb.state(), BreakerState::Closed);
712 cb.record_failure();
713 cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
715 }
716
717 #[test]
718 fn test_windowed_trip_prunes_old_failures() {
719 let clock = Arc::new(ManualClock::new());
720 let cb = breaker(
721 Trip::Windowed {
722 failures: 3,
723 period: Duration::from_secs(5),
724 },
725 Duration::from_secs(10),
726 clock.clone(),
727 );
728
729 cb.record_failure();
730 clock.advance(Duration::from_secs(6)); cb.record_failure();
732 cb.record_failure();
733 assert_eq!(cb.state(), BreakerState::Closed); cb.record_failure();
735 assert_eq!(cb.state(), BreakerState::Open); }
737}