1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8 ExceptionPolicy, HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
9};
10use camel_api::{BoxProcessor, CamelError, Exchange, Value};
11
12pub struct ErrorHandlerLayer {
16 dlc_producer: Option<BoxProcessor>,
18 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
20}
21
22impl ErrorHandlerLayer {
23 pub fn new(
25 dlc_producer: Option<BoxProcessor>,
26 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
27 ) -> Self {
28 Self {
29 dlc_producer,
30 policies,
31 }
32 }
33}
34
35impl<S> Layer<S> for ErrorHandlerLayer
36where
37 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
38 S::Future: Send + 'static,
39{
40 type Service = ErrorHandlerService<S>;
41
42 fn layer(&self, inner: S) -> Self::Service {
43 ErrorHandlerService {
44 inner,
45 dlc_producer: self.dlc_producer.clone(),
46 policies: self
47 .policies
48 .iter()
49 .map(|(p, prod)| (p.clone(), prod.clone()))
50 .collect(),
51 }
52 }
53}
54
55pub struct ErrorHandlerService<S> {
60 inner: S,
61 dlc_producer: Option<BoxProcessor>,
62 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
63}
64
65impl<S: Clone> Clone for ErrorHandlerService<S> {
66 fn clone(&self) -> Self {
67 Self {
68 inner: self.inner.clone(),
69 dlc_producer: self.dlc_producer.clone(),
70 policies: self
71 .policies
72 .iter()
73 .map(|(p, prod)| (p.clone(), prod.clone()))
74 .collect(),
75 }
76 }
77}
78
79impl<S> ErrorHandlerService<S>
80where
81 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
82 S::Future: Send + 'static,
83{
84 pub fn new(
86 inner: S,
87 dlc_producer: Option<BoxProcessor>,
88 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
89 ) -> Self {
90 Self {
91 inner,
92 dlc_producer,
93 policies,
94 }
95 }
96}
97
98impl<S> Service<Exchange> for ErrorHandlerService<S>
99where
100 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
101 S::Future: Send + 'static,
102{
103 type Response = Exchange;
104 type Error = CamelError;
105 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
106
107 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
108 self.inner.poll_ready(cx)
109 }
110
111 fn call(&mut self, exchange: Exchange) -> Self::Future {
112 let mut inner = self.inner.clone();
113 let dlc = self.dlc_producer.clone();
114 let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
115 .policies
116 .iter()
117 .map(|(p, prod)| (p.clone(), prod.clone()))
118 .collect();
119
120 Box::pin(async move {
121 let original = exchange.clone();
122 let result = inner.ready().await?.call(exchange).await;
123
124 let err = match result {
125 Ok(ex) => return Ok(ex),
126 Err(e) => e,
127 };
128
129 if matches!(err, CamelError::Stopped) {
131 return Err(err);
132 }
133
134 let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
136
137 if let Some((policy, policy_producer)) = matched {
138 if let Some(ref backoff) = policy.retry {
140 for attempt in 0..backoff.max_attempts {
141 let delay = backoff.delay_for(attempt);
142 tokio::time::sleep(delay).await;
143
144 let mut ex = original.clone();
146 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
147 ex.input.set_header(
148 HEADER_REDELIVERY_COUNTER,
149 Value::Number((attempt + 1).into()),
150 );
151 ex.input.set_header(
152 HEADER_REDELIVERY_MAX_COUNTER,
153 Value::Number(backoff.max_attempts.into()),
154 );
155
156 match inner.ready().await?.call(ex).await {
157 Ok(ex) => return Ok(ex),
158 Err(_e) => {
159 if attempt + 1 == backoff.max_attempts {
160 let mut ex = original.clone();
162 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
163 ex.input.set_header(
164 HEADER_REDELIVERY_COUNTER,
165 Value::Number(backoff.max_attempts.into()),
166 );
167 ex.input.set_header(
168 HEADER_REDELIVERY_MAX_COUNTER,
169 Value::Number(backoff.max_attempts.into()),
170 );
171 ex.set_error(_e);
172 let handler = policy_producer.or(dlc);
173 return send_to_handler(ex, handler).await;
174 }
175 }
176 }
177 }
178 }
179 let mut ex = original.clone();
181 ex.set_error(err);
182 let handler = policy_producer.or(dlc);
183 send_to_handler(ex, handler).await
184 } else {
185 let mut ex = original;
187 ex.set_error(err);
188 send_to_handler(ex, dlc).await
189 }
190 })
191 }
192}
193
194async fn send_to_handler(
195 exchange: Exchange,
196 producer: Option<BoxProcessor>,
197) -> Result<Exchange, CamelError> {
198 match producer {
199 None => {
200 tracing::error!(
201 error = ?exchange.error,
202 "Exchange failed with no error handler configured"
203 );
204 Ok(exchange)
205 }
206 Some(mut prod) => match prod.ready().await {
207 Err(e) => {
208 tracing::error!("DLC/handler not ready: {e}");
209 Ok(exchange)
210 }
211 Ok(svc) => match svc.call(exchange.clone()).await {
212 Ok(ex) => Ok(ex),
213 Err(e) => {
214 tracing::error!("DLC/handler call failed: {e}");
215 Ok(exchange)
217 }
218 },
219 },
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use camel_api::{
227 BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, Value,
228 error_handler::RedeliveryPolicy,
229 };
230 use std::sync::{
231 Arc,
232 atomic::{AtomicU32, Ordering},
233 };
234 use std::time::Duration;
235 use tower::ServiceExt;
236
237 fn make_exchange() -> Exchange {
238 Exchange::new(Message::new("test"))
239 }
240
241 fn failing_processor() -> BoxProcessor {
242 BoxProcessor::from_fn(|_ex| {
243 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
244 })
245 }
246
247 fn ok_processor() -> BoxProcessor {
248 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
249 }
250
251 fn fail_n_times(n: u32) -> BoxProcessor {
252 let count = Arc::new(AtomicU32::new(0));
253 BoxProcessor::from_fn(move |ex| {
254 let count = Arc::clone(&count);
255 Box::pin(async move {
256 let c = count.fetch_add(1, Ordering::SeqCst);
257 if c < n {
258 Err(CamelError::ProcessorError(format!("attempt {c}")))
259 } else {
260 Ok(ex)
261 }
262 })
263 })
264 }
265
266 #[tokio::test]
267 async fn test_ok_passthrough() {
268 let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
269 let result = svc.oneshot(make_exchange()).await;
270 assert!(result.is_ok());
271 assert!(!result.unwrap().has_error());
272 }
273
274 #[tokio::test]
275 async fn test_error_goes_to_dlc() {
276 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
277 let received_clone = Arc::clone(&received);
278 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
279 let r = Arc::clone(&received_clone);
280 Box::pin(async move {
281 r.lock().unwrap().push(ex.clone());
282 Ok(ex)
283 })
284 });
285
286 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
287 let result = svc.oneshot(make_exchange()).await;
288 assert!(result.is_ok());
289 let ex = result.unwrap();
290 assert!(ex.has_error());
291 assert_eq!(received.lock().unwrap().len(), 1);
292 }
293
294 #[tokio::test]
295 async fn test_retry_recovers() {
296 let inner = fail_n_times(2);
297 let policy = ExceptionPolicy {
298 matches: Arc::new(|_| true),
299 retry: Some(RedeliveryPolicy {
300 max_attempts: 3,
301 initial_delay: Duration::from_millis(1),
302 multiplier: 1.0,
303 max_delay: Duration::from_millis(10),
304 jitter_factor: 0.0,
305 }),
306 handled_by: None,
307 };
308 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
309 let result = svc.oneshot(make_exchange()).await;
310 assert!(result.is_ok());
311 assert!(!result.unwrap().has_error());
312 }
313
314 #[tokio::test]
315 async fn test_retry_exhausted_goes_to_dlc() {
316 let inner = fail_n_times(10);
317 let received = Arc::new(std::sync::Mutex::new(0u32));
318 let received_clone = Arc::clone(&received);
319 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
320 let r = Arc::clone(&received_clone);
321 Box::pin(async move {
322 *r.lock().unwrap() += 1;
323 Ok(ex)
324 })
325 });
326 let policy = ExceptionPolicy {
327 matches: Arc::new(|_| true),
328 retry: Some(RedeliveryPolicy {
329 max_attempts: 2,
330 initial_delay: Duration::from_millis(1),
331 multiplier: 1.0,
332 max_delay: Duration::from_millis(10),
333 jitter_factor: 0.0,
334 }),
335 handled_by: None,
336 };
337 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
338 let result = svc.oneshot(make_exchange()).await;
339 assert!(result.is_ok());
340 assert!(result.unwrap().has_error());
341 assert_eq!(*received.lock().unwrap(), 1);
342 }
343
344 #[test]
345 fn test_poll_ready_delegates_to_inner() {
346 use std::sync::atomic::AtomicBool;
347
348 #[derive(Clone)]
350 struct DelayedReadyService {
351 ready: Arc<AtomicBool>,
352 }
353
354 impl Service<Exchange> for DelayedReadyService {
355 type Response = Exchange;
356 type Error = CamelError;
357 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
358
359 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
360 if self.ready.fetch_or(true, Ordering::SeqCst) {
361 Poll::Ready(Ok(()))
363 } else {
364 cx.waker().wake_by_ref();
366 Poll::Pending
367 }
368 }
369
370 fn call(&mut self, ex: Exchange) -> Self::Future {
371 Box::pin(async move { Ok(ex) })
372 }
373 }
374
375 let waker = futures::task::noop_waker();
376 let mut cx = Context::from_waker(&waker);
377
378 let inner = DelayedReadyService {
379 ready: Arc::new(AtomicBool::new(false)),
380 };
381 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
382
383 let first = Pin::new(&mut svc).poll_ready(&mut cx);
385 assert!(first.is_pending(), "expected Pending on first poll_ready");
386
387 let second = Pin::new(&mut svc).poll_ready(&mut cx);
389 assert!(second.is_ready(), "expected Ready on second poll_ready");
390 }
391
392 #[tokio::test]
393 async fn test_no_matching_policy_uses_dlc() {
394 let received = Arc::new(std::sync::Mutex::new(0u32));
395 let received_clone = Arc::clone(&received);
396 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
397 let r = Arc::clone(&received_clone);
398 Box::pin(async move {
399 *r.lock().unwrap() += 1;
400 Ok(ex)
401 })
402 });
403 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
404 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
405 let result = svc.oneshot(make_exchange()).await;
406 assert!(result.is_ok());
407 assert_eq!(*received.lock().unwrap(), 1);
408 }
409
410 #[tokio::test]
411 async fn test_redelivery_headers_are_set() {
412 use camel_api::error_handler::{
413 HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
414 RedeliveryPolicy,
415 };
416
417 let inner = fail_n_times(10);
418 let received = Arc::new(std::sync::Mutex::new(None));
419 let received_clone = Arc::clone(&received);
420 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
421 let r = Arc::clone(&received_clone);
422 Box::pin(async move {
423 *r.lock().unwrap() = Some(ex.clone());
424 Ok(ex)
425 })
426 });
427
428 let policy = ExceptionPolicy {
429 matches: Arc::new(|_| true),
430 retry: Some(RedeliveryPolicy {
431 max_attempts: 2,
432 initial_delay: Duration::from_millis(1),
433 multiplier: 1.0,
434 max_delay: Duration::from_millis(10),
435 jitter_factor: 0.0,
436 }),
437 handled_by: None,
438 };
439
440 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
441 let _ = svc.oneshot(make_exchange()).await.unwrap();
442
443 let ex = received.lock().unwrap().take().unwrap();
444 assert_eq!(
445 ex.input.header(HEADER_REDELIVERED),
446 Some(&Value::Bool(true))
447 );
448 assert_eq!(
449 ex.input.header(HEADER_REDELIVERY_COUNTER),
450 Some(&Value::Number(2.into()))
451 );
452 assert_eq!(
453 ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
454 Some(&Value::Number(2.into()))
455 );
456 }
457
458 #[tokio::test]
459 async fn test_jitter_produces_varying_delays_in_retry_flow() {
460 use std::time::Instant;
461
462 let inner = fail_n_times(10);
463 let received = Arc::new(std::sync::Mutex::new(None));
464 let received_clone = Arc::clone(&received);
465 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
466 let r = Arc::clone(&received_clone);
467 Box::pin(async move {
468 *r.lock().unwrap() = Some(ex.clone());
469 Ok(ex)
470 })
471 });
472
473 let policy = ExceptionPolicy {
474 matches: Arc::new(|_| true),
475 retry: Some(RedeliveryPolicy {
476 max_attempts: 5,
477 initial_delay: Duration::from_millis(20),
478 multiplier: 1.0,
479 max_delay: Duration::from_millis(100),
480 jitter_factor: 0.5,
481 }),
482 handled_by: None,
483 };
484
485 let start = Instant::now();
486 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
487 let _ = svc.oneshot(make_exchange()).await.unwrap();
488 let elapsed = start.elapsed();
489
490 assert!(
491 received.lock().unwrap().is_some(),
492 "DLC should have received exchange"
493 );
494
495 assert!(
496 elapsed >= Duration::from_millis(50),
497 "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
498 );
499
500 assert!(
501 elapsed <= Duration::from_millis(500),
502 "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
503 );
504 }
505
506 #[tokio::test]
509 async fn test_stopped_bypasses_error_handler() {
510 let stopped_inner =
511 BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
512
513 let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
515 let dlc_called_clone = Arc::clone(&dlc_called);
516 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
517 dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
518 Box::pin(async move { Ok(ex) })
519 });
520
521 let policy = ExceptionPolicy::new(|_| true); let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
523 let result = svc.oneshot(make_exchange()).await;
524
525 assert!(
527 matches!(result, Err(CamelError::Stopped)),
528 "expected Err(Stopped), got: {:?}",
529 result
530 );
531 assert!(
533 !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
534 "DLC should not be called for Stopped"
535 );
536 }
537}