Skip to main content

camel_processor/
circuit_breaker.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use tower::{Layer, Service};
8
9use camel_api::{BoxProcessor, CamelError, CircuitBreakerConfig, Exchange};
10
11// ── State ──────────────────────────────────────────────────────────────
12
13enum CircuitState {
14    Closed { consecutive_failures: u32 },
15    Open { opened_at: Instant },
16    HalfOpen,
17}
18
19// ── Layer ──────────────────────────────────────────────────────────────
20
21/// Tower Layer that wraps an inner service with circuit-breaker logic.
22#[derive(Clone)]
23pub struct CircuitBreakerLayer {
24    config: CircuitBreakerConfig,
25    state: Arc<Mutex<CircuitState>>,
26}
27
28impl CircuitBreakerLayer {
29    pub fn new(config: CircuitBreakerConfig) -> Self {
30        Self {
31            config,
32            state: Arc::new(Mutex::new(CircuitState::Closed {
33                consecutive_failures: 0,
34            })),
35        }
36    }
37}
38
39impl<S> Layer<S> for CircuitBreakerLayer {
40    type Service = CircuitBreakerService<S>;
41
42    fn layer(&self, inner: S) -> Self::Service {
43        CircuitBreakerService {
44            inner,
45            config: self.config.clone(),
46            state: Arc::clone(&self.state),
47        }
48    }
49}
50
51// ── Service ────────────────────────────────────────────────────────────
52
53/// Tower Service implementing the circuit-breaker pattern.
54pub struct CircuitBreakerService<S> {
55    inner: S,
56    config: CircuitBreakerConfig,
57    state: Arc<Mutex<CircuitState>>,
58}
59
60impl<S: Clone> Clone for CircuitBreakerService<S> {
61    fn clone(&self) -> Self {
62        Self {
63            inner: self.inner.clone(),
64            config: self.config.clone(),
65            state: Arc::clone(&self.state),
66        }
67    }
68}
69
70impl<S> Service<Exchange> for CircuitBreakerService<S>
71where
72    S: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
73    S::Future: Send,
74{
75    type Response = Exchange;
76    type Error = CamelError;
77    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
78
79    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
81        match *state {
82            CircuitState::Closed { .. } => {
83                drop(state);
84                self.inner.poll_ready(cx)
85            }
86            CircuitState::Open { opened_at } => {
87                if opened_at.elapsed() >= self.config.open_duration {
88                    tracing::info!("Circuit breaker transitioning from Open to HalfOpen");
89                    *state = CircuitState::HalfOpen;
90                    drop(state);
91                    self.inner.poll_ready(cx)
92                } else if self.config.fallback.is_some() {
93                    Poll::Ready(Ok(()))
94                } else {
95                    Poll::Ready(Err(CamelError::CircuitOpen(
96                        "circuit breaker is open".into(),
97                    )))
98                }
99            }
100            CircuitState::HalfOpen => {
101                drop(state);
102                self.inner.poll_ready(cx)
103            }
104        }
105    }
106
107    fn call(&mut self, exchange: Exchange) -> Self::Future {
108        {
109            let mut st = self.state.lock().unwrap_or_else(|e| e.into_inner());
110            if let CircuitState::Open { opened_at } = *st {
111                if opened_at.elapsed() < self.config.open_duration {
112                    if let Some(mut fallback) = self.config.fallback.clone() {
113                        return Box::pin(async move { fallback.call(exchange).await });
114                    }
115                    return Box::pin(async {
116                        Err(CamelError::CircuitOpen("circuit breaker is open".into()))
117                    });
118                }
119
120                tracing::info!("Circuit breaker transitioning from Open to HalfOpen");
121                *st = CircuitState::HalfOpen;
122            }
123        }
124
125        // Clone inner service (Tower pattern) and state handle.
126        let mut inner = self.inner.clone();
127        let state = Arc::clone(&self.state);
128        let config = self.config.clone();
129
130        // Snapshot the current state before calling (briefly lock).
131        let current_is_half_open = matches!(
132            *state.lock().unwrap_or_else(|e| e.into_inner()),
133            CircuitState::HalfOpen
134        );
135
136        Box::pin(async move {
137            let result = inner.call(exchange).await;
138
139            // Update state based on result (briefly lock).
140            let mut st = state.lock().unwrap_or_else(|e| e.into_inner());
141            match &result {
142                Ok(_) => {
143                    // Success → reset to Closed.
144                    if current_is_half_open {
145                        tracing::info!("Circuit breaker transitioning from HalfOpen to Closed");
146                    }
147                    *st = CircuitState::Closed {
148                        consecutive_failures: 0,
149                    };
150                }
151                Err(_) => {
152                    if current_is_half_open {
153                        // Half-open failure → reopen circuit.
154                        tracing::warn!(
155                            "Circuit breaker transitioning from HalfOpen to Open (probe failed)"
156                        );
157                        *st = CircuitState::Open {
158                            opened_at: Instant::now(),
159                        };
160                    } else if let CircuitState::Closed {
161                        consecutive_failures,
162                    } = &mut *st
163                    {
164                        *consecutive_failures += 1;
165                        if *consecutive_failures >= config.failure_threshold {
166                            tracing::warn!(
167                                threshold = config.failure_threshold,
168                                "Circuit breaker transitioning from Closed to Open (failure threshold reached)"
169                            );
170                            *st = CircuitState::Open {
171                                opened_at: Instant::now(),
172                            };
173                        }
174                    }
175                }
176            }
177
178            result
179        })
180    }
181}
182
183// ── Gate ──────────────────────────────────────────────────────────────
184
185/// Decision returned by [`CircuitBreakerGate::before_call`].
186pub enum CircuitBreakerDecision {
187    /// Circuit is closed or half-open — proceed with the pipeline call.
188    Allow,
189    /// Circuit is open but a fallback processor is configured.
190    /// Call this processor instead of the main pipeline.
191    Fallback(BoxProcessor),
192    /// Circuit is open with no fallback — reject the call.
193    Reject(CamelError),
194}
195
196/// Reusable circuit-breaker gate with explicit `before_call`/`after_result` API.
197#[derive(Clone)]
198pub struct CircuitBreakerGate {
199    config: CircuitBreakerConfig,
200    state: Arc<Mutex<CircuitState>>,
201}
202
203impl CircuitBreakerGate {
204    pub fn new(config: CircuitBreakerConfig) -> Self {
205        Self {
206            config,
207            state: Arc::new(Mutex::new(CircuitState::Closed {
208                consecutive_failures: 0,
209            })),
210        }
211    }
212
213    pub fn before_call(&self) -> CircuitBreakerDecision {
214        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
215        match *state {
216            CircuitState::Closed { .. } => CircuitBreakerDecision::Allow,
217            CircuitState::Open { opened_at } => {
218                if opened_at.elapsed() >= self.config.open_duration {
219                    tracing::info!("Circuit breaker gate: Open → HalfOpen");
220                    *state = CircuitState::HalfOpen;
221                    CircuitBreakerDecision::Allow
222                } else if let Some(ref fallback) = self.config.fallback {
223                    CircuitBreakerDecision::Fallback(fallback.clone())
224                } else {
225                    CircuitBreakerDecision::Reject(CamelError::CircuitOpen(
226                        "circuit breaker is open".into(),
227                    ))
228                }
229            }
230            CircuitState::HalfOpen => CircuitBreakerDecision::Allow,
231        }
232    }
233
234    pub fn after_result(&self, result: &Result<Exchange, CamelError>) {
235        let mut st = self.state.lock().unwrap_or_else(|e| e.into_inner());
236        let current_is_half_open = matches!(*st, CircuitState::HalfOpen);
237        match result {
238            Ok(_) => {
239                if current_is_half_open {
240                    tracing::info!("Circuit breaker gate: HalfOpen → Closed");
241                }
242                *st = CircuitState::Closed {
243                    consecutive_failures: 0,
244                };
245            }
246            Err(_) => {
247                if current_is_half_open {
248                    tracing::warn!("Circuit breaker gate: HalfOpen → Open (probe failed)");
249                    *st = CircuitState::Open {
250                        opened_at: Instant::now(),
251                    };
252                } else if let CircuitState::Closed {
253                    consecutive_failures,
254                } = &mut *st
255                {
256                    *consecutive_failures += 1;
257                    if *consecutive_failures >= self.config.failure_threshold {
258                        tracing::warn!(
259                            threshold = self.config.failure_threshold,
260                            "Circuit breaker gate: Closed → Open (failure threshold reached)"
261                        );
262                        *st = CircuitState::Open {
263                            opened_at: Instant::now(),
264                        };
265                    }
266                }
267            }
268        }
269    }
270}
271
272// ── Tests ──────────────────────────────────────────────────────────────
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
278    use std::sync::atomic::{AtomicU32, Ordering};
279    use std::time::Duration;
280    use tower::ServiceExt;
281
282    fn make_exchange() -> Exchange {
283        Exchange::new(Message::new("test"))
284    }
285
286    fn ok_processor() -> BoxProcessor {
287        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
288    }
289
290    fn failing_processor() -> BoxProcessor {
291        BoxProcessor::from_fn(|_ex| {
292            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
293        })
294    }
295
296    fn fail_n_times(n: u32) -> BoxProcessor {
297        let count = Arc::new(AtomicU32::new(0));
298        BoxProcessor::from_fn(move |ex| {
299            let count = Arc::clone(&count);
300            Box::pin(async move {
301                let c = count.fetch_add(1, Ordering::SeqCst);
302                if c < n {
303                    Err(CamelError::ProcessorError(format!("attempt {c}")))
304                } else {
305                    Ok(ex)
306                }
307            })
308        })
309    }
310
311    fn tag_processor(tag: &'static str) -> BoxProcessor {
312        BoxProcessor::from_fn(move |_ex| {
313            Box::pin(async move {
314                let mut out = make_exchange();
315                out.input.body = tag.to_string().into();
316                Ok(out)
317            })
318        })
319    }
320
321    /// 1. Circuit stays closed on success.
322    #[tokio::test]
323    async fn test_stays_closed_on_success() {
324        let config = CircuitBreakerConfig::new().failure_threshold(3);
325        let layer = CircuitBreakerLayer::new(config);
326        let mut svc = layer.layer(ok_processor());
327
328        for _ in 0..5 {
329            let result = svc.ready().await.unwrap().call(make_exchange()).await;
330            assert!(result.is_ok());
331        }
332
333        // State should still be closed with 0 failures.
334        let state = svc.state.lock().unwrap();
335        match *state {
336            CircuitState::Closed {
337                consecutive_failures,
338            } => assert_eq!(consecutive_failures, 0),
339            _ => panic!("expected Closed state"),
340        }
341    }
342
343    /// 2. Circuit opens after failure_threshold consecutive failures.
344    #[tokio::test]
345    async fn test_opens_after_failure_threshold() {
346        let config = CircuitBreakerConfig::new().failure_threshold(3);
347        let layer = CircuitBreakerLayer::new(config);
348        let mut svc = layer.layer(failing_processor());
349
350        // Three consecutive failures should open the circuit.
351        for _ in 0..3 {
352            let result = svc.ready().await.unwrap().call(make_exchange()).await;
353            assert!(result.is_err());
354        }
355
356        // The next poll_ready should return CircuitOpen error.
357        let waker = futures::task::noop_waker();
358        let mut cx = Context::from_waker(&waker);
359        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
360        match poll {
361            Poll::Ready(Err(CamelError::CircuitOpen(_))) => {} // expected
362            other => panic!("expected CircuitOpen error, got {other:?}"),
363        }
364    }
365
366    /// 3. Circuit transitions to half-open after open_duration.
367    #[tokio::test]
368    async fn test_transitions_to_half_open_after_duration() {
369        let config = CircuitBreakerConfig::new()
370            .failure_threshold(2)
371            .open_duration(Duration::from_millis(50));
372        let layer = CircuitBreakerLayer::new(config);
373        // Use fail_n_times(2) so the first 2 calls fail (opening the circuit),
374        // then the third (half-open probe) succeeds.
375        let mut svc = layer.layer(fail_n_times(2));
376
377        // Trigger 2 failures to open the circuit.
378        for _ in 0..2 {
379            let _ = svc.ready().await.unwrap().call(make_exchange()).await;
380        }
381
382        // Circuit is now open. Wait for open_duration to elapse.
383        tokio::time::sleep(Duration::from_millis(60)).await;
384
385        // poll_ready should transition to HalfOpen and succeed.
386        let result = svc.ready().await.unwrap().call(make_exchange()).await;
387        assert!(result.is_ok(), "half-open probe should succeed");
388
389        // After successful probe, circuit should be back to Closed.
390        let state = svc.state.lock().unwrap();
391        match *state {
392            CircuitState::Closed {
393                consecutive_failures,
394            } => assert_eq!(consecutive_failures, 0),
395            _ => panic!("expected Closed state after successful half-open probe"),
396        }
397    }
398
399    /// 4. Half-open failure reopens circuit.
400    #[tokio::test]
401    async fn test_half_open_failure_reopens() {
402        let config = CircuitBreakerConfig::new()
403            .failure_threshold(2)
404            .open_duration(Duration::from_millis(50));
405        let layer = CircuitBreakerLayer::new(config);
406        let mut svc = layer.layer(failing_processor());
407
408        // Trigger 2 failures to open the circuit.
409        for _ in 0..2 {
410            let _ = svc.ready().await.unwrap().call(make_exchange()).await;
411        }
412
413        // Wait for open_duration to elapse, transitioning to HalfOpen.
414        tokio::time::sleep(Duration::from_millis(60)).await;
415
416        // Half-open probe fails → circuit reopens.
417        let result = svc.ready().await.unwrap().call(make_exchange()).await;
418        assert!(result.is_err());
419
420        // Circuit should be open again.
421        let state = svc.state.lock().unwrap();
422        match *state {
423            CircuitState::Open { .. } => {} // expected
424            _ => panic!("expected Open state after half-open failure"),
425        }
426    }
427
428    /// 5. Intermittent failures below threshold don't open circuit.
429    #[tokio::test]
430    async fn test_intermittent_failures_dont_open() {
431        let config = CircuitBreakerConfig::new().failure_threshold(3);
432        let layer = CircuitBreakerLayer::new(config);
433
434        // Alternate: fail, fail, success, fail, fail, success
435        // The counter should reset on success, so threshold of 3 is never reached.
436        let call_count = Arc::new(AtomicU32::new(0));
437        let cc = Arc::clone(&call_count);
438        let inner = BoxProcessor::from_fn(move |ex| {
439            let cc = Arc::clone(&cc);
440            Box::pin(async move {
441                let c = cc.fetch_add(1, Ordering::SeqCst);
442                // Pattern: fail, fail, success, fail, fail, success
443                if c % 3 == 2 {
444                    Ok(ex)
445                } else {
446                    Err(CamelError::ProcessorError("intermittent".into()))
447                }
448            })
449        });
450
451        let mut svc = layer.layer(inner);
452
453        for _ in 0..6 {
454            let _ = svc.ready().await.unwrap().call(make_exchange()).await;
455        }
456
457        // Circuit should still be closed because successes reset the counter.
458        let state = svc.state.lock().unwrap();
459        match *state {
460            CircuitState::Closed { .. } => {} // expected
461            _ => panic!("expected circuit to remain Closed"),
462        }
463    }
464
465    #[tokio::test]
466    async fn test_open_uses_fallback_when_configured() {
467        let fallback = tag_processor("fallback");
468        let config = CircuitBreakerConfig::new()
469            .failure_threshold(1)
470            .open_duration(Duration::from_secs(60))
471            .fallback(fallback);
472        let layer = CircuitBreakerLayer::new(config);
473        let mut svc = layer.layer(failing_processor());
474
475        let _ = svc.ready().await.unwrap().call(make_exchange()).await;
476        let result = svc
477            .ready()
478            .await
479            .unwrap()
480            .call(make_exchange())
481            .await
482            .unwrap();
483        assert_eq!(result.input.body.as_text(), Some("fallback"));
484    }
485
486    #[tokio::test]
487    async fn test_open_without_fallback_returns_err() {
488        let config = CircuitBreakerConfig::new()
489            .failure_threshold(1)
490            .open_duration(Duration::from_secs(60));
491        let layer = CircuitBreakerLayer::new(config);
492        let mut svc = layer.layer(failing_processor());
493
494        let _ = svc.ready().await.unwrap().call(make_exchange()).await;
495        let result = svc.ready().await;
496        assert!(matches!(result, Err(CamelError::CircuitOpen(_))));
497    }
498
499    // ── CircuitBreakerGate tests ──────────────────────────────────────────
500
501    #[test]
502    fn test_cb_gate_before_call_closed_allows() {
503        let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
504            failure_threshold: 3,
505            open_duration: Duration::from_secs(60),
506            success_threshold: 1,
507            fallback: None,
508        });
509        assert!(matches!(gate.before_call(), CircuitBreakerDecision::Allow));
510    }
511
512    #[test]
513    fn test_cb_gate_records_failures_and_opens() {
514        let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
515            failure_threshold: 2,
516            open_duration: Duration::from_secs(60),
517            success_threshold: 1,
518            fallback: None,
519        });
520        gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
521        assert!(
522            matches!(gate.before_call(), CircuitBreakerDecision::Allow),
523            "still closed after 1 failure"
524        );
525        gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
526        assert!(
527            matches!(gate.before_call(), CircuitBreakerDecision::Reject(_)),
528            "should be open after 2 failures"
529        );
530    }
531
532    #[tokio::test]
533    async fn test_cb_gate_closes_on_success() {
534        let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
535            failure_threshold: 1,
536            open_duration: Duration::from_millis(1),
537            success_threshold: 1,
538            fallback: None,
539        });
540        gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
541        assert!(
542            matches!(gate.before_call(), CircuitBreakerDecision::Reject(_)),
543            "should be open"
544        );
545        tokio::time::sleep(Duration::from_millis(10)).await;
546        assert!(
547            matches!(gate.before_call(), CircuitBreakerDecision::Allow),
548            "should transition to half-open"
549        );
550        let ex = Exchange::new(Message::new("test"));
551        gate.after_result(&Ok(ex));
552        assert!(
553            matches!(gate.before_call(), CircuitBreakerDecision::Allow),
554            "should be closed again"
555        );
556    }
557
558    #[tokio::test]
559    async fn test_cb_gate_half_open_failure_reopens() {
560        let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
561            failure_threshold: 1,
562            open_duration: Duration::from_millis(1),
563            success_threshold: 1,
564            fallback: None,
565        });
566        // Open the circuit
567        gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
568        assert!(
569            matches!(gate.before_call(), CircuitBreakerDecision::Reject(_)),
570            "should be open"
571        );
572
573        // Wait for open_duration to elapse → transitions to HalfOpen
574        tokio::time::sleep(Duration::from_millis(10)).await;
575        assert!(
576            matches!(gate.before_call(), CircuitBreakerDecision::Allow),
577            "should be half-open now"
578        );
579
580        // Probe fails in HalfOpen → should reopen
581        gate.after_result(&Err(CamelError::ProcessorError("probe fail".into())));
582        assert!(
583            matches!(gate.before_call(), CircuitBreakerDecision::Reject(_)),
584            "should be open again after probe failure"
585        );
586    }
587
588    #[test]
589    fn test_cb_gate_open_with_fallback_returns_fallback() {
590        let fallback = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
591        let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
592            failure_threshold: 1,
593            open_duration: Duration::from_secs(60),
594            success_threshold: 1,
595            fallback: Some(fallback),
596        });
597        gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
598        assert!(
599            matches!(gate.before_call(), CircuitBreakerDecision::Fallback(_)),
600            "should return fallback when open"
601        );
602    }
603
604    #[test]
605    fn test_cb_gate_handled_error_counts_as_success() {
606        let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
607            failure_threshold: 1,
608            open_duration: Duration::from_secs(60),
609            success_threshold: 1,
610            fallback: None,
611        });
612        let ex = Exchange::new(Message::new("test"));
613        gate.after_result(&Ok(ex));
614        assert!(
615            matches!(gate.before_call(), CircuitBreakerDecision::Allow),
616            "handled error should not trip CB"
617        );
618    }
619}