Skip to main content

tracing_log_sample/
lib.rs

1//! A [`tracing_subscriber::Layer`] that rate-limits log output using
2//! [reservoir sampling](https://en.wikipedia.org/wiki/Reservoir_sampling).
3//!
4//! Events are collected into fixed-duration time buckets and uniformly sampled,
5//! guaranteeing every event has an equal chance of being kept. Multiple sampling
6//! budgets can be configured with [`EnvFilter`](tracing_subscriber::filter::EnvFilter)
7//! patterns — events displaced from one budget's reservoir cascade to the next
8//! matching budget.
9//!
10//! Sampled events are released gradually over the following bucket via adaptive
11//! smearing, avoiding write bursts at rotation boundaries.
12//!
13//! Formatting is delegated to [`tracing_subscriber::fmt::Layer`], so all the
14//! usual formatting options (compact, pretty, JSON, timestamps, etc.) work
15//! out of the box.
16//!
17//! # Example
18//!
19//! ```
20//! use std::time::Duration;
21//! use tracing_subscriber::{Registry, filter::EnvFilter, layer::SubscriberExt};
22//! use tracing_log_sample::SamplingLayer;
23//!
24//! let (layer, stats) = SamplingLayer::<Registry>::builder()
25//!     .bucket_duration(Duration::from_millis(50))
26//!     .budget(EnvFilter::new("error"), 1000)
27//!     .budget(EnvFilter::new("info"), 5000)
28//!     .build();
29//!
30//! let subscriber = Registry::default().with(layer);
31//! // stats.received(), stats.sampled(), stats.dropped()
32//! // tracing::subscriber::set_global_default(subscriber).unwrap();
33//! ```
34
35mod builder;
36mod capture;
37mod layer;
38mod reservoir;
39
40pub use builder::SamplingLayerBuilder;
41pub use layer::{SamplingLayer, Stats};
42
43#[cfg(test)]
44mod tests {
45    use std::io::{self, Write};
46    use std::sync::{Arc, Mutex};
47    use std::time::Duration;
48
49    use tracing_subscriber::Registry;
50    use tracing_subscriber::filter::EnvFilter;
51    use tracing_subscriber::fmt::MakeWriter;
52    use tracing_subscriber::layer::SubscriberExt;
53
54    use crate::SamplingLayer;
55
56    #[derive(Clone, Default)]
57    struct SharedBuf(Arc<Mutex<Vec<u8>>>);
58
59    impl Write for SharedBuf {
60        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
61            self.0.lock().unwrap().write(buf)
62        }
63        fn flush(&mut self) -> io::Result<()> {
64            Ok(())
65        }
66    }
67
68    impl<'a> MakeWriter<'a> for SharedBuf {
69        type Writer = SharedBuf;
70        fn make_writer(&'a self) -> Self::Writer {
71            self.clone()
72        }
73    }
74
75    impl SharedBuf {
76        fn lines(&self) -> Vec<String> {
77            let raw = self.0.lock().unwrap();
78            let s = String::from_utf8_lossy(&raw);
79            s.lines().map(String::from).collect()
80        }
81    }
82
83    fn capture_layer(
84        bucket_ms: u64,
85        budgets: &[(&str, u64)],
86    ) -> (impl tracing_subscriber::Layer<Registry>, SharedBuf) {
87        let buf = SharedBuf::default();
88        let mut builder = SamplingLayer::<Registry>::builder()
89            .without_time()
90            .with_target(false)
91            .bucket_duration(Duration::from_millis(bucket_ms))
92            .writer(buf.clone());
93        for &(filter, limit) in budgets {
94            builder = builder.budget(EnvFilter::new(filter), limit);
95        }
96        let (layer, _stats) = builder.build();
97        (layer, buf)
98    }
99
100    #[test]
101    fn reservoir_keeps_at_most_limit() {
102        let (layer, buf) = capture_layer(1_000, &[("error", 10)]);
103        let subscriber = Registry::default().with(layer);
104
105        tracing::subscriber::with_default(subscriber, || {
106            for _ in 0..100 {
107                tracing::error!("event");
108            }
109        });
110
111        let lines = buf.lines();
112        assert_eq!(lines.len(), 10, "expected 10 events, got {}", lines.len());
113    }
114
115    #[test]
116    fn ejected_events_cascade_to_next_budget() {
117        let (layer, buf) = capture_layer(1_000, &[("error", 5), ("trace", 50)]);
118        let subscriber = Registry::default().with(layer);
119
120        tracing::subscriber::with_default(subscriber, || {
121            for _ in 0..100 {
122                tracing::error!("event");
123            }
124        });
125
126        let lines = buf.lines();
127        assert!(
128            lines.len() > 5,
129            "cascade should produce more than budget 1's limit of 5, got {}",
130            lines.len()
131        );
132        assert!(
133            lines.len() <= 55,
134            "total should be at most 5 + 50 = 55, got {}",
135            lines.len()
136        );
137    }
138
139    #[test]
140    fn non_matching_events_are_dropped() {
141        let (layer, buf) = capture_layer(1_000, &[("error", 100)]);
142        let subscriber = Registry::default().with(layer);
143
144        tracing::subscriber::with_default(subscriber, || {
145            for _ in 0..50 {
146                tracing::debug!("should be dropped");
147            }
148        });
149
150        let lines = buf.lines();
151        assert_eq!(lines.len(), 0, "debug events should not match error filter");
152    }
153
154    #[test]
155    fn multiple_budgets_separate_levels() {
156        let (layer, buf) = capture_layer(1_000, &[("error", 10), ("debug", 10)]);
157        let subscriber = Registry::default().with(layer);
158
159        tracing::subscriber::with_default(subscriber, || {
160            for _ in 0..50 {
161                tracing::error!("err");
162            }
163            for _ in 0..50 {
164                tracing::debug!("dbg");
165            }
166        });
167
168        let lines = buf.lines();
169        let error_count = lines.iter().filter(|l| l.contains("ERROR")).count();
170        let debug_count = lines.iter().filter(|l| l.contains("DEBUG")).count();
171
172        assert!(
173            error_count >= 10,
174            "should have at least 10 errors, got {error_count}"
175        );
176        assert!(
177            debug_count >= 1,
178            "should have at least 1 debug, got {debug_count}"
179        );
180    }
181
182    #[test]
183    fn flushed_events_are_in_arrival_order() {
184        let (layer, buf) = capture_layer(1_000, &[("error", 10)]);
185        let subscriber = Registry::default().with(layer);
186
187        tracing::subscriber::with_default(subscriber, || {
188            for i in 0..200 {
189                tracing::error!(i, "seq");
190            }
191        });
192
193        let lines = buf.lines();
194        assert_eq!(lines.len(), 10);
195
196        let numbers: Vec<usize> = lines
197            .iter()
198            .map(|line| {
199                let s = line.rsplit("i=").next().unwrap().trim();
200                s.parse().unwrap()
201            })
202            .collect();
203
204        for w in numbers.windows(2) {
205            assert!(
206                w[0] < w[1],
207                "events not in arrival order: {} came before {}",
208                w[0],
209                w[1]
210            );
211        }
212    }
213
214    #[test]
215    fn flushed_cascade_events_are_in_arrival_order() {
216        let (layer, buf) = capture_layer(1_000, &[("error", 5), ("trace", 10)]);
217        let subscriber = Registry::default().with(layer);
218
219        tracing::subscriber::with_default(subscriber, || {
220            for i in 0..200u32 {
221                if i % 2 == 0 {
222                    tracing::error!(i, "seq");
223                } else {
224                    tracing::trace!(i, "seq");
225                }
226            }
227        });
228
229        let lines = buf.lines();
230        assert!(
231            lines.len() > 5,
232            "cascade should produce more than first budget's limit of 5, got {}",
233            lines.len()
234        );
235
236        let numbers: Vec<usize> = lines
237            .iter()
238            .map(|line| {
239                let s = line.rsplit("i=").next().unwrap().trim();
240                s.parse().unwrap()
241            })
242            .collect();
243
244        for w in numbers.windows(2) {
245            assert!(
246                w[0] < w[1],
247                "events not in arrival order: {} came before {}",
248                w[0],
249                w[1]
250            );
251        }
252    }
253
254    #[test]
255    fn bucket_rotation_flushes() {
256        let (layer, buf) = capture_layer(50, &[("trace", 1000)]);
257        let subscriber = Registry::default().with(layer);
258
259        tracing::subscriber::with_default(subscriber, || {
260            for _ in 0..10 {
261                tracing::info!("batch1");
262            }
263            std::thread::sleep(Duration::from_millis(60));
264            tracing::info!("batch2");
265        });
266
267        let lines = buf.lines();
268        assert!(
269            lines.len() >= 10,
270            "should have flushed batch1 on rotation, got {}",
271            lines.len()
272        );
273    }
274
275    #[test]
276    fn smearing_releases_incrementally() {
277        // 100/s * 0.2s = 20 per bucket
278        let (layer, buf) = capture_layer(200, &[("error", 100)]);
279        let subscriber = Registry::default().with(layer);
280
281        tracing::subscriber::with_default(subscriber, || {
282            for _ in 0..50 {
283                tracing::error!("fill");
284            }
285
286            std::thread::sleep(Duration::from_millis(210));
287
288            let mut counts = Vec::new();
289            for _ in 0..20 {
290                tracing::error!("trigger");
291                std::thread::sleep(Duration::from_millis(10));
292                counts.push(buf.lines().len());
293            }
294
295            let increases = counts.windows(2).filter(|w| w[1] > w[0]).count();
296            assert!(
297                increases >= 2,
298                "events should trickle out over multiple on_event calls, \
299                 but count only increased {} times: {:?}",
300                increases,
301                counts
302            );
303        });
304    }
305
306    #[test]
307    fn smearing_writes_all_events() {
308        let (layer, buf) = capture_layer(1_000, &[("error", 10)]);
309        let subscriber = Registry::default().with(layer);
310
311        tracing::subscriber::with_default(subscriber, || {
312            for _ in 0..100 {
313                tracing::error!("event");
314            }
315        });
316
317        let lines = buf.lines();
318        assert_eq!(
319            lines.len(),
320            10,
321            "all sampled events should be written on drop"
322        );
323    }
324
325    #[test]
326    fn smearing_preserves_order() {
327        // 50/s * 0.2s = 10 per bucket
328        let (layer, buf) = capture_layer(200, &[("error", 50)]);
329        let subscriber = Registry::default().with(layer);
330
331        tracing::subscriber::with_default(subscriber, || {
332            for i in 0..200 {
333                tracing::error!(i, "seq");
334            }
335            std::thread::sleep(Duration::from_millis(210));
336            for _ in 0..20 {
337                tracing::error!("trigger");
338                std::thread::sleep(Duration::from_millis(10));
339            }
340        });
341
342        let lines = buf.lines();
343        let numbers: Vec<usize> = lines
344            .iter()
345            .filter_map(|line| {
346                let s = line.rsplit("i=").next()?.trim();
347                s.parse().ok()
348            })
349            .collect();
350
351        assert!(
352            numbers.len() >= 10,
353            "should have at least 10 sequenced events, got {}",
354            numbers.len()
355        );
356
357        for w in numbers.windows(2) {
358            assert!(
359                w[0] < w[1],
360                "smeared events not in arrival order: {} came before {}",
361                w[0],
362                w[1]
363            );
364        }
365    }
366}