aspect_std/
circuitbreaker.rs1use aspect_core::{Aspect, AspectError, ProceedingJoinPoint};
4use parking_lot::Mutex;
5use std::any::Any;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9#[derive(Debug, Clone, PartialEq)]
11pub enum CircuitState {
12 Closed,
14 Open { until: Instant },
16 HalfOpen,
18}
19
20#[derive(Clone)]
47pub struct CircuitBreakerAspect {
48 state: Arc<Mutex<CircuitBreakerState>>,
49}
50
51struct CircuitBreakerState {
52 circuit_state: CircuitState,
53 failure_count: usize,
54 success_count: usize,
55 failure_threshold: usize,
56 timeout: Duration,
57 half_open_max_requests: usize,
58}
59
60impl CircuitBreakerAspect {
61 pub fn new(failure_threshold: usize, timeout: Duration) -> Self {
75 Self {
76 state: Arc::new(Mutex::new(CircuitBreakerState {
77 circuit_state: CircuitState::Closed,
78 failure_count: 0,
79 success_count: 0,
80 failure_threshold,
81 timeout,
82 half_open_max_requests: 1,
83 })),
84 }
85 }
86
87 pub fn with_half_open_requests(self, max_requests: usize) -> Self {
89 self.state.lock().half_open_max_requests = max_requests;
90 self
91 }
92
93 pub fn state(&self) -> CircuitState {
95 self.state.lock().circuit_state.clone()
96 }
97
98 pub fn reset(&self) {
100 let mut state = self.state.lock();
101 state.circuit_state = CircuitState::Closed;
102 state.failure_count = 0;
103 state.success_count = 0;
104 }
105
106 fn record_success(&self) {
108 let mut state = self.state.lock();
109
110 match state.circuit_state {
111 CircuitState::HalfOpen => {
112 state.success_count += 1;
113 if state.success_count >= state.half_open_max_requests {
115 state.circuit_state = CircuitState::Closed;
116 state.failure_count = 0;
117 state.success_count = 0;
118 }
119 }
120 CircuitState::Closed => {
121 state.failure_count = 0;
123 }
124 CircuitState::Open { .. } => {
125 state.failure_count = 0;
127 state.success_count = 0;
128 }
129 }
130 }
131
132 fn record_failure(&self) {
134 let mut state = self.state.lock();
135
136 match state.circuit_state {
137 CircuitState::HalfOpen => {
138 state.circuit_state = CircuitState::Open {
140 until: Instant::now() + state.timeout,
141 };
142 state.success_count = 0;
143 }
144 CircuitState::Closed => {
145 state.failure_count += 1;
146 if state.failure_count >= state.failure_threshold {
147 state.circuit_state = CircuitState::Open {
149 until: Instant::now() + state.timeout,
150 };
151 }
152 }
153 CircuitState::Open { .. } => {
154 }
156 }
157 }
158
159 fn should_allow_request(&self) -> Result<(), AspectError> {
161 let mut state = self.state.lock();
162
163 match state.circuit_state {
164 CircuitState::Closed => Ok(()),
165 CircuitState::HalfOpen => Ok(()),
166 CircuitState::Open { until } => {
167 if Instant::now() >= until {
168 state.circuit_state = CircuitState::HalfOpen;
170 state.success_count = 0;
171 Ok(())
172 } else {
173 Err(AspectError::execution(
174 "Circuit breaker is OPEN - failing fast",
175 ))
176 }
177 }
178 }
179 }
180}
181
182impl Aspect for CircuitBreakerAspect {
183 fn around(&self, pjp: ProceedingJoinPoint) -> Result<Box<dyn Any>, AspectError> {
184 self.should_allow_request()?;
186
187 match pjp.proceed() {
189 Ok(result) => {
190 self.record_success();
191 Ok(result)
192 }
193 Err(e) => {
194 self.record_failure();
195 Err(e)
196 }
197 }
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204
205 #[test]
206 fn test_circuit_breaker_closed_initially() {
207 let breaker = CircuitBreakerAspect::new(3, Duration::from_secs(1));
208 assert_eq!(breaker.state(), CircuitState::Closed);
209 }
210
211 #[test]
212 fn test_circuit_opens_after_threshold() {
213 let breaker = CircuitBreakerAspect::new(3, Duration::from_secs(60));
214
215 for _ in 0..3 {
217 breaker.record_failure();
218 }
219
220 match breaker.state() {
222 CircuitState::Open { .. } => (),
223 _ => panic!("Circuit should be open"),
224 }
225 }
226
227 #[test]
228 fn test_circuit_rejects_when_open() {
229 let breaker = CircuitBreakerAspect::new(1, Duration::from_secs(60));
230
231 breaker.record_failure();
232
233 assert!(breaker.should_allow_request().is_err());
235 }
236
237 #[test]
238 fn test_circuit_transitions_to_half_open() {
239 let breaker = CircuitBreakerAspect::new(1, Duration::from_millis(100));
240
241 breaker.record_failure();
242 assert!(matches!(breaker.state(), CircuitState::Open { .. }));
243
244 std::thread::sleep(Duration::from_millis(150));
246
247 assert!(breaker.should_allow_request().is_ok());
249 assert_eq!(breaker.state(), CircuitState::HalfOpen);
250 }
251
252 #[test]
253 fn test_circuit_closes_after_success() {
254 let breaker = CircuitBreakerAspect::new(1, Duration::from_millis(50));
255
256 breaker.record_failure();
258 std::thread::sleep(Duration::from_millis(60));
259
260 breaker.should_allow_request().unwrap();
262
263 breaker.record_success();
265 assert_eq!(breaker.state(), CircuitState::Closed);
266 }
267
268 #[test]
269 fn test_reset() {
270 let breaker = CircuitBreakerAspect::new(2, Duration::from_secs(60));
271
272 breaker.record_failure();
273 breaker.record_failure();
274
275 assert!(matches!(breaker.state(), CircuitState::Open { .. }));
276
277 breaker.reset();
278 assert_eq!(breaker.state(), CircuitState::Closed);
279 }
280}