error_forge/recovery/
circuit_breaker.rs1use crate::recovery::RecoveryResult;
2use parking_lot::Mutex;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6#[derive(Debug, Clone, Copy, PartialEq)]
12#[non_exhaustive]
13pub enum CircuitState {
14 Closed,
16
17 Open,
19
20 HalfOpen,
22}
23
24#[derive(Clone)]
31#[non_exhaustive]
32pub struct CircuitBreakerConfig {
33 pub failure_threshold: usize,
35
36 pub failure_window_ms: u64,
38
39 pub reset_timeout_ms: u64,
41}
42
43impl Default for CircuitBreakerConfig {
44 fn default() -> Self {
45 Self {
46 failure_threshold: 5,
47 failure_window_ms: 60000, reset_timeout_ms: 30000, }
50 }
51}
52
53impl CircuitBreakerConfig {
54 pub fn new(failure_threshold: usize, failure_window_ms: u64, reset_timeout_ms: u64) -> Self {
62 Self {
63 failure_threshold,
64 failure_window_ms,
65 reset_timeout_ms,
66 }
67 }
68
69 #[must_use]
71 pub fn with_failure_threshold(mut self, threshold: usize) -> Self {
72 self.failure_threshold = threshold;
73 self
74 }
75
76 #[must_use]
78 pub fn with_failure_window_ms(mut self, window_ms: u64) -> Self {
79 self.failure_window_ms = window_ms;
80 self
81 }
82
83 #[must_use]
85 pub fn with_reset_timeout_ms(mut self, reset_ms: u64) -> Self {
86 self.reset_timeout_ms = reset_ms;
87 self
88 }
89}
90
91struct CircuitBreakerInner {
92 config: CircuitBreakerConfig,
93 state: CircuitState,
94 failures: Vec<Instant>,
95 last_state_change: Instant,
96}
97
98pub struct CircuitBreaker {
103 name: String,
104 inner: Arc<Mutex<CircuitBreakerInner>>,
105}
106
107impl CircuitBreaker {
108 pub fn new(name: impl Into<String>) -> Self {
110 Self::with_config(name, CircuitBreakerConfig::default())
111 }
112
113 pub fn with_config(name: impl Into<String>, config: CircuitBreakerConfig) -> Self {
115 Self {
116 name: name.into(),
117 inner: Arc::new(Mutex::new(CircuitBreakerInner {
118 config,
119 state: CircuitState::Closed,
120 failures: Vec::new(),
121 last_state_change: Instant::now(),
122 })),
123 }
124 }
125
126 pub fn state(&self) -> CircuitState {
128 let inner = self.inner.lock();
129 inner.state
130 }
131
132 pub fn name(&self) -> &str {
134 &self.name
135 }
136
137 pub fn execute<F, T, E>(&self, f: F) -> RecoveryResult<T>
139 where
140 F: FnOnce() -> Result<T, E>,
141 E: std::error::Error + Send + Sync + 'static,
142 {
143 let can_proceed = {
145 let mut inner = self.inner.lock();
146 self.update_state(&mut inner);
147 inner.state != CircuitState::Open
148 };
149
150 if !can_proceed {
152 return Err(Box::new(CircuitOpenError::new(&self.name)));
153 }
154
155 match f() {
157 Ok(value) => {
158 self.on_success();
160 Ok(value)
161 }
162 Err(err) => {
163 self.on_failure();
165 Err(Box::new(err))
166 }
167 }
168 }
169
170 pub fn reset(&self) {
172 let mut inner = self.inner.lock();
173 inner.state = CircuitState::Closed;
174 inner.failures.clear();
175 inner.last_state_change = Instant::now();
176 }
177
178 fn on_success(&self) {
180 let mut inner = self.inner.lock();
181 if inner.state == CircuitState::HalfOpen {
182 inner.state = CircuitState::Closed;
184 inner.failures.clear();
185 inner.last_state_change = Instant::now();
186 }
187 }
188
189 fn on_failure(&self) {
191 let mut inner = self.inner.lock();
192
193 if inner.state == CircuitState::HalfOpen {
194 inner.state = CircuitState::Open;
196 inner.last_state_change = Instant::now();
197 return;
198 }
199
200 let now = Instant::now();
202 inner.failures.push(now);
203
204 let window_start = now - Duration::from_millis(inner.config.failure_window_ms);
206 inner.failures.retain(|&time| time >= window_start);
207
208 if inner.state == CircuitState::Closed
210 && inner.failures.len() >= inner.config.failure_threshold
211 {
212 inner.state = CircuitState::Open;
214 inner.last_state_change = now;
215 }
216 }
217
218 fn update_state(&self, inner: &mut CircuitBreakerInner) {
220 if inner.state == CircuitState::Open {
221 let now = Instant::now();
222 let elapsed = now.duration_since(inner.last_state_change);
223
224 if elapsed >= Duration::from_millis(inner.config.reset_timeout_ms) {
225 inner.state = CircuitState::HalfOpen;
227 inner.last_state_change = now;
228 }
229 }
230 }
231}
232
233#[derive(Debug)]
235pub struct CircuitOpenError {
236 circuit_name: String,
237}
238
239impl CircuitOpenError {
240 fn new(circuit_name: &str) -> Self {
241 Self {
242 circuit_name: circuit_name.to_string(),
243 }
244 }
245}
246
247impl std::fmt::Display for CircuitOpenError {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 write!(f, "Circuit '{}' is open, failing fast", self.circuit_name)
250 }
251}
252
253impl std::error::Error for CircuitOpenError {}