Skip to main content

camel_core/lifecycle/adapters/
exchange_uow.rs

1// crates/camel-core/src/lifecycle/adapters/exchange_uow.rs
2//! Exchange Unit of Work Tower layer.
3
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use tower::{Layer, Service, ServiceExt};
12
13use camel_api::{BoxProcessor, CamelError, Exchange};
14
15// ─── RAII Guard ──────────────────────────────────────────────────────────────
16
17/// RAII guard that decrements the in-flight counter when dropped.
18///
19/// Uses `Ordering::Relaxed` intentionally: the counter is a best-effort
20/// observability metric, not a synchronization primitive. Approximate
21/// counts under concurrent load are acceptable — no happens-before
22/// relationship with other memory is required.
23pub struct InFlightGuard(Arc<AtomicU64>);
24
25impl Drop for InFlightGuard {
26    fn drop(&mut self) {
27        self.0.fetch_sub(1, Ordering::Relaxed);
28    }
29}
30
31// ─── Layer ───────────────────────────────────────────────────────────────────
32
33#[derive(Clone)]
34pub struct ExchangeUoWLayer {
35    counter: Arc<AtomicU64>,
36    on_complete_producer: Option<BoxProcessor>,
37    on_failure_producer: Option<BoxProcessor>,
38}
39
40impl ExchangeUoWLayer {
41    pub fn new(
42        counter: Arc<AtomicU64>,
43        on_complete_producer: Option<BoxProcessor>,
44        on_failure_producer: Option<BoxProcessor>,
45    ) -> Self {
46        Self {
47            counter,
48            on_complete_producer,
49            on_failure_producer,
50        }
51    }
52}
53
54impl<S> Layer<S> for ExchangeUoWLayer
55where
56    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
57    S::Future: Send + 'static,
58{
59    type Service = ExchangeUoW<S>;
60
61    fn layer(&self, inner: S) -> Self::Service {
62        ExchangeUoW {
63            inner,
64            counter: Arc::clone(&self.counter),
65            on_complete_producer: self.on_complete_producer.clone(),
66            on_failure_producer: self.on_failure_producer.clone(),
67        }
68    }
69}
70
71// ─── Service ─────────────────────────────────────────────────────────────────
72
73#[derive(Clone)]
74pub struct ExchangeUoW<S> {
75    inner: S,
76    counter: Arc<AtomicU64>,
77    on_complete_producer: Option<BoxProcessor>,
78    on_failure_producer: Option<BoxProcessor>,
79}
80
81impl<S> Service<Exchange> for ExchangeUoW<S>
82where
83    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
84    S::Future: Send + 'static,
85{
86    type Response = Exchange;
87    type Error = CamelError;
88    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
89
90    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
91        self.inner.poll_ready(cx)
92    }
93
94    fn call(&mut self, exchange: Exchange) -> Self::Future {
95        let mut inner = self.inner.clone();
96        let counter = Arc::clone(&self.counter);
97        let on_complete = self.on_complete_producer.clone();
98        let on_failure = self.on_failure_producer.clone();
99
100        Box::pin(async move {
101            // Relaxed: this counter is observability-only; no synchronization needed.
102            counter.fetch_add(1, Ordering::Relaxed);
103            let _guard = InFlightGuard(Arc::clone(&counter));
104            let original = exchange.clone();
105
106            let result = match inner.ready().await {
107                Ok(svc) => svc.call(exchange).await,
108                Err(err) => {
109                    // Infrastructure failure: inner service not ready.
110                    // Pass `original` (pre-call clone) since no post-processing occurred.
111                    fire_hook(on_failure.clone(), Some(original)).await;
112                    return Err(err);
113                }
114            };
115
116            match &result {
117                Err(_) => {
118                    // Inner call returned Err: pass `original` (pre-call clone), no output exchange.
119                    fire_hook(on_failure, Some(original)).await;
120                }
121                Ok(ex) if ex.has_error() => {
122                    // Exchange-level error: pass the post-processing exchange so the hook
123                    // can inspect the error flag and any headers/body set by the pipeline.
124                    fire_hook(on_failure, Some(ex.clone())).await;
125                }
126                Ok(ex) => {
127                    fire_hook(on_complete, Some(ex.clone())).await;
128                }
129            }
130
131            result
132        })
133    }
134}
135
136async fn fire_hook(producer: Option<BoxProcessor>, exchange: Option<Exchange>) {
137    let (Some(mut producer), Some(ex)) = (producer, exchange) else {
138        return;
139    };
140    let fire = async move {
141        if let Err(e) = producer.ready().await {
142            tracing::warn!(error = %e, "UoW hook producer not ready");
143            return;
144        }
145        if let Err(e) = producer.call(ex).await {
146            tracing::warn!(error = %e, "UoW hook producer call failed");
147        }
148    };
149    if tokio::time::timeout(Duration::from_secs(1), fire)
150        .await
151        .is_err()
152    {
153        tracing::warn!("UoW hook timed out after 1s (error swallowed)");
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use camel_api::{BoxProcessorExt, Message};
161    use std::sync::atomic::Ordering;
162
163    fn make_exchange() -> Exchange {
164        Exchange::new(Message::new("test"))
165    }
166
167    fn identity() -> BoxProcessor {
168        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
169    }
170
171    fn failing() -> BoxProcessor {
172        BoxProcessor::from_fn(|_| {
173            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
174        })
175    }
176
177    fn error_exchange_proc() -> BoxProcessor {
178        BoxProcessor::from_fn(|mut ex: Exchange| {
179            Box::pin(async move {
180                ex.set_error(CamelError::ProcessorError("exchange error".into()));
181                Ok(ex)
182            })
183        })
184    }
185
186    #[tokio::test]
187    async fn counter_increments_then_decrements_on_success() {
188        let counter = Arc::new(AtomicU64::new(0));
189        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, None);
190        let svc = layer.layer(identity());
191        assert_eq!(counter.load(Ordering::Relaxed), 0);
192        let _ = tower::ServiceExt::oneshot(svc, make_exchange())
193            .await
194            .unwrap();
195        assert_eq!(counter.load(Ordering::Relaxed), 0);
196    }
197
198    #[tokio::test]
199    async fn counter_decrements_on_inner_error() {
200        let counter = Arc::new(AtomicU64::new(0));
201        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, None);
202        let svc = layer.layer(failing());
203        let result = tower::ServiceExt::oneshot(svc, make_exchange()).await;
204        assert!(result.is_err());
205        assert_eq!(counter.load(Ordering::Relaxed), 0);
206    }
207
208    #[tokio::test]
209    async fn on_complete_fires_on_success() {
210        let fired = Arc::new(AtomicU64::new(0));
211        let fired_clone = Arc::clone(&fired);
212        let hook = BoxProcessor::from_fn(move |ex| {
213            fired_clone.fetch_add(1, Ordering::Relaxed);
214            Box::pin(async move { Ok(ex) })
215        });
216        let counter = Arc::new(AtomicU64::new(0));
217        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), Some(hook), None);
218        let _ = tower::ServiceExt::oneshot(layer.layer(identity()), make_exchange())
219            .await
220            .unwrap();
221        assert_eq!(fired.load(Ordering::Relaxed), 1);
222    }
223
224    #[tokio::test]
225    async fn on_failure_fires_on_inner_error() {
226        let fired = Arc::new(AtomicU64::new(0));
227        let fired_clone = Arc::clone(&fired);
228        let hook = BoxProcessor::from_fn(move |ex| {
229            fired_clone.fetch_add(1, Ordering::Relaxed);
230            Box::pin(async move { Ok(ex) })
231        });
232        let counter = Arc::new(AtomicU64::new(0));
233        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, Some(hook));
234        let _ = tower::ServiceExt::oneshot(layer.layer(failing()), make_exchange()).await;
235        assert_eq!(fired.load(Ordering::Relaxed), 1);
236    }
237
238    #[tokio::test]
239    async fn on_failure_fires_on_exchange_error() {
240        let fired = Arc::new(AtomicU64::new(0));
241        let fired_clone = Arc::clone(&fired);
242        let hook = BoxProcessor::from_fn(move |ex| {
243            fired_clone.fetch_add(1, Ordering::Relaxed);
244            Box::pin(async move { Ok(ex) })
245        });
246        let counter = Arc::new(AtomicU64::new(0));
247        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, Some(hook));
248        let _ = tower::ServiceExt::oneshot(layer.layer(error_exchange_proc()), make_exchange())
249            .await
250            .unwrap();
251        assert_eq!(fired.load(Ordering::Relaxed), 1);
252    }
253
254    #[tokio::test]
255    async fn on_failure_fires_when_poll_ready_fails() {
256        use std::future::Future;
257        use std::pin::Pin;
258        use std::sync::Arc;
259        use std::sync::atomic::AtomicU64;
260        use std::task::{Context, Poll};
261        use tower::Service;
262
263        #[derive(Clone)]
264        struct FailReadySvc {
265            polls: Arc<AtomicU64>,
266        }
267        impl Service<Exchange> for FailReadySvc {
268            type Response = Exchange;
269            type Error = CamelError;
270            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
271            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
272                let n = self.polls.fetch_add(1, Ordering::Relaxed);
273                if n == 0 {
274                    Poll::Ready(Ok(()))
275                } else {
276                    Poll::Ready(Err(CamelError::ProcessorError("not ready".into())))
277                }
278            }
279            fn call(&mut self, ex: Exchange) -> Self::Future {
280                Box::pin(async move { Ok(ex) })
281            }
282        }
283
284        let fired = Arc::new(AtomicU64::new(0));
285        let fired_clone = Arc::clone(&fired);
286        let hook = BoxProcessor::from_fn(move |ex| {
287            fired_clone.fetch_add(1, Ordering::Relaxed);
288            Box::pin(async move { Ok(ex) })
289        });
290        let counter = Arc::new(AtomicU64::new(0));
291        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, Some(hook));
292        let svc = layer.layer(FailReadySvc {
293            polls: Arc::new(AtomicU64::new(0)),
294        });
295        let result = tower::ServiceExt::oneshot(svc, make_exchange()).await;
296        assert!(result.is_err());
297        assert_eq!(
298            fired.load(Ordering::Relaxed),
299            1,
300            "on_failure must fire when poll_ready fails"
301        );
302        assert_eq!(counter.load(Ordering::Relaxed), 0);
303    }
304
305    #[tokio::test]
306    async fn on_complete_does_not_fire_on_exchange_error() {
307        let fired = Arc::new(AtomicU64::new(0));
308        let fired_clone = Arc::clone(&fired);
309        let hook = BoxProcessor::from_fn(move |ex| {
310            fired_clone.fetch_add(1, Ordering::Relaxed);
311            Box::pin(async move { Ok(ex) })
312        });
313        let counter = Arc::new(AtomicU64::new(0));
314        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), Some(hook), None);
315        let _ = tower::ServiceExt::oneshot(layer.layer(error_exchange_proc()), make_exchange())
316            .await
317            .unwrap();
318        assert_eq!(fired.load(Ordering::Relaxed), 0);
319    }
320
321    #[tokio::test]
322    async fn hook_error_does_not_fail_exchange() {
323        let bad_hook = BoxProcessor::from_fn(|_| {
324            Box::pin(async { Err(CamelError::ProcessorError("hook failed".into())) })
325        });
326        let counter = Arc::new(AtomicU64::new(0));
327        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), Some(bad_hook), None);
328        let result = tower::ServiceExt::oneshot(layer.layer(identity()), make_exchange()).await;
329        assert!(
330            result.is_ok(),
331            "hook error must not fail exchange: {:?}",
332            result
333        );
334    }
335
336    #[test]
337    fn in_flight_guard_decrements_on_drop() {
338        let counter = Arc::new(AtomicU64::new(1));
339        {
340            let _guard = InFlightGuard(Arc::clone(&counter));
341            assert_eq!(counter.load(Ordering::Relaxed), 1);
342        }
343        assert_eq!(counter.load(Ordering::Relaxed), 0);
344    }
345}