camel_core/lifecycle/adapters/
exchange_uow.rs1use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use tower::{Layer, Service, ServiceExt};
12
13use camel_api::{BoxProcessor, CamelError, Exchange};
14
15pub struct InFlightGuard(Arc<AtomicU64>);
24
25impl Drop for InFlightGuard {
26 fn drop(&mut self) {
27 self.0.fetch_sub(1, Ordering::Relaxed);
28 }
29}
30
31#[derive(Clone)]
34pub struct ExchangeUoWLayer {
35 counter: Arc<AtomicU64>,
36 on_complete_producer: Option<BoxProcessor>,
37 on_failure_producer: Option<BoxProcessor>,
38}
39
40impl ExchangeUoWLayer {
41 pub fn new(
42 counter: Arc<AtomicU64>,
43 on_complete_producer: Option<BoxProcessor>,
44 on_failure_producer: Option<BoxProcessor>,
45 ) -> Self {
46 Self {
47 counter,
48 on_complete_producer,
49 on_failure_producer,
50 }
51 }
52}
53
54impl<S> Layer<S> for ExchangeUoWLayer
55where
56 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
57 S::Future: Send + 'static,
58{
59 type Service = ExchangeUoW<S>;
60
61 fn layer(&self, inner: S) -> Self::Service {
62 ExchangeUoW {
63 inner,
64 counter: Arc::clone(&self.counter),
65 on_complete_producer: self.on_complete_producer.clone(),
66 on_failure_producer: self.on_failure_producer.clone(),
67 }
68 }
69}
70
71#[derive(Clone)]
74pub struct ExchangeUoW<S> {
75 inner: S,
76 counter: Arc<AtomicU64>,
77 on_complete_producer: Option<BoxProcessor>,
78 on_failure_producer: Option<BoxProcessor>,
79}
80
81impl<S> Service<Exchange> for ExchangeUoW<S>
82where
83 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
84 S::Future: Send + 'static,
85{
86 type Response = Exchange;
87 type Error = CamelError;
88 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
89
90 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
91 self.inner.poll_ready(cx)
92 }
93
94 fn call(&mut self, exchange: Exchange) -> Self::Future {
95 let mut inner = self.inner.clone();
96 let counter = Arc::clone(&self.counter);
97 let on_complete = self.on_complete_producer.clone();
98 let on_failure = self.on_failure_producer.clone();
99
100 Box::pin(async move {
101 counter.fetch_add(1, Ordering::Relaxed);
103 let _guard = InFlightGuard(Arc::clone(&counter));
104 let original = exchange.clone();
105
106 let result = match inner.ready().await {
107 Ok(svc) => svc.call(exchange).await,
108 Err(err) => {
109 fire_hook(on_failure.clone(), Some(original)).await;
112 return Err(err);
113 }
114 };
115
116 match &result {
117 Err(_) => {
118 fire_hook(on_failure, Some(original)).await;
120 }
121 Ok(ex) if ex.has_error() => {
122 fire_hook(on_failure, Some(ex.clone())).await;
125 }
126 Ok(ex) => {
127 fire_hook(on_complete, Some(ex.clone())).await;
128 }
129 }
130
131 result
132 })
133 }
134}
135
136async fn fire_hook(producer: Option<BoxProcessor>, exchange: Option<Exchange>) {
137 let (Some(mut producer), Some(ex)) = (producer, exchange) else {
138 return;
139 };
140 let fire = async move {
141 if let Err(e) = producer.ready().await {
142 tracing::warn!(error = %e, "UoW hook producer not ready");
143 return;
144 }
145 if let Err(e) = producer.call(ex).await {
146 tracing::warn!(error = %e, "UoW hook producer call failed");
147 }
148 };
149 if tokio::time::timeout(Duration::from_secs(1), fire)
150 .await
151 .is_err()
152 {
153 tracing::warn!("UoW hook timed out after 1s (error swallowed)");
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use camel_api::{BoxProcessorExt, Message};
161 use std::sync::atomic::Ordering;
162
163 fn make_exchange() -> Exchange {
164 Exchange::new(Message::new("test"))
165 }
166
167 fn identity() -> BoxProcessor {
168 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
169 }
170
171 fn failing() -> BoxProcessor {
172 BoxProcessor::from_fn(|_| {
173 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
174 })
175 }
176
177 fn error_exchange_proc() -> BoxProcessor {
178 BoxProcessor::from_fn(|mut ex: Exchange| {
179 Box::pin(async move {
180 ex.set_error(CamelError::ProcessorError("exchange error".into()));
181 Ok(ex)
182 })
183 })
184 }
185
186 #[tokio::test]
187 async fn counter_increments_then_decrements_on_success() {
188 let counter = Arc::new(AtomicU64::new(0));
189 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, None);
190 let svc = layer.layer(identity());
191 assert_eq!(counter.load(Ordering::Relaxed), 0);
192 let _ = tower::ServiceExt::oneshot(svc, make_exchange())
193 .await
194 .unwrap();
195 assert_eq!(counter.load(Ordering::Relaxed), 0);
196 }
197
198 #[tokio::test]
199 async fn counter_decrements_on_inner_error() {
200 let counter = Arc::new(AtomicU64::new(0));
201 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, None);
202 let svc = layer.layer(failing());
203 let result = tower::ServiceExt::oneshot(svc, make_exchange()).await;
204 assert!(result.is_err());
205 assert_eq!(counter.load(Ordering::Relaxed), 0);
206 }
207
208 #[tokio::test]
209 async fn on_complete_fires_on_success() {
210 let fired = Arc::new(AtomicU64::new(0));
211 let fired_clone = Arc::clone(&fired);
212 let hook = BoxProcessor::from_fn(move |ex| {
213 fired_clone.fetch_add(1, Ordering::Relaxed);
214 Box::pin(async move { Ok(ex) })
215 });
216 let counter = Arc::new(AtomicU64::new(0));
217 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), Some(hook), None);
218 let _ = tower::ServiceExt::oneshot(layer.layer(identity()), make_exchange())
219 .await
220 .unwrap();
221 assert_eq!(fired.load(Ordering::Relaxed), 1);
222 }
223
224 #[tokio::test]
225 async fn on_failure_fires_on_inner_error() {
226 let fired = Arc::new(AtomicU64::new(0));
227 let fired_clone = Arc::clone(&fired);
228 let hook = BoxProcessor::from_fn(move |ex| {
229 fired_clone.fetch_add(1, Ordering::Relaxed);
230 Box::pin(async move { Ok(ex) })
231 });
232 let counter = Arc::new(AtomicU64::new(0));
233 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, Some(hook));
234 let _ = tower::ServiceExt::oneshot(layer.layer(failing()), make_exchange()).await;
235 assert_eq!(fired.load(Ordering::Relaxed), 1);
236 }
237
238 #[tokio::test]
239 async fn on_failure_fires_on_exchange_error() {
240 let fired = Arc::new(AtomicU64::new(0));
241 let fired_clone = Arc::clone(&fired);
242 let hook = BoxProcessor::from_fn(move |ex| {
243 fired_clone.fetch_add(1, Ordering::Relaxed);
244 Box::pin(async move { Ok(ex) })
245 });
246 let counter = Arc::new(AtomicU64::new(0));
247 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, Some(hook));
248 let _ = tower::ServiceExt::oneshot(layer.layer(error_exchange_proc()), make_exchange())
249 .await
250 .unwrap();
251 assert_eq!(fired.load(Ordering::Relaxed), 1);
252 }
253
254 #[tokio::test]
255 async fn on_failure_fires_when_poll_ready_fails() {
256 use std::future::Future;
257 use std::pin::Pin;
258 use std::sync::Arc;
259 use std::sync::atomic::AtomicU64;
260 use std::task::{Context, Poll};
261 use tower::Service;
262
263 #[derive(Clone)]
264 struct FailReadySvc {
265 polls: Arc<AtomicU64>,
266 }
267 impl Service<Exchange> for FailReadySvc {
268 type Response = Exchange;
269 type Error = CamelError;
270 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
271 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
272 let n = self.polls.fetch_add(1, Ordering::Relaxed);
273 if n == 0 {
274 Poll::Ready(Ok(()))
275 } else {
276 Poll::Ready(Err(CamelError::ProcessorError("not ready".into())))
277 }
278 }
279 fn call(&mut self, ex: Exchange) -> Self::Future {
280 Box::pin(async move { Ok(ex) })
281 }
282 }
283
284 let fired = Arc::new(AtomicU64::new(0));
285 let fired_clone = Arc::clone(&fired);
286 let hook = BoxProcessor::from_fn(move |ex| {
287 fired_clone.fetch_add(1, Ordering::Relaxed);
288 Box::pin(async move { Ok(ex) })
289 });
290 let counter = Arc::new(AtomicU64::new(0));
291 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), None, Some(hook));
292 let svc = layer.layer(FailReadySvc {
293 polls: Arc::new(AtomicU64::new(0)),
294 });
295 let result = tower::ServiceExt::oneshot(svc, make_exchange()).await;
296 assert!(result.is_err());
297 assert_eq!(
298 fired.load(Ordering::Relaxed),
299 1,
300 "on_failure must fire when poll_ready fails"
301 );
302 assert_eq!(counter.load(Ordering::Relaxed), 0);
303 }
304
305 #[tokio::test]
306 async fn on_complete_does_not_fire_on_exchange_error() {
307 let fired = Arc::new(AtomicU64::new(0));
308 let fired_clone = Arc::clone(&fired);
309 let hook = BoxProcessor::from_fn(move |ex| {
310 fired_clone.fetch_add(1, Ordering::Relaxed);
311 Box::pin(async move { Ok(ex) })
312 });
313 let counter = Arc::new(AtomicU64::new(0));
314 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), Some(hook), None);
315 let _ = tower::ServiceExt::oneshot(layer.layer(error_exchange_proc()), make_exchange())
316 .await
317 .unwrap();
318 assert_eq!(fired.load(Ordering::Relaxed), 0);
319 }
320
321 #[tokio::test]
322 async fn hook_error_does_not_fail_exchange() {
323 let bad_hook = BoxProcessor::from_fn(|_| {
324 Box::pin(async { Err(CamelError::ProcessorError("hook failed".into())) })
325 });
326 let counter = Arc::new(AtomicU64::new(0));
327 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), Some(bad_hook), None);
328 let result = tower::ServiceExt::oneshot(layer.layer(identity()), make_exchange()).await;
329 assert!(
330 result.is_ok(),
331 "hook error must not fail exchange: {:?}",
332 result
333 );
334 }
335
336 #[test]
337 fn in_flight_guard_decrements_on_drop() {
338 let counter = Arc::new(AtomicU64::new(1));
339 {
340 let _guard = InFlightGuard(Arc::clone(&counter));
341 assert_eq!(counter.load(Ordering::Relaxed), 1);
342 }
343 assert_eq!(counter.load(Ordering::Relaxed), 0);
344 }
345}