1mod builder;
36mod capture;
37mod layer;
38mod reservoir;
39
40pub use builder::SamplingLayerBuilder;
41pub use layer::{SamplingLayer, Stats};
42
43#[cfg(test)]
44mod tests {
45 use std::io::{self, Write};
46 use std::sync::{Arc, Mutex};
47 use std::time::Duration;
48
49 use tracing_subscriber::Registry;
50 use tracing_subscriber::filter::EnvFilter;
51 use tracing_subscriber::fmt::MakeWriter;
52 use tracing_subscriber::layer::SubscriberExt;
53
54 use crate::SamplingLayer;
55
56 #[derive(Clone, Default)]
57 struct SharedBuf(Arc<Mutex<Vec<u8>>>);
58
59 impl Write for SharedBuf {
60 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
61 self.0.lock().unwrap().write(buf)
62 }
63 fn flush(&mut self) -> io::Result<()> {
64 Ok(())
65 }
66 }
67
68 impl<'a> MakeWriter<'a> for SharedBuf {
69 type Writer = SharedBuf;
70 fn make_writer(&'a self) -> Self::Writer {
71 self.clone()
72 }
73 }
74
75 impl SharedBuf {
76 fn lines(&self) -> Vec<String> {
77 let raw = self.0.lock().unwrap();
78 let s = String::from_utf8_lossy(&raw);
79 s.lines().map(String::from).collect()
80 }
81 }
82
83 fn capture_layer(
84 bucket_ms: u64,
85 budgets: &[(&str, u64)],
86 ) -> (impl tracing_subscriber::Layer<Registry>, SharedBuf) {
87 let buf = SharedBuf::default();
88 let mut builder = SamplingLayer::<Registry>::builder()
89 .without_time()
90 .with_target(false)
91 .bucket_duration(Duration::from_millis(bucket_ms))
92 .writer(buf.clone());
93 for &(filter, limit) in budgets {
94 builder = builder.budget(EnvFilter::new(filter), limit);
95 }
96 let (layer, _stats) = builder.build();
97 (layer, buf)
98 }
99
100 #[test]
101 fn reservoir_keeps_at_most_limit() {
102 let (layer, buf) = capture_layer(1_000, &[("error", 10)]);
103 let subscriber = Registry::default().with(layer);
104
105 tracing::subscriber::with_default(subscriber, || {
106 for _ in 0..100 {
107 tracing::error!("event");
108 }
109 });
110
111 let lines = buf.lines();
112 assert_eq!(lines.len(), 10, "expected 10 events, got {}", lines.len());
113 }
114
115 #[test]
116 fn ejected_events_cascade_to_next_budget() {
117 let (layer, buf) = capture_layer(1_000, &[("error", 5), ("trace", 50)]);
118 let subscriber = Registry::default().with(layer);
119
120 tracing::subscriber::with_default(subscriber, || {
121 for _ in 0..100 {
122 tracing::error!("event");
123 }
124 });
125
126 let lines = buf.lines();
127 assert!(
128 lines.len() > 5,
129 "cascade should produce more than budget 1's limit of 5, got {}",
130 lines.len()
131 );
132 assert!(
133 lines.len() <= 55,
134 "total should be at most 5 + 50 = 55, got {}",
135 lines.len()
136 );
137 }
138
139 #[test]
140 fn non_matching_events_are_dropped() {
141 let (layer, buf) = capture_layer(1_000, &[("error", 100)]);
142 let subscriber = Registry::default().with(layer);
143
144 tracing::subscriber::with_default(subscriber, || {
145 for _ in 0..50 {
146 tracing::debug!("should be dropped");
147 }
148 });
149
150 let lines = buf.lines();
151 assert_eq!(lines.len(), 0, "debug events should not match error filter");
152 }
153
154 #[test]
155 fn multiple_budgets_separate_levels() {
156 let (layer, buf) = capture_layer(1_000, &[("error", 10), ("debug", 10)]);
157 let subscriber = Registry::default().with(layer);
158
159 tracing::subscriber::with_default(subscriber, || {
160 for _ in 0..50 {
161 tracing::error!("err");
162 }
163 for _ in 0..50 {
164 tracing::debug!("dbg");
165 }
166 });
167
168 let lines = buf.lines();
169 let error_count = lines.iter().filter(|l| l.contains("ERROR")).count();
170 let debug_count = lines.iter().filter(|l| l.contains("DEBUG")).count();
171
172 assert!(
173 error_count >= 10,
174 "should have at least 10 errors, got {error_count}"
175 );
176 assert!(
177 debug_count >= 1,
178 "should have at least 1 debug, got {debug_count}"
179 );
180 }
181
182 #[test]
183 fn flushed_events_are_in_arrival_order() {
184 let (layer, buf) = capture_layer(1_000, &[("error", 10)]);
185 let subscriber = Registry::default().with(layer);
186
187 tracing::subscriber::with_default(subscriber, || {
188 for i in 0..200 {
189 tracing::error!(i, "seq");
190 }
191 });
192
193 let lines = buf.lines();
194 assert_eq!(lines.len(), 10);
195
196 let numbers: Vec<usize> = lines
197 .iter()
198 .map(|line| {
199 let s = line.rsplit("i=").next().unwrap().trim();
200 s.parse().unwrap()
201 })
202 .collect();
203
204 for w in numbers.windows(2) {
205 assert!(
206 w[0] < w[1],
207 "events not in arrival order: {} came before {}",
208 w[0],
209 w[1]
210 );
211 }
212 }
213
214 #[test]
215 fn flushed_cascade_events_are_in_arrival_order() {
216 let (layer, buf) = capture_layer(1_000, &[("error", 5), ("trace", 10)]);
217 let subscriber = Registry::default().with(layer);
218
219 tracing::subscriber::with_default(subscriber, || {
220 for i in 0..200u32 {
221 if i % 2 == 0 {
222 tracing::error!(i, "seq");
223 } else {
224 tracing::trace!(i, "seq");
225 }
226 }
227 });
228
229 let lines = buf.lines();
230 assert!(
231 lines.len() > 5,
232 "cascade should produce more than first budget's limit of 5, got {}",
233 lines.len()
234 );
235
236 let numbers: Vec<usize> = lines
237 .iter()
238 .map(|line| {
239 let s = line.rsplit("i=").next().unwrap().trim();
240 s.parse().unwrap()
241 })
242 .collect();
243
244 for w in numbers.windows(2) {
245 assert!(
246 w[0] < w[1],
247 "events not in arrival order: {} came before {}",
248 w[0],
249 w[1]
250 );
251 }
252 }
253
254 #[test]
255 fn bucket_rotation_flushes() {
256 let (layer, buf) = capture_layer(50, &[("trace", 1000)]);
257 let subscriber = Registry::default().with(layer);
258
259 tracing::subscriber::with_default(subscriber, || {
260 for _ in 0..10 {
261 tracing::info!("batch1");
262 }
263 std::thread::sleep(Duration::from_millis(60));
264 tracing::info!("batch2");
265 });
266
267 let lines = buf.lines();
268 assert!(
269 lines.len() >= 10,
270 "should have flushed batch1 on rotation, got {}",
271 lines.len()
272 );
273 }
274
275 #[test]
276 fn smearing_releases_incrementally() {
277 let (layer, buf) = capture_layer(200, &[("error", 100)]);
279 let subscriber = Registry::default().with(layer);
280
281 tracing::subscriber::with_default(subscriber, || {
282 for _ in 0..50 {
283 tracing::error!("fill");
284 }
285
286 std::thread::sleep(Duration::from_millis(210));
287
288 let mut counts = Vec::new();
289 for _ in 0..20 {
290 tracing::error!("trigger");
291 std::thread::sleep(Duration::from_millis(10));
292 counts.push(buf.lines().len());
293 }
294
295 let increases = counts.windows(2).filter(|w| w[1] > w[0]).count();
296 assert!(
297 increases >= 2,
298 "events should trickle out over multiple on_event calls, \
299 but count only increased {} times: {:?}",
300 increases,
301 counts
302 );
303 });
304 }
305
306 #[test]
307 fn smearing_writes_all_events() {
308 let (layer, buf) = capture_layer(1_000, &[("error", 10)]);
309 let subscriber = Registry::default().with(layer);
310
311 tracing::subscriber::with_default(subscriber, || {
312 for _ in 0..100 {
313 tracing::error!("event");
314 }
315 });
316
317 let lines = buf.lines();
318 assert_eq!(
319 lines.len(),
320 10,
321 "all sampled events should be written on drop"
322 );
323 }
324
325 #[test]
326 fn smearing_preserves_order() {
327 let (layer, buf) = capture_layer(200, &[("error", 50)]);
329 let subscriber = Registry::default().with(layer);
330
331 tracing::subscriber::with_default(subscriber, || {
332 for i in 0..200 {
333 tracing::error!(i, "seq");
334 }
335 std::thread::sleep(Duration::from_millis(210));
336 for _ in 0..20 {
337 tracing::error!("trigger");
338 std::thread::sleep(Duration::from_millis(10));
339 }
340 });
341
342 let lines = buf.lines();
343 let numbers: Vec<usize> = lines
344 .iter()
345 .filter_map(|line| {
346 let s = line.rsplit("i=").next()?.trim();
347 s.parse().ok()
348 })
349 .collect();
350
351 assert!(
352 numbers.len() >= 10,
353 "should have at least 10 sequenced events, got {}",
354 numbers.len()
355 );
356
357 for w in numbers.windows(2) {
358 assert!(
359 w[0] < w[1],
360 "smeared events not in arrival order: {} came before {}",
361 w[0],
362 w[1]
363 );
364 }
365 }
366}