esox_objectpool/
circuit_breaker.rs1use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum CircuitBreakerState {
20 Closed,
22
23 Open,
25
26 HalfOpen,
28}
29
30pub struct CircuitBreaker {
49 state: Arc<Mutex<CircuitBreakerState>>,
50 failure_count: Arc<AtomicUsize>,
51 success_count: Arc<AtomicUsize>,
52 failure_threshold: usize,
53 timeout: Duration,
54 last_failure_time: Arc<Mutex<Option<Instant>>>,
55}
56
57impl CircuitBreaker {
58 pub fn new(failure_threshold: usize, timeout: Duration) -> Self {
60 Self {
61 state: Arc::new(Mutex::new(CircuitBreakerState::Closed)),
62 failure_count: Arc::new(AtomicUsize::new(0)),
63 success_count: Arc::new(AtomicUsize::new(0)),
64 failure_threshold,
65 timeout,
66 last_failure_time: Arc::new(Mutex::new(None)),
67 }
68 }
69
70 pub fn state(&self) -> CircuitBreakerState {
72 *self.state.lock().unwrap()
73 }
74
75 pub fn allow_request(&self) -> bool {
77 let current_state = self.state();
78
79 match current_state {
80 CircuitBreakerState::Closed => true,
81 CircuitBreakerState::Open => {
82 let last_failure = self.last_failure_time.lock().unwrap();
84 if let Some(time) = *last_failure
85 && time.elapsed() > self.timeout
86 {
87 drop(last_failure);
88 self.transition_to_half_open();
89 return true;
90 }
91 false
92 }
93 CircuitBreakerState::HalfOpen => true,
94 }
95 }
96
97 pub fn record_success(&self) {
99 let current_state = self.state();
100 match current_state {
101 CircuitBreakerState::Closed => {
102 self.failure_count.store(0, Ordering::Relaxed);
104 }
105 CircuitBreakerState::HalfOpen => {
106 self.success_count.fetch_add(1, Ordering::Relaxed);
107
108 if self.success_count.load(Ordering::Relaxed) >= 3 {
110 self.transition_to_closed();
111 }
112 }
113 CircuitBreakerState::Open => {}
114 }
115 }
116
117 pub fn record_failure(&self) {
119 let count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
120 *self.last_failure_time.lock().unwrap() = Some(Instant::now());
121
122 let current_state = self.state();
123 match current_state {
124 CircuitBreakerState::Closed => {
125 if count >= self.failure_threshold {
126 self.transition_to_open();
127 }
128 }
129 CircuitBreakerState::HalfOpen => {
130 self.transition_to_open();
132 }
133 CircuitBreakerState::Open => {}
134 }
135 }
136
137 fn transition_to_open(&self) {
138 *self.state.lock().unwrap() = CircuitBreakerState::Open;
139 }
140
141 fn transition_to_half_open(&self) {
142 *self.state.lock().unwrap() = CircuitBreakerState::HalfOpen;
143 self.success_count.store(0, Ordering::Relaxed);
144 }
145
146 fn transition_to_closed(&self) {
147 *self.state.lock().unwrap() = CircuitBreakerState::Closed;
148 self.failure_count.store(0, Ordering::Relaxed);
149 self.success_count.store(0, Ordering::Relaxed);
150 }
151
152 pub fn reset(&self) {
154 self.transition_to_closed();
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn does_not_open_on_non_consecutive_failures() {
164 let breaker = CircuitBreaker::new(3, Duration::from_secs(60));
165
166 breaker.record_failure();
167 breaker.record_success();
168 breaker.record_failure();
169 breaker.record_success();
170 breaker.record_failure();
171
172 assert_eq!(breaker.state(), CircuitBreakerState::Closed);
173 assert!(breaker.allow_request());
174 }
175
176 #[test]
177 fn opens_after_consecutive_failures() {
178 let breaker = CircuitBreaker::new(3, Duration::from_secs(60));
179
180 breaker.record_failure();
181 breaker.record_failure();
182 breaker.record_failure();
183
184 assert_eq!(breaker.state(), CircuitBreakerState::Open);
185 assert!(!breaker.allow_request());
186 }
187
188 #[test]
189 fn half_open_failure_reopens_immediately() {
190 let breaker = CircuitBreaker::new(1, Duration::from_millis(5));
191
192 breaker.record_failure();
193 assert_eq!(breaker.state(), CircuitBreakerState::Open);
194
195 std::thread::sleep(Duration::from_millis(10));
196 assert!(breaker.allow_request());
197 assert_eq!(breaker.state(), CircuitBreakerState::HalfOpen);
198
199 breaker.record_failure();
200 assert_eq!(breaker.state(), CircuitBreakerState::Open);
201 }
202
203 #[test]
204 fn reset_closes_open_circuit() {
205 let breaker = CircuitBreaker::new(1, Duration::from_secs(60));
206 breaker.record_failure();
207 assert_eq!(breaker.state(), CircuitBreakerState::Open);
208
209 breaker.reset();
210
211 assert_eq!(breaker.state(), CircuitBreakerState::Closed);
212 assert!(breaker.allow_request());
213 }
214
215 #[test]
216 fn half_open_transitions_to_closed_after_three_successes() {
217 let breaker = CircuitBreaker::new(1, Duration::from_millis(5));
218 breaker.record_failure();
219 std::thread::sleep(Duration::from_millis(10));
220
221 assert!(breaker.allow_request());
223 assert_eq!(breaker.state(), CircuitBreakerState::HalfOpen);
224
225 breaker.record_success();
226 assert_eq!(breaker.state(), CircuitBreakerState::HalfOpen);
227 breaker.record_success();
228 assert_eq!(breaker.state(), CircuitBreakerState::HalfOpen);
229 breaker.record_success(); assert_eq!(breaker.state(), CircuitBreakerState::Closed);
231 assert!(breaker.allow_request());
232 }
233
234 #[test]
235 fn partial_successes_in_half_open_do_not_close() {
236 let breaker = CircuitBreaker::new(1, Duration::from_millis(5));
237 breaker.record_failure();
238 std::thread::sleep(Duration::from_millis(10));
239 breaker.allow_request(); breaker.record_success();
242 assert_eq!(breaker.state(), CircuitBreakerState::HalfOpen);
243 breaker.record_success();
244 assert_eq!(breaker.state(), CircuitBreakerState::HalfOpen);
245 }
246
247 #[test]
248 fn failure_in_open_state_is_noop() {
249 let breaker = CircuitBreaker::new(1, Duration::from_secs(60));
250 breaker.record_failure(); assert_eq!(breaker.state(), CircuitBreakerState::Open);
252
253 breaker.record_failure();
255 assert_eq!(breaker.state(), CircuitBreakerState::Open);
256 }
257
258 #[test]
259 fn success_in_open_state_is_noop() {
260 let breaker = CircuitBreaker::new(1, Duration::from_secs(60));
261 breaker.record_failure(); assert_eq!(breaker.state(), CircuitBreakerState::Open);
263
264 breaker.record_success(); assert_eq!(breaker.state(), CircuitBreakerState::Open);
266 assert!(!breaker.allow_request());
267 }
268
269 #[test]
270 fn open_with_unexpired_timeout_denies_request() {
271 let breaker = CircuitBreaker::new(1, Duration::from_secs(60));
272 breaker.record_failure();
273 assert_eq!(breaker.state(), CircuitBreakerState::Open);
274
275 assert!(!breaker.allow_request());
277 assert_eq!(breaker.state(), CircuitBreakerState::Open);
279 }
280
281 #[test]
282 fn closed_after_reset_accepts_new_failures() {
283 let breaker = CircuitBreaker::new(2, Duration::from_secs(60));
284 breaker.record_failure();
285 breaker.record_failure(); breaker.reset();
287
288 breaker.record_failure();
290 assert_eq!(breaker.state(), CircuitBreakerState::Closed);
291
292 breaker.record_failure(); assert_eq!(breaker.state(), CircuitBreakerState::Open);
294 }
295}
296
297impl Default for CircuitBreaker {
298 fn default() -> Self {
299 Self::new(5, Duration::from_secs(60))
300 }
301}