camel_processor/
circuit_breaker.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use tower::{Layer, Service};
8
9use camel_api::{CamelError, CircuitBreakerConfig, Exchange};
10
11enum CircuitState {
14 Closed { consecutive_failures: u32 },
15 Open { opened_at: Instant },
16 HalfOpen,
17}
18
19#[derive(Clone)]
23pub struct CircuitBreakerLayer {
24 config: CircuitBreakerConfig,
25 state: Arc<Mutex<CircuitState>>,
26}
27
28impl CircuitBreakerLayer {
29 pub fn new(config: CircuitBreakerConfig) -> Self {
30 Self {
31 config,
32 state: Arc::new(Mutex::new(CircuitState::Closed {
33 consecutive_failures: 0,
34 })),
35 }
36 }
37}
38
39impl<S> Layer<S> for CircuitBreakerLayer {
40 type Service = CircuitBreakerService<S>;
41
42 fn layer(&self, inner: S) -> Self::Service {
43 CircuitBreakerService {
44 inner,
45 config: self.config.clone(),
46 state: Arc::clone(&self.state),
47 }
48 }
49}
50
51pub struct CircuitBreakerService<S> {
55 inner: S,
56 config: CircuitBreakerConfig,
57 state: Arc<Mutex<CircuitState>>,
58}
59
60impl<S: Clone> Clone for CircuitBreakerService<S> {
61 fn clone(&self) -> Self {
62 Self {
63 inner: self.inner.clone(),
64 config: self.config.clone(),
65 state: Arc::clone(&self.state),
66 }
67 }
68}
69
70impl<S> Service<Exchange> for CircuitBreakerService<S>
71where
72 S: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
73 S::Future: Send,
74{
75 type Response = Exchange;
76 type Error = CamelError;
77 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
78
79 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
81 match *state {
82 CircuitState::Closed { .. } => {
83 drop(state);
84 self.inner.poll_ready(cx)
85 }
86 CircuitState::Open { opened_at } => {
87 if opened_at.elapsed() >= self.config.open_duration {
88 tracing::info!("Circuit breaker transitioning from Open to HalfOpen");
89 *state = CircuitState::HalfOpen;
90 drop(state);
91 self.inner.poll_ready(cx)
92 } else {
93 Poll::Ready(Err(CamelError::CircuitOpen(
94 "circuit breaker is open".into(),
95 )))
96 }
97 }
98 CircuitState::HalfOpen => {
99 drop(state);
100 self.inner.poll_ready(cx)
101 }
102 }
103 }
104
105 fn call(&mut self, exchange: Exchange) -> Self::Future {
106 let mut inner = self.inner.clone();
108 let state = Arc::clone(&self.state);
109 let config = self.config.clone();
110
111 let current_is_half_open = matches!(
113 *state.lock().unwrap_or_else(|e| e.into_inner()),
114 CircuitState::HalfOpen
115 );
116
117 Box::pin(async move {
118 let result = inner.call(exchange).await;
119
120 let mut st = state.lock().unwrap_or_else(|e| e.into_inner());
122 match &result {
123 Ok(_) => {
124 if current_is_half_open {
126 tracing::info!("Circuit breaker transitioning from HalfOpen to Closed");
127 }
128 *st = CircuitState::Closed {
129 consecutive_failures: 0,
130 };
131 }
132 Err(_) => {
133 if current_is_half_open {
134 tracing::warn!(
136 "Circuit breaker transitioning from HalfOpen to Open (probe failed)"
137 );
138 *st = CircuitState::Open {
139 opened_at: Instant::now(),
140 };
141 } else if let CircuitState::Closed {
142 consecutive_failures,
143 } = &mut *st
144 {
145 *consecutive_failures += 1;
146 if *consecutive_failures >= config.failure_threshold {
147 tracing::warn!(
148 threshold = config.failure_threshold,
149 "Circuit breaker transitioning from Closed to Open (failure threshold reached)"
150 );
151 *st = CircuitState::Open {
152 opened_at: Instant::now(),
153 };
154 }
155 }
156 }
157 }
158
159 result
160 })
161 }
162}
163
164#[cfg(test)]
167mod tests {
168 use super::*;
169 use camel_api::{BoxProcessor, BoxProcessorExt, Message};
170 use std::sync::atomic::{AtomicU32, Ordering};
171 use std::time::Duration;
172 use tower::ServiceExt;
173
174 fn make_exchange() -> Exchange {
175 Exchange::new(Message::new("test"))
176 }
177
178 fn ok_processor() -> BoxProcessor {
179 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
180 }
181
182 fn failing_processor() -> BoxProcessor {
183 BoxProcessor::from_fn(|_ex| {
184 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
185 })
186 }
187
188 fn fail_n_times(n: u32) -> BoxProcessor {
189 let count = Arc::new(AtomicU32::new(0));
190 BoxProcessor::from_fn(move |ex| {
191 let count = Arc::clone(&count);
192 Box::pin(async move {
193 let c = count.fetch_add(1, Ordering::SeqCst);
194 if c < n {
195 Err(CamelError::ProcessorError(format!("attempt {c}")))
196 } else {
197 Ok(ex)
198 }
199 })
200 })
201 }
202
203 #[tokio::test]
205 async fn test_stays_closed_on_success() {
206 let config = CircuitBreakerConfig::new().failure_threshold(3);
207 let layer = CircuitBreakerLayer::new(config);
208 let mut svc = layer.layer(ok_processor());
209
210 for _ in 0..5 {
211 let result = svc.ready().await.unwrap().call(make_exchange()).await;
212 assert!(result.is_ok());
213 }
214
215 let state = svc.state.lock().unwrap();
217 match *state {
218 CircuitState::Closed {
219 consecutive_failures,
220 } => assert_eq!(consecutive_failures, 0),
221 _ => panic!("expected Closed state"),
222 }
223 }
224
225 #[tokio::test]
227 async fn test_opens_after_failure_threshold() {
228 let config = CircuitBreakerConfig::new().failure_threshold(3);
229 let layer = CircuitBreakerLayer::new(config);
230 let mut svc = layer.layer(failing_processor());
231
232 for _ in 0..3 {
234 let result = svc.ready().await.unwrap().call(make_exchange()).await;
235 assert!(result.is_err());
236 }
237
238 let waker = futures::task::noop_waker();
240 let mut cx = Context::from_waker(&waker);
241 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
242 match poll {
243 Poll::Ready(Err(CamelError::CircuitOpen(_))) => {} other => panic!("expected CircuitOpen error, got {other:?}"),
245 }
246 }
247
248 #[tokio::test]
250 async fn test_transitions_to_half_open_after_duration() {
251 let config = CircuitBreakerConfig::new()
252 .failure_threshold(2)
253 .open_duration(Duration::from_millis(50));
254 let layer = CircuitBreakerLayer::new(config);
255 let mut svc = layer.layer(fail_n_times(2));
258
259 for _ in 0..2 {
261 let _ = svc.ready().await.unwrap().call(make_exchange()).await;
262 }
263
264 tokio::time::sleep(Duration::from_millis(60)).await;
266
267 let result = svc.ready().await.unwrap().call(make_exchange()).await;
269 assert!(result.is_ok(), "half-open probe should succeed");
270
271 let state = svc.state.lock().unwrap();
273 match *state {
274 CircuitState::Closed {
275 consecutive_failures,
276 } => assert_eq!(consecutive_failures, 0),
277 _ => panic!("expected Closed state after successful half-open probe"),
278 }
279 }
280
281 #[tokio::test]
283 async fn test_half_open_failure_reopens() {
284 let config = CircuitBreakerConfig::new()
285 .failure_threshold(2)
286 .open_duration(Duration::from_millis(50));
287 let layer = CircuitBreakerLayer::new(config);
288 let mut svc = layer.layer(failing_processor());
289
290 for _ in 0..2 {
292 let _ = svc.ready().await.unwrap().call(make_exchange()).await;
293 }
294
295 tokio::time::sleep(Duration::from_millis(60)).await;
297
298 let result = svc.ready().await.unwrap().call(make_exchange()).await;
300 assert!(result.is_err());
301
302 let state = svc.state.lock().unwrap();
304 match *state {
305 CircuitState::Open { .. } => {} _ => panic!("expected Open state after half-open failure"),
307 }
308 }
309
310 #[tokio::test]
312 async fn test_intermittent_failures_dont_open() {
313 let config = CircuitBreakerConfig::new().failure_threshold(3);
314 let layer = CircuitBreakerLayer::new(config);
315
316 let call_count = Arc::new(AtomicU32::new(0));
319 let cc = Arc::clone(&call_count);
320 let inner = BoxProcessor::from_fn(move |ex| {
321 let cc = Arc::clone(&cc);
322 Box::pin(async move {
323 let c = cc.fetch_add(1, Ordering::SeqCst);
324 if c % 3 == 2 {
326 Ok(ex)
327 } else {
328 Err(CamelError::ProcessorError("intermittent".into()))
329 }
330 })
331 });
332
333 let mut svc = layer.layer(inner);
334
335 for _ in 0..6 {
336 let _ = svc.ready().await.unwrap().call(make_exchange()).await;
337 }
338
339 let state = svc.state.lock().unwrap();
341 match *state {
342 CircuitState::Closed { .. } => {} _ => panic!("expected circuit to remain Closed"),
344 }
345 }
346}