camel_processor/
sampling.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Mutex;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::{CAMEL_STOP, CamelError, Exchange, Value};
9
10#[derive(Clone)]
25pub struct SamplingService {
26 period: usize,
27 counter: std::sync::Arc<Mutex<usize>>,
28}
29
30impl SamplingService {
31 pub fn new(period: usize) -> Self {
32 assert!(period > 0, "sampling period must be > 0");
33 Self {
34 period,
35 counter: std::sync::Arc::new(Mutex::new(0)),
36 }
37 }
38}
39
40impl Service<Exchange> for SamplingService {
41 type Response = Exchange;
42 type Error = CamelError;
43 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
44
45 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46 Poll::Ready(Ok(()))
47 }
48
49 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
50 let period = self.period;
51 let counter = std::sync::Arc::clone(&self.counter);
52
53 Box::pin(async move {
54 let passes = {
55 let mut c = counter.lock().unwrap(); *c += 1;
57 (*c).is_multiple_of(period)
58 };
59
60 if passes {
61 Ok(exchange)
62 } else {
63 exchange.set_property(CAMEL_STOP, Value::Bool(true));
64 Ok(exchange)
65 }
66 })
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use super::*;
73 use camel_api::Message;
74 use tower::ServiceExt;
75
76 fn make_exchange() -> Exchange {
77 Exchange::new(Message::new("test"))
78 }
79
80 #[test]
81 fn period_0_rejected() {
82 let result = std::panic::catch_unwind(|| {
83 SamplingService::new(0);
84 });
85 assert!(result.is_err(), "zero period should panic");
86 }
87
88 #[tokio::test]
89 async fn period_1_passes_all() {
90 let mut svc = SamplingService::new(1);
91
92 for _ in 0..5 {
93 let result = svc.ready().await.unwrap().call(make_exchange()).await;
94 assert!(result.is_ok(), "all exchanges should pass with period=1");
95 let ex = result.unwrap();
96 assert_ne!(
97 ex.property(CAMEL_STOP),
98 Some(&Value::Bool(true)),
99 "passing exchange should NOT have CamelStop"
100 );
101 }
102 }
103
104 #[tokio::test]
105 async fn period_3_passes_every_3rd() {
106 let mut svc = SamplingService::new(3);
107
108 let ex = svc
110 .ready()
111 .await
112 .unwrap()
113 .call(make_exchange())
114 .await
115 .unwrap();
116 assert_eq!(
117 ex.property(CAMEL_STOP),
118 Some(&Value::Bool(true)),
119 "exchange 1 should be dropped (CamelStop=true)"
120 );
121
122 let ex = svc
124 .ready()
125 .await
126 .unwrap()
127 .call(make_exchange())
128 .await
129 .unwrap();
130 assert_eq!(
131 ex.property(CAMEL_STOP),
132 Some(&Value::Bool(true)),
133 "exchange 2 should be dropped (CamelStop=true)"
134 );
135
136 let ex = svc
138 .ready()
139 .await
140 .unwrap()
141 .call(make_exchange())
142 .await
143 .unwrap();
144 assert_ne!(
145 ex.property(CAMEL_STOP),
146 Some(&Value::Bool(true)),
147 "exchange 3 should pass (CamelStop not set)"
148 );
149
150 let ex = svc
152 .ready()
153 .await
154 .unwrap()
155 .call(make_exchange())
156 .await
157 .unwrap();
158 assert_eq!(
159 ex.property(CAMEL_STOP),
160 Some(&Value::Bool(true)),
161 "exchange 4 should be dropped (CamelStop=true)"
162 );
163
164 let ex = svc
166 .ready()
167 .await
168 .unwrap()
169 .call(make_exchange())
170 .await
171 .unwrap();
172 assert_eq!(
173 ex.property(CAMEL_STOP),
174 Some(&Value::Bool(true)),
175 "exchange 5 should be dropped (CamelStop=true)"
176 );
177
178 let ex = svc
180 .ready()
181 .await
182 .unwrap()
183 .call(make_exchange())
184 .await
185 .unwrap();
186 assert_ne!(
187 ex.property(CAMEL_STOP),
188 Some(&Value::Bool(true)),
189 "exchange 6 should pass (CamelStop not set)"
190 );
191 }
192
193 #[tokio::test]
194 async fn non_sampled_sets_camel_stop() {
195 let mut svc = SamplingService::new(10);
196
197 let ex = svc
199 .ready()
200 .await
201 .unwrap()
202 .call(make_exchange())
203 .await
204 .unwrap();
205 assert_eq!(
206 ex.property(CAMEL_STOP),
207 Some(&Value::Bool(true)),
208 "non-sampled exchange must have CamelStop=true"
209 );
210
211 if let camel_api::body::Body::Text(ref t) = ex.input.body {
213 assert_eq!(t, "test", "exchange body should be preserved");
214 } else {
215 panic!("expected Text body");
216 }
217 }
218}