Skip to main content

camel_processor/
error_handler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{ExceptionPolicy, HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER};
8use camel_api::{BoxProcessor, CamelError, Exchange, Value};
9
10/// Tower Layer that wraps a pipeline with error handling behaviour.
11///
12/// Constructed with already-resolved producers; URI resolution happens in `camel-core`.
13pub struct ErrorHandlerLayer {
14    /// Resolved DLC producer (None = log only).
15    dlc_producer: Option<BoxProcessor>,
16    /// Policies with their resolved `handled_by` producers.
17    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
18}
19
20impl ErrorHandlerLayer {
21    /// Create the layer with pre-resolved producers.
22    pub fn new(
23        dlc_producer: Option<BoxProcessor>,
24        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
25    ) -> Self {
26        Self {
27            dlc_producer,
28            policies,
29        }
30    }
31}
32
33impl<S> Layer<S> for ErrorHandlerLayer
34where
35    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
36    S::Future: Send + 'static,
37{
38    type Service = ErrorHandlerService<S>;
39
40    fn layer(&self, inner: S) -> Self::Service {
41        ErrorHandlerService {
42            inner,
43            dlc_producer: self.dlc_producer.clone(),
44            policies: self
45                .policies
46                .iter()
47                .map(|(p, prod)| (p.clone(), prod.clone()))
48                .collect(),
49        }
50    }
51}
52
53/// Tower Service that absorbs pipeline errors by retrying and/or forwarding to a DLC.
54///
55/// `call` always returns `Ok` — errors are absorbed. The returned exchange will have
56/// `has_error() == true` if the pipeline ultimately failed.
57pub struct ErrorHandlerService<S> {
58    inner: S,
59    dlc_producer: Option<BoxProcessor>,
60    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
61}
62
63impl<S: Clone> Clone for ErrorHandlerService<S> {
64    fn clone(&self) -> Self {
65        Self {
66            inner: self.inner.clone(),
67            dlc_producer: self.dlc_producer.clone(),
68            policies: self
69                .policies
70                .iter()
71                .map(|(p, prod)| (p.clone(), prod.clone()))
72                .collect(),
73        }
74    }
75}
76
77impl<S> ErrorHandlerService<S>
78where
79    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
80    S::Future: Send + 'static,
81{
82    /// Create the service directly (used in unit tests; in production use the Layer).
83    pub fn new(
84        inner: S,
85        dlc_producer: Option<BoxProcessor>,
86        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
87    ) -> Self {
88        Self {
89            inner,
90            dlc_producer,
91            policies,
92        }
93    }
94}
95
96impl<S> Service<Exchange> for ErrorHandlerService<S>
97where
98    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
99    S::Future: Send + 'static,
100{
101    type Response = Exchange;
102    type Error = CamelError;
103    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
104
105    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106        self.inner.poll_ready(cx)
107    }
108
109    fn call(&mut self, exchange: Exchange) -> Self::Future {
110        let mut inner = self.inner.clone();
111        let dlc = self.dlc_producer.clone();
112        let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
113            .policies
114            .iter()
115            .map(|(p, prod)| (p.clone(), prod.clone()))
116            .collect();
117
118        Box::pin(async move {
119            let original = exchange.clone();
120            let result = inner.ready().await?.call(exchange).await;
121
122            let err = match result {
123                Ok(ex) => return Ok(ex),
124                Err(e) => e,
125            };
126
127            // Stop EIP is a control-flow sentinel — pass through without retry or DLC.
128            if matches!(err, CamelError::Stopped) {
129                return Err(err);
130            }
131
132            // Find the first matching policy.
133            let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
134
135            if let Some((policy, policy_producer)) = matched {
136                // Retry if configured.
137                if let Some(ref backoff) = policy.retry {
138                    for attempt in 0..backoff.max_attempts {
139                        let delay = backoff.delay_for(attempt);
140                        tokio::time::sleep(delay).await;
141
142                        // Set redelivery headers
143                        let mut ex = original.clone();
144                        ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
145                        ex.input.set_header(HEADER_REDELIVERY_COUNTER, Value::Number((attempt + 1).into()));
146                        ex.input.set_header(HEADER_REDELIVERY_MAX_COUNTER, Value::Number(backoff.max_attempts.into()));
147
148                        match inner.ready().await?.call(ex).await {
149                            Ok(ex) => return Ok(ex),
150                            Err(_e) => {
151                                if attempt + 1 == backoff.max_attempts {
152                                    // Retries exhausted — send to handler.
153                                    let mut ex = original.clone();
154                                    ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
155                                    ex.input.set_header(HEADER_REDELIVERY_COUNTER, Value::Number(backoff.max_attempts.into()));
156                                    ex.input.set_header(HEADER_REDELIVERY_MAX_COUNTER, Value::Number(backoff.max_attempts.into()));
157                                    ex.set_error(_e);
158                                    let handler = policy_producer.or(dlc);
159                                    return send_to_handler(ex, handler).await;
160                                }
161                            }
162                        }
163                    }
164                }
165                // No retry configured (or 0 attempts) — send to policy handler or DLC.
166                let mut ex = original.clone();
167                ex.set_error(err);
168                let handler = policy_producer.or(dlc);
169                send_to_handler(ex, handler).await
170            } else {
171                // No matching policy — forward directly to DLC.
172                let mut ex = original;
173                ex.set_error(err);
174                send_to_handler(ex, dlc).await
175            }
176        })
177    }
178}
179
180async fn send_to_handler(
181    exchange: Exchange,
182    producer: Option<BoxProcessor>,
183) -> Result<Exchange, CamelError> {
184    match producer {
185        None => {
186            tracing::error!(
187                error = ?exchange.error,
188                "Exchange failed with no error handler configured"
189            );
190            Ok(exchange)
191        }
192        Some(mut prod) => match prod.ready().await {
193            Err(e) => {
194                tracing::error!("DLC/handler not ready: {e}");
195                Ok(exchange)
196            }
197            Ok(svc) => match svc.call(exchange.clone()).await {
198                Ok(ex) => Ok(ex),
199                Err(e) => {
200                    tracing::error!("DLC/handler call failed: {e}");
201                    // Return the original exchange with original error intact.
202                    Ok(exchange)
203                }
204            },
205        },
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use camel_api::{
213        BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, Value,
214        error_handler::RedeliveryPolicy,
215    };
216    use std::sync::{
217        Arc,
218        atomic::{AtomicU32, Ordering},
219    };
220    use std::time::Duration;
221    use tower::ServiceExt;
222
223    fn make_exchange() -> Exchange {
224        Exchange::new(Message::new("test"))
225    }
226
227    fn failing_processor() -> BoxProcessor {
228        BoxProcessor::from_fn(|_ex| {
229            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
230        })
231    }
232
233    fn ok_processor() -> BoxProcessor {
234        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
235    }
236
237    fn fail_n_times(n: u32) -> BoxProcessor {
238        let count = Arc::new(AtomicU32::new(0));
239        BoxProcessor::from_fn(move |ex| {
240            let count = Arc::clone(&count);
241            Box::pin(async move {
242                let c = count.fetch_add(1, Ordering::SeqCst);
243                if c < n {
244                    Err(CamelError::ProcessorError(format!("attempt {c}")))
245                } else {
246                    Ok(ex)
247                }
248            })
249        })
250    }
251
252    #[tokio::test]
253    async fn test_ok_passthrough() {
254        let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
255        let result = svc.oneshot(make_exchange()).await;
256        assert!(result.is_ok());
257        assert!(!result.unwrap().has_error());
258    }
259
260    #[tokio::test]
261    async fn test_error_goes_to_dlc() {
262        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
263        let received_clone = Arc::clone(&received);
264        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
265            let r = Arc::clone(&received_clone);
266            Box::pin(async move {
267                r.lock().unwrap().push(ex.clone());
268                Ok(ex)
269            })
270        });
271
272        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
273        let result = svc.oneshot(make_exchange()).await;
274        assert!(result.is_ok());
275        let ex = result.unwrap();
276        assert!(ex.has_error());
277        assert_eq!(received.lock().unwrap().len(), 1);
278    }
279
280    #[tokio::test]
281    async fn test_retry_recovers() {
282        let inner = fail_n_times(2);
283        let policy = ExceptionPolicy {
284            matches: Arc::new(|_| true),
285            retry: Some(RedeliveryPolicy {
286                max_attempts: 3,
287                initial_delay: Duration::from_millis(1),
288                multiplier: 1.0,
289                max_delay: Duration::from_millis(10),
290                jitter_factor: 0.0,
291            }),
292            handled_by: None,
293        };
294        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
295        let result = svc.oneshot(make_exchange()).await;
296        assert!(result.is_ok());
297        assert!(!result.unwrap().has_error());
298    }
299
300    #[tokio::test]
301    async fn test_retry_exhausted_goes_to_dlc() {
302        let inner = fail_n_times(10);
303        let received = Arc::new(std::sync::Mutex::new(0u32));
304        let received_clone = Arc::clone(&received);
305        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
306            let r = Arc::clone(&received_clone);
307            Box::pin(async move {
308                *r.lock().unwrap() += 1;
309                Ok(ex)
310            })
311        });
312        let policy = ExceptionPolicy {
313            matches: Arc::new(|_| true),
314            retry: Some(RedeliveryPolicy {
315                max_attempts: 2,
316                initial_delay: Duration::from_millis(1),
317                multiplier: 1.0,
318                max_delay: Duration::from_millis(10),
319                jitter_factor: 0.0,
320            }),
321            handled_by: None,
322        };
323        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
324        let result = svc.oneshot(make_exchange()).await;
325        assert!(result.is_ok());
326        assert!(result.unwrap().has_error());
327        assert_eq!(*received.lock().unwrap(), 1);
328    }
329
330    #[test]
331    fn test_poll_ready_delegates_to_inner() {
332        use std::sync::atomic::AtomicBool;
333
334        /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
335        #[derive(Clone)]
336        struct DelayedReadyService {
337            ready: Arc<AtomicBool>,
338        }
339
340        impl Service<Exchange> for DelayedReadyService {
341            type Response = Exchange;
342            type Error = CamelError;
343            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
344
345            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
346                if self.ready.fetch_or(true, Ordering::SeqCst) {
347                    // Already marked ready (second+ call) → Ready
348                    Poll::Ready(Ok(()))
349                } else {
350                    // First call → Pending, schedule a wake
351                    cx.waker().wake_by_ref();
352                    Poll::Pending
353                }
354            }
355
356            fn call(&mut self, ex: Exchange) -> Self::Future {
357                Box::pin(async move { Ok(ex) })
358            }
359        }
360
361        let waker = futures::task::noop_waker();
362        let mut cx = Context::from_waker(&waker);
363
364        let inner = DelayedReadyService {
365            ready: Arc::new(AtomicBool::new(false)),
366        };
367        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
368
369        // First poll_ready: inner returns Pending, so ErrorHandlerService must too.
370        let first = Pin::new(&mut svc).poll_ready(&mut cx);
371        assert!(first.is_pending(), "expected Pending on first poll_ready");
372
373        // Second poll_ready: inner returns Ready, so ErrorHandlerService must too.
374        let second = Pin::new(&mut svc).poll_ready(&mut cx);
375        assert!(second.is_ready(), "expected Ready on second poll_ready");
376    }
377
378    #[tokio::test]
379    async fn test_no_matching_policy_uses_dlc() {
380        let received = Arc::new(std::sync::Mutex::new(0u32));
381        let received_clone = Arc::clone(&received);
382        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
383            let r = Arc::clone(&received_clone);
384            Box::pin(async move {
385                *r.lock().unwrap() += 1;
386                Ok(ex)
387            })
388        });
389        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
390        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
391        let result = svc.oneshot(make_exchange()).await;
392        assert!(result.is_ok());
393        assert_eq!(*received.lock().unwrap(), 1);
394    }
395
396    #[tokio::test]
397    async fn test_redelivery_headers_are_set() {
398        use camel_api::error_handler::{RedeliveryPolicy, HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER};
399
400        let inner = fail_n_times(10);
401        let received = Arc::new(std::sync::Mutex::new(None));
402        let received_clone = Arc::clone(&received);
403        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
404            let r = Arc::clone(&received_clone);
405            Box::pin(async move {
406                *r.lock().unwrap() = Some(ex.clone());
407                Ok(ex)
408            })
409        });
410
411        let policy = ExceptionPolicy {
412            matches: Arc::new(|_| true),
413            retry: Some(RedeliveryPolicy {
414                max_attempts: 2,
415                initial_delay: Duration::from_millis(1),
416                multiplier: 1.0,
417                max_delay: Duration::from_millis(10),
418                jitter_factor: 0.0,
419            }),
420            handled_by: None,
421        };
422
423        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
424        let _ = svc.oneshot(make_exchange()).await.unwrap();
425
426        let ex = received.lock().unwrap().take().unwrap();
427        assert_eq!(ex.input.header(HEADER_REDELIVERED), Some(&Value::Bool(true)));
428        assert_eq!(ex.input.header(HEADER_REDELIVERY_COUNTER), Some(&Value::Number(2.into())));
429        assert_eq!(ex.input.header(HEADER_REDELIVERY_MAX_COUNTER), Some(&Value::Number(2.into())));
430    }
431
432    #[tokio::test]
433    async fn test_jitter_produces_varying_delays_in_retry_flow() {
434        use std::time::Instant;
435
436        let inner = fail_n_times(10);
437        let received = Arc::new(std::sync::Mutex::new(None));
438        let received_clone = Arc::clone(&received);
439        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
440            let r = Arc::clone(&received_clone);
441            Box::pin(async move {
442                *r.lock().unwrap() = Some(ex.clone());
443                Ok(ex)
444            })
445        });
446
447        let policy = ExceptionPolicy {
448            matches: Arc::new(|_| true),
449            retry: Some(RedeliveryPolicy {
450                max_attempts: 5,
451                initial_delay: Duration::from_millis(20),
452                multiplier: 1.0,
453                max_delay: Duration::from_millis(100),
454                jitter_factor: 0.5,
455            }),
456            handled_by: None,
457        };
458
459        let start = Instant::now();
460        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
461        let _ = svc.oneshot(make_exchange()).await.unwrap();
462        let elapsed = start.elapsed();
463
464        assert!(received.lock().unwrap().is_some(), "DLC should have received exchange");
465
466        assert!(
467            elapsed >= Duration::from_millis(50),
468            "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
469        );
470
471        assert!(
472            elapsed <= Duration::from_millis(500),
473            "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
474        );
475    }
476
477    // Stopped is a control-flow sentinel, not a real error.
478    // ErrorHandlerService must pass it through without retrying or forwarding to DLC.
479    #[tokio::test]
480    async fn test_stopped_bypasses_error_handler() {
481        let stopped_inner =
482            BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
483
484        // DLC that tracks if it was ever called.
485        let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
486        let dlc_called_clone = Arc::clone(&dlc_called);
487        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
488            dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
489            Box::pin(async move { Ok(ex) })
490        });
491
492        let policy = ExceptionPolicy::new(|_| true); // matches everything
493        let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
494        let result = svc.oneshot(make_exchange()).await;
495
496        // Must propagate Err(Stopped) — not absorb it.
497        assert!(
498            matches!(result, Err(CamelError::Stopped)),
499            "expected Err(Stopped), got: {:?}",
500            result
501        );
502        // DLC must NOT have been called.
503        assert!(
504            !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
505            "DLC should not be called for Stopped"
506        );
507    }
508}