1use core::time::Duration;
19use std::collections::VecDeque;
20use std::sync::{Mutex, MutexGuard, PoisonError};
21
22use clock_lib::{Clock, Monotonic, SystemClock};
23
24use crate::decision::Decision;
25use crate::error::ThrottleError;
26use crate::limiter::Limiter;
27
28#[non_exhaustive]
32#[derive(Debug, Clone, Copy, PartialEq)]
33pub enum Trip {
34 Consecutive(u32),
36 Ratio {
39 window: u32,
41 ratio: f64,
43 min_calls: u32,
45 },
46 Windowed {
48 failures: u32,
50 period: Duration,
52 },
53}
54
55#[non_exhaustive]
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum BreakerState {
61 Closed,
63 Open,
65 HalfOpen,
67}
68
69struct Shared {
71 state: BreakerState,
72 consecutive: u32,
74 outcomes: VecDeque<bool>,
76 failure_times: VecDeque<u64>,
78 half_open_inflight: u32,
80 half_open_successes: u32,
82 open_until_ms: u64,
84}
85
86impl Shared {
87 fn new() -> Self {
88 Self {
89 state: BreakerState::Closed,
90 consecutive: 0,
91 outcomes: VecDeque::new(),
92 failure_times: VecDeque::new(),
93 half_open_inflight: 0,
94 half_open_successes: 0,
95 open_until_ms: 0,
96 }
97 }
98
99 fn reset_counters(&mut self) {
101 self.consecutive = 0;
102 self.outcomes.clear();
103 self.failure_times.clear();
104 self.half_open_inflight = 0;
105 self.half_open_successes = 0;
106 }
107}
108
109enum Admit {
111 Allow,
112 Reject(Duration),
113}
114
115pub struct CircuitBreaker<L, C = SystemClock>
145where
146 C: Clock,
147{
148 inner: L,
149 config: Config,
150 shared: Mutex<Shared>,
151 clock: C,
152 epoch: Monotonic,
153}
154
155#[derive(Debug, Clone, Copy)]
157struct Config {
158 trip: Trip,
159 cooldown: Duration,
160 half_open_trials: u32,
161 half_open_required: u32,
162}
163
164impl CircuitBreaker<core::convert::Infallible> {
168 #[must_use]
171 pub fn builder() -> CircuitBreakerBuilder {
172 CircuitBreakerBuilder::new()
173 }
174}
175
176impl<L, C> CircuitBreaker<L, C>
177where
178 L: Limiter,
179 C: Clock + Clone,
180{
181 fn new(inner: L, config: Config, clock: C) -> Self {
182 let epoch = clock.now();
183 Self {
184 inner,
185 config,
186 shared: Mutex::new(Shared::new()),
187 clock,
188 epoch,
189 }
190 }
191
192 #[must_use]
195 pub fn with_clock<C2>(self, clock: C2) -> CircuitBreaker<L, C2>
196 where
197 C2: Clock + Clone,
198 {
199 CircuitBreaker::new(self.inner, self.config, clock)
200 }
201
202 #[must_use]
204 pub fn state(&self) -> BreakerState {
205 self.lock().state
206 }
207
208 pub fn inner(&self) -> &L {
210 &self.inner
211 }
212
213 #[inline]
214 fn lock(&self) -> MutexGuard<'_, Shared> {
215 self.shared.lock().unwrap_or_else(PoisonError::into_inner)
216 }
217
218 #[inline]
219 fn now_ms(&self) -> u64 {
220 let elapsed = self.clock.now().saturating_duration_since(self.epoch);
221 u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX)
222 }
223
224 fn admit(&self, now_ms: u64) -> Admit {
227 let mut shared = self.lock();
228 match shared.state {
229 BreakerState::Closed => Admit::Allow,
230 BreakerState::Open => {
231 if now_ms >= shared.open_until_ms {
232 shared.state = BreakerState::HalfOpen;
233 shared.half_open_inflight = 1;
234 shared.half_open_successes = 0;
235 crate::obs::circuit_transition("Open", "HalfOpen", 1);
236 Admit::Allow
237 } else {
238 Admit::Reject(Duration::from_millis(shared.open_until_ms - now_ms))
239 }
240 }
241 BreakerState::HalfOpen => {
242 if shared.half_open_inflight < self.config.half_open_trials {
243 shared.half_open_inflight += 1;
244 Admit::Allow
245 } else {
246 Admit::Reject(Duration::ZERO)
248 }
249 }
250 }
251 }
252
253 fn abort(&self) {
256 let mut shared = self.lock();
257 if shared.state == BreakerState::HalfOpen {
258 shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
259 }
260 }
261
262 fn record(&self, success: bool) {
264 let now_ms = self.now_ms();
265 let mut shared = self.lock();
266 match shared.state {
267 BreakerState::HalfOpen => {
268 shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
269 if success {
270 shared.half_open_successes += 1;
271 if shared.half_open_successes >= self.config.half_open_required {
272 shared.state = BreakerState::Closed;
273 shared.reset_counters();
274 crate::obs::circuit_transition("HalfOpen", "Closed", 0);
275 }
276 } else {
277 self.open(&mut shared, now_ms);
278 }
279 }
280 BreakerState::Closed => {
281 if success {
282 shared.consecutive = 0;
283 record_outcome(&mut shared, false, now_ms, self.config.trip);
284 } else {
285 shared.consecutive += 1;
286 record_outcome(&mut shared, true, now_ms, self.config.trip);
287 if tripped(&shared, now_ms, self.config.trip) {
288 self.open(&mut shared, now_ms);
289 }
290 }
291 }
292 BreakerState::Open => {}
294 }
295 }
296
297 fn open(&self, shared: &mut Shared, now_ms: u64) {
299 let from = if shared.state == BreakerState::HalfOpen {
300 "HalfOpen"
301 } else {
302 "Closed"
303 };
304 shared.state = BreakerState::Open;
305 shared.open_until_ms = now_ms
306 .saturating_add(u64::try_from(self.config.cooldown.as_millis()).unwrap_or(u64::MAX));
307 shared.half_open_inflight = 0;
308 shared.half_open_successes = 0;
309 crate::obs::circuit_transition(from, "Open", 2);
310 }
311
312 pub fn record_success(&self) {
314 self.record(true);
315 }
316
317 pub fn record_failure(&self) {
319 self.record(false);
320 }
321
322 pub fn try_acquire(&self) -> Result<Option<Permit<'_, L, C>>, ThrottleError> {
337 let now_ms = self.now_ms();
338 match self.admit(now_ms) {
339 Admit::Reject(retry_after) => Err(ThrottleError::CircuitOpen { retry_after }),
340 Admit::Allow => match self.inner.acquire_cost(1) {
341 Decision::Acquired => Ok(Some(Permit::new(self))),
342 Decision::Retry { .. } => {
343 self.abort();
344 Ok(None)
345 }
346 Decision::Impossible => {
347 self.abort();
348 Err(ThrottleError::CostExceedsCapacity {
349 cost: 1,
350 capacity: self.inner.capacity(),
351 })
352 }
353 },
354 }
355 }
356}
357
358#[cfg(feature = "runtime")]
359#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
360impl<L, C> CircuitBreaker<L, C>
361where
362 L: Limiter,
363 C: Clock + Clone,
364{
365 pub async fn acquire(&self) -> Result<Permit<'_, L, C>, ThrottleError> {
378 match self.admit(self.now_ms()) {
382 Admit::Reject(retry_after) => return Err(ThrottleError::CircuitOpen { retry_after }),
383 Admit::Allow => {}
384 }
385 loop {
386 match self.inner.acquire_cost(1) {
387 Decision::Acquired => return Ok(Permit::new(self)),
388 Decision::Retry { after } => crate::rt::sleep(after).await,
389 Decision::Impossible => {
390 self.abort();
391 return Err(ThrottleError::CostExceedsCapacity {
392 cost: 1,
393 capacity: self.inner.capacity(),
394 });
395 }
396 }
397 }
398 }
399}
400
401fn record_outcome(shared: &mut Shared, failure: bool, now_ms: u64, trip: Trip) {
403 match trip {
404 Trip::Consecutive(_) => {}
405 Trip::Ratio { window, .. } => {
406 shared.outcomes.push_back(failure);
407 while shared.outcomes.len() > window as usize {
408 let _ = shared.outcomes.pop_front();
409 }
410 }
411 Trip::Windowed { period, .. } => {
412 if failure {
413 shared.failure_times.push_back(now_ms);
414 }
415 let cutoff =
416 now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
417 while shared.failure_times.front().is_some_and(|&t| t < cutoff) {
418 let _ = shared.failure_times.pop_front();
419 }
420 }
421 }
422}
423
424fn tripped(shared: &Shared, now_ms: u64, trip: Trip) -> bool {
426 match trip {
427 Trip::Consecutive(n) => shared.consecutive >= n,
428 Trip::Ratio {
429 ratio, min_calls, ..
430 } => {
431 let total = shared.outcomes.len() as u32;
432 if total < min_calls || total == 0 {
433 return false;
434 }
435 let failures = shared.outcomes.iter().filter(|&&f| f).count() as u32;
436 f64::from(failures) / f64::from(total) >= ratio
437 }
438 Trip::Windowed { failures, period } => {
439 let cutoff =
440 now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
441 let recent = shared
442 .failure_times
443 .iter()
444 .filter(|&&t| t >= cutoff)
445 .count() as u32;
446 recent >= failures
447 }
448 }
449}
450
451#[must_use = "settle the permit with `.success()` or `.failure()`; dropping it counts as a failure"]
458pub struct Permit<'a, L, C>
459where
460 L: Limiter,
461 C: Clock + Clone,
462{
463 breaker: &'a CircuitBreaker<L, C>,
464 settled: bool,
465}
466
467impl<'a, L, C> Permit<'a, L, C>
468where
469 L: Limiter,
470 C: Clock + Clone,
471{
472 fn new(breaker: &'a CircuitBreaker<L, C>) -> Self {
473 Self {
474 breaker,
475 settled: false,
476 }
477 }
478
479 pub fn success(mut self) {
481 self.breaker.record(true);
482 self.settled = true;
483 }
484
485 pub fn failure(mut self) {
487 self.breaker.record(false);
488 self.settled = true;
489 }
490}
491
492impl<L, C> Drop for Permit<'_, L, C>
493where
494 L: Limiter,
495 C: Clock + Clone,
496{
497 fn drop(&mut self) {
498 if !self.settled {
499 self.breaker.record(false);
500 }
501 }
502}
503
504#[derive(Debug, Clone, Copy)]
506pub struct CircuitBreakerBuilder {
507 trip: Trip,
508 cooldown: Duration,
509 half_open_trials: u32,
510 half_open_required: u32,
511}
512
513impl Default for CircuitBreakerBuilder {
514 fn default() -> Self {
515 Self::new()
516 }
517}
518
519impl CircuitBreakerBuilder {
520 #[must_use]
523 pub fn new() -> Self {
524 Self {
525 trip: Trip::Consecutive(5),
526 cooldown: Duration::from_secs(30),
527 half_open_trials: 1,
528 half_open_required: 1,
529 }
530 }
531
532 #[must_use]
534 pub fn trip(mut self, trip: Trip) -> Self {
535 self.trip = trip;
536 self
537 }
538
539 #[must_use]
541 pub fn cooldown(mut self, cooldown: Duration) -> Self {
542 self.cooldown = cooldown;
543 self
544 }
545
546 #[must_use]
550 pub fn half_open(mut self, trials: u32, required: u32) -> Self {
551 self.half_open_trials = trials.max(1);
552 self.half_open_required = required.max(1).min(self.half_open_trials);
553 self
554 }
555
556 #[must_use]
558 pub fn build<L>(self, limiter: L) -> CircuitBreaker<L, SystemClock>
559 where
560 L: Limiter,
561 {
562 CircuitBreaker::new(
563 limiter,
564 Config {
565 trip: self.trip,
566 cooldown: self.cooldown,
567 half_open_trials: self.half_open_trials,
568 half_open_required: self.half_open_required,
569 },
570 SystemClock::new(),
571 )
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 #![allow(clippy::unwrap_used, clippy::expect_used)]
578
579 use super::{BreakerState, CircuitBreaker, Trip};
580 use crate::throttle::Throttle;
581 use clock_lib::ManualClock;
582 use core::time::Duration;
583 use std::sync::Arc;
584
585 fn assert_send_sync<T: Send + Sync>() {}
586
587 #[test]
588 fn test_breaker_is_send_sync() {
589 assert_send_sync::<CircuitBreaker<Throttle>>();
590 }
591
592 fn breaker(
593 trip: Trip,
594 cooldown: Duration,
595 clock: Arc<ManualClock>,
596 ) -> CircuitBreaker<Throttle, Arc<ManualClock>> {
597 CircuitBreaker::builder()
598 .trip(trip)
599 .cooldown(cooldown)
600 .half_open(1, 1)
601 .build(Throttle::per_second(1_000_000))
602 .with_clock(clock)
603 }
604
605 #[test]
606 fn test_consecutive_failures_trip_open() {
607 let clock = Arc::new(ManualClock::new());
608 let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
609
610 assert_eq!(cb.state(), BreakerState::Closed);
611 cb.record_failure();
612 cb.record_failure();
613 assert_eq!(cb.state(), BreakerState::Closed);
614 cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
616 }
617
618 #[test]
619 fn test_success_resets_consecutive_count() {
620 let clock = Arc::new(ManualClock::new());
621 let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
622
623 cb.record_failure();
624 cb.record_failure();
625 cb.record_success(); cb.record_failure();
627 cb.record_failure();
628 assert_eq!(cb.state(), BreakerState::Closed); }
630
631 #[test]
632 fn test_open_sheds_requests_without_touching_limiter() {
633 let clock = Arc::new(ManualClock::new());
634 let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock);
635
636 cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
638
639 let before = cb.inner().available();
640 let result = cb.try_acquire();
641 assert!(matches!(
642 result,
643 Err(crate::ThrottleError::CircuitOpen { .. })
644 ));
645 assert_eq!(cb.inner().available(), before);
647 }
648
649 #[test]
650 fn test_half_open_after_cooldown_then_close_on_success() {
651 let clock = Arc::new(ManualClock::new());
652 let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
653
654 cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
656
657 clock.advance(Duration::from_secs(10)); let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
659 assert_eq!(cb.state(), BreakerState::HalfOpen);
660 permit.success();
661 assert_eq!(cb.state(), BreakerState::Closed);
662 }
663
664 #[test]
665 fn test_half_open_failure_reopens() {
666 let clock = Arc::new(ManualClock::new());
667 let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
668
669 cb.record_failure(); clock.advance(Duration::from_secs(10));
671 let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
672 assert_eq!(cb.state(), BreakerState::HalfOpen);
673 permit.failure(); assert_eq!(cb.state(), BreakerState::Open);
675 }
676
677 #[test]
678 fn test_open_rejects_until_cooldown_elapses() {
679 let clock = Arc::new(ManualClock::new());
680 let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
681
682 cb.record_failure(); clock.advance(Duration::from_secs(9)); assert!(matches!(
685 cb.try_acquire(),
686 Err(crate::ThrottleError::CircuitOpen { .. })
687 ));
688 clock.advance(Duration::from_secs(1)); assert!(cb.try_acquire().unwrap().is_some());
690 }
691
692 #[test]
693 fn test_dropping_permit_counts_as_failure() {
694 let clock = Arc::new(ManualClock::new());
695 let cb = breaker(Trip::Consecutive(2), Duration::from_secs(10), clock);
696
697 drop(cb.try_acquire().unwrap());
699 assert_eq!(cb.state(), BreakerState::Closed);
700 drop(cb.try_acquire().unwrap());
701 assert_eq!(cb.state(), BreakerState::Open);
702 }
703
704 #[test]
705 fn test_ratio_trip() {
706 let clock = Arc::new(ManualClock::new());
707 let cb = breaker(
708 Trip::Ratio {
709 window: 10,
710 ratio: 0.5,
711 min_calls: 4,
712 },
713 Duration::from_secs(10),
714 clock,
715 );
716
717 cb.record_success();
718 cb.record_success();
719 assert_eq!(cb.state(), BreakerState::Closed);
720 cb.record_failure();
721 cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
723 }
724
725 #[test]
726 fn test_windowed_trip_prunes_old_failures() {
727 let clock = Arc::new(ManualClock::new());
728 let cb = breaker(
729 Trip::Windowed {
730 failures: 3,
731 period: Duration::from_secs(5),
732 },
733 Duration::from_secs(10),
734 clock.clone(),
735 );
736
737 cb.record_failure();
738 clock.advance(Duration::from_secs(6)); cb.record_failure();
740 cb.record_failure();
741 assert_eq!(cb.state(), BreakerState::Closed); cb.record_failure();
743 assert_eq!(cb.state(), BreakerState::Open); }
745}