Skip to main content

tracing_log_sample/
layer.rs

1use std::io::{self, Write};
2use std::marker::PhantomData;
3use std::sync::Mutex;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::time::{Duration, Instant};
6
7use tracing::subscriber::Interest;
8use tracing::{Event, Metadata, Subscriber};
9use tracing_subscriber::Layer;
10use tracing_subscriber::filter::EnvFilter;
11use tracing_subscriber::fmt::format::{DefaultFields, Format, Full};
12use tracing_subscriber::fmt::{self, FormatFields, MakeWriter};
13use tracing_subscriber::layer::Context;
14use tracing_subscriber::registry::LookupSpan;
15
16use crate::capture::{CaptureMakeWriter, return_captured, take_captured};
17use crate::reservoir::Reservoir;
18
19pub(crate) struct State {
20    pub(crate) bucket_start: Instant,
21    pub(crate) seq: u64,
22    pub(crate) reservoirs: Vec<Reservoir<(u64, Vec<u8>)>>,
23    pub(crate) pending: std::vec::IntoIter<(u64, Vec<u8>)>,
24    pub(crate) last_release: Instant,
25}
26
27/// Shared handle for reading layer event counters.
28///
29/// Returned by [`SamplingLayerBuilder::build`](crate::SamplingLayerBuilder::build).
30/// All counts are cumulative since layer creation.
31#[derive(Clone)]
32pub struct Stats {
33    received: std::sync::Arc<AtomicU64>,
34    sampled: std::sync::Arc<AtomicU64>,
35    dropped: std::sync::Arc<AtomicU64>,
36}
37
38impl Stats {
39    pub(crate) fn new() -> Self {
40        Self {
41            received: std::sync::Arc::new(AtomicU64::new(0)),
42            sampled: std::sync::Arc::new(AtomicU64::new(0)),
43            dropped: std::sync::Arc::new(AtomicU64::new(0)),
44        }
45    }
46
47    /// Events that matched at least one filter.
48    pub fn received(&self) -> u64 {
49        self.received.load(Ordering::Relaxed)
50    }
51
52    /// Events that were kept in a reservoir.
53    pub fn sampled(&self) -> u64 {
54        self.sampled.load(Ordering::Relaxed)
55    }
56
57    /// Events that were dropped after failing to enter any reservoir.
58    pub fn dropped(&self) -> u64 {
59        self.dropped.load(Ordering::Relaxed)
60    }
61}
62
63/// A [`tracing_subscriber::Layer`] that samples events into time-bucketed reservoirs.
64///
65/// Uses `tracing_subscriber::fmt::Layer` internally for event formatting.
66/// Sampled events are smeared across the bucket duration to reduce tail-latency
67/// spikes from burst writes.
68///
69/// Construct via [`SamplingLayer::builder()`](crate::SamplingLayerBuilder).
70pub struct SamplingLayer<
71    S,
72    N = DefaultFields,
73    E = Format<Full>,
74    W: for<'a> MakeWriter<'a> = fn() -> io::Stderr,
75> {
76    pub(crate) filters: Vec<EnvFilter>,
77    pub(crate) state: Mutex<State>,
78    pub(crate) bucket_duration: Duration,
79    pub(crate) writer: W,
80    pub(crate) fmt_layer: fmt::Layer<S, N, E, CaptureMakeWriter>,
81    pub(crate) stats: Stats,
82    pub(crate) _subscriber: PhantomData<fn(S)>,
83}
84
85impl<S, N, E, W: for<'a> MakeWriter<'a>> SamplingLayer<S, N, E, W> {
86    fn drain_all(state: &mut State) -> Vec<(u64, Vec<u8>)> {
87        let mut events: Vec<_> = state
88            .reservoirs
89            .iter_mut()
90            .flat_map(|r| r.drain())
91            .collect();
92        events.sort_unstable_by_key(|(seq, _)| *seq);
93        events
94    }
95
96    #[cold]
97    fn write_events(&self, events: &[(u64, Vec<u8>)]) {
98        if events.is_empty() {
99            return;
100        }
101        let mut writer = self.writer.make_writer();
102        for (_, buf) in events {
103            let _ = writer.write_all(buf);
104        }
105    }
106
107    fn smear_collect(
108        state: &mut State,
109        now: Instant,
110        bucket_duration: Duration,
111    ) -> Vec<(u64, Vec<u8>)> {
112        let n = state.pending.len();
113        if n == 0 {
114            return Vec::new();
115        }
116
117        let bucket_end = state.bucket_start + bucket_duration;
118        let remaining = bucket_end.saturating_duration_since(now);
119        let to_release = if remaining.is_zero() {
120            n
121        } else {
122            let interval = remaining / n as u32;
123            if interval.is_zero() {
124                n
125            } else {
126                let since_last = now.duration_since(state.last_release);
127                (since_last.as_nanos() / interval.as_nanos()) as usize
128            }
129        };
130
131        if to_release > 0 {
132            let batch: Vec<_> = state.pending.by_ref().take(to_release).collect();
133            state.last_release = now;
134            batch
135        } else {
136            Vec::new()
137        }
138    }
139
140    #[cold]
141    fn rotate_bucket(&self, state: &mut State, batch: &mut Vec<(u64, Vec<u8>)>, now: Instant) {
142        batch.extend(state.pending.by_ref());
143        let drained = Self::drain_all(state);
144        state.pending = drained.into_iter();
145        state.bucket_start = now;
146        state.last_release = now;
147    }
148
149    #[inline]
150    fn tick_smear(&self) {
151        let now = Instant::now();
152        let to_write = {
153            let mut state = self.state.lock().unwrap();
154            let mut batch = Self::smear_collect(&mut state, now, self.bucket_duration);
155            if now.duration_since(state.bucket_start) >= self.bucket_duration {
156                self.rotate_bucket(&mut state, &mut batch, now);
157            }
158            batch
159        };
160        self.write_events(&to_write);
161    }
162
163    #[inline]
164    fn match_filters<S2: Subscriber + for<'a> LookupSpan<'a>>(
165        &self,
166        meta: &Metadata<'_>,
167        ctx: &Context<'_, S2>,
168    ) -> u64 {
169        let mut matched: u64 = 0;
170        for (i, filter) in self.filters.iter().enumerate() {
171            if <EnvFilter as tracing_subscriber::Layer<S2>>::enabled(filter, meta, ctx.clone()) {
172                matched |= 1 << i;
173            }
174        }
175        matched
176    }
177
178    #[cold]
179    fn sample_event(&self, bytes: Vec<u8>, matched: u64) {
180        let mut state = self.state.lock().unwrap();
181        state.seq += 1;
182        let mut current = (state.seq, bytes);
183        for (i, reservoir) in state.reservoirs.iter_mut().enumerate() {
184            if matched & (1 << i) == 0 {
185                continue;
186            }
187            current = reservoir.sample(current);
188            if current.1.is_empty() {
189                self.stats.sampled.fetch_add(1, Ordering::Relaxed);
190                return;
191            }
192        }
193        self.stats.dropped.fetch_add(1, Ordering::Relaxed);
194        return_captured(&self.fmt_layer.writer().0, current.1);
195    }
196
197    /// Drain all reservoirs and write their contents immediately.
198    pub fn flush(&self) {
199        let (pending, drained) = {
200            let mut state = self.state.lock().unwrap();
201            let pending: Vec<_> = state.pending.by_ref().collect();
202            let drained = Self::drain_all(&mut state);
203            (pending, drained)
204        };
205        self.write_events(&pending);
206        self.write_events(&drained);
207    }
208}
209
210impl<S, N, E, W: for<'a> MakeWriter<'a>> Drop for SamplingLayer<S, N, E, W> {
211    fn drop(&mut self) {
212        if let Ok(mut state) = self.state.lock() {
213            let pending: Vec<_> = state.pending.by_ref().collect();
214            let drained = Self::drain_all(&mut state);
215            drop(state);
216            self.write_events(&pending);
217            self.write_events(&drained);
218        }
219    }
220}
221
222type FmtLayer<S, N, E> = fmt::Layer<S, N, E, CaptureMakeWriter>;
223
224impl<S, N, E, W> SamplingLayer<S, N, E, W>
225where
226    S: Subscriber + for<'a> LookupSpan<'a>,
227    N: for<'writer> FormatFields<'writer> + 'static,
228    E: fmt::FormatEvent<S, N> + 'static,
229    W: for<'a> MakeWriter<'a> + 'static,
230{
231    #[inline]
232    fn inner(&self) -> &FmtLayer<S, N, E> {
233        &self.fmt_layer
234    }
235
236    #[inline]
237    fn format_event(&self, event: &Event<'_>, ctx: Context<'_, S>) -> Vec<u8> {
238        self.inner().on_event(event, ctx);
239        take_captured(&self.inner().writer().0)
240    }
241}
242
243impl<S, N, E, W> tracing_subscriber::Layer<S> for SamplingLayer<S, N, E, W>
244where
245    S: Subscriber + for<'a> LookupSpan<'a>,
246    N: for<'writer> FormatFields<'writer> + 'static,
247    E: fmt::FormatEvent<S, N> + 'static,
248    W: for<'a> MakeWriter<'a> + 'static,
249{
250    fn register_callsite(&self, meta: &'static Metadata<'static>) -> Interest {
251        for filter in &self.filters {
252            let interest =
253                <EnvFilter as tracing_subscriber::Layer<S>>::register_callsite(filter, meta);
254            if interest.is_sometimes() || interest.is_always() {
255                return Interest::sometimes();
256            }
257        }
258        Interest::never()
259    }
260
261    fn enabled(&self, meta: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
262        self.filters.iter().any(|filter| {
263            <EnvFilter as tracing_subscriber::Layer<S>>::enabled(filter, meta, ctx.clone())
264        })
265    }
266
267    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
268        let matched = self.match_filters(event.metadata(), &ctx);
269        if matched == 0 {
270            return;
271        }
272
273        self.stats.received.fetch_add(1, Ordering::Relaxed);
274
275        self.tick_smear();
276
277        let bytes = self.format_event(event, ctx);
278        if bytes.is_empty() {
279            return;
280        }
281
282        self.sample_event(bytes, matched);
283    }
284
285    #[inline]
286    fn on_new_span(
287        &self,
288        attrs: &tracing::span::Attributes<'_>,
289        id: &tracing::span::Id,
290        ctx: Context<'_, S>,
291    ) {
292        self.inner().on_new_span(attrs, id, ctx);
293    }
294
295    #[inline]
296    fn on_record(
297        &self,
298        id: &tracing::span::Id,
299        values: &tracing::span::Record<'_>,
300        ctx: Context<'_, S>,
301    ) {
302        self.inner().on_record(id, values, ctx);
303    }
304
305    #[inline]
306    fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) {
307        self.inner().on_enter(id, ctx);
308    }
309
310    #[inline]
311    fn on_exit(&self, id: &tracing::span::Id, ctx: Context<'_, S>) {
312        self.inner().on_exit(id, ctx);
313    }
314
315    #[inline]
316    fn on_close(&self, id: tracing::span::Id, ctx: Context<'_, S>) {
317        self.inner().on_close(id, ctx);
318    }
319}