Skip to main content

camel_processor/
error_handler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8    ExceptionPolicy, HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
9};
10use camel_api::{BoxProcessor, CamelError, Exchange, Value};
11
12/// Tower Layer that wraps a pipeline with error handling behaviour.
13///
14/// Constructed with already-resolved producers; URI resolution happens in `camel-core`.
15pub struct ErrorHandlerLayer {
16    /// Resolved DLC producer (None = log only).
17    dlc_producer: Option<BoxProcessor>,
18    /// Policies with their resolved `handled_by` producers.
19    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
20}
21
22impl ErrorHandlerLayer {
23    /// Create the layer with pre-resolved producers.
24    pub fn new(
25        dlc_producer: Option<BoxProcessor>,
26        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
27    ) -> Self {
28        Self {
29            dlc_producer,
30            policies,
31        }
32    }
33}
34
35impl<S> Layer<S> for ErrorHandlerLayer
36where
37    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
38    S::Future: Send + 'static,
39{
40    type Service = ErrorHandlerService<S>;
41
42    fn layer(&self, inner: S) -> Self::Service {
43        ErrorHandlerService {
44            inner,
45            dlc_producer: self.dlc_producer.clone(),
46            policies: self
47                .policies
48                .iter()
49                .map(|(p, prod)| (p.clone(), prod.clone()))
50                .collect(),
51        }
52    }
53}
54
55/// Tower Service that absorbs pipeline errors by retrying and/or forwarding to a DLC.
56///
57/// `call` always returns `Ok` — errors are absorbed. The returned exchange will have
58/// `has_error() == true` if the pipeline ultimately failed.
59pub struct ErrorHandlerService<S> {
60    inner: S,
61    dlc_producer: Option<BoxProcessor>,
62    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
63}
64
65impl<S: Clone> Clone for ErrorHandlerService<S> {
66    fn clone(&self) -> Self {
67        Self {
68            inner: self.inner.clone(),
69            dlc_producer: self.dlc_producer.clone(),
70            policies: self
71                .policies
72                .iter()
73                .map(|(p, prod)| (p.clone(), prod.clone()))
74                .collect(),
75        }
76    }
77}
78
79impl<S> ErrorHandlerService<S>
80where
81    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
82    S::Future: Send + 'static,
83{
84    /// Create the service directly (used in unit tests; in production use the Layer).
85    pub fn new(
86        inner: S,
87        dlc_producer: Option<BoxProcessor>,
88        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
89    ) -> Self {
90        Self {
91            inner,
92            dlc_producer,
93            policies,
94        }
95    }
96}
97
98impl<S> Service<Exchange> for ErrorHandlerService<S>
99where
100    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
101    S::Future: Send + 'static,
102{
103    type Response = Exchange;
104    type Error = CamelError;
105    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
106
107    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
108        self.inner.poll_ready(cx)
109    }
110
111    fn call(&mut self, exchange: Exchange) -> Self::Future {
112        let mut inner = self.inner.clone();
113        let dlc = self.dlc_producer.clone();
114        let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
115            .policies
116            .iter()
117            .map(|(p, prod)| (p.clone(), prod.clone()))
118            .collect();
119
120        Box::pin(async move {
121            let original = exchange.clone();
122            let result = inner.ready().await?.call(exchange).await;
123
124            let err = match result {
125                Ok(ex) => return Ok(ex),
126                Err(e) => e,
127            };
128
129            // Stop EIP is a control-flow sentinel — pass through without retry or DLC.
130            if matches!(err, CamelError::Stopped) {
131                return Err(err);
132            }
133
134            // Find the first matching policy.
135            let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
136
137            if let Some((policy, policy_producer)) = matched {
138                // Retry if configured.
139                if let Some(ref backoff) = policy.retry {
140                    for attempt in 0..backoff.max_attempts {
141                        let delay = backoff.delay_for(attempt);
142                        tokio::time::sleep(delay).await;
143
144                        // Set redelivery headers
145                        let mut ex = original.clone();
146                        ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
147                        ex.input.set_header(
148                            HEADER_REDELIVERY_COUNTER,
149                            Value::Number((attempt + 1).into()),
150                        );
151                        ex.input.set_header(
152                            HEADER_REDELIVERY_MAX_COUNTER,
153                            Value::Number(backoff.max_attempts.into()),
154                        );
155
156                        match inner.ready().await?.call(ex).await {
157                            Ok(ex) => return Ok(ex),
158                            Err(_e) => {
159                                if attempt + 1 == backoff.max_attempts {
160                                    // Retries exhausted — send to handler.
161                                    let mut ex = original.clone();
162                                    ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
163                                    ex.input.set_header(
164                                        HEADER_REDELIVERY_COUNTER,
165                                        Value::Number(backoff.max_attempts.into()),
166                                    );
167                                    ex.input.set_header(
168                                        HEADER_REDELIVERY_MAX_COUNTER,
169                                        Value::Number(backoff.max_attempts.into()),
170                                    );
171                                    ex.set_error(_e);
172                                    let handler = policy_producer.or(dlc);
173                                    return send_to_handler(ex, handler).await;
174                                }
175                            }
176                        }
177                    }
178                }
179                // No retry configured (or 0 attempts) — send to policy handler or DLC.
180                let mut ex = original.clone();
181                ex.set_error(err);
182                let handler = policy_producer.or(dlc);
183                send_to_handler(ex, handler).await
184            } else {
185                // No matching policy — forward directly to DLC.
186                let mut ex = original;
187                ex.set_error(err);
188                send_to_handler(ex, dlc).await
189            }
190        })
191    }
192}
193
194async fn send_to_handler(
195    exchange: Exchange,
196    producer: Option<BoxProcessor>,
197) -> Result<Exchange, CamelError> {
198    match producer {
199        None => {
200            tracing::error!(
201                error = ?exchange.error,
202                "Exchange failed with no error handler configured"
203            );
204            Ok(exchange)
205        }
206        Some(mut prod) => match prod.ready().await {
207            Err(e) => {
208                tracing::error!("DLC/handler not ready: {e}");
209                Ok(exchange)
210            }
211            Ok(svc) => match svc.call(exchange.clone()).await {
212                Ok(ex) => Ok(ex),
213                Err(e) => {
214                    tracing::error!("DLC/handler call failed: {e}");
215                    // Return the original exchange with original error intact.
216                    Ok(exchange)
217                }
218            },
219        },
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use camel_api::{
227        BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, Value,
228        error_handler::RedeliveryPolicy,
229    };
230    use std::sync::{
231        Arc,
232        atomic::{AtomicU32, Ordering},
233    };
234    use std::time::Duration;
235    use tower::ServiceExt;
236
237    fn make_exchange() -> Exchange {
238        Exchange::new(Message::new("test"))
239    }
240
241    fn failing_processor() -> BoxProcessor {
242        BoxProcessor::from_fn(|_ex| {
243            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
244        })
245    }
246
247    fn ok_processor() -> BoxProcessor {
248        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
249    }
250
251    fn fail_n_times(n: u32) -> BoxProcessor {
252        let count = Arc::new(AtomicU32::new(0));
253        BoxProcessor::from_fn(move |ex| {
254            let count = Arc::clone(&count);
255            Box::pin(async move {
256                let c = count.fetch_add(1, Ordering::SeqCst);
257                if c < n {
258                    Err(CamelError::ProcessorError(format!("attempt {c}")))
259                } else {
260                    Ok(ex)
261                }
262            })
263        })
264    }
265
266    #[tokio::test]
267    async fn test_ok_passthrough() {
268        let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
269        let result = svc.oneshot(make_exchange()).await;
270        assert!(result.is_ok());
271        assert!(!result.unwrap().has_error());
272    }
273
274    #[tokio::test]
275    async fn test_error_goes_to_dlc() {
276        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
277        let received_clone = Arc::clone(&received);
278        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
279            let r = Arc::clone(&received_clone);
280            Box::pin(async move {
281                r.lock().unwrap().push(ex.clone());
282                Ok(ex)
283            })
284        });
285
286        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
287        let result = svc.oneshot(make_exchange()).await;
288        assert!(result.is_ok());
289        let ex = result.unwrap();
290        assert!(ex.has_error());
291        assert_eq!(received.lock().unwrap().len(), 1);
292    }
293
294    #[tokio::test]
295    async fn test_retry_recovers() {
296        let inner = fail_n_times(2);
297        let policy = ExceptionPolicy {
298            matches: Arc::new(|_| true),
299            retry: Some(RedeliveryPolicy {
300                max_attempts: 3,
301                initial_delay: Duration::from_millis(1),
302                multiplier: 1.0,
303                max_delay: Duration::from_millis(10),
304                jitter_factor: 0.0,
305            }),
306            handled_by: None,
307        };
308        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
309        let result = svc.oneshot(make_exchange()).await;
310        assert!(result.is_ok());
311        assert!(!result.unwrap().has_error());
312    }
313
314    #[tokio::test]
315    async fn test_retry_exhausted_goes_to_dlc() {
316        let inner = fail_n_times(10);
317        let received = Arc::new(std::sync::Mutex::new(0u32));
318        let received_clone = Arc::clone(&received);
319        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
320            let r = Arc::clone(&received_clone);
321            Box::pin(async move {
322                *r.lock().unwrap() += 1;
323                Ok(ex)
324            })
325        });
326        let policy = ExceptionPolicy {
327            matches: Arc::new(|_| true),
328            retry: Some(RedeliveryPolicy {
329                max_attempts: 2,
330                initial_delay: Duration::from_millis(1),
331                multiplier: 1.0,
332                max_delay: Duration::from_millis(10),
333                jitter_factor: 0.0,
334            }),
335            handled_by: None,
336        };
337        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
338        let result = svc.oneshot(make_exchange()).await;
339        assert!(result.is_ok());
340        assert!(result.unwrap().has_error());
341        assert_eq!(*received.lock().unwrap(), 1);
342    }
343
344    #[test]
345    fn test_poll_ready_delegates_to_inner() {
346        use std::sync::atomic::AtomicBool;
347
348        /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
349        #[derive(Clone)]
350        struct DelayedReadyService {
351            ready: Arc<AtomicBool>,
352        }
353
354        impl Service<Exchange> for DelayedReadyService {
355            type Response = Exchange;
356            type Error = CamelError;
357            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
358
359            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
360                if self.ready.fetch_or(true, Ordering::SeqCst) {
361                    // Already marked ready (second+ call) → Ready
362                    Poll::Ready(Ok(()))
363                } else {
364                    // First call → Pending, schedule a wake
365                    cx.waker().wake_by_ref();
366                    Poll::Pending
367                }
368            }
369
370            fn call(&mut self, ex: Exchange) -> Self::Future {
371                Box::pin(async move { Ok(ex) })
372            }
373        }
374
375        let waker = futures::task::noop_waker();
376        let mut cx = Context::from_waker(&waker);
377
378        let inner = DelayedReadyService {
379            ready: Arc::new(AtomicBool::new(false)),
380        };
381        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
382
383        // First poll_ready: inner returns Pending, so ErrorHandlerService must too.
384        let first = Pin::new(&mut svc).poll_ready(&mut cx);
385        assert!(first.is_pending(), "expected Pending on first poll_ready");
386
387        // Second poll_ready: inner returns Ready, so ErrorHandlerService must too.
388        let second = Pin::new(&mut svc).poll_ready(&mut cx);
389        assert!(second.is_ready(), "expected Ready on second poll_ready");
390    }
391
392    #[tokio::test]
393    async fn test_no_matching_policy_uses_dlc() {
394        let received = Arc::new(std::sync::Mutex::new(0u32));
395        let received_clone = Arc::clone(&received);
396        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
397            let r = Arc::clone(&received_clone);
398            Box::pin(async move {
399                *r.lock().unwrap() += 1;
400                Ok(ex)
401            })
402        });
403        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
404        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
405        let result = svc.oneshot(make_exchange()).await;
406        assert!(result.is_ok());
407        assert_eq!(*received.lock().unwrap(), 1);
408    }
409
410    #[tokio::test]
411    async fn test_redelivery_headers_are_set() {
412        use camel_api::error_handler::{
413            HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
414            RedeliveryPolicy,
415        };
416
417        let inner = fail_n_times(10);
418        let received = Arc::new(std::sync::Mutex::new(None));
419        let received_clone = Arc::clone(&received);
420        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
421            let r = Arc::clone(&received_clone);
422            Box::pin(async move {
423                *r.lock().unwrap() = Some(ex.clone());
424                Ok(ex)
425            })
426        });
427
428        let policy = ExceptionPolicy {
429            matches: Arc::new(|_| true),
430            retry: Some(RedeliveryPolicy {
431                max_attempts: 2,
432                initial_delay: Duration::from_millis(1),
433                multiplier: 1.0,
434                max_delay: Duration::from_millis(10),
435                jitter_factor: 0.0,
436            }),
437            handled_by: None,
438        };
439
440        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
441        let _ = svc.oneshot(make_exchange()).await.unwrap();
442
443        let ex = received.lock().unwrap().take().unwrap();
444        assert_eq!(
445            ex.input.header(HEADER_REDELIVERED),
446            Some(&Value::Bool(true))
447        );
448        assert_eq!(
449            ex.input.header(HEADER_REDELIVERY_COUNTER),
450            Some(&Value::Number(2.into()))
451        );
452        assert_eq!(
453            ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
454            Some(&Value::Number(2.into()))
455        );
456    }
457
458    #[tokio::test]
459    async fn test_jitter_produces_varying_delays_in_retry_flow() {
460        use std::time::Instant;
461
462        let inner = fail_n_times(10);
463        let received = Arc::new(std::sync::Mutex::new(None));
464        let received_clone = Arc::clone(&received);
465        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
466            let r = Arc::clone(&received_clone);
467            Box::pin(async move {
468                *r.lock().unwrap() = Some(ex.clone());
469                Ok(ex)
470            })
471        });
472
473        let policy = ExceptionPolicy {
474            matches: Arc::new(|_| true),
475            retry: Some(RedeliveryPolicy {
476                max_attempts: 5,
477                initial_delay: Duration::from_millis(20),
478                multiplier: 1.0,
479                max_delay: Duration::from_millis(100),
480                jitter_factor: 0.5,
481            }),
482            handled_by: None,
483        };
484
485        let start = Instant::now();
486        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
487        let _ = svc.oneshot(make_exchange()).await.unwrap();
488        let elapsed = start.elapsed();
489
490        assert!(
491            received.lock().unwrap().is_some(),
492            "DLC should have received exchange"
493        );
494
495        assert!(
496            elapsed >= Duration::from_millis(50),
497            "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
498        );
499
500        assert!(
501            elapsed <= Duration::from_millis(500),
502            "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
503        );
504    }
505
506    // Stopped is a control-flow sentinel, not a real error.
507    // ErrorHandlerService must pass it through without retrying or forwarding to DLC.
508    #[tokio::test]
509    async fn test_stopped_bypasses_error_handler() {
510        let stopped_inner =
511            BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
512
513        // DLC that tracks if it was ever called.
514        let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
515        let dlc_called_clone = Arc::clone(&dlc_called);
516        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
517            dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
518            Box::pin(async move { Ok(ex) })
519        });
520
521        let policy = ExceptionPolicy::new(|_| true); // matches everything
522        let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
523        let result = svc.oneshot(make_exchange()).await;
524
525        // Must propagate Err(Stopped) — not absorb it.
526        assert!(
527            matches!(result, Err(CamelError::Stopped)),
528            "expected Err(Stopped), got: {:?}",
529            result
530        );
531        // DLC must NOT have been called.
532        assert!(
533            !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
534            "DLC should not be called for Stopped"
535        );
536    }
537}