Skip to main content

camel_processor/
wire_tap.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tokio::sync::Semaphore;
7use tower::{Service, ServiceExt};
8
9use camel_api::{CamelError, Exchange};
10
11/// Configuration for [`WireTapService`].
12#[derive(Clone, Default)]
13pub struct WireTapConfig {
14    /// Maximum number of concurrent tap tasks. `None` means unlimited (backward-compatible).
15    pub max_concurrent: Option<usize>,
16}
17
18impl WireTapConfig {
19    /// Create a config with a bounded concurrency limit.
20    pub fn bounded(max_concurrent: usize) -> Self {
21        assert!(max_concurrent > 0, "max_concurrent must be > 0");
22        Self {
23            max_concurrent: Some(max_concurrent),
24        }
25    }
26}
27
28#[derive(Clone)]
29pub struct WireTapService {
30    tap_endpoint: camel_api::BoxProcessor,
31    semaphore: Option<Arc<Semaphore>>,
32}
33
34impl WireTapService {
35    /// Create a new `WireTapService` with default (unbounded) concurrency.
36    pub fn new(tap_endpoint: camel_api::BoxProcessor) -> Self {
37        Self {
38            tap_endpoint,
39            semaphore: None,
40        }
41    }
42
43    /// Create a new `WireTapService` from a [`WireTapConfig`].
44    pub fn with_config(tap_endpoint: camel_api::BoxProcessor, config: WireTapConfig) -> Self {
45        let semaphore = config
46            .max_concurrent
47            .map(|limit| Arc::new(Semaphore::new(limit)));
48        Self {
49            tap_endpoint,
50            semaphore,
51        }
52    }
53}
54
55impl Service<Exchange> for WireTapService {
56    type Response = Exchange;
57    type Error = CamelError;
58    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
59
60    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61        self.tap_endpoint.poll_ready(cx)
62    }
63
64    fn call(&mut self, exchange: Exchange) -> Self::Future {
65        let mut tap_endpoint = self.tap_endpoint.clone();
66        let tap_exchange = exchange.clone();
67        let semaphore = self.semaphore.clone();
68
69        tokio::spawn(async move {
70            // Acquire semaphore permit if concurrency is bounded.
71            let _permit = match &semaphore {
72                Some(sem) => match sem.acquire().await {
73                    Ok(p) => Some(p),
74                    Err(_) => {
75                        tracing::warn!("WireTap semaphore closed, dropping tap");
76                        return;
77                    }
78                },
79                None => None,
80            };
81
82            if let Err(e) = tap_endpoint.ready().await {
83                tracing::warn!("WireTap endpoint poll_ready failed: {}", e);
84                return;
85            }
86            if let Err(e) = tap_endpoint.call(tap_exchange).await {
87                // log-policy: handler-owned
88                tracing::warn!("WireTap processing error: {}", e);
89            }
90        });
91
92        Box::pin(async move { Ok(exchange) })
93    }
94}
95
96/// A Tower layer that produces `WireTapService` instances.
97pub struct WireTapLayer {
98    tap_endpoint: camel_api::BoxProcessor,
99    config: WireTapConfig,
100}
101
102impl WireTapLayer {
103    /// Create a new WireTapLayer with the given tap endpoint processor (unbounded).
104    pub fn new(tap_endpoint: camel_api::BoxProcessor) -> Self {
105        Self {
106            tap_endpoint,
107            config: WireTapConfig::default(),
108        }
109    }
110
111    /// Create a new WireTapLayer with bounded concurrency.
112    pub fn bounded(tap_endpoint: camel_api::BoxProcessor, max_concurrent: usize) -> Self {
113        Self {
114            tap_endpoint,
115            config: WireTapConfig::bounded(max_concurrent),
116        }
117    }
118}
119
120impl<S> tower::Layer<S> for WireTapLayer {
121    type Service = WireTapService;
122
123    fn layer(&self, _inner: S) -> Self::Service {
124        WireTapService::with_config(self.tap_endpoint.clone(), self.config.clone())
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
132    use std::sync::Arc;
133    use std::sync::atomic::{AtomicUsize, Ordering};
134    use tower::ServiceExt;
135
136    #[tokio::test]
137    async fn test_wire_tap_returns_original_immediately() {
138        let tap_processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
139
140        let mut wire_tap = WireTapService::new(tap_processor);
141        let exchange = Exchange::new(Message::new("test message"));
142
143        let result = wire_tap
144            .ready()
145            .await
146            .unwrap()
147            .call(exchange)
148            .await
149            .unwrap();
150
151        assert_eq!(result.input.body.as_text(), Some("test message"));
152    }
153
154    #[tokio::test]
155    async fn test_wire_tap_endpoint_receives_clone() {
156        let received_count = Arc::new(AtomicUsize::new(0));
157        let count_clone = received_count.clone();
158
159        let tap_processor = BoxProcessor::from_fn(move |ex| {
160            let count = count_clone.clone();
161            Box::pin(async move {
162                count.fetch_add(1, Ordering::SeqCst);
163                Ok(ex)
164            })
165        });
166
167        let mut wire_tap = WireTapService::new(tap_processor);
168        let exchange = Exchange::new(Message::new("test"));
169
170        let _result = wire_tap
171            .ready()
172            .await
173            .unwrap()
174            .call(exchange)
175            .await
176            .unwrap();
177
178        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
179
180        assert_eq!(received_count.load(Ordering::SeqCst), 1);
181    }
182
183    #[tokio::test]
184    async fn test_wire_tap_isolates_errors() {
185        let tap_processor = BoxProcessor::from_fn(|_ex| {
186            Box::pin(async move { Err(CamelError::ProcessorError("tap error".into())) })
187        });
188
189        let mut wire_tap = WireTapService::new(tap_processor);
190        let exchange = Exchange::new(Message::new("test"));
191
192        let result = wire_tap.ready().await.unwrap().call(exchange).await;
193
194        assert!(result.is_ok());
195        assert_eq!(result.unwrap().input.body.as_text(), Some("test"));
196    }
197
198    #[tokio::test]
199    async fn test_wire_tap_layer() {
200        use tower::Layer;
201
202        let tap_processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
203
204        let layer = super::WireTapLayer::new(tap_processor);
205        let inner = camel_api::IdentityProcessor;
206        let mut svc = layer.layer(inner);
207
208        let exchange = Exchange::new(Message::new("test"));
209        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
210
211        assert_eq!(result.input.body.as_text(), Some("test"));
212    }
213
214    #[tokio::test]
215    async fn test_wiretap_bounded_concurrency() {
216        // WireTap with max_concurrent=2: sending 3 slow exchanges must not
217        // spawn more than 2 concurrent tap tasks at any point.
218        let concurrent = Arc::new(AtomicUsize::new(0));
219        let max_concurrent = Arc::new(AtomicUsize::new(0));
220
221        let c = Arc::clone(&concurrent);
222        let mc = Arc::clone(&max_concurrent);
223        let tap_processor = BoxProcessor::from_fn(move |ex| {
224            let c = Arc::clone(&c);
225            let mc = Arc::clone(&mc);
226            Box::pin(async move {
227                let current = c.fetch_add(1, Ordering::SeqCst) + 1;
228                mc.fetch_max(current, Ordering::SeqCst);
229                // Hold the task open so multiple taps overlap.
230                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
231                c.fetch_sub(1, Ordering::SeqCst);
232                Ok(ex)
233            })
234        });
235
236        let config = super::WireTapConfig::bounded(2);
237        let mut svc = super::WireTapService::with_config(tap_processor, config);
238
239        // Send 3 exchanges rapidly — at most 2 tap tasks should run concurrently.
240        for _ in 0..3 {
241            let ex = Exchange::new(Message::new("test"));
242            let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
243        }
244
245        // Wait for all tap tasks to complete.
246        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
247
248        let observed_max = max_concurrent.load(Ordering::SeqCst);
249        assert!(
250            observed_max <= 2,
251            "max concurrency was {observed_max}, expected <= 2"
252        );
253    }
254}