1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use tower::{Layer, Service};
8
9use camel_api::{BoxProcessor, CamelError, CircuitBreakerConfig, Exchange};
10
11enum CircuitState {
14 Closed { consecutive_failures: u32 },
15 Open { opened_at: Instant },
16 HalfOpen,
17}
18
19#[derive(Clone)]
23pub struct CircuitBreakerLayer {
24 config: CircuitBreakerConfig,
25 state: Arc<Mutex<CircuitState>>,
26}
27
28impl CircuitBreakerLayer {
29 pub fn new(config: CircuitBreakerConfig) -> Self {
30 Self {
31 config,
32 state: Arc::new(Mutex::new(CircuitState::Closed {
33 consecutive_failures: 0,
34 })),
35 }
36 }
37}
38
39impl<S> Layer<S> for CircuitBreakerLayer {
40 type Service = CircuitBreakerService<S>;
41
42 fn layer(&self, inner: S) -> Self::Service {
43 CircuitBreakerService {
44 inner,
45 config: self.config.clone(),
46 state: Arc::clone(&self.state),
47 }
48 }
49}
50
51pub struct CircuitBreakerService<S> {
55 inner: S,
56 config: CircuitBreakerConfig,
57 state: Arc<Mutex<CircuitState>>,
58}
59
60impl<S: Clone> Clone for CircuitBreakerService<S> {
61 fn clone(&self) -> Self {
62 Self {
63 inner: self.inner.clone(),
64 config: self.config.clone(),
65 state: Arc::clone(&self.state),
66 }
67 }
68}
69
70impl<S> Service<Exchange> for CircuitBreakerService<S>
71where
72 S: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
73 S::Future: Send,
74{
75 type Response = Exchange;
76 type Error = CamelError;
77 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
78
79 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
81 match *state {
82 CircuitState::Closed { .. } => {
83 drop(state);
84 self.inner.poll_ready(cx)
85 }
86 CircuitState::Open { opened_at } => {
87 if opened_at.elapsed() >= self.config.open_duration {
88 tracing::info!("Circuit breaker transitioning from Open to HalfOpen");
89 *state = CircuitState::HalfOpen;
90 drop(state);
91 self.inner.poll_ready(cx)
92 } else if self.config.fallback.is_some() {
93 Poll::Ready(Ok(()))
94 } else {
95 Poll::Ready(Err(CamelError::CircuitOpen(
96 "circuit breaker is open".into(),
97 )))
98 }
99 }
100 CircuitState::HalfOpen => {
101 drop(state);
102 self.inner.poll_ready(cx)
103 }
104 }
105 }
106
107 fn call(&mut self, exchange: Exchange) -> Self::Future {
108 {
109 let mut st = self.state.lock().unwrap_or_else(|e| e.into_inner());
110 if let CircuitState::Open { opened_at } = *st {
111 if opened_at.elapsed() < self.config.open_duration {
112 if let Some(mut fallback) = self.config.fallback.clone() {
113 return Box::pin(async move { fallback.call(exchange).await });
114 }
115 return Box::pin(async {
116 Err(CamelError::CircuitOpen("circuit breaker is open".into()))
117 });
118 }
119
120 tracing::info!("Circuit breaker transitioning from Open to HalfOpen");
121 *st = CircuitState::HalfOpen;
122 }
123 }
124
125 let mut inner = self.inner.clone();
127 let state = Arc::clone(&self.state);
128 let config = self.config.clone();
129
130 let current_is_half_open = matches!(
132 *state.lock().unwrap_or_else(|e| e.into_inner()),
133 CircuitState::HalfOpen
134 );
135
136 Box::pin(async move {
137 let result = inner.call(exchange).await;
138
139 let mut st = state.lock().unwrap_or_else(|e| e.into_inner());
141 match &result {
142 Ok(_) => {
143 if current_is_half_open {
145 tracing::info!("Circuit breaker transitioning from HalfOpen to Closed");
146 }
147 *st = CircuitState::Closed {
148 consecutive_failures: 0,
149 };
150 }
151 Err(_) => {
152 if current_is_half_open {
153 tracing::warn!(
155 "Circuit breaker transitioning from HalfOpen to Open (probe failed)"
156 );
157 *st = CircuitState::Open {
158 opened_at: Instant::now(),
159 };
160 } else if let CircuitState::Closed {
161 consecutive_failures,
162 } = &mut *st
163 {
164 *consecutive_failures += 1;
165 if *consecutive_failures >= config.failure_threshold {
166 tracing::warn!(
167 threshold = config.failure_threshold,
168 "Circuit breaker transitioning from Closed to Open (failure threshold reached)"
169 );
170 *st = CircuitState::Open {
171 opened_at: Instant::now(),
172 };
173 }
174 }
175 }
176 }
177
178 result
179 })
180 }
181}
182
183pub enum CircuitBreakerDecision {
187 Allow,
189 Fallback(BoxProcessor),
192 Reject(CamelError),
194}
195
196#[derive(Clone)]
198pub struct CircuitBreakerGate {
199 config: CircuitBreakerConfig,
200 state: Arc<Mutex<CircuitState>>,
201}
202
203impl CircuitBreakerGate {
204 pub fn new(config: CircuitBreakerConfig) -> Self {
205 Self {
206 config,
207 state: Arc::new(Mutex::new(CircuitState::Closed {
208 consecutive_failures: 0,
209 })),
210 }
211 }
212
213 pub fn before_call(&self) -> CircuitBreakerDecision {
214 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
215 match *state {
216 CircuitState::Closed { .. } => CircuitBreakerDecision::Allow,
217 CircuitState::Open { opened_at } => {
218 if opened_at.elapsed() >= self.config.open_duration {
219 tracing::info!("Circuit breaker gate: Open → HalfOpen");
220 *state = CircuitState::HalfOpen;
221 CircuitBreakerDecision::Allow
222 } else if let Some(ref fallback) = self.config.fallback {
223 CircuitBreakerDecision::Fallback(fallback.clone())
224 } else {
225 CircuitBreakerDecision::Reject(CamelError::CircuitOpen(
226 "circuit breaker is open".into(),
227 ))
228 }
229 }
230 CircuitState::HalfOpen => CircuitBreakerDecision::Allow,
231 }
232 }
233
234 pub fn after_result(&self, result: &Result<Exchange, CamelError>) {
235 let mut st = self.state.lock().unwrap_or_else(|e| e.into_inner());
236 let current_is_half_open = matches!(*st, CircuitState::HalfOpen);
237 match result {
238 Ok(_) => {
239 if current_is_half_open {
240 tracing::info!("Circuit breaker gate: HalfOpen → Closed");
241 }
242 *st = CircuitState::Closed {
243 consecutive_failures: 0,
244 };
245 }
246 Err(_) => {
247 if current_is_half_open {
248 tracing::warn!("Circuit breaker gate: HalfOpen → Open (probe failed)");
249 *st = CircuitState::Open {
250 opened_at: Instant::now(),
251 };
252 } else if let CircuitState::Closed {
253 consecutive_failures,
254 } = &mut *st
255 {
256 *consecutive_failures += 1;
257 if *consecutive_failures >= self.config.failure_threshold {
258 tracing::warn!(
259 threshold = self.config.failure_threshold,
260 "Circuit breaker gate: Closed → Open (failure threshold reached)"
261 );
262 *st = CircuitState::Open {
263 opened_at: Instant::now(),
264 };
265 }
266 }
267 }
268 }
269 }
270}
271
272#[cfg(test)]
275mod tests {
276 use super::*;
277 use camel_api::{BoxProcessor, BoxProcessorExt, Message};
278 use std::sync::atomic::{AtomicU32, Ordering};
279 use std::time::Duration;
280 use tower::ServiceExt;
281
282 fn make_exchange() -> Exchange {
283 Exchange::new(Message::new("test"))
284 }
285
286 fn ok_processor() -> BoxProcessor {
287 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
288 }
289
290 fn failing_processor() -> BoxProcessor {
291 BoxProcessor::from_fn(|_ex| {
292 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
293 })
294 }
295
296 fn fail_n_times(n: u32) -> BoxProcessor {
297 let count = Arc::new(AtomicU32::new(0));
298 BoxProcessor::from_fn(move |ex| {
299 let count = Arc::clone(&count);
300 Box::pin(async move {
301 let c = count.fetch_add(1, Ordering::SeqCst);
302 if c < n {
303 Err(CamelError::ProcessorError(format!("attempt {c}")))
304 } else {
305 Ok(ex)
306 }
307 })
308 })
309 }
310
311 fn tag_processor(tag: &'static str) -> BoxProcessor {
312 BoxProcessor::from_fn(move |_ex| {
313 Box::pin(async move {
314 let mut out = make_exchange();
315 out.input.body = tag.to_string().into();
316 Ok(out)
317 })
318 })
319 }
320
321 #[tokio::test]
323 async fn test_stays_closed_on_success() {
324 let config = CircuitBreakerConfig::new().failure_threshold(3);
325 let layer = CircuitBreakerLayer::new(config);
326 let mut svc = layer.layer(ok_processor());
327
328 for _ in 0..5 {
329 let result = svc.ready().await.unwrap().call(make_exchange()).await;
330 assert!(result.is_ok());
331 }
332
333 let state = svc.state.lock().unwrap();
335 match *state {
336 CircuitState::Closed {
337 consecutive_failures,
338 } => assert_eq!(consecutive_failures, 0),
339 _ => panic!("expected Closed state"),
340 }
341 }
342
343 #[tokio::test]
345 async fn test_opens_after_failure_threshold() {
346 let config = CircuitBreakerConfig::new().failure_threshold(3);
347 let layer = CircuitBreakerLayer::new(config);
348 let mut svc = layer.layer(failing_processor());
349
350 for _ in 0..3 {
352 let result = svc.ready().await.unwrap().call(make_exchange()).await;
353 assert!(result.is_err());
354 }
355
356 let waker = futures::task::noop_waker();
358 let mut cx = Context::from_waker(&waker);
359 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
360 match poll {
361 Poll::Ready(Err(CamelError::CircuitOpen(_))) => {} other => panic!("expected CircuitOpen error, got {other:?}"),
363 }
364 }
365
366 #[tokio::test]
368 async fn test_transitions_to_half_open_after_duration() {
369 let config = CircuitBreakerConfig::new()
370 .failure_threshold(2)
371 .open_duration(Duration::from_millis(50));
372 let layer = CircuitBreakerLayer::new(config);
373 let mut svc = layer.layer(fail_n_times(2));
376
377 for _ in 0..2 {
379 let _ = svc.ready().await.unwrap().call(make_exchange()).await;
380 }
381
382 tokio::time::sleep(Duration::from_millis(60)).await;
384
385 let result = svc.ready().await.unwrap().call(make_exchange()).await;
387 assert!(result.is_ok(), "half-open probe should succeed");
388
389 let state = svc.state.lock().unwrap();
391 match *state {
392 CircuitState::Closed {
393 consecutive_failures,
394 } => assert_eq!(consecutive_failures, 0),
395 _ => panic!("expected Closed state after successful half-open probe"),
396 }
397 }
398
399 #[tokio::test]
401 async fn test_half_open_failure_reopens() {
402 let config = CircuitBreakerConfig::new()
403 .failure_threshold(2)
404 .open_duration(Duration::from_millis(50));
405 let layer = CircuitBreakerLayer::new(config);
406 let mut svc = layer.layer(failing_processor());
407
408 for _ in 0..2 {
410 let _ = svc.ready().await.unwrap().call(make_exchange()).await;
411 }
412
413 tokio::time::sleep(Duration::from_millis(60)).await;
415
416 let result = svc.ready().await.unwrap().call(make_exchange()).await;
418 assert!(result.is_err());
419
420 let state = svc.state.lock().unwrap();
422 match *state {
423 CircuitState::Open { .. } => {} _ => panic!("expected Open state after half-open failure"),
425 }
426 }
427
428 #[tokio::test]
430 async fn test_intermittent_failures_dont_open() {
431 let config = CircuitBreakerConfig::new().failure_threshold(3);
432 let layer = CircuitBreakerLayer::new(config);
433
434 let call_count = Arc::new(AtomicU32::new(0));
437 let cc = Arc::clone(&call_count);
438 let inner = BoxProcessor::from_fn(move |ex| {
439 let cc = Arc::clone(&cc);
440 Box::pin(async move {
441 let c = cc.fetch_add(1, Ordering::SeqCst);
442 if c % 3 == 2 {
444 Ok(ex)
445 } else {
446 Err(CamelError::ProcessorError("intermittent".into()))
447 }
448 })
449 });
450
451 let mut svc = layer.layer(inner);
452
453 for _ in 0..6 {
454 let _ = svc.ready().await.unwrap().call(make_exchange()).await;
455 }
456
457 let state = svc.state.lock().unwrap();
459 match *state {
460 CircuitState::Closed { .. } => {} _ => panic!("expected circuit to remain Closed"),
462 }
463 }
464
465 #[tokio::test]
466 async fn test_open_uses_fallback_when_configured() {
467 let fallback = tag_processor("fallback");
468 let config = CircuitBreakerConfig::new()
469 .failure_threshold(1)
470 .open_duration(Duration::from_secs(60))
471 .fallback(fallback);
472 let layer = CircuitBreakerLayer::new(config);
473 let mut svc = layer.layer(failing_processor());
474
475 let _ = svc.ready().await.unwrap().call(make_exchange()).await;
476 let result = svc
477 .ready()
478 .await
479 .unwrap()
480 .call(make_exchange())
481 .await
482 .unwrap();
483 assert_eq!(result.input.body.as_text(), Some("fallback"));
484 }
485
486 #[tokio::test]
487 async fn test_open_without_fallback_returns_err() {
488 let config = CircuitBreakerConfig::new()
489 .failure_threshold(1)
490 .open_duration(Duration::from_secs(60));
491 let layer = CircuitBreakerLayer::new(config);
492 let mut svc = layer.layer(failing_processor());
493
494 let _ = svc.ready().await.unwrap().call(make_exchange()).await;
495 let result = svc.ready().await;
496 assert!(matches!(result, Err(CamelError::CircuitOpen(_))));
497 }
498
499 #[test]
502 fn test_cb_gate_before_call_closed_allows() {
503 let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
504 failure_threshold: 3,
505 open_duration: Duration::from_secs(60),
506 success_threshold: 1,
507 fallback: None,
508 });
509 assert!(matches!(gate.before_call(), CircuitBreakerDecision::Allow));
510 }
511
512 #[test]
513 fn test_cb_gate_records_failures_and_opens() {
514 let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
515 failure_threshold: 2,
516 open_duration: Duration::from_secs(60),
517 success_threshold: 1,
518 fallback: None,
519 });
520 gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
521 assert!(
522 matches!(gate.before_call(), CircuitBreakerDecision::Allow),
523 "still closed after 1 failure"
524 );
525 gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
526 assert!(
527 matches!(gate.before_call(), CircuitBreakerDecision::Reject(_)),
528 "should be open after 2 failures"
529 );
530 }
531
532 #[tokio::test]
533 async fn test_cb_gate_closes_on_success() {
534 let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
535 failure_threshold: 1,
536 open_duration: Duration::from_millis(1),
537 success_threshold: 1,
538 fallback: None,
539 });
540 gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
541 assert!(
542 matches!(gate.before_call(), CircuitBreakerDecision::Reject(_)),
543 "should be open"
544 );
545 tokio::time::sleep(Duration::from_millis(10)).await;
546 assert!(
547 matches!(gate.before_call(), CircuitBreakerDecision::Allow),
548 "should transition to half-open"
549 );
550 let ex = Exchange::new(Message::new("test"));
551 gate.after_result(&Ok(ex));
552 assert!(
553 matches!(gate.before_call(), CircuitBreakerDecision::Allow),
554 "should be closed again"
555 );
556 }
557
558 #[tokio::test]
559 async fn test_cb_gate_half_open_failure_reopens() {
560 let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
561 failure_threshold: 1,
562 open_duration: Duration::from_millis(1),
563 success_threshold: 1,
564 fallback: None,
565 });
566 gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
568 assert!(
569 matches!(gate.before_call(), CircuitBreakerDecision::Reject(_)),
570 "should be open"
571 );
572
573 tokio::time::sleep(Duration::from_millis(10)).await;
575 assert!(
576 matches!(gate.before_call(), CircuitBreakerDecision::Allow),
577 "should be half-open now"
578 );
579
580 gate.after_result(&Err(CamelError::ProcessorError("probe fail".into())));
582 assert!(
583 matches!(gate.before_call(), CircuitBreakerDecision::Reject(_)),
584 "should be open again after probe failure"
585 );
586 }
587
588 #[test]
589 fn test_cb_gate_open_with_fallback_returns_fallback() {
590 let fallback = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
591 let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
592 failure_threshold: 1,
593 open_duration: Duration::from_secs(60),
594 success_threshold: 1,
595 fallback: Some(fallback),
596 });
597 gate.after_result(&Err(CamelError::ProcessorError("fail".into())));
598 assert!(
599 matches!(gate.before_call(), CircuitBreakerDecision::Fallback(_)),
600 "should return fallback when open"
601 );
602 }
603
604 #[test]
605 fn test_cb_gate_handled_error_counts_as_success() {
606 let gate = CircuitBreakerGate::new(CircuitBreakerConfig {
607 failure_threshold: 1,
608 open_duration: Duration::from_secs(60),
609 success_threshold: 1,
610 fallback: None,
611 });
612 let ex = Exchange::new(Message::new("test"));
613 gate.after_result(&Ok(ex));
614 assert!(
615 matches!(gate.before_call(), CircuitBreakerDecision::Allow),
616 "handled error should not trip CB"
617 );
618 }
619}