Skip to main content

wide_event/
lib.rs

1//! # wide-event
2//!
3//! Honeycomb-style wide events for Rust.
4//!
5//! A wide event accumulates key-value pairs throughout a request (or task)
6//! lifecycle and emits them as a **single structured event** when the
7//! request completes. This gives you one row per request in your log
8//! aggregator with every dimension attached — perfect for high-cardinality
9//! exploratory analysis.
10//!
11//! ## Quick start
12//!
13//! ```no_run
14//! use wide_event::{WideEventGuard, WideEventLayer};
15//! use tracing_subscriber::prelude::*;
16//!
17//! // Once at startup:
18//! tracing_subscriber::registry()
19//!     .with(WideEventLayer::stdout().with_system("myapp"))
20//!     .init();
21//!
22//! // Per request — guard auto-emits on drop:
23//! {
24//!     let req = WideEventGuard::new("http");
25//!     req.set_str("method", "GET");
26//!     req.set_str("path", "/api/users");
27//!     req.set_u64("status", 200);
28//! } // ← emitted here as single JSON line
29//! ```
30//!
31//! ## How it works
32//!
33//! 1. [`WideEvent::new`] starts a timer and creates an empty field map.
34//! 2. Throughout processing, call setters (`set_str`, `set_u64`, `incr`,
35//!    etc.) to accumulate fields — these are cheap local `Mutex` operations
36//!    that never touch the tracing subscriber.
37//! 3. [`WideEvent::emit`] (or [`WideEventGuard`] drop) finalizes the record,
38//!    pushes it to a thread-local stack, and dispatches a structured
39//!    `tracing::info!` event. The [`WideEventLayer`] pulls the record
40//!    from the stack, formats the timestamp using the configured
41//!    [`FormatTime`] implementation, and serializes in a single pass.
42//!
43//! ## Timestamp control
44//!
45//! Timestamps are formatted by a [`FormatTime`] implementation configured
46//! on the layer. The default is [`Rfc3339`] (e.g. `2024-01-15T14:30:00.123Z`).
47//! Swap it for any `tracing_subscriber::fmt::time` implementation:
48//!
49//! ```no_run
50//! use wide_event::WideEventLayer;
51//! use tracing_subscriber::fmt::time::Uptime;
52//! # use tracing_subscriber::prelude::*;
53//!
54//! tracing_subscriber::registry()
55//!     .with(WideEventLayer::stdout().with_timer(Uptime::default()))
56//!     .init();
57//! ```
58//!
59//! ## Features
60//!
61//! - **`opentelemetry`** — attaches `trace_id` and `span_id` from the
62//!   current OpenTelemetry span context.
63//! - **`tokio`** — provides [`context::scope`] and [`context::current`]
64//!   for async task-local wide event propagation.
65
66#![warn(clippy::pedantic, missing_docs)]
67
68mod format;
69mod layer;
70
71#[cfg(feature = "tokio")]
72pub mod context;
73
74pub use format::{JsonFormatter, LogfmtFormatter, WideEventFormatter};
75pub use layer::{exclude_wide_events, WideEventLayer};
76pub use tracing_subscriber::fmt::time::FormatTime;
77
78use std::cell::RefCell;
79use std::collections::HashMap;
80use std::sync::atomic::{AtomicU64, Ordering};
81use std::sync::{Arc, Mutex};
82use std::time::Instant;
83
84use serde_json::Value;
85use tracing_subscriber::fmt::format::Writer;
86
87/// Default target used for wide event tracing dispatches.
88pub const DEFAULT_TARGET: &str = "wide_event";
89
90/// Callback invoked just before serialization with the accumulated fields.
91pub type EmitHook = Arc<dyn Fn(&HashMap<&'static str, Value>) + Send + Sync>;
92
93static NEXT_ID: AtomicU64 = AtomicU64::new(1);
94
95thread_local! {
96    pub(crate) static EMIT_STACK: RefCell<Vec<(u64, WideEventRecord)>> =
97        const { RefCell::new(Vec::new()) };
98}
99
100/// Default timer: RFC 3339 timestamps with microsecond precision.
101///
102/// Produces timestamps like `2024-01-15T14:30:00.123456Z`.
103pub struct Rfc3339;
104
105impl FormatTime for Rfc3339 {
106    fn format_time(&self, w: &mut Writer<'_>) -> std::fmt::Result {
107        write!(
108            w,
109            "{}",
110            humantime::format_rfc3339_micros(std::time::SystemTime::now())
111        )
112    }
113}
114
115/// A finalized wide event record ready for formatting.
116///
117/// Contains the accumulated fields and timing data. The timestamp
118/// is formatted separately by the layer's [`FormatTime`] implementation.
119pub struct WideEventRecord {
120    /// The subsystem label (e.g. `"http"`, `"grpc"`).
121    pub subsystem: &'static str,
122    /// Wall-clock duration from event creation to emit.
123    pub duration: std::time::Duration,
124    /// Accumulated key-value fields.
125    pub fields: HashMap<&'static str, Value>,
126    /// OpenTelemetry trace ID, if the `opentelemetry` feature is enabled.
127    pub trace_id: Option<String>,
128    /// OpenTelemetry span ID, if the `opentelemetry` feature is enabled.
129    pub span_id: Option<String>,
130}
131
132/// A wide event that accumulates structured fields over its lifetime.
133///
134/// Field setters are cheap — they only touch a local `Mutex<HashMap>`,
135/// never the tracing subscriber stack.
136pub struct WideEvent {
137    subsystem: &'static str,
138    inner: Mutex<WideEventInner>,
139}
140
141struct WideEventInner {
142    fields: HashMap<&'static str, Value>,
143    start: Instant,
144    emit_hook: Option<EmitHook>,
145    emitted: bool,
146}
147
148impl std::fmt::Debug for WideEvent {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("WideEvent")
151            .field("subsystem", &self.subsystem)
152            .finish_non_exhaustive()
153    }
154}
155
156#[allow(
157    clippy::missing_panics_doc,
158    reason = "all panics are from Mutex::lock which only panics if poisoned"
159)]
160impl WideEvent {
161    /// Create a new wide event for the given subsystem.
162    ///
163    /// To set a process-wide `"system"` field, configure it on the
164    /// [`WideEventLayer`] via [`WideEventLayer::with_system`].
165    #[must_use]
166    pub fn new(subsystem: &'static str) -> Self {
167        Self {
168            subsystem,
169            inner: Mutex::new(WideEventInner {
170                fields: HashMap::with_capacity(24),
171                start: Instant::now(),
172                emit_hook: None,
173                emitted: false,
174            }),
175        }
176    }
177
178    /// Set a hook called with the accumulated fields just before emit.
179    pub fn set_emit_hook(&self, hook: EmitHook) {
180        self.inner.lock().unwrap().emit_hook = Some(hook);
181    }
182
183    /// Returns true if the given key exists in the accumulated fields.
184    pub fn has_key(&self, key: &str) -> bool {
185        self.inner.lock().unwrap().fields.contains_key(key)
186    }
187
188    /// Set a string field (copies `val`; use [`set_string`](Self::set_string) to move an owned `String`).
189    pub fn set_str(&self, key: &'static str, val: &str) {
190        self.inner
191            .lock()
192            .unwrap()
193            .fields
194            .insert(key, Value::String(val.to_string()));
195    }
196
197    /// Set a string field from an owned `String` (avoids a copy).
198    pub fn set_string(&self, key: &'static str, val: String) {
199        self.inner
200            .lock()
201            .unwrap()
202            .fields
203            .insert(key, Value::String(val));
204    }
205
206    /// Set a signed 64-bit integer field.
207    pub fn set_i64(&self, key: &'static str, val: i64) {
208        self.inner
209            .lock()
210            .unwrap()
211            .fields
212            .insert(key, Value::Number(val.into()));
213    }
214
215    /// Set an unsigned 64-bit integer field.
216    pub fn set_u64(&self, key: &'static str, val: u64) {
217        self.inner
218            .lock()
219            .unwrap()
220            .fields
221            .insert(key, Value::Number(val.into()));
222    }
223
224    /// Set a 64-bit floating-point field (`NaN` / `Inf` are stored as `null`).
225    pub fn set_f64(&self, key: &'static str, val: f64) {
226        self.inner.lock().unwrap().fields.insert(
227            key,
228            serde_json::Number::from_f64(val).map_or(Value::Null, Value::Number),
229        );
230    }
231
232    /// Set a boolean field.
233    pub fn set_bool(&self, key: &'static str, val: bool) {
234        self.inner
235            .lock()
236            .unwrap()
237            .fields
238            .insert(key, Value::Bool(val));
239    }
240
241    /// Set a field from a raw [`serde_json::Value`].
242    pub fn set_value(&self, key: &'static str, val: Value) {
243        self.inner.lock().unwrap().fields.insert(key, val);
244    }
245
246    /// Increment an integer counter field by 1 (initialized to 1 if absent).
247    pub fn incr(&self, key: &'static str) {
248        let mut inner = self.inner.lock().unwrap();
249        let entry = inner.fields.entry(key).or_insert(Value::Number(0.into()));
250        if let Some(n) = entry.as_i64() {
251            *entry = Value::Number((n + 1).into());
252        }
253    }
254
255    /// Set `error = true`, `error.type`, and `error.message`.
256    pub fn set_error(&self, err_type: &str, message: &str) {
257        self.set_bool("error", true);
258        self.set_str("error.type", err_type);
259        self.set_str("error.message", message);
260    }
261
262    /// Emit the wide event through the tracing pipeline.
263    ///
264    /// The tracing target is `{target_prefix}::{subsystem}` where
265    /// `target_prefix` defaults to `wide_event`. Configure the prefix
266    /// on [`WideEventLayer::with_target_prefix`].
267    ///
268    /// Calling `emit()` more than once is a no-op.
269    pub fn emit(&self) {
270        let Some(record) = self.finalize() else {
271            return;
272        };
273        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
274        let subsystem = self.subsystem;
275
276        EMIT_STACK.with(|s| s.borrow_mut().push((id, record)));
277
278        // Target must be a string literal for tracing macros. We use the
279        // default target here; the layer matches by the wide_event_id field.
280        tracing::info!(
281            target: "wide_event",
282            wide_event_id = id,
283            subsystem = subsystem,
284        );
285
286        EMIT_STACK.with(|s| {
287            s.borrow_mut().pop();
288        });
289    }
290
291    #[allow(
292        clippy::items_after_statements,
293        reason = "cfg-gated inner fn for conditional compilation is idiomatic"
294    )]
295    fn finalize(&self) -> Option<WideEventRecord> {
296        let mut inner = self.inner.lock().unwrap();
297        if inner.emitted {
298            return None;
299        }
300        inner.emitted = true;
301
302        let duration = inner.start.elapsed();
303
304        // Insert duration into fields so emit hooks can access it
305        #[allow(
306            clippy::cast_possible_truncation,
307            reason = "duration_ns fits in u64 for any practical duration"
308        )]
309        inner.fields.insert(
310            "duration_ns",
311            Value::Number((duration.as_nanos() as u64).into()),
312        );
313
314        let mut fields = std::mem::take(&mut inner.fields);
315
316        if let Some(ref hook) = inner.emit_hook {
317            hook(&fields);
318        }
319
320        // Remove duration_ns from fields to avoid double-emission
321        // (the formatter emits it from `record.duration`)
322        fields.remove("duration_ns");
323
324        let (trace_id, span_id) = otel_context();
325
326        #[cfg(feature = "opentelemetry")]
327        fn otel_context() -> (Option<String>, Option<String>) {
328            use opentelemetry::trace::TraceContextExt;
329            use tracing_opentelemetry::OpenTelemetrySpanExt;
330
331            let span = tracing::Span::current();
332            let cx = span.context();
333            let sc = cx.span().span_context().clone();
334            if sc.is_valid() {
335                (
336                    Some(format!("{:032x}", sc.trace_id())), // TraceId has no Display impl with padding
337                    Some(format!("{:016x}", sc.span_id())), // SpanId has no Display impl with padding
338                )
339            } else {
340                (None, None)
341            }
342        }
343
344        #[cfg(not(feature = "opentelemetry"))]
345        fn otel_context() -> (Option<String>, Option<String>) {
346            (None, None)
347        }
348
349        Some(WideEventRecord {
350            subsystem: self.subsystem,
351            duration,
352            fields,
353            trace_id,
354            span_id,
355        })
356    }
357}
358
359/// RAII guard that emits the wide event when dropped.
360///
361/// Implements `Deref<Target = WideEvent>` so you can call setters directly.
362///
363/// ```
364/// # use wide_event::WideEventGuard;
365/// let req = WideEventGuard::new("http");
366/// req.set_str("method", "GET");
367/// req.set_u64("status", 200);
368/// // emitted automatically when `req` drops
369/// ```
370pub struct WideEventGuard(WideEvent);
371
372impl WideEventGuard {
373    /// Create a new guard that emits on drop.
374    #[must_use]
375    pub fn new(subsystem: &'static str) -> Self {
376        Self(WideEvent::new(subsystem))
377    }
378}
379
380impl Drop for WideEventGuard {
381    fn drop(&mut self) {
382        self.0.emit();
383    }
384}
385
386impl std::ops::Deref for WideEventGuard {
387    type Target = WideEvent;
388    fn deref(&self) -> &WideEvent {
389        &self.0
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396    use std::sync::atomic::{AtomicBool, Ordering};
397
398    #[test]
399    fn set_and_has_key() {
400        let evt = WideEvent::new("test");
401        assert!(!evt.has_key("foo"));
402        evt.set_str("foo", "bar");
403        assert!(evt.has_key("foo"));
404    }
405
406    #[test]
407    fn incr_creates_and_increments() {
408        let evt = WideEvent::new("test");
409        evt.incr("counter");
410        evt.incr("counter");
411        evt.incr("counter");
412        let inner = evt.inner.lock().unwrap();
413        assert_eq!(inner.fields["counter"], Value::Number(3.into()));
414    }
415
416    #[test]
417    fn set_error_sets_three_fields() {
418        let evt = WideEvent::new("test");
419        evt.set_error("timeout", "connection timed out");
420        let inner = evt.inner.lock().unwrap();
421        assert_eq!(inner.fields["error"], Value::Bool(true));
422        assert_eq!(
423            inner.fields["error.type"],
424            Value::String("timeout".to_string())
425        );
426    }
427
428    #[test]
429    fn double_emit_is_noop() {
430        let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
431        let count_clone = count.clone();
432
433        let evt = WideEvent::new("test");
434        evt.set_emit_hook(Arc::new(move |_| {
435            count_clone.fetch_add(1, Ordering::SeqCst);
436        }));
437        evt.emit();
438        evt.emit();
439        assert_eq!(count.load(Ordering::SeqCst), 1);
440    }
441
442    #[test]
443    fn finalize_moves_fields() {
444        let evt = WideEvent::new("test");
445        evt.set_str("key", "value");
446
447        let record = evt.finalize().unwrap();
448        assert_eq!(record.fields["key"], Value::String("value".to_string()));
449        assert_eq!(record.subsystem, "test");
450        assert!(evt.finalize().is_none());
451    }
452
453    #[test]
454    fn guard_auto_emits() {
455        let emitted = Arc::new(AtomicBool::new(false));
456        let emitted_clone = emitted.clone();
457
458        {
459            let guard = WideEventGuard::new("test");
460            guard.set_emit_hook(Arc::new(move |_| {
461                emitted_clone.store(true, Ordering::SeqCst);
462            }));
463            guard.set_str("method", "GET");
464        }
465
466        assert!(emitted.load(Ordering::SeqCst));
467    }
468
469    #[test]
470    fn guard_deref() {
471        let guard = WideEventGuard::new("http");
472        guard.set_str("path", "/api");
473        assert!(guard.has_key("path"));
474        guard.emit(); // manual emit; Drop will be no-op
475    }
476
477    #[test]
478    fn rfc3339_timer() {
479        let mut buf = String::new();
480        Rfc3339.format_time(&mut Writer::new(&mut buf)).unwrap();
481        assert!(buf.contains('T'));
482        assert!(buf.ends_with('Z'));
483    }
484}