1#![warn(clippy::pedantic, missing_docs)]
67
68mod format;
69mod layer;
70
71#[cfg(feature = "tokio")]
72pub mod context;
73
74pub use format::{JsonFormatter, LogfmtFormatter, WideEventFormatter};
75pub use layer::{exclude_wide_events, WideEventLayer};
76pub use tracing_subscriber::fmt::time::FormatTime;
77
78use std::cell::RefCell;
79use std::collections::HashMap;
80use std::sync::atomic::{AtomicU64, Ordering};
81use std::sync::{Arc, Mutex};
82use std::time::Instant;
83
84use serde_json::Value;
85use tracing_subscriber::fmt::format::Writer;
86
87pub const DEFAULT_TARGET: &str = "wide_event";
89
90pub type EmitHook = Arc<dyn Fn(&HashMap<&'static str, Value>) + Send + Sync>;
92
93static NEXT_ID: AtomicU64 = AtomicU64::new(1);
94
95thread_local! {
96 pub(crate) static EMIT_STACK: RefCell<Vec<(u64, WideEventRecord)>> =
97 const { RefCell::new(Vec::new()) };
98}
99
100pub struct Rfc3339;
104
105impl FormatTime for Rfc3339 {
106 fn format_time(&self, w: &mut Writer<'_>) -> std::fmt::Result {
107 write!(
108 w,
109 "{}",
110 humantime::format_rfc3339_micros(std::time::SystemTime::now())
111 )
112 }
113}
114
115pub struct WideEventRecord {
120 pub subsystem: &'static str,
122 pub duration: std::time::Duration,
124 pub fields: HashMap<&'static str, Value>,
126 pub trace_id: Option<String>,
128 pub span_id: Option<String>,
130}
131
132pub struct WideEvent {
137 subsystem: &'static str,
138 inner: Mutex<WideEventInner>,
139}
140
141struct WideEventInner {
142 fields: HashMap<&'static str, Value>,
143 start: Instant,
144 emit_hook: Option<EmitHook>,
145 emitted: bool,
146}
147
148impl std::fmt::Debug for WideEvent {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("WideEvent")
151 .field("subsystem", &self.subsystem)
152 .finish_non_exhaustive()
153 }
154}
155
156#[allow(
157 clippy::missing_panics_doc,
158 reason = "all panics are from Mutex::lock which only panics if poisoned"
159)]
160impl WideEvent {
161 #[must_use]
166 pub fn new(subsystem: &'static str) -> Self {
167 Self {
168 subsystem,
169 inner: Mutex::new(WideEventInner {
170 fields: HashMap::with_capacity(24),
171 start: Instant::now(),
172 emit_hook: None,
173 emitted: false,
174 }),
175 }
176 }
177
178 pub fn set_emit_hook(&self, hook: EmitHook) {
180 self.inner.lock().unwrap().emit_hook = Some(hook);
181 }
182
183 pub fn has_key(&self, key: &str) -> bool {
185 self.inner.lock().unwrap().fields.contains_key(key)
186 }
187
188 pub fn set_str(&self, key: &'static str, val: &str) {
190 self.inner
191 .lock()
192 .unwrap()
193 .fields
194 .insert(key, Value::String(val.to_string()));
195 }
196
197 pub fn set_string(&self, key: &'static str, val: String) {
199 self.inner
200 .lock()
201 .unwrap()
202 .fields
203 .insert(key, Value::String(val));
204 }
205
206 pub fn set_i64(&self, key: &'static str, val: i64) {
208 self.inner
209 .lock()
210 .unwrap()
211 .fields
212 .insert(key, Value::Number(val.into()));
213 }
214
215 pub fn set_u64(&self, key: &'static str, val: u64) {
217 self.inner
218 .lock()
219 .unwrap()
220 .fields
221 .insert(key, Value::Number(val.into()));
222 }
223
224 pub fn set_f64(&self, key: &'static str, val: f64) {
226 self.inner.lock().unwrap().fields.insert(
227 key,
228 serde_json::Number::from_f64(val).map_or(Value::Null, Value::Number),
229 );
230 }
231
232 pub fn set_bool(&self, key: &'static str, val: bool) {
234 self.inner
235 .lock()
236 .unwrap()
237 .fields
238 .insert(key, Value::Bool(val));
239 }
240
241 pub fn set_value(&self, key: &'static str, val: Value) {
243 self.inner.lock().unwrap().fields.insert(key, val);
244 }
245
246 pub fn incr(&self, key: &'static str) {
248 let mut inner = self.inner.lock().unwrap();
249 let entry = inner.fields.entry(key).or_insert(Value::Number(0.into()));
250 if let Some(n) = entry.as_i64() {
251 *entry = Value::Number((n + 1).into());
252 }
253 }
254
255 pub fn set_error(&self, err_type: &str, message: &str) {
257 self.set_bool("error", true);
258 self.set_str("error.type", err_type);
259 self.set_str("error.message", message);
260 }
261
262 pub fn emit(&self) {
270 let Some(record) = self.finalize() else {
271 return;
272 };
273 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
274 let subsystem = self.subsystem;
275
276 EMIT_STACK.with(|s| s.borrow_mut().push((id, record)));
277
278 tracing::info!(
281 target: "wide_event",
282 wide_event_id = id,
283 subsystem = subsystem,
284 );
285
286 EMIT_STACK.with(|s| {
287 s.borrow_mut().pop();
288 });
289 }
290
291 #[allow(
292 clippy::items_after_statements,
293 reason = "cfg-gated inner fn for conditional compilation is idiomatic"
294 )]
295 fn finalize(&self) -> Option<WideEventRecord> {
296 let mut inner = self.inner.lock().unwrap();
297 if inner.emitted {
298 return None;
299 }
300 inner.emitted = true;
301
302 let duration = inner.start.elapsed();
303
304 #[allow(
306 clippy::cast_possible_truncation,
307 reason = "duration_ns fits in u64 for any practical duration"
308 )]
309 inner.fields.insert(
310 "duration_ns",
311 Value::Number((duration.as_nanos() as u64).into()),
312 );
313
314 let mut fields = std::mem::take(&mut inner.fields);
315
316 if let Some(ref hook) = inner.emit_hook {
317 hook(&fields);
318 }
319
320 fields.remove("duration_ns");
323
324 let (trace_id, span_id) = otel_context();
325
326 #[cfg(feature = "opentelemetry")]
327 fn otel_context() -> (Option<String>, Option<String>) {
328 use opentelemetry::trace::TraceContextExt;
329 use tracing_opentelemetry::OpenTelemetrySpanExt;
330
331 let span = tracing::Span::current();
332 let cx = span.context();
333 let sc = cx.span().span_context().clone();
334 if sc.is_valid() {
335 (
336 Some(format!("{:032x}", sc.trace_id())), Some(format!("{:016x}", sc.span_id())), )
339 } else {
340 (None, None)
341 }
342 }
343
344 #[cfg(not(feature = "opentelemetry"))]
345 fn otel_context() -> (Option<String>, Option<String>) {
346 (None, None)
347 }
348
349 Some(WideEventRecord {
350 subsystem: self.subsystem,
351 duration,
352 fields,
353 trace_id,
354 span_id,
355 })
356 }
357}
358
359pub struct WideEventGuard(WideEvent);
371
372impl WideEventGuard {
373 #[must_use]
375 pub fn new(subsystem: &'static str) -> Self {
376 Self(WideEvent::new(subsystem))
377 }
378}
379
380impl Drop for WideEventGuard {
381 fn drop(&mut self) {
382 self.0.emit();
383 }
384}
385
386impl std::ops::Deref for WideEventGuard {
387 type Target = WideEvent;
388 fn deref(&self) -> &WideEvent {
389 &self.0
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396 use std::sync::atomic::{AtomicBool, Ordering};
397
398 #[test]
399 fn set_and_has_key() {
400 let evt = WideEvent::new("test");
401 assert!(!evt.has_key("foo"));
402 evt.set_str("foo", "bar");
403 assert!(evt.has_key("foo"));
404 }
405
406 #[test]
407 fn incr_creates_and_increments() {
408 let evt = WideEvent::new("test");
409 evt.incr("counter");
410 evt.incr("counter");
411 evt.incr("counter");
412 let inner = evt.inner.lock().unwrap();
413 assert_eq!(inner.fields["counter"], Value::Number(3.into()));
414 }
415
416 #[test]
417 fn set_error_sets_three_fields() {
418 let evt = WideEvent::new("test");
419 evt.set_error("timeout", "connection timed out");
420 let inner = evt.inner.lock().unwrap();
421 assert_eq!(inner.fields["error"], Value::Bool(true));
422 assert_eq!(
423 inner.fields["error.type"],
424 Value::String("timeout".to_string())
425 );
426 }
427
428 #[test]
429 fn double_emit_is_noop() {
430 let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
431 let count_clone = count.clone();
432
433 let evt = WideEvent::new("test");
434 evt.set_emit_hook(Arc::new(move |_| {
435 count_clone.fetch_add(1, Ordering::SeqCst);
436 }));
437 evt.emit();
438 evt.emit();
439 assert_eq!(count.load(Ordering::SeqCst), 1);
440 }
441
442 #[test]
443 fn finalize_moves_fields() {
444 let evt = WideEvent::new("test");
445 evt.set_str("key", "value");
446
447 let record = evt.finalize().unwrap();
448 assert_eq!(record.fields["key"], Value::String("value".to_string()));
449 assert_eq!(record.subsystem, "test");
450 assert!(evt.finalize().is_none());
451 }
452
453 #[test]
454 fn guard_auto_emits() {
455 let emitted = Arc::new(AtomicBool::new(false));
456 let emitted_clone = emitted.clone();
457
458 {
459 let guard = WideEventGuard::new("test");
460 guard.set_emit_hook(Arc::new(move |_| {
461 emitted_clone.store(true, Ordering::SeqCst);
462 }));
463 guard.set_str("method", "GET");
464 }
465
466 assert!(emitted.load(Ordering::SeqCst));
467 }
468
469 #[test]
470 fn guard_deref() {
471 let guard = WideEventGuard::new("http");
472 guard.set_str("path", "/api");
473 assert!(guard.has_key("path"));
474 guard.emit(); }
476
477 #[test]
478 fn rfc3339_timer() {
479 let mut buf = String::new();
480 Rfc3339.format_time(&mut Writer::new(&mut buf)).unwrap();
481 assert!(buf.contains('T'));
482 assert!(buf.ends_with('Z'));
483 }
484}