error_forge/recovery/
circuit_breaker.rs1use std::sync::{Arc, Mutex};
2use std::time::{Duration, Instant};
3use crate::recovery::RecoveryResult;
4
5#[derive(Debug, Clone, Copy, PartialEq)]
7pub enum CircuitState {
8 Closed,
10
11 Open,
13
14 HalfOpen,
16}
17
18#[derive(Clone)]
20pub struct CircuitBreakerConfig {
21 pub failure_threshold: usize,
23
24 pub failure_window_ms: u64,
26
27 pub reset_timeout_ms: u64,
29}
30
31impl Default for CircuitBreakerConfig {
32 fn default() -> Self {
33 Self {
34 failure_threshold: 5,
35 failure_window_ms: 60000, reset_timeout_ms: 30000, }
38 }
39}
40
41struct CircuitBreakerInner {
42 config: CircuitBreakerConfig,
43 state: CircuitState,
44 failures: Vec<Instant>,
45 last_state_change: Instant,
46}
47
48pub struct CircuitBreaker {
53 name: String,
54 inner: Arc<Mutex<CircuitBreakerInner>>,
55}
56
57impl CircuitBreaker {
58 pub fn new(name: impl Into<String>) -> Self {
60 Self::with_config(name, CircuitBreakerConfig::default())
61 }
62
63 pub fn with_config(name: impl Into<String>, config: CircuitBreakerConfig) -> Self {
65 Self {
66 name: name.into(),
67 inner: Arc::new(Mutex::new(CircuitBreakerInner {
68 config,
69 state: CircuitState::Closed,
70 failures: Vec::new(),
71 last_state_change: Instant::now(),
72 })),
73 }
74 }
75
76 pub fn state(&self) -> CircuitState {
78 let inner = self.inner.lock().unwrap();
79 inner.state
80 }
81
82 pub fn name(&self) -> &str {
84 &self.name
85 }
86
87 pub fn execute<F, T, E>(&self, f: F) -> RecoveryResult<T>
89 where
90 F: FnOnce() -> Result<T, E>,
91 E: std::error::Error + Send + Sync + 'static,
92 {
93 let can_proceed = {
95 let mut inner = self.inner.lock().unwrap();
96 self.update_state(&mut inner);
97 inner.state != CircuitState::Open
98 };
99
100 if !can_proceed {
102 return Err(Box::new(CircuitOpenError::new(&self.name)));
103 }
104
105 match f() {
107 Ok(value) => {
108 self.on_success();
110 Ok(value)
111 }
112 Err(err) => {
113 self.on_failure();
115 Err(Box::new(err))
116 }
117 }
118 }
119
120 pub fn reset(&self) {
122 let mut inner = self.inner.lock().unwrap();
123 inner.state = CircuitState::Closed;
124 inner.failures.clear();
125 inner.last_state_change = Instant::now();
126 }
127
128 fn on_success(&self) {
130 let mut inner = self.inner.lock().unwrap();
131 if inner.state == CircuitState::HalfOpen {
132 inner.state = CircuitState::Closed;
134 inner.failures.clear();
135 inner.last_state_change = Instant::now();
136 }
137 }
138
139 fn on_failure(&self) {
141 let mut inner = self.inner.lock().unwrap();
142
143 if inner.state == CircuitState::HalfOpen {
144 inner.state = CircuitState::Open;
146 inner.last_state_change = Instant::now();
147 return;
148 }
149
150 let now = Instant::now();
152 inner.failures.push(now);
153
154 let window_start = now - Duration::from_millis(inner.config.failure_window_ms);
156 inner.failures.retain(|&time| time >= window_start);
157
158 if inner.state == CircuitState::Closed &&
160 inner.failures.len() >= inner.config.failure_threshold {
161 inner.state = CircuitState::Open;
163 inner.last_state_change = now;
164 }
165 }
166
167 fn update_state(&self, inner: &mut CircuitBreakerInner) {
169 if inner.state == CircuitState::Open {
170 let now = Instant::now();
171 let elapsed = now.duration_since(inner.last_state_change);
172
173 if elapsed >= Duration::from_millis(inner.config.reset_timeout_ms) {
174 inner.state = CircuitState::HalfOpen;
176 inner.last_state_change = now;
177 }
178 }
179 }
180}
181
182#[derive(Debug)]
184pub struct CircuitOpenError {
185 circuit_name: String,
186}
187
188impl CircuitOpenError {
189 fn new(circuit_name: &str) -> Self {
190 Self {
191 circuit_name: circuit_name.to_string(),
192 }
193 }
194}
195
196impl std::fmt::Display for CircuitOpenError {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 write!(f, "Circuit '{}' is open, failing fast", self.circuit_name)
199 }
200}
201
202impl std::error::Error for CircuitOpenError {}