Skip to main content

aspect_std/
circuitbreaker.rs

1//! Circuit breaker aspect for fault tolerance.
2
3use aspect_core::{Aspect, AspectError, ProceedingJoinPoint};
4use parking_lot::Mutex;
5use std::any::Any;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9/// Circuit breaker states following the classic pattern.
10#[derive(Debug, Clone, PartialEq)]
11pub enum CircuitState {
12    /// Circuit is closed, requests pass through normally.
13    Closed,
14    /// Circuit is open, requests fail fast without calling the function.
15    Open { until: Instant },
16    /// Circuit is half-open, testing if the service has recovered.
17    HalfOpen,
18}
19
20/// Circuit breaker aspect for preventing cascading failures.
21///
22/// Implements the circuit breaker pattern to protect services from
23/// repeated failures and allow time for recovery.
24///
25/// # States
26/// - **Closed**: Normal operation, failures are tracked
27/// - **Open**: Fast-fail mode after threshold is exceeded
28/// - **Half-Open**: Testing recovery with limited requests
29///
30/// # Example
31///
32/// ```rust,ignore
33/// use aspect_std::CircuitBreakerAspect;
34/// use aspect_macros::aspect;
35/// use std::time::Duration;
36///
37/// // Open after 5 failures, timeout after 30 seconds
38/// let breaker = CircuitBreakerAspect::new(5, Duration::from_secs(30));
39///
40/// #[aspect(breaker.clone())]
41/// fn call_external_service() -> Result<String, String> {
42///     // This call is protected by the circuit breaker
43///     Ok("success".to_string())
44/// }
45/// ```
46#[derive(Clone)]
47pub struct CircuitBreakerAspect {
48    state: Arc<Mutex<CircuitBreakerState>>,
49}
50
51struct CircuitBreakerState {
52    circuit_state: CircuitState,
53    failure_count: usize,
54    success_count: usize,
55    failure_threshold: usize,
56    timeout: Duration,
57    half_open_max_requests: usize,
58}
59
60impl CircuitBreakerAspect {
61    /// Create a new circuit breaker.
62    ///
63    /// # Arguments
64    /// * `failure_threshold` - Number of failures before opening circuit
65    /// * `timeout` - How long to wait before attempting recovery
66    ///
67    /// # Example
68    /// ```rust
69    /// use aspect_std::CircuitBreakerAspect;
70    /// use std::time::Duration;
71    ///
72    /// let breaker = CircuitBreakerAspect::new(5, Duration::from_secs(30));
73    /// ```
74    pub fn new(failure_threshold: usize, timeout: Duration) -> Self {
75        Self {
76            state: Arc::new(Mutex::new(CircuitBreakerState {
77                circuit_state: CircuitState::Closed,
78                failure_count: 0,
79                success_count: 0,
80                failure_threshold,
81                timeout,
82                half_open_max_requests: 1,
83            })),
84        }
85    }
86
87    /// Set the maximum number of requests to allow in half-open state.
88    pub fn with_half_open_requests(self, max_requests: usize) -> Self {
89        self.state.lock().half_open_max_requests = max_requests;
90        self
91    }
92
93    /// Get the current circuit state.
94    pub fn state(&self) -> CircuitState {
95        self.state.lock().circuit_state.clone()
96    }
97
98    /// Manually reset the circuit breaker to closed state.
99    pub fn reset(&self) {
100        let mut state = self.state.lock();
101        state.circuit_state = CircuitState::Closed;
102        state.failure_count = 0;
103        state.success_count = 0;
104    }
105
106    /// Record a successful call.
107    fn record_success(&self) {
108        let mut state = self.state.lock();
109
110        match state.circuit_state {
111            CircuitState::HalfOpen => {
112                state.success_count += 1;
113                // Transition back to closed after successful test
114                if state.success_count >= state.half_open_max_requests {
115                    state.circuit_state = CircuitState::Closed;
116                    state.failure_count = 0;
117                    state.success_count = 0;
118                }
119            }
120            CircuitState::Closed => {
121                // Reset failure count on success
122                state.failure_count = 0;
123            }
124            CircuitState::Open { .. } => {
125                // Shouldn't happen, but reset counts
126                state.failure_count = 0;
127                state.success_count = 0;
128            }
129        }
130    }
131
132    /// Record a failed call.
133    fn record_failure(&self) {
134        let mut state = self.state.lock();
135
136        match state.circuit_state {
137            CircuitState::HalfOpen => {
138                // Failure in half-open state immediately reopens circuit
139                state.circuit_state = CircuitState::Open {
140                    until: Instant::now() + state.timeout,
141                };
142                state.success_count = 0;
143            }
144            CircuitState::Closed => {
145                state.failure_count += 1;
146                if state.failure_count >= state.failure_threshold {
147                    // Open the circuit
148                    state.circuit_state = CircuitState::Open {
149                        until: Instant::now() + state.timeout,
150                    };
151                }
152            }
153            CircuitState::Open { .. } => {
154                // Already open, nothing to do
155            }
156        }
157    }
158
159    /// Check if a request should be allowed through.
160    fn should_allow_request(&self) -> Result<(), AspectError> {
161        let mut state = self.state.lock();
162
163        match state.circuit_state {
164            CircuitState::Closed => Ok(()),
165            CircuitState::HalfOpen => Ok(()),
166            CircuitState::Open { until } => {
167                if Instant::now() >= until {
168                    // Timeout expired, transition to half-open
169                    state.circuit_state = CircuitState::HalfOpen;
170                    state.success_count = 0;
171                    Ok(())
172                } else {
173                    Err(AspectError::execution(
174                        "Circuit breaker is OPEN - failing fast",
175                    ))
176                }
177            }
178        }
179    }
180}
181
182impl Aspect for CircuitBreakerAspect {
183    fn around(&self, pjp: ProceedingJoinPoint) -> Result<Box<dyn Any>, AspectError> {
184        // Check if request should be allowed
185        self.should_allow_request()?;
186
187        // Attempt the call
188        match pjp.proceed() {
189            Ok(result) => {
190                self.record_success();
191                Ok(result)
192            }
193            Err(e) => {
194                self.record_failure();
195                Err(e)
196            }
197        }
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204
205    #[test]
206    fn test_circuit_breaker_closed_initially() {
207        let breaker = CircuitBreakerAspect::new(3, Duration::from_secs(1));
208        assert_eq!(breaker.state(), CircuitState::Closed);
209    }
210
211    #[test]
212    fn test_circuit_opens_after_threshold() {
213        let breaker = CircuitBreakerAspect::new(3, Duration::from_secs(60));
214
215        // Record failures
216        for _ in 0..3 {
217            breaker.record_failure();
218        }
219
220        // Circuit should be open
221        match breaker.state() {
222            CircuitState::Open { .. } => (),
223            _ => panic!("Circuit should be open"),
224        }
225    }
226
227    #[test]
228    fn test_circuit_rejects_when_open() {
229        let breaker = CircuitBreakerAspect::new(1, Duration::from_secs(60));
230
231        breaker.record_failure();
232
233        // Should reject requests
234        assert!(breaker.should_allow_request().is_err());
235    }
236
237    #[test]
238    fn test_circuit_transitions_to_half_open() {
239        let breaker = CircuitBreakerAspect::new(1, Duration::from_millis(100));
240
241        breaker.record_failure();
242        assert!(matches!(breaker.state(), CircuitState::Open { .. }));
243
244        // Wait for timeout
245        std::thread::sleep(Duration::from_millis(150));
246
247        // Should transition to half-open when checked
248        assert!(breaker.should_allow_request().is_ok());
249        assert_eq!(breaker.state(), CircuitState::HalfOpen);
250    }
251
252    #[test]
253    fn test_circuit_closes_after_success() {
254        let breaker = CircuitBreakerAspect::new(1, Duration::from_millis(50));
255
256        // Open the circuit
257        breaker.record_failure();
258        std::thread::sleep(Duration::from_millis(60));
259
260        // Allow request (transitions to half-open)
261        breaker.should_allow_request().unwrap();
262
263        // Success should close the circuit
264        breaker.record_success();
265        assert_eq!(breaker.state(), CircuitState::Closed);
266    }
267
268    #[test]
269    fn test_reset() {
270        let breaker = CircuitBreakerAspect::new(2, Duration::from_secs(60));
271
272        breaker.record_failure();
273        breaker.record_failure();
274
275        assert!(matches!(breaker.state(), CircuitState::Open { .. }));
276
277        breaker.reset();
278        assert_eq!(breaker.state(), CircuitState::Closed);
279    }
280}