Skip to main content

wide_event/
layer.rs

1//! A `tracing_subscriber::Layer` that intercepts wide events, pulls the
2//! finalized record from the thread-local emit stack, formats the
3//! timestamp via [`FormatTime`], and serializes in a single pass.
4
5use std::cell::RefCell;
6use std::io::Write;
7use std::sync::Mutex;
8
9use tracing::Subscriber;
10use tracing_subscriber::fmt::format::Writer;
11use tracing_subscriber::fmt::time::FormatTime;
12use tracing_subscriber::layer::Context;
13use tracing_subscriber::registry::LookupSpan;
14use tracing_subscriber::Layer;
15
16use crate::format::{JsonFormatter, WideEventFormatter};
17use crate::{Rfc3339, DEFAULT_TARGET, EMIT_STACK};
18
19thread_local! {
20    static TIMESTAMP_BUF: RefCell<String> = const { RefCell::new(String::new()) };
21}
22
23/// A `tracing_subscriber::Layer` that writes wide event output.
24///
25/// Timestamp precision is controlled by the [`FormatTime`] implementation
26/// (default: [`Rfc3339`](crate::Rfc3339)). Swap it with [`with_timer`](Self::with_timer):
27///
28/// ```no_run
29/// use wide_event::WideEventLayer;
30/// use tracing_subscriber::fmt::time::Uptime;
31/// # use tracing_subscriber::prelude::*;
32///
33/// tracing_subscriber::registry()
34///     .with(WideEventLayer::stdout().with_timer(Uptime::default()))
35///     .init();
36/// ```
37pub struct WideEventLayer<W = std::io::Stdout, F = JsonFormatter, T = Rfc3339> {
38    writer: Mutex<W>,
39    formatter: F,
40    timer: T,
41    system: Option<&'static str>,
42    target_prefix: &'static str,
43}
44
45impl WideEventLayer<std::io::Stdout, JsonFormatter, Rfc3339> {
46    /// Create a layer that writes JSON wide events to stdout with
47    /// RFC 3339 timestamps.
48    #[must_use]
49    pub fn stdout() -> Self {
50        Self {
51            writer: Mutex::new(std::io::stdout()),
52            formatter: JsonFormatter,
53            timer: Rfc3339,
54            system: None,
55            target_prefix: DEFAULT_TARGET,
56        }
57    }
58}
59
60impl<W: Write + Send + 'static, F: WideEventFormatter> WideEventLayer<W, F, Rfc3339> {
61    /// Create a layer with a custom writer and formatter.
62    ///
63    /// Uses [`Rfc3339`](crate::Rfc3339) timestamps by default.
64    /// Call [`with_timer`](Self::with_timer) to change.
65    pub fn new(writer: W, formatter: F) -> Self {
66        Self {
67            writer: Mutex::new(writer),
68            formatter,
69            timer: Rfc3339,
70            system: None,
71            target_prefix: DEFAULT_TARGET,
72        }
73    }
74}
75
76impl<W: Write + Send + 'static, F: WideEventFormatter, T: FormatTime> WideEventLayer<W, F, T> {
77    /// Set a process-wide `"system"` field injected into every wide event.
78    #[must_use]
79    pub fn with_system(mut self, system: &'static str) -> Self {
80        self.system = Some(system);
81        self
82    }
83
84    /// Set the target prefix for wide event output.
85    ///
86    /// The actual tracing target is always `wide_event` (a macro
87    /// limitation), but the formatted output uses `{prefix}::{subsystem}`
88    /// as the subsystem identifier.
89    #[must_use]
90    pub fn with_target_prefix(mut self, prefix: &'static str) -> Self {
91        self.target_prefix = prefix;
92        self
93    }
94
95    /// Set the timestamp formatter.
96    ///
97    /// Accepts any [`FormatTime`] implementation from `tracing_subscriber::fmt::time`.
98    pub fn with_timer<T2: FormatTime>(self, timer: T2) -> WideEventLayer<W, F, T2> {
99        WideEventLayer {
100            writer: self.writer,
101            formatter: self.formatter,
102            timer,
103            system: self.system,
104            target_prefix: self.target_prefix,
105        }
106    }
107}
108
109impl<S, W, F, T> Layer<S> for WideEventLayer<W, F, T>
110where
111    S: Subscriber + for<'a> LookupSpan<'a>,
112    W: Write + Send + 'static,
113    F: WideEventFormatter + 'static,
114    T: FormatTime + Send + Sync + 'static,
115{
116    fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
117        // Match by the presence of wide_event_id field
118        let mut visitor = IdVisitor(None);
119        event.record(&mut visitor);
120        let Some(id) = visitor.0 else { return };
121
122        EMIT_STACK.with(|s| {
123            let s = s.borrow();
124            if let Some((_, record)) = s.iter().rev().find(|(rid, _)| *rid == id) {
125                TIMESTAMP_BUF.with(|buf| {
126                    let mut timestamp = buf.borrow_mut();
127                    timestamp.clear();
128                    let _ = self.timer.format_time(&mut Writer::new(&mut *timestamp));
129
130                    if let Ok(mut w) = self.writer.lock() {
131                        let _ =
132                            self.formatter
133                                .write_record(&mut *w, self.system, &timestamp, record);
134                        let _ = w.write_all(b"\n");
135                    }
136                });
137            }
138        });
139    }
140}
141
142/// Filter function that excludes wide events from other layers.
143///
144/// Uses the default target (`wide_event`).
145///
146/// ```no_run
147/// use wide_event::{WideEventLayer, exclude_wide_events};
148/// use tracing_subscriber::prelude::*;
149/// use tracing_subscriber::filter::filter_fn;
150///
151/// tracing_subscriber::registry()
152///     .with(WideEventLayer::stdout())
153///     .with(
154///         tracing_subscriber::fmt::layer()
155///             .with_filter(filter_fn(exclude_wide_events))
156///     )
157///     .init();
158/// ```
159#[must_use]
160pub fn exclude_wide_events(meta: &tracing::Metadata<'_>) -> bool {
161    meta.target() != DEFAULT_TARGET
162}
163
164struct IdVisitor(Option<u64>);
165
166impl tracing::field::Visit for IdVisitor {
167    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
168        if field.name() == "wide_event_id" {
169            self.0 = Some(value);
170        }
171    }
172
173    fn record_debug(&mut self, _: &tracing::field::Field, _: &dyn std::fmt::Debug) {}
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::WideEvent;
180    use std::sync::Arc;
181    use tracing_subscriber::prelude::*;
182
183    struct SharedBuf(Arc<Mutex<Vec<u8>>>);
184
185    impl Write for SharedBuf {
186        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
187            self.0.lock().unwrap().extend_from_slice(buf);
188            Ok(buf.len())
189        }
190        fn flush(&mut self) -> std::io::Result<()> {
191            Ok(())
192        }
193    }
194
195    #[test]
196    fn layer_captures_json() {
197        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
198
199        let layer =
200            WideEventLayer::new(SharedBuf(buf.clone()), JsonFormatter).with_system("testapp");
201        let subscriber = tracing_subscriber::registry().with(layer);
202
203        tracing::subscriber::with_default(subscriber, || {
204            let evt = WideEvent::new("ingress");
205            evt.set_str("hello", "world");
206            evt.emit();
207        });
208
209        let output = buf.lock().unwrap();
210        let parsed: serde_json::Value =
211            serde_json::from_str(std::str::from_utf8(&output).unwrap().trim()).unwrap();
212        assert_eq!(parsed["hello"], "world");
213        assert_eq!(parsed["system"], "testapp");
214        assert_eq!(parsed["subsystem"], "ingress");
215        // Timestamp is formatted by Rfc3339 timer
216        assert!(parsed["timestamp"].as_str().unwrap().contains('T'));
217    }
218
219    #[test]
220    fn layer_with_custom_timer() {
221        use tracing_subscriber::fmt::time::Uptime;
222
223        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
224
225        let layer = WideEventLayer::new(SharedBuf(buf.clone()), JsonFormatter)
226            .with_timer(Uptime::default());
227        let subscriber = tracing_subscriber::registry().with(layer);
228
229        tracing::subscriber::with_default(subscriber, || {
230            let evt = WideEvent::new("http");
231            evt.emit();
232        });
233
234        let output = buf.lock().unwrap();
235        let parsed: serde_json::Value =
236            serde_json::from_str(std::str::from_utf8(&output).unwrap().trim()).unwrap();
237        // Uptime timer produces elapsed time, not RFC 3339
238        let ts = parsed["timestamp"].as_str().unwrap();
239        assert!(!ts.contains('T'));
240    }
241
242    #[test]
243    fn layer_captures_logfmt() {
244        use crate::LogfmtFormatter;
245
246        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
247        let layer = WideEventLayer::new(SharedBuf(buf.clone()), LogfmtFormatter);
248        let subscriber = tracing_subscriber::registry().with(layer);
249
250        tracing::subscriber::with_default(subscriber, || {
251            let evt = WideEvent::new("http");
252            evt.set_str("method", "GET");
253            evt.emit();
254        });
255
256        let logfmt = String::from_utf8(buf.lock().unwrap().clone()).unwrap();
257        assert!(logfmt.contains("subsystem=http"));
258        assert!(logfmt.contains("method=GET"));
259    }
260
261    #[test]
262    fn non_wide_events_are_ignored() {
263        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
264        let layer = WideEventLayer::new(SharedBuf(buf.clone()), JsonFormatter);
265        let subscriber = tracing_subscriber::registry().with(layer);
266
267        tracing::subscriber::with_default(subscriber, || {
268            tracing::info!("normal log line");
269        });
270
271        assert!(buf.lock().unwrap().is_empty());
272    }
273
274    #[test]
275    fn guard_emits_through_layer() {
276        use crate::WideEventGuard;
277
278        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
279        let layer = WideEventLayer::new(SharedBuf(buf.clone()), JsonFormatter);
280        let subscriber = tracing_subscriber::registry().with(layer);
281
282        tracing::subscriber::with_default(subscriber, || {
283            let guard = WideEventGuard::new("http");
284            guard.set_str("path", "/api");
285            drop(guard);
286        });
287
288        let output = buf.lock().unwrap();
289        let parsed: serde_json::Value =
290            serde_json::from_str(std::str::from_utf8(&output).unwrap().trim()).unwrap();
291        assert_eq!(parsed["path"], "/api");
292        assert_eq!(parsed["subsystem"], "http");
293    }
294
295    #[test]
296    fn emit_hook_sees_duration_ns() {
297        let seen_duration = Arc::new(std::sync::atomic::AtomicBool::new(false));
298        let seen_clone = seen_duration.clone();
299
300        let evt = WideEvent::new("test");
301        evt.set_emit_hook(Arc::new(move |fields| {
302            if fields.contains_key("duration_ns") {
303                seen_clone.store(true, std::sync::atomic::Ordering::SeqCst);
304            }
305        }));
306        evt.emit();
307
308        assert!(seen_duration.load(std::sync::atomic::Ordering::SeqCst));
309    }
310}