1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::ExceptionPolicy;
8use camel_api::{BoxProcessor, CamelError, Exchange};
9
10pub struct ErrorHandlerLayer {
14 dlc_producer: Option<BoxProcessor>,
16 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
18}
19
20impl ErrorHandlerLayer {
21 pub fn new(
23 dlc_producer: Option<BoxProcessor>,
24 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
25 ) -> Self {
26 Self {
27 dlc_producer,
28 policies,
29 }
30 }
31}
32
33impl<S> Layer<S> for ErrorHandlerLayer
34where
35 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
36 S::Future: Send + 'static,
37{
38 type Service = ErrorHandlerService<S>;
39
40 fn layer(&self, inner: S) -> Self::Service {
41 ErrorHandlerService {
42 inner,
43 dlc_producer: self.dlc_producer.clone(),
44 policies: self
45 .policies
46 .iter()
47 .map(|(p, prod)| (p.clone(), prod.clone()))
48 .collect(),
49 }
50 }
51}
52
53pub struct ErrorHandlerService<S> {
58 inner: S,
59 dlc_producer: Option<BoxProcessor>,
60 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
61}
62
63impl<S: Clone> Clone for ErrorHandlerService<S> {
64 fn clone(&self) -> Self {
65 Self {
66 inner: self.inner.clone(),
67 dlc_producer: self.dlc_producer.clone(),
68 policies: self
69 .policies
70 .iter()
71 .map(|(p, prod)| (p.clone(), prod.clone()))
72 .collect(),
73 }
74 }
75}
76
77impl<S> ErrorHandlerService<S>
78where
79 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
80 S::Future: Send + 'static,
81{
82 pub fn new(
84 inner: S,
85 dlc_producer: Option<BoxProcessor>,
86 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
87 ) -> Self {
88 Self {
89 inner,
90 dlc_producer,
91 policies,
92 }
93 }
94}
95
96impl<S> Service<Exchange> for ErrorHandlerService<S>
97where
98 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
99 S::Future: Send + 'static,
100{
101 type Response = Exchange;
102 type Error = CamelError;
103 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
104
105 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106 self.inner.poll_ready(cx)
107 }
108
109 fn call(&mut self, exchange: Exchange) -> Self::Future {
110 let mut inner = self.inner.clone();
111 let dlc = self.dlc_producer.clone();
112 let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
113 .policies
114 .iter()
115 .map(|(p, prod)| (p.clone(), prod.clone()))
116 .collect();
117
118 Box::pin(async move {
119 let original = exchange.clone();
120 let result = inner.ready().await?.call(exchange).await;
121
122 let err = match result {
123 Ok(ex) => return Ok(ex),
124 Err(e) => e,
125 };
126
127 if matches!(err, CamelError::Stopped) {
129 return Err(err);
130 }
131
132 let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
134
135 if let Some((policy, policy_producer)) = matched {
136 if let Some(ref backoff) = policy.retry {
138 for attempt in 0..backoff.max_attempts {
139 let delay = backoff.delay_for(attempt);
140 tokio::time::sleep(delay).await;
141 match inner.ready().await?.call(original.clone()).await {
142 Ok(ex) => return Ok(ex),
143 Err(_e) => {
144 if attempt + 1 == backoff.max_attempts {
145 let mut ex = original.clone();
147 ex.set_error(_e);
148 let handler = policy_producer.or(dlc);
149 return send_to_handler(ex, handler).await;
150 }
151 }
152 }
153 }
154 }
155 let mut ex = original.clone();
157 ex.set_error(err);
158 let handler = policy_producer.or(dlc);
159 send_to_handler(ex, handler).await
160 } else {
161 let mut ex = original;
163 ex.set_error(err);
164 send_to_handler(ex, dlc).await
165 }
166 })
167 }
168}
169
170async fn send_to_handler(
171 exchange: Exchange,
172 producer: Option<BoxProcessor>,
173) -> Result<Exchange, CamelError> {
174 match producer {
175 None => {
176 tracing::error!(
177 error = ?exchange.error,
178 "Exchange failed with no error handler configured"
179 );
180 Ok(exchange)
181 }
182 Some(mut prod) => match prod.ready().await {
183 Err(e) => {
184 tracing::error!("DLC/handler not ready: {e}");
185 Ok(exchange)
186 }
187 Ok(svc) => match svc.call(exchange.clone()).await {
188 Ok(ex) => Ok(ex),
189 Err(e) => {
190 tracing::error!("DLC/handler call failed: {e}");
191 Ok(exchange)
193 }
194 },
195 },
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use camel_api::{
203 BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message,
204 error_handler::ExponentialBackoff,
205 };
206 use std::sync::{
207 Arc,
208 atomic::{AtomicU32, Ordering},
209 };
210 use std::time::Duration;
211 use tower::ServiceExt;
212
213 fn make_exchange() -> Exchange {
214 Exchange::new(Message::new("test"))
215 }
216
217 fn failing_processor() -> BoxProcessor {
218 BoxProcessor::from_fn(|_ex| {
219 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
220 })
221 }
222
223 fn ok_processor() -> BoxProcessor {
224 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
225 }
226
227 fn fail_n_times(n: u32) -> BoxProcessor {
228 let count = Arc::new(AtomicU32::new(0));
229 BoxProcessor::from_fn(move |ex| {
230 let count = Arc::clone(&count);
231 Box::pin(async move {
232 let c = count.fetch_add(1, Ordering::SeqCst);
233 if c < n {
234 Err(CamelError::ProcessorError(format!("attempt {c}")))
235 } else {
236 Ok(ex)
237 }
238 })
239 })
240 }
241
242 #[tokio::test]
243 async fn test_ok_passthrough() {
244 let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
245 let result = svc.oneshot(make_exchange()).await;
246 assert!(result.is_ok());
247 assert!(!result.unwrap().has_error());
248 }
249
250 #[tokio::test]
251 async fn test_error_goes_to_dlc() {
252 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
253 let received_clone = Arc::clone(&received);
254 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
255 let r = Arc::clone(&received_clone);
256 Box::pin(async move {
257 r.lock().unwrap().push(ex.clone());
258 Ok(ex)
259 })
260 });
261
262 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
263 let result = svc.oneshot(make_exchange()).await;
264 assert!(result.is_ok());
265 let ex = result.unwrap();
266 assert!(ex.has_error());
267 assert_eq!(received.lock().unwrap().len(), 1);
268 }
269
270 #[tokio::test]
271 async fn test_retry_recovers() {
272 let inner = fail_n_times(2);
273 let policy = ExceptionPolicy {
274 matches: Arc::new(|_| true),
275 retry: Some(ExponentialBackoff {
276 max_attempts: 3,
277 initial_delay: Duration::from_millis(1),
278 multiplier: 1.0,
279 max_delay: Duration::from_millis(10),
280 }),
281 handled_by: None,
282 };
283 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
284 let result = svc.oneshot(make_exchange()).await;
285 assert!(result.is_ok());
286 assert!(!result.unwrap().has_error());
287 }
288
289 #[tokio::test]
290 async fn test_retry_exhausted_goes_to_dlc() {
291 let inner = fail_n_times(10);
292 let received = Arc::new(std::sync::Mutex::new(0u32));
293 let received_clone = Arc::clone(&received);
294 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
295 let r = Arc::clone(&received_clone);
296 Box::pin(async move {
297 *r.lock().unwrap() += 1;
298 Ok(ex)
299 })
300 });
301 let policy = ExceptionPolicy {
302 matches: Arc::new(|_| true),
303 retry: Some(ExponentialBackoff {
304 max_attempts: 2,
305 initial_delay: Duration::from_millis(1),
306 multiplier: 1.0,
307 max_delay: Duration::from_millis(10),
308 }),
309 handled_by: None,
310 };
311 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
312 let result = svc.oneshot(make_exchange()).await;
313 assert!(result.is_ok());
314 assert!(result.unwrap().has_error());
315 assert_eq!(*received.lock().unwrap(), 1);
316 }
317
318 #[test]
319 fn test_poll_ready_delegates_to_inner() {
320 use std::sync::atomic::AtomicBool;
321
322 #[derive(Clone)]
324 struct DelayedReadyService {
325 ready: Arc<AtomicBool>,
326 }
327
328 impl Service<Exchange> for DelayedReadyService {
329 type Response = Exchange;
330 type Error = CamelError;
331 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
332
333 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
334 if self.ready.fetch_or(true, Ordering::SeqCst) {
335 Poll::Ready(Ok(()))
337 } else {
338 cx.waker().wake_by_ref();
340 Poll::Pending
341 }
342 }
343
344 fn call(&mut self, ex: Exchange) -> Self::Future {
345 Box::pin(async move { Ok(ex) })
346 }
347 }
348
349 let waker = futures::task::noop_waker();
350 let mut cx = Context::from_waker(&waker);
351
352 let inner = DelayedReadyService {
353 ready: Arc::new(AtomicBool::new(false)),
354 };
355 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
356
357 let first = Pin::new(&mut svc).poll_ready(&mut cx);
359 assert!(first.is_pending(), "expected Pending on first poll_ready");
360
361 let second = Pin::new(&mut svc).poll_ready(&mut cx);
363 assert!(second.is_ready(), "expected Ready on second poll_ready");
364 }
365
366 #[tokio::test]
367 async fn test_no_matching_policy_uses_dlc() {
368 let received = Arc::new(std::sync::Mutex::new(0u32));
369 let received_clone = Arc::clone(&received);
370 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
371 let r = Arc::clone(&received_clone);
372 Box::pin(async move {
373 *r.lock().unwrap() += 1;
374 Ok(ex)
375 })
376 });
377 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
378 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
379 let result = svc.oneshot(make_exchange()).await;
380 assert!(result.is_ok());
381 assert_eq!(*received.lock().unwrap(), 1);
382 }
383
384 #[tokio::test]
387 async fn test_stopped_bypasses_error_handler() {
388 let stopped_inner =
389 BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
390
391 let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
393 let dlc_called_clone = Arc::clone(&dlc_called);
394 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
395 dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
396 Box::pin(async move { Ok(ex) })
397 });
398
399 let policy = ExceptionPolicy::new(|_| true); let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
401 let result = svc.oneshot(make_exchange()).await;
402
403 assert!(
405 matches!(result, Err(CamelError::Stopped)),
406 "expected Err(Stopped), got: {:?}",
407 result
408 );
409 assert!(
411 !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
412 "DLC should not be called for Stopped"
413 );
414 }
415}