Skip to main content

camel_processor/
error_handler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::ExceptionPolicy;
8use camel_api::{BoxProcessor, CamelError, Exchange};
9
10/// Tower Layer that wraps a pipeline with error handling behaviour.
11///
12/// Constructed with already-resolved producers; URI resolution happens in `camel-core`.
13pub struct ErrorHandlerLayer {
14    /// Resolved DLC producer (None = log only).
15    dlc_producer: Option<BoxProcessor>,
16    /// Policies with their resolved `handled_by` producers.
17    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
18}
19
20impl ErrorHandlerLayer {
21    /// Create the layer with pre-resolved producers.
22    pub fn new(
23        dlc_producer: Option<BoxProcessor>,
24        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
25    ) -> Self {
26        Self {
27            dlc_producer,
28            policies,
29        }
30    }
31}
32
33impl<S> Layer<S> for ErrorHandlerLayer
34where
35    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
36    S::Future: Send + 'static,
37{
38    type Service = ErrorHandlerService<S>;
39
40    fn layer(&self, inner: S) -> Self::Service {
41        ErrorHandlerService {
42            inner,
43            dlc_producer: self.dlc_producer.clone(),
44            policies: self
45                .policies
46                .iter()
47                .map(|(p, prod)| (p.clone(), prod.clone()))
48                .collect(),
49        }
50    }
51}
52
53/// Tower Service that absorbs pipeline errors by retrying and/or forwarding to a DLC.
54///
55/// `call` always returns `Ok` — errors are absorbed. The returned exchange will have
56/// `has_error() == true` if the pipeline ultimately failed.
57pub struct ErrorHandlerService<S> {
58    inner: S,
59    dlc_producer: Option<BoxProcessor>,
60    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
61}
62
63impl<S: Clone> Clone for ErrorHandlerService<S> {
64    fn clone(&self) -> Self {
65        Self {
66            inner: self.inner.clone(),
67            dlc_producer: self.dlc_producer.clone(),
68            policies: self
69                .policies
70                .iter()
71                .map(|(p, prod)| (p.clone(), prod.clone()))
72                .collect(),
73        }
74    }
75}
76
77impl<S> ErrorHandlerService<S>
78where
79    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
80    S::Future: Send + 'static,
81{
82    /// Create the service directly (used in unit tests; in production use the Layer).
83    pub fn new(
84        inner: S,
85        dlc_producer: Option<BoxProcessor>,
86        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
87    ) -> Self {
88        Self {
89            inner,
90            dlc_producer,
91            policies,
92        }
93    }
94}
95
96impl<S> Service<Exchange> for ErrorHandlerService<S>
97where
98    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
99    S::Future: Send + 'static,
100{
101    type Response = Exchange;
102    type Error = CamelError;
103    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
104
105    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106        self.inner.poll_ready(cx)
107    }
108
109    fn call(&mut self, exchange: Exchange) -> Self::Future {
110        let mut inner = self.inner.clone();
111        let dlc = self.dlc_producer.clone();
112        let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
113            .policies
114            .iter()
115            .map(|(p, prod)| (p.clone(), prod.clone()))
116            .collect();
117
118        Box::pin(async move {
119            let original = exchange.clone();
120            let result = inner.ready().await?.call(exchange).await;
121
122            let err = match result {
123                Ok(ex) => return Ok(ex),
124                Err(e) => e,
125            };
126
127            // Stop EIP is a control-flow sentinel — pass through without retry or DLC.
128            if matches!(err, CamelError::Stopped) {
129                return Err(err);
130            }
131
132            // Find the first matching policy.
133            let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
134
135            if let Some((policy, policy_producer)) = matched {
136                // Retry if configured.
137                if let Some(ref backoff) = policy.retry {
138                    for attempt in 0..backoff.max_attempts {
139                        let delay = backoff.delay_for(attempt);
140                        tokio::time::sleep(delay).await;
141                        match inner.ready().await?.call(original.clone()).await {
142                            Ok(ex) => return Ok(ex),
143                            Err(_e) => {
144                                if attempt + 1 == backoff.max_attempts {
145                                    // Retries exhausted — send to handler.
146                                    let mut ex = original.clone();
147                                    ex.set_error(_e);
148                                    let handler = policy_producer.or(dlc);
149                                    return send_to_handler(ex, handler).await;
150                                }
151                            }
152                        }
153                    }
154                }
155                // No retry configured (or 0 attempts) — send to policy handler or DLC.
156                let mut ex = original.clone();
157                ex.set_error(err);
158                let handler = policy_producer.or(dlc);
159                send_to_handler(ex, handler).await
160            } else {
161                // No matching policy — forward directly to DLC.
162                let mut ex = original;
163                ex.set_error(err);
164                send_to_handler(ex, dlc).await
165            }
166        })
167    }
168}
169
170async fn send_to_handler(
171    exchange: Exchange,
172    producer: Option<BoxProcessor>,
173) -> Result<Exchange, CamelError> {
174    match producer {
175        None => {
176            tracing::error!(
177                error = ?exchange.error,
178                "Exchange failed with no error handler configured"
179            );
180            Ok(exchange)
181        }
182        Some(mut prod) => match prod.ready().await {
183            Err(e) => {
184                tracing::error!("DLC/handler not ready: {e}");
185                Ok(exchange)
186            }
187            Ok(svc) => match svc.call(exchange.clone()).await {
188                Ok(ex) => Ok(ex),
189                Err(e) => {
190                    tracing::error!("DLC/handler call failed: {e}");
191                    // Return the original exchange with original error intact.
192                    Ok(exchange)
193                }
194            },
195        },
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202    use camel_api::{
203        BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message,
204        error_handler::ExponentialBackoff,
205    };
206    use std::sync::{
207        Arc,
208        atomic::{AtomicU32, Ordering},
209    };
210    use std::time::Duration;
211    use tower::ServiceExt;
212
213    fn make_exchange() -> Exchange {
214        Exchange::new(Message::new("test"))
215    }
216
217    fn failing_processor() -> BoxProcessor {
218        BoxProcessor::from_fn(|_ex| {
219            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
220        })
221    }
222
223    fn ok_processor() -> BoxProcessor {
224        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
225    }
226
227    fn fail_n_times(n: u32) -> BoxProcessor {
228        let count = Arc::new(AtomicU32::new(0));
229        BoxProcessor::from_fn(move |ex| {
230            let count = Arc::clone(&count);
231            Box::pin(async move {
232                let c = count.fetch_add(1, Ordering::SeqCst);
233                if c < n {
234                    Err(CamelError::ProcessorError(format!("attempt {c}")))
235                } else {
236                    Ok(ex)
237                }
238            })
239        })
240    }
241
242    #[tokio::test]
243    async fn test_ok_passthrough() {
244        let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
245        let result = svc.oneshot(make_exchange()).await;
246        assert!(result.is_ok());
247        assert!(!result.unwrap().has_error());
248    }
249
250    #[tokio::test]
251    async fn test_error_goes_to_dlc() {
252        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
253        let received_clone = Arc::clone(&received);
254        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
255            let r = Arc::clone(&received_clone);
256            Box::pin(async move {
257                r.lock().unwrap().push(ex.clone());
258                Ok(ex)
259            })
260        });
261
262        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
263        let result = svc.oneshot(make_exchange()).await;
264        assert!(result.is_ok());
265        let ex = result.unwrap();
266        assert!(ex.has_error());
267        assert_eq!(received.lock().unwrap().len(), 1);
268    }
269
270    #[tokio::test]
271    async fn test_retry_recovers() {
272        let inner = fail_n_times(2);
273        let policy = ExceptionPolicy {
274            matches: Arc::new(|_| true),
275            retry: Some(ExponentialBackoff {
276                max_attempts: 3,
277                initial_delay: Duration::from_millis(1),
278                multiplier: 1.0,
279                max_delay: Duration::from_millis(10),
280            }),
281            handled_by: None,
282        };
283        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
284        let result = svc.oneshot(make_exchange()).await;
285        assert!(result.is_ok());
286        assert!(!result.unwrap().has_error());
287    }
288
289    #[tokio::test]
290    async fn test_retry_exhausted_goes_to_dlc() {
291        let inner = fail_n_times(10);
292        let received = Arc::new(std::sync::Mutex::new(0u32));
293        let received_clone = Arc::clone(&received);
294        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
295            let r = Arc::clone(&received_clone);
296            Box::pin(async move {
297                *r.lock().unwrap() += 1;
298                Ok(ex)
299            })
300        });
301        let policy = ExceptionPolicy {
302            matches: Arc::new(|_| true),
303            retry: Some(ExponentialBackoff {
304                max_attempts: 2,
305                initial_delay: Duration::from_millis(1),
306                multiplier: 1.0,
307                max_delay: Duration::from_millis(10),
308            }),
309            handled_by: None,
310        };
311        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
312        let result = svc.oneshot(make_exchange()).await;
313        assert!(result.is_ok());
314        assert!(result.unwrap().has_error());
315        assert_eq!(*received.lock().unwrap(), 1);
316    }
317
318    #[test]
319    fn test_poll_ready_delegates_to_inner() {
320        use std::sync::atomic::AtomicBool;
321
322        /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
323        #[derive(Clone)]
324        struct DelayedReadyService {
325            ready: Arc<AtomicBool>,
326        }
327
328        impl Service<Exchange> for DelayedReadyService {
329            type Response = Exchange;
330            type Error = CamelError;
331            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
332
333            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
334                if self.ready.fetch_or(true, Ordering::SeqCst) {
335                    // Already marked ready (second+ call) → Ready
336                    Poll::Ready(Ok(()))
337                } else {
338                    // First call → Pending, schedule a wake
339                    cx.waker().wake_by_ref();
340                    Poll::Pending
341                }
342            }
343
344            fn call(&mut self, ex: Exchange) -> Self::Future {
345                Box::pin(async move { Ok(ex) })
346            }
347        }
348
349        let waker = futures::task::noop_waker();
350        let mut cx = Context::from_waker(&waker);
351
352        let inner = DelayedReadyService {
353            ready: Arc::new(AtomicBool::new(false)),
354        };
355        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
356
357        // First poll_ready: inner returns Pending, so ErrorHandlerService must too.
358        let first = Pin::new(&mut svc).poll_ready(&mut cx);
359        assert!(first.is_pending(), "expected Pending on first poll_ready");
360
361        // Second poll_ready: inner returns Ready, so ErrorHandlerService must too.
362        let second = Pin::new(&mut svc).poll_ready(&mut cx);
363        assert!(second.is_ready(), "expected Ready on second poll_ready");
364    }
365
366    #[tokio::test]
367    async fn test_no_matching_policy_uses_dlc() {
368        let received = Arc::new(std::sync::Mutex::new(0u32));
369        let received_clone = Arc::clone(&received);
370        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
371            let r = Arc::clone(&received_clone);
372            Box::pin(async move {
373                *r.lock().unwrap() += 1;
374                Ok(ex)
375            })
376        });
377        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
378        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
379        let result = svc.oneshot(make_exchange()).await;
380        assert!(result.is_ok());
381        assert_eq!(*received.lock().unwrap(), 1);
382    }
383
384    // Stopped is a control-flow sentinel, not a real error.
385    // ErrorHandlerService must pass it through without retrying or forwarding to DLC.
386    #[tokio::test]
387    async fn test_stopped_bypasses_error_handler() {
388        let stopped_inner =
389            BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
390
391        // DLC that tracks if it was ever called.
392        let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
393        let dlc_called_clone = Arc::clone(&dlc_called);
394        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
395            dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
396            Box::pin(async move { Ok(ex) })
397        });
398
399        let policy = ExceptionPolicy::new(|_| true); // matches everything
400        let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
401        let result = svc.oneshot(make_exchange()).await;
402
403        // Must propagate Err(Stopped) — not absorb it.
404        assert!(
405            matches!(result, Err(CamelError::Stopped)),
406            "expected Err(Stopped), got: {:?}",
407            result
408        );
409        // DLC must NOT have been called.
410        assert!(
411            !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
412            "DLC should not be called for Stopped"
413        );
414    }
415}