dag_executor/advanced/
circuit_breaker.rs1use parking_lot::Mutex;
4use std::time::{Duration, Instant};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum CircuitState {
9 Closed,
11 Open,
13 HalfOpen,
15}
16
17#[derive(Debug)]
18struct Inner {
19 state: CircuitState,
20 consecutive_failures: u32,
21 opened_at: Option<Instant>,
22}
23
24pub struct CircuitBreaker {
31 failure_threshold: u32,
32 success_threshold: u32,
33 cooldown: Duration,
34 inner: Mutex<Inner>,
35}
36
37impl CircuitBreaker {
38 pub fn new(failure_threshold: u32, success_threshold: u32, cooldown: Duration) -> Self {
44 CircuitBreaker {
45 failure_threshold: failure_threshold.max(1),
46 success_threshold: success_threshold.max(1),
47 cooldown,
48 inner: Mutex::new(Inner {
49 state: CircuitState::Closed,
50 consecutive_failures: 0,
51 opened_at: None,
52 }),
53 }
54 }
55
56 pub fn state(&self) -> CircuitState {
59 let mut inner = self.inner.lock();
60 self.refresh(&mut inner);
61 inner.state
62 }
63
64 pub fn allow_request(&self) -> bool {
69 let mut inner = self.inner.lock();
70 self.refresh(&mut inner);
71 !matches!(inner.state, CircuitState::Open)
72 }
73
74 pub fn record_success(&self) {
76 let mut inner = self.inner.lock();
77 match inner.state {
78 CircuitState::HalfOpen => {
79 inner.consecutive_failures += 1;
81 if inner.consecutive_failures >= self.success_threshold {
82 inner.state = CircuitState::Closed;
83 inner.consecutive_failures = 0;
84 inner.opened_at = None;
85 }
86 }
87 _ => {
88 inner.state = CircuitState::Closed;
89 inner.consecutive_failures = 0;
90 inner.opened_at = None;
91 }
92 }
93 }
94
95 pub fn record_failure(&self) {
97 let mut inner = self.inner.lock();
98 match inner.state {
99 CircuitState::HalfOpen => {
100 inner.state = CircuitState::Open;
102 inner.consecutive_failures = 0;
103 inner.opened_at = Some(Instant::now());
104 }
105 _ => {
106 inner.consecutive_failures += 1;
107 if inner.consecutive_failures >= self.failure_threshold {
108 inner.state = CircuitState::Open;
109 inner.opened_at = Some(Instant::now());
110 }
111 }
112 }
113 }
114
115 fn refresh(&self, inner: &mut Inner) {
117 if inner.state == CircuitState::Open {
118 if let Some(opened) = inner.opened_at {
119 if opened.elapsed() >= self.cooldown {
120 inner.state = CircuitState::HalfOpen;
121 inner.consecutive_failures = 0;
123 }
124 }
125 }
126 }
127}
128
129impl Default for CircuitBreaker {
130 fn default() -> Self {
131 CircuitBreaker::new(5, 2, Duration::from_secs(30))
132 }
133}