1use std::cell::RefCell;
6use std::io::Write;
7use std::sync::Mutex;
8
9use tracing::Subscriber;
10use tracing_subscriber::fmt::format::Writer;
11use tracing_subscriber::fmt::time::FormatTime;
12use tracing_subscriber::layer::Context;
13use tracing_subscriber::registry::LookupSpan;
14use tracing_subscriber::Layer;
15
16use crate::format::{JsonFormatter, WideEventFormatter};
17use crate::{Rfc3339, DEFAULT_TARGET, EMIT_STACK};
18
19thread_local! {
20 static TIMESTAMP_BUF: RefCell<String> = const { RefCell::new(String::new()) };
21}
22
23pub struct WideEventLayer<W = std::io::Stdout, F = JsonFormatter, T = Rfc3339> {
38 writer: Mutex<W>,
39 formatter: F,
40 timer: T,
41 system: Option<&'static str>,
42 target_prefix: &'static str,
43}
44
45impl WideEventLayer<std::io::Stdout, JsonFormatter, Rfc3339> {
46 #[must_use]
49 pub fn stdout() -> Self {
50 Self {
51 writer: Mutex::new(std::io::stdout()),
52 formatter: JsonFormatter,
53 timer: Rfc3339,
54 system: None,
55 target_prefix: DEFAULT_TARGET,
56 }
57 }
58}
59
60impl<W: Write + Send + 'static, F: WideEventFormatter> WideEventLayer<W, F, Rfc3339> {
61 pub fn new(writer: W, formatter: F) -> Self {
66 Self {
67 writer: Mutex::new(writer),
68 formatter,
69 timer: Rfc3339,
70 system: None,
71 target_prefix: DEFAULT_TARGET,
72 }
73 }
74}
75
76impl<W: Write + Send + 'static, F: WideEventFormatter, T: FormatTime> WideEventLayer<W, F, T> {
77 #[must_use]
79 pub fn with_system(mut self, system: &'static str) -> Self {
80 self.system = Some(system);
81 self
82 }
83
84 #[must_use]
90 pub fn with_target_prefix(mut self, prefix: &'static str) -> Self {
91 self.target_prefix = prefix;
92 self
93 }
94
95 pub fn with_timer<T2: FormatTime>(self, timer: T2) -> WideEventLayer<W, F, T2> {
99 WideEventLayer {
100 writer: self.writer,
101 formatter: self.formatter,
102 timer,
103 system: self.system,
104 target_prefix: self.target_prefix,
105 }
106 }
107}
108
109impl<S, W, F, T> Layer<S> for WideEventLayer<W, F, T>
110where
111 S: Subscriber + for<'a> LookupSpan<'a>,
112 W: Write + Send + 'static,
113 F: WideEventFormatter + 'static,
114 T: FormatTime + Send + Sync + 'static,
115{
116 fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
117 let mut visitor = IdVisitor(None);
119 event.record(&mut visitor);
120 let Some(id) = visitor.0 else { return };
121
122 EMIT_STACK.with(|s| {
123 let s = s.borrow();
124 if let Some((_, record)) = s.iter().rev().find(|(rid, _)| *rid == id) {
125 TIMESTAMP_BUF.with(|buf| {
126 let mut timestamp = buf.borrow_mut();
127 timestamp.clear();
128 let _ = self.timer.format_time(&mut Writer::new(&mut *timestamp));
129
130 if let Ok(mut w) = self.writer.lock() {
131 let _ =
132 self.formatter
133 .write_record(&mut *w, self.system, ×tamp, record);
134 let _ = w.write_all(b"\n");
135 }
136 });
137 }
138 });
139 }
140}
141
142#[must_use]
160pub fn exclude_wide_events(meta: &tracing::Metadata<'_>) -> bool {
161 meta.target() != DEFAULT_TARGET
162}
163
164struct IdVisitor(Option<u64>);
165
166impl tracing::field::Visit for IdVisitor {
167 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
168 if field.name() == "wide_event_id" {
169 self.0 = Some(value);
170 }
171 }
172
173 fn record_debug(&mut self, _: &tracing::field::Field, _: &dyn std::fmt::Debug) {}
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::WideEvent;
180 use std::sync::Arc;
181 use tracing_subscriber::prelude::*;
182
183 struct SharedBuf(Arc<Mutex<Vec<u8>>>);
184
185 impl Write for SharedBuf {
186 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
187 self.0.lock().unwrap().extend_from_slice(buf);
188 Ok(buf.len())
189 }
190 fn flush(&mut self) -> std::io::Result<()> {
191 Ok(())
192 }
193 }
194
195 #[test]
196 fn layer_captures_json() {
197 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
198
199 let layer =
200 WideEventLayer::new(SharedBuf(buf.clone()), JsonFormatter).with_system("testapp");
201 let subscriber = tracing_subscriber::registry().with(layer);
202
203 tracing::subscriber::with_default(subscriber, || {
204 let evt = WideEvent::new("ingress");
205 evt.set_str("hello", "world");
206 evt.emit();
207 });
208
209 let output = buf.lock().unwrap();
210 let parsed: serde_json::Value =
211 serde_json::from_str(std::str::from_utf8(&output).unwrap().trim()).unwrap();
212 assert_eq!(parsed["hello"], "world");
213 assert_eq!(parsed["system"], "testapp");
214 assert_eq!(parsed["subsystem"], "ingress");
215 assert!(parsed["timestamp"].as_str().unwrap().contains('T'));
217 }
218
219 #[test]
220 fn layer_with_custom_timer() {
221 use tracing_subscriber::fmt::time::Uptime;
222
223 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
224
225 let layer = WideEventLayer::new(SharedBuf(buf.clone()), JsonFormatter)
226 .with_timer(Uptime::default());
227 let subscriber = tracing_subscriber::registry().with(layer);
228
229 tracing::subscriber::with_default(subscriber, || {
230 let evt = WideEvent::new("http");
231 evt.emit();
232 });
233
234 let output = buf.lock().unwrap();
235 let parsed: serde_json::Value =
236 serde_json::from_str(std::str::from_utf8(&output).unwrap().trim()).unwrap();
237 let ts = parsed["timestamp"].as_str().unwrap();
239 assert!(!ts.contains('T'));
240 }
241
242 #[test]
243 fn layer_captures_logfmt() {
244 use crate::LogfmtFormatter;
245
246 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
247 let layer = WideEventLayer::new(SharedBuf(buf.clone()), LogfmtFormatter);
248 let subscriber = tracing_subscriber::registry().with(layer);
249
250 tracing::subscriber::with_default(subscriber, || {
251 let evt = WideEvent::new("http");
252 evt.set_str("method", "GET");
253 evt.emit();
254 });
255
256 let logfmt = String::from_utf8(buf.lock().unwrap().clone()).unwrap();
257 assert!(logfmt.contains("subsystem=http"));
258 assert!(logfmt.contains("method=GET"));
259 }
260
261 #[test]
262 fn non_wide_events_are_ignored() {
263 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
264 let layer = WideEventLayer::new(SharedBuf(buf.clone()), JsonFormatter);
265 let subscriber = tracing_subscriber::registry().with(layer);
266
267 tracing::subscriber::with_default(subscriber, || {
268 tracing::info!("normal log line");
269 });
270
271 assert!(buf.lock().unwrap().is_empty());
272 }
273
274 #[test]
275 fn guard_emits_through_layer() {
276 use crate::WideEventGuard;
277
278 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
279 let layer = WideEventLayer::new(SharedBuf(buf.clone()), JsonFormatter);
280 let subscriber = tracing_subscriber::registry().with(layer);
281
282 tracing::subscriber::with_default(subscriber, || {
283 let guard = WideEventGuard::new("http");
284 guard.set_str("path", "/api");
285 drop(guard);
286 });
287
288 let output = buf.lock().unwrap();
289 let parsed: serde_json::Value =
290 serde_json::from_str(std::str::from_utf8(&output).unwrap().trim()).unwrap();
291 assert_eq!(parsed["path"], "/api");
292 assert_eq!(parsed["subsystem"], "http");
293 }
294
295 #[test]
296 fn emit_hook_sees_duration_ns() {
297 let seen_duration = Arc::new(std::sync::atomic::AtomicBool::new(false));
298 let seen_clone = seen_duration.clone();
299
300 let evt = WideEvent::new("test");
301 evt.set_emit_hook(Arc::new(move |fields| {
302 if fields.contains_key("duration_ns") {
303 seen_clone.store(true, std::sync::atomic::Ordering::SeqCst);
304 }
305 }));
306 evt.emit();
307
308 assert!(seen_duration.load(std::sync::atomic::Ordering::SeqCst));
309 }
310}