camel_processor/
wire_tap.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Service, ServiceExt};
6
7use camel_api::{CamelError, Exchange};
8
9#[derive(Clone)]
10pub struct WireTapService {
11 tap_endpoint: camel_api::BoxProcessor,
12}
13
14impl WireTapService {
15 pub fn new(tap_endpoint: camel_api::BoxProcessor) -> Self {
16 Self { tap_endpoint }
17 }
18}
19
20impl Service<Exchange> for WireTapService {
21 type Response = Exchange;
22 type Error = CamelError;
23 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
24
25 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
26 self.tap_endpoint.poll_ready(cx)
27 }
28
29 fn call(&mut self, exchange: Exchange) -> Self::Future {
30 let mut tap_endpoint = self.tap_endpoint.clone();
31 let tap_exchange = exchange.clone();
32
33 tokio::spawn(async move {
34 if let Err(e) = tap_endpoint.ready().await {
35 tracing::warn!("WireTap endpoint poll_ready failed: {}", e);
36 return;
37 }
38 if let Err(e) = tap_endpoint.call(tap_exchange).await {
39 tracing::error!("WireTap processing error: {}", e);
40 }
41 });
42
43 Box::pin(async move { Ok(exchange) })
44 }
45}
46
47pub struct WireTapLayer {
49 tap_endpoint: camel_api::BoxProcessor,
50}
51
52impl WireTapLayer {
53 pub fn new(tap_endpoint: camel_api::BoxProcessor) -> Self {
55 Self { tap_endpoint }
56 }
57}
58
59impl<S> tower::Layer<S> for WireTapLayer {
60 type Service = WireTapService;
61
62 fn layer(&self, _inner: S) -> Self::Service {
63 WireTapService::new(self.tap_endpoint.clone())
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use super::*;
70 use camel_api::{BoxProcessor, BoxProcessorExt, Message};
71 use std::sync::Arc;
72 use std::sync::atomic::{AtomicUsize, Ordering};
73 use tower::ServiceExt;
74
75 #[tokio::test]
76 async fn test_wire_tap_returns_original_immediately() {
77 let tap_processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
78
79 let mut wire_tap = WireTapService::new(tap_processor);
80 let exchange = Exchange::new(Message::new("test message"));
81
82 let result = wire_tap
83 .ready()
84 .await
85 .unwrap()
86 .call(exchange)
87 .await
88 .unwrap();
89
90 assert_eq!(result.input.body.as_text(), Some("test message"));
91 }
92
93 #[tokio::test]
94 async fn test_wire_tap_endpoint_receives_clone() {
95 let received_count = Arc::new(AtomicUsize::new(0));
96 let count_clone = received_count.clone();
97
98 let tap_processor = BoxProcessor::from_fn(move |ex| {
99 let count = count_clone.clone();
100 Box::pin(async move {
101 count.fetch_add(1, Ordering::SeqCst);
102 Ok(ex)
103 })
104 });
105
106 let mut wire_tap = WireTapService::new(tap_processor);
107 let exchange = Exchange::new(Message::new("test"));
108
109 let _result = wire_tap
110 .ready()
111 .await
112 .unwrap()
113 .call(exchange)
114 .await
115 .unwrap();
116
117 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
118
119 assert_eq!(received_count.load(Ordering::SeqCst), 1);
120 }
121
122 #[tokio::test]
123 async fn test_wire_tap_isolates_errors() {
124 let tap_processor = BoxProcessor::from_fn(|_ex| {
125 Box::pin(async move { Err(CamelError::ProcessorError("tap error".into())) })
126 });
127
128 let mut wire_tap = WireTapService::new(tap_processor);
129 let exchange = Exchange::new(Message::new("test"));
130
131 let result = wire_tap.ready().await.unwrap().call(exchange).await;
132
133 assert!(result.is_ok());
134 assert_eq!(result.unwrap().input.body.as_text(), Some("test"));
135 }
136
137 #[tokio::test]
138 async fn test_wire_tap_layer() {
139 use tower::Layer;
140
141 let tap_processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
142
143 let layer = super::WireTapLayer::new(tap_processor);
144 let inner = camel_api::IdentityProcessor;
145 let mut svc = layer.layer(inner);
146
147 let exchange = Exchange::new(Message::new("test"));
148 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
149
150 assert_eq!(result.input.body.as_text(), Some("test"));
151 }
152}