Skip to main content

camel_processor/
circuit_breaker.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use tower::{Layer, Service};
8
9use camel_api::{CamelError, CircuitBreakerConfig, Exchange};
10
11// ── State ──────────────────────────────────────────────────────────────
12
13enum CircuitState {
14    Closed { consecutive_failures: u32 },
15    Open { opened_at: Instant },
16    HalfOpen,
17}
18
19// ── Layer ──────────────────────────────────────────────────────────────
20
21/// Tower Layer that wraps an inner service with circuit-breaker logic.
22#[derive(Clone)]
23pub struct CircuitBreakerLayer {
24    config: CircuitBreakerConfig,
25    state: Arc<Mutex<CircuitState>>,
26}
27
28impl CircuitBreakerLayer {
29    pub fn new(config: CircuitBreakerConfig) -> Self {
30        Self {
31            config,
32            state: Arc::new(Mutex::new(CircuitState::Closed {
33                consecutive_failures: 0,
34            })),
35        }
36    }
37}
38
39impl<S> Layer<S> for CircuitBreakerLayer {
40    type Service = CircuitBreakerService<S>;
41
42    fn layer(&self, inner: S) -> Self::Service {
43        CircuitBreakerService {
44            inner,
45            config: self.config.clone(),
46            state: Arc::clone(&self.state),
47        }
48    }
49}
50
51// ── Service ────────────────────────────────────────────────────────────
52
53/// Tower Service implementing the circuit-breaker pattern.
54pub struct CircuitBreakerService<S> {
55    inner: S,
56    config: CircuitBreakerConfig,
57    state: Arc<Mutex<CircuitState>>,
58}
59
60impl<S: Clone> Clone for CircuitBreakerService<S> {
61    fn clone(&self) -> Self {
62        Self {
63            inner: self.inner.clone(),
64            config: self.config.clone(),
65            state: Arc::clone(&self.state),
66        }
67    }
68}
69
70impl<S> Service<Exchange> for CircuitBreakerService<S>
71where
72    S: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
73    S::Future: Send,
74{
75    type Response = Exchange;
76    type Error = CamelError;
77    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
78
79    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
81        match *state {
82            CircuitState::Closed { .. } => {
83                drop(state);
84                self.inner.poll_ready(cx)
85            }
86            CircuitState::Open { opened_at } => {
87                if opened_at.elapsed() >= self.config.open_duration {
88                    tracing::info!("Circuit breaker transitioning from Open to HalfOpen");
89                    *state = CircuitState::HalfOpen;
90                    drop(state);
91                    self.inner.poll_ready(cx)
92                } else {
93                    Poll::Ready(Err(CamelError::CircuitOpen(
94                        "circuit breaker is open".into(),
95                    )))
96                }
97            }
98            CircuitState::HalfOpen => {
99                drop(state);
100                self.inner.poll_ready(cx)
101            }
102        }
103    }
104
105    fn call(&mut self, exchange: Exchange) -> Self::Future {
106        // Clone inner service (Tower pattern) and state handle.
107        let mut inner = self.inner.clone();
108        let state = Arc::clone(&self.state);
109        let config = self.config.clone();
110
111        // Snapshot the current state before calling (briefly lock).
112        let current_is_half_open = matches!(
113            *state.lock().unwrap_or_else(|e| e.into_inner()),
114            CircuitState::HalfOpen
115        );
116
117        Box::pin(async move {
118            let result = inner.call(exchange).await;
119
120            // Update state based on result (briefly lock).
121            let mut st = state.lock().unwrap_or_else(|e| e.into_inner());
122            match &result {
123                Ok(_) => {
124                    // Success → reset to Closed.
125                    if current_is_half_open {
126                        tracing::info!("Circuit breaker transitioning from HalfOpen to Closed");
127                    }
128                    *st = CircuitState::Closed {
129                        consecutive_failures: 0,
130                    };
131                }
132                Err(_) => {
133                    if current_is_half_open {
134                        // Half-open failure → reopen circuit.
135                        tracing::warn!(
136                            "Circuit breaker transitioning from HalfOpen to Open (probe failed)"
137                        );
138                        *st = CircuitState::Open {
139                            opened_at: Instant::now(),
140                        };
141                    } else if let CircuitState::Closed {
142                        consecutive_failures,
143                    } = &mut *st
144                    {
145                        *consecutive_failures += 1;
146                        if *consecutive_failures >= config.failure_threshold {
147                            tracing::warn!(
148                                threshold = config.failure_threshold,
149                                "Circuit breaker transitioning from Closed to Open (failure threshold reached)"
150                            );
151                            *st = CircuitState::Open {
152                                opened_at: Instant::now(),
153                            };
154                        }
155                    }
156                }
157            }
158
159            result
160        })
161    }
162}
163
164// ── Tests ──────────────────────────────────────────────────────────────
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
170    use std::sync::atomic::{AtomicU32, Ordering};
171    use std::time::Duration;
172    use tower::ServiceExt;
173
174    fn make_exchange() -> Exchange {
175        Exchange::new(Message::new("test"))
176    }
177
178    fn ok_processor() -> BoxProcessor {
179        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
180    }
181
182    fn failing_processor() -> BoxProcessor {
183        BoxProcessor::from_fn(|_ex| {
184            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
185        })
186    }
187
188    fn fail_n_times(n: u32) -> BoxProcessor {
189        let count = Arc::new(AtomicU32::new(0));
190        BoxProcessor::from_fn(move |ex| {
191            let count = Arc::clone(&count);
192            Box::pin(async move {
193                let c = count.fetch_add(1, Ordering::SeqCst);
194                if c < n {
195                    Err(CamelError::ProcessorError(format!("attempt {c}")))
196                } else {
197                    Ok(ex)
198                }
199            })
200        })
201    }
202
203    /// 1. Circuit stays closed on success.
204    #[tokio::test]
205    async fn test_stays_closed_on_success() {
206        let config = CircuitBreakerConfig::new().failure_threshold(3);
207        let layer = CircuitBreakerLayer::new(config);
208        let mut svc = layer.layer(ok_processor());
209
210        for _ in 0..5 {
211            let result = svc.ready().await.unwrap().call(make_exchange()).await;
212            assert!(result.is_ok());
213        }
214
215        // State should still be closed with 0 failures.
216        let state = svc.state.lock().unwrap();
217        match *state {
218            CircuitState::Closed {
219                consecutive_failures,
220            } => assert_eq!(consecutive_failures, 0),
221            _ => panic!("expected Closed state"),
222        }
223    }
224
225    /// 2. Circuit opens after failure_threshold consecutive failures.
226    #[tokio::test]
227    async fn test_opens_after_failure_threshold() {
228        let config = CircuitBreakerConfig::new().failure_threshold(3);
229        let layer = CircuitBreakerLayer::new(config);
230        let mut svc = layer.layer(failing_processor());
231
232        // Three consecutive failures should open the circuit.
233        for _ in 0..3 {
234            let result = svc.ready().await.unwrap().call(make_exchange()).await;
235            assert!(result.is_err());
236        }
237
238        // The next poll_ready should return CircuitOpen error.
239        let waker = futures::task::noop_waker();
240        let mut cx = Context::from_waker(&waker);
241        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
242        match poll {
243            Poll::Ready(Err(CamelError::CircuitOpen(_))) => {} // expected
244            other => panic!("expected CircuitOpen error, got {other:?}"),
245        }
246    }
247
248    /// 3. Circuit transitions to half-open after open_duration.
249    #[tokio::test]
250    async fn test_transitions_to_half_open_after_duration() {
251        let config = CircuitBreakerConfig::new()
252            .failure_threshold(2)
253            .open_duration(Duration::from_millis(50));
254        let layer = CircuitBreakerLayer::new(config);
255        // Use fail_n_times(2) so the first 2 calls fail (opening the circuit),
256        // then the third (half-open probe) succeeds.
257        let mut svc = layer.layer(fail_n_times(2));
258
259        // Trigger 2 failures to open the circuit.
260        for _ in 0..2 {
261            let _ = svc.ready().await.unwrap().call(make_exchange()).await;
262        }
263
264        // Circuit is now open. Wait for open_duration to elapse.
265        tokio::time::sleep(Duration::from_millis(60)).await;
266
267        // poll_ready should transition to HalfOpen and succeed.
268        let result = svc.ready().await.unwrap().call(make_exchange()).await;
269        assert!(result.is_ok(), "half-open probe should succeed");
270
271        // After successful probe, circuit should be back to Closed.
272        let state = svc.state.lock().unwrap();
273        match *state {
274            CircuitState::Closed {
275                consecutive_failures,
276            } => assert_eq!(consecutive_failures, 0),
277            _ => panic!("expected Closed state after successful half-open probe"),
278        }
279    }
280
281    /// 4. Half-open failure reopens circuit.
282    #[tokio::test]
283    async fn test_half_open_failure_reopens() {
284        let config = CircuitBreakerConfig::new()
285            .failure_threshold(2)
286            .open_duration(Duration::from_millis(50));
287        let layer = CircuitBreakerLayer::new(config);
288        let mut svc = layer.layer(failing_processor());
289
290        // Trigger 2 failures to open the circuit.
291        for _ in 0..2 {
292            let _ = svc.ready().await.unwrap().call(make_exchange()).await;
293        }
294
295        // Wait for open_duration to elapse, transitioning to HalfOpen.
296        tokio::time::sleep(Duration::from_millis(60)).await;
297
298        // Half-open probe fails → circuit reopens.
299        let result = svc.ready().await.unwrap().call(make_exchange()).await;
300        assert!(result.is_err());
301
302        // Circuit should be open again.
303        let state = svc.state.lock().unwrap();
304        match *state {
305            CircuitState::Open { .. } => {} // expected
306            _ => panic!("expected Open state after half-open failure"),
307        }
308    }
309
310    /// 5. Intermittent failures below threshold don't open circuit.
311    #[tokio::test]
312    async fn test_intermittent_failures_dont_open() {
313        let config = CircuitBreakerConfig::new().failure_threshold(3);
314        let layer = CircuitBreakerLayer::new(config);
315
316        // Alternate: fail, fail, success, fail, fail, success
317        // The counter should reset on success, so threshold of 3 is never reached.
318        let call_count = Arc::new(AtomicU32::new(0));
319        let cc = Arc::clone(&call_count);
320        let inner = BoxProcessor::from_fn(move |ex| {
321            let cc = Arc::clone(&cc);
322            Box::pin(async move {
323                let c = cc.fetch_add(1, Ordering::SeqCst);
324                // Pattern: fail, fail, success, fail, fail, success
325                if c % 3 == 2 {
326                    Ok(ex)
327                } else {
328                    Err(CamelError::ProcessorError("intermittent".into()))
329                }
330            })
331        });
332
333        let mut svc = layer.layer(inner);
334
335        for _ in 0..6 {
336            let _ = svc.ready().await.unwrap().call(make_exchange()).await;
337        }
338
339        // Circuit should still be closed because successes reset the counter.
340        let state = svc.state.lock().unwrap();
341        match *state {
342            CircuitState::Closed { .. } => {} // expected
343            _ => panic!("expected circuit to remain Closed"),
344        }
345    }
346}