1use std::io::{self, Write};
2use std::marker::PhantomData;
3use std::sync::Mutex;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::time::{Duration, Instant};
6
7use tracing::subscriber::Interest;
8use tracing::{Event, Metadata, Subscriber};
9use tracing_subscriber::Layer;
10use tracing_subscriber::filter::EnvFilter;
11use tracing_subscriber::fmt::format::{DefaultFields, Format, Full};
12use tracing_subscriber::fmt::{self, FormatFields, MakeWriter};
13use tracing_subscriber::layer::Context;
14use tracing_subscriber::registry::LookupSpan;
15
16use crate::capture::{CaptureMakeWriter, return_captured, take_captured};
17use crate::reservoir::Reservoir;
18
19pub(crate) struct State {
20 pub(crate) bucket_start: Instant,
21 pub(crate) seq: u64,
22 pub(crate) reservoirs: Vec<Reservoir<(u64, Vec<u8>)>>,
23 pub(crate) pending: std::vec::IntoIter<(u64, Vec<u8>)>,
24 pub(crate) last_release: Instant,
25}
26
27#[derive(Clone)]
32pub struct Stats {
33 received: std::sync::Arc<AtomicU64>,
34 sampled: std::sync::Arc<AtomicU64>,
35 dropped: std::sync::Arc<AtomicU64>,
36}
37
38impl Stats {
39 pub(crate) fn new() -> Self {
40 Self {
41 received: std::sync::Arc::new(AtomicU64::new(0)),
42 sampled: std::sync::Arc::new(AtomicU64::new(0)),
43 dropped: std::sync::Arc::new(AtomicU64::new(0)),
44 }
45 }
46
47 pub fn received(&self) -> u64 {
49 self.received.load(Ordering::Relaxed)
50 }
51
52 pub fn sampled(&self) -> u64 {
54 self.sampled.load(Ordering::Relaxed)
55 }
56
57 pub fn dropped(&self) -> u64 {
59 self.dropped.load(Ordering::Relaxed)
60 }
61}
62
63pub struct SamplingLayer<
71 S,
72 N = DefaultFields,
73 E = Format<Full>,
74 W: for<'a> MakeWriter<'a> = fn() -> io::Stderr,
75> {
76 pub(crate) filters: Vec<EnvFilter>,
77 pub(crate) state: Mutex<State>,
78 pub(crate) bucket_duration: Duration,
79 pub(crate) writer: W,
80 pub(crate) fmt_layer: fmt::Layer<S, N, E, CaptureMakeWriter>,
81 pub(crate) stats: Stats,
82 pub(crate) _subscriber: PhantomData<fn(S)>,
83}
84
85impl<S, N, E, W: for<'a> MakeWriter<'a>> SamplingLayer<S, N, E, W> {
86 fn drain_all(state: &mut State) -> Vec<(u64, Vec<u8>)> {
87 let mut events: Vec<_> = state
88 .reservoirs
89 .iter_mut()
90 .flat_map(|r| r.drain())
91 .collect();
92 events.sort_unstable_by_key(|(seq, _)| *seq);
93 events
94 }
95
96 #[cold]
97 fn write_events(&self, events: &[(u64, Vec<u8>)]) {
98 if events.is_empty() {
99 return;
100 }
101 let mut writer = self.writer.make_writer();
102 for (_, buf) in events {
103 let _ = writer.write_all(buf);
104 }
105 }
106
107 fn smear_collect(
108 state: &mut State,
109 now: Instant,
110 bucket_duration: Duration,
111 ) -> Vec<(u64, Vec<u8>)> {
112 let n = state.pending.len();
113 if n == 0 {
114 return Vec::new();
115 }
116
117 let bucket_end = state.bucket_start + bucket_duration;
118 let remaining = bucket_end.saturating_duration_since(now);
119 let to_release = if remaining.is_zero() {
120 n
121 } else {
122 let interval = remaining / n as u32;
123 if interval.is_zero() {
124 n
125 } else {
126 let since_last = now.duration_since(state.last_release);
127 (since_last.as_nanos() / interval.as_nanos()) as usize
128 }
129 };
130
131 if to_release > 0 {
132 let batch: Vec<_> = state.pending.by_ref().take(to_release).collect();
133 state.last_release = now;
134 batch
135 } else {
136 Vec::new()
137 }
138 }
139
140 #[cold]
141 fn rotate_bucket(&self, state: &mut State, batch: &mut Vec<(u64, Vec<u8>)>, now: Instant) {
142 batch.extend(state.pending.by_ref());
143 let drained = Self::drain_all(state);
144 state.pending = drained.into_iter();
145 state.bucket_start = now;
146 state.last_release = now;
147 }
148
149 #[inline]
150 fn tick_smear(&self) {
151 let now = Instant::now();
152 let to_write = {
153 let mut state = self.state.lock().unwrap();
154 let mut batch = Self::smear_collect(&mut state, now, self.bucket_duration);
155 if now.duration_since(state.bucket_start) >= self.bucket_duration {
156 self.rotate_bucket(&mut state, &mut batch, now);
157 }
158 batch
159 };
160 self.write_events(&to_write);
161 }
162
163 #[inline]
164 fn match_filters<S2: Subscriber + for<'a> LookupSpan<'a>>(
165 &self,
166 meta: &Metadata<'_>,
167 ctx: &Context<'_, S2>,
168 ) -> u64 {
169 let mut matched: u64 = 0;
170 for (i, filter) in self.filters.iter().enumerate() {
171 if <EnvFilter as tracing_subscriber::Layer<S2>>::enabled(filter, meta, ctx.clone()) {
172 matched |= 1 << i;
173 }
174 }
175 matched
176 }
177
178 #[cold]
179 fn sample_event(&self, bytes: Vec<u8>, matched: u64) {
180 let mut state = self.state.lock().unwrap();
181 state.seq += 1;
182 let mut current = (state.seq, bytes);
183 for (i, reservoir) in state.reservoirs.iter_mut().enumerate() {
184 if matched & (1 << i) == 0 {
185 continue;
186 }
187 current = reservoir.sample(current);
188 if current.1.is_empty() {
189 self.stats.sampled.fetch_add(1, Ordering::Relaxed);
190 return;
191 }
192 }
193 self.stats.dropped.fetch_add(1, Ordering::Relaxed);
194 return_captured(&self.fmt_layer.writer().0, current.1);
195 }
196
197 pub fn flush(&self) {
199 let (pending, drained) = {
200 let mut state = self.state.lock().unwrap();
201 let pending: Vec<_> = state.pending.by_ref().collect();
202 let drained = Self::drain_all(&mut state);
203 (pending, drained)
204 };
205 self.write_events(&pending);
206 self.write_events(&drained);
207 }
208}
209
210impl<S, N, E, W: for<'a> MakeWriter<'a>> Drop for SamplingLayer<S, N, E, W> {
211 fn drop(&mut self) {
212 if let Ok(mut state) = self.state.lock() {
213 let pending: Vec<_> = state.pending.by_ref().collect();
214 let drained = Self::drain_all(&mut state);
215 drop(state);
216 self.write_events(&pending);
217 self.write_events(&drained);
218 }
219 }
220}
221
222type FmtLayer<S, N, E> = fmt::Layer<S, N, E, CaptureMakeWriter>;
223
224impl<S, N, E, W> SamplingLayer<S, N, E, W>
225where
226 S: Subscriber + for<'a> LookupSpan<'a>,
227 N: for<'writer> FormatFields<'writer> + 'static,
228 E: fmt::FormatEvent<S, N> + 'static,
229 W: for<'a> MakeWriter<'a> + 'static,
230{
231 #[inline]
232 fn inner(&self) -> &FmtLayer<S, N, E> {
233 &self.fmt_layer
234 }
235
236 #[inline]
237 fn format_event(&self, event: &Event<'_>, ctx: Context<'_, S>) -> Vec<u8> {
238 self.inner().on_event(event, ctx);
239 take_captured(&self.inner().writer().0)
240 }
241}
242
243impl<S, N, E, W> tracing_subscriber::Layer<S> for SamplingLayer<S, N, E, W>
244where
245 S: Subscriber + for<'a> LookupSpan<'a>,
246 N: for<'writer> FormatFields<'writer> + 'static,
247 E: fmt::FormatEvent<S, N> + 'static,
248 W: for<'a> MakeWriter<'a> + 'static,
249{
250 fn register_callsite(&self, meta: &'static Metadata<'static>) -> Interest {
251 for filter in &self.filters {
252 let interest =
253 <EnvFilter as tracing_subscriber::Layer<S>>::register_callsite(filter, meta);
254 if interest.is_sometimes() || interest.is_always() {
255 return Interest::sometimes();
256 }
257 }
258 Interest::never()
259 }
260
261 fn enabled(&self, meta: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
262 self.filters.iter().any(|filter| {
263 <EnvFilter as tracing_subscriber::Layer<S>>::enabled(filter, meta, ctx.clone())
264 })
265 }
266
267 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
268 let matched = self.match_filters(event.metadata(), &ctx);
269 if matched == 0 {
270 return;
271 }
272
273 self.stats.received.fetch_add(1, Ordering::Relaxed);
274
275 self.tick_smear();
276
277 let bytes = self.format_event(event, ctx);
278 if bytes.is_empty() {
279 return;
280 }
281
282 self.sample_event(bytes, matched);
283 }
284
285 #[inline]
286 fn on_new_span(
287 &self,
288 attrs: &tracing::span::Attributes<'_>,
289 id: &tracing::span::Id,
290 ctx: Context<'_, S>,
291 ) {
292 self.inner().on_new_span(attrs, id, ctx);
293 }
294
295 #[inline]
296 fn on_record(
297 &self,
298 id: &tracing::span::Id,
299 values: &tracing::span::Record<'_>,
300 ctx: Context<'_, S>,
301 ) {
302 self.inner().on_record(id, values, ctx);
303 }
304
305 #[inline]
306 fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) {
307 self.inner().on_enter(id, ctx);
308 }
309
310 #[inline]
311 fn on_exit(&self, id: &tracing::span::Id, ctx: Context<'_, S>) {
312 self.inner().on_exit(id, ctx);
313 }
314
315 #[inline]
316 fn on_close(&self, id: tracing::span::Id, ctx: Context<'_, S>) {
317 self.inner().on_close(id, ctx);
318 }
319}