Skip to main content

camel_processor/
sampling.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Mutex;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::{CAMEL_STOP, CamelError, Exchange, Value};
9
10/// A sampling processor that passes 1 of every N exchanges (deterministic,
11/// counter-based). Non-sampled exchanges get `CamelStop=true` set and return
12/// `Ok(exchange)` (drop semantics consistent with `ThrottleStrategy::Drop`).
13///
14/// # Which exchange passes
15///
16/// The counter starts at 0 and is incremented **before** the modulo check.
17/// Exchange #N passes (when `counter % period == 0`), then #2N, #3N, etc.
18///
19/// # Lifecycle
20///
21/// `SamplingService` does NOT implement `StepLifecycle` — the counter is
22/// route-scoped and reset on hot-swap (per ADR-0004). No background timers,
23/// nothing to drain.
24#[derive(Clone)]
25pub struct SamplingService {
26    period: usize,
27    counter: std::sync::Arc<Mutex<usize>>,
28}
29
30impl SamplingService {
31    pub fn new(period: usize) -> Self {
32        assert!(period > 0, "sampling period must be > 0");
33        Self {
34            period,
35            counter: std::sync::Arc::new(Mutex::new(0)),
36        }
37    }
38}
39
40impl Service<Exchange> for SamplingService {
41    type Response = Exchange;
42    type Error = CamelError;
43    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
44
45    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46        Poll::Ready(Ok(()))
47    }
48
49    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
50        let period = self.period;
51        let counter = std::sync::Arc::clone(&self.counter);
52
53        Box::pin(async move {
54            let passes = {
55                let mut c = counter.lock().unwrap(); // allow-unwrap
56                *c += 1;
57                (*c).is_multiple_of(period)
58            };
59
60            if passes {
61                Ok(exchange)
62            } else {
63                exchange.set_property(CAMEL_STOP, Value::Bool(true));
64                Ok(exchange)
65            }
66        })
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73    use camel_api::Message;
74    use tower::ServiceExt;
75
76    fn make_exchange() -> Exchange {
77        Exchange::new(Message::new("test"))
78    }
79
80    #[test]
81    fn period_0_rejected() {
82        let result = std::panic::catch_unwind(|| {
83            SamplingService::new(0);
84        });
85        assert!(result.is_err(), "zero period should panic");
86    }
87
88    #[tokio::test]
89    async fn period_1_passes_all() {
90        let mut svc = SamplingService::new(1);
91
92        for _ in 0..5 {
93            let result = svc.ready().await.unwrap().call(make_exchange()).await;
94            assert!(result.is_ok(), "all exchanges should pass with period=1");
95            let ex = result.unwrap();
96            assert_ne!(
97                ex.property(CAMEL_STOP),
98                Some(&Value::Bool(true)),
99                "passing exchange should NOT have CamelStop"
100            );
101        }
102    }
103
104    #[tokio::test]
105    async fn period_3_passes_every_3rd() {
106        let mut svc = SamplingService::new(3);
107
108        // Exchange 1 — should NOT pass (counter=1, 1%3=1 != 0)
109        let ex = svc
110            .ready()
111            .await
112            .unwrap()
113            .call(make_exchange())
114            .await
115            .unwrap();
116        assert_eq!(
117            ex.property(CAMEL_STOP),
118            Some(&Value::Bool(true)),
119            "exchange 1 should be dropped (CamelStop=true)"
120        );
121
122        // Exchange 2 — should NOT pass (counter=2, 2%3=2 != 0)
123        let ex = svc
124            .ready()
125            .await
126            .unwrap()
127            .call(make_exchange())
128            .await
129            .unwrap();
130        assert_eq!(
131            ex.property(CAMEL_STOP),
132            Some(&Value::Bool(true)),
133            "exchange 2 should be dropped (CamelStop=true)"
134        );
135
136        // Exchange 3 — should pass (counter=3, 3%3=0)
137        let ex = svc
138            .ready()
139            .await
140            .unwrap()
141            .call(make_exchange())
142            .await
143            .unwrap();
144        assert_ne!(
145            ex.property(CAMEL_STOP),
146            Some(&Value::Bool(true)),
147            "exchange 3 should pass (CamelStop not set)"
148        );
149
150        // Exchange 4 — should NOT pass (counter=4, 4%3=1 != 0)
151        let ex = svc
152            .ready()
153            .await
154            .unwrap()
155            .call(make_exchange())
156            .await
157            .unwrap();
158        assert_eq!(
159            ex.property(CAMEL_STOP),
160            Some(&Value::Bool(true)),
161            "exchange 4 should be dropped (CamelStop=true)"
162        );
163
164        // Exchange 5 — should NOT pass (counter=5, 5%3=2 != 0)
165        let ex = svc
166            .ready()
167            .await
168            .unwrap()
169            .call(make_exchange())
170            .await
171            .unwrap();
172        assert_eq!(
173            ex.property(CAMEL_STOP),
174            Some(&Value::Bool(true)),
175            "exchange 5 should be dropped (CamelStop=true)"
176        );
177
178        // Exchange 6 — should pass (counter=6, 6%3=0)
179        let ex = svc
180            .ready()
181            .await
182            .unwrap()
183            .call(make_exchange())
184            .await
185            .unwrap();
186        assert_ne!(
187            ex.property(CAMEL_STOP),
188            Some(&Value::Bool(true)),
189            "exchange 6 should pass (CamelStop not set)"
190        );
191    }
192
193    #[tokio::test]
194    async fn non_sampled_sets_camel_stop() {
195        let mut svc = SamplingService::new(10);
196
197        // First exchange never passes with period=10 (counter=1, 1%10=1)
198        let ex = svc
199            .ready()
200            .await
201            .unwrap()
202            .call(make_exchange())
203            .await
204            .unwrap();
205        assert_eq!(
206            ex.property(CAMEL_STOP),
207            Some(&Value::Bool(true)),
208            "non-sampled exchange must have CamelStop=true"
209        );
210
211        // Verify the exchange is otherwise intact
212        if let camel_api::body::Body::Text(ref t) = ex.input.body {
213            assert_eq!(t, "test", "exchange body should be preserved");
214        } else {
215            panic!("expected Text body");
216        }
217    }
218}