Skip to main content

camel_processor/
wire_tap.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Service, ServiceExt};
6
7use camel_api::{CamelError, Exchange};
8
9#[derive(Clone)]
10pub struct WireTapService {
11    tap_endpoint: camel_api::BoxProcessor,
12}
13
14impl WireTapService {
15    pub fn new(tap_endpoint: camel_api::BoxProcessor) -> Self {
16        Self { tap_endpoint }
17    }
18}
19
20impl Service<Exchange> for WireTapService {
21    type Response = Exchange;
22    type Error = CamelError;
23    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
24
25    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
26        self.tap_endpoint.poll_ready(cx)
27    }
28
29    fn call(&mut self, exchange: Exchange) -> Self::Future {
30        let mut tap_endpoint = self.tap_endpoint.clone();
31        let tap_exchange = exchange.clone();
32
33        tokio::spawn(async move {
34            if let Err(e) = tap_endpoint.ready().await {
35                tracing::warn!("WireTap endpoint poll_ready failed: {}", e);
36                return;
37            }
38            if let Err(e) = tap_endpoint.call(tap_exchange).await {
39                tracing::error!("WireTap processing error: {}", e);
40            }
41        });
42
43        Box::pin(async move { Ok(exchange) })
44    }
45}
46
47/// A Tower layer that produces `WireTapService` instances.
48pub struct WireTapLayer {
49    tap_endpoint: camel_api::BoxProcessor,
50}
51
52impl WireTapLayer {
53    /// Create a new WireTapLayer with the given tap endpoint processor.
54    pub fn new(tap_endpoint: camel_api::BoxProcessor) -> Self {
55        Self { tap_endpoint }
56    }
57}
58
59impl<S> tower::Layer<S> for WireTapLayer {
60    type Service = WireTapService;
61
62    fn layer(&self, _inner: S) -> Self::Service {
63        WireTapService::new(self.tap_endpoint.clone())
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use super::*;
70    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
71    use std::sync::Arc;
72    use std::sync::atomic::{AtomicUsize, Ordering};
73    use tower::ServiceExt;
74
75    #[tokio::test]
76    async fn test_wire_tap_returns_original_immediately() {
77        let tap_processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
78
79        let mut wire_tap = WireTapService::new(tap_processor);
80        let exchange = Exchange::new(Message::new("test message"));
81
82        let result = wire_tap
83            .ready()
84            .await
85            .unwrap()
86            .call(exchange)
87            .await
88            .unwrap();
89
90        assert_eq!(result.input.body.as_text(), Some("test message"));
91    }
92
93    #[tokio::test]
94    async fn test_wire_tap_endpoint_receives_clone() {
95        let received_count = Arc::new(AtomicUsize::new(0));
96        let count_clone = received_count.clone();
97
98        let tap_processor = BoxProcessor::from_fn(move |ex| {
99            let count = count_clone.clone();
100            Box::pin(async move {
101                count.fetch_add(1, Ordering::SeqCst);
102                Ok(ex)
103            })
104        });
105
106        let mut wire_tap = WireTapService::new(tap_processor);
107        let exchange = Exchange::new(Message::new("test"));
108
109        let _result = wire_tap
110            .ready()
111            .await
112            .unwrap()
113            .call(exchange)
114            .await
115            .unwrap();
116
117        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
118
119        assert_eq!(received_count.load(Ordering::SeqCst), 1);
120    }
121
122    #[tokio::test]
123    async fn test_wire_tap_isolates_errors() {
124        let tap_processor = BoxProcessor::from_fn(|_ex| {
125            Box::pin(async move { Err(CamelError::ProcessorError("tap error".into())) })
126        });
127
128        let mut wire_tap = WireTapService::new(tap_processor);
129        let exchange = Exchange::new(Message::new("test"));
130
131        let result = wire_tap.ready().await.unwrap().call(exchange).await;
132
133        assert!(result.is_ok());
134        assert_eq!(result.unwrap().input.body.as_text(), Some("test"));
135    }
136
137    #[tokio::test]
138    async fn test_wire_tap_layer() {
139        use tower::Layer;
140
141        let tap_processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
142
143        let layer = super::WireTapLayer::new(tap_processor);
144        let inner = camel_api::IdentityProcessor;
145        let mut svc = layer.layer(inner);
146
147        let exchange = Exchange::new(Message::new("test"));
148        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
149
150        assert_eq!(result.input.body.as_text(), Some("test"));
151    }
152}