#![warn(clippy::pedantic, missing_docs)]
mod format;
mod layer;
#[cfg(feature = "tokio")]
pub mod context;
pub use format::{JsonFormatter, LogfmtFormatter, WideEventFormatter};
pub use layer::{exclude_wide_events, WideEventLayer};
pub use tracing_subscriber::fmt::time::FormatTime;
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use serde_json::Value;
use tracing_subscriber::fmt::format::Writer;
pub const DEFAULT_TARGET: &str = "wide_event";
pub type EmitHook = Arc<dyn Fn(&HashMap<&'static str, Value>) + Send + Sync>;
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
thread_local! {
pub(crate) static EMIT_STACK: RefCell<Vec<(u64, WideEventRecord)>> =
const { RefCell::new(Vec::new()) };
}
pub struct Rfc3339;
impl FormatTime for Rfc3339 {
fn format_time(&self, w: &mut Writer<'_>) -> std::fmt::Result {
write!(
w,
"{}",
humantime::format_rfc3339_micros(std::time::SystemTime::now())
)
}
}
pub struct WideEventRecord {
pub subsystem: &'static str,
pub duration: std::time::Duration,
pub fields: HashMap<&'static str, Value>,
pub trace_id: Option<String>,
pub span_id: Option<String>,
}
pub struct WideEvent {
subsystem: &'static str,
inner: Mutex<WideEventInner>,
}
struct WideEventInner {
fields: HashMap<&'static str, Value>,
start: Instant,
emit_hook: Option<EmitHook>,
emitted: bool,
}
impl std::fmt::Debug for WideEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WideEvent")
.field("subsystem", &self.subsystem)
.finish_non_exhaustive()
}
}
#[allow(
clippy::missing_panics_doc,
reason = "all panics are from Mutex::lock which only panics if poisoned"
)]
impl WideEvent {
#[must_use]
pub fn new(subsystem: &'static str) -> Self {
Self {
subsystem,
inner: Mutex::new(WideEventInner {
fields: HashMap::with_capacity(24),
start: Instant::now(),
emit_hook: None,
emitted: false,
}),
}
}
pub fn set_emit_hook(&self, hook: EmitHook) {
self.inner.lock().unwrap().emit_hook = Some(hook);
}
pub fn has_key(&self, key: &str) -> bool {
self.inner.lock().unwrap().fields.contains_key(key)
}
pub fn set_str(&self, key: &'static str, val: &str) {
self.inner
.lock()
.unwrap()
.fields
.insert(key, Value::String(val.to_string()));
}
pub fn set_string(&self, key: &'static str, val: String) {
self.inner
.lock()
.unwrap()
.fields
.insert(key, Value::String(val));
}
pub fn set_i64(&self, key: &'static str, val: i64) {
self.inner
.lock()
.unwrap()
.fields
.insert(key, Value::Number(val.into()));
}
pub fn set_u64(&self, key: &'static str, val: u64) {
self.inner
.lock()
.unwrap()
.fields
.insert(key, Value::Number(val.into()));
}
pub fn set_f64(&self, key: &'static str, val: f64) {
self.inner.lock().unwrap().fields.insert(
key,
serde_json::Number::from_f64(val).map_or(Value::Null, Value::Number),
);
}
pub fn set_bool(&self, key: &'static str, val: bool) {
self.inner
.lock()
.unwrap()
.fields
.insert(key, Value::Bool(val));
}
pub fn set_value(&self, key: &'static str, val: Value) {
self.inner.lock().unwrap().fields.insert(key, val);
}
pub fn incr(&self, key: &'static str) {
let mut inner = self.inner.lock().unwrap();
let entry = inner.fields.entry(key).or_insert(Value::Number(0.into()));
if let Some(n) = entry.as_i64() {
*entry = Value::Number((n + 1).into());
}
}
pub fn set_error(&self, err_type: &str, message: &str) {
self.set_bool("error", true);
self.set_str("error.type", err_type);
self.set_str("error.message", message);
}
pub fn emit(&self) {
let Some(record) = self.finalize() else {
return;
};
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
let subsystem = self.subsystem;
EMIT_STACK.with(|s| s.borrow_mut().push((id, record)));
tracing::info!(
target: "wide_event",
wide_event_id = id,
subsystem = subsystem,
);
EMIT_STACK.with(|s| {
s.borrow_mut().pop();
});
}
#[allow(
clippy::items_after_statements,
reason = "cfg-gated inner fn for conditional compilation is idiomatic"
)]
fn finalize(&self) -> Option<WideEventRecord> {
let mut inner = self.inner.lock().unwrap();
if inner.emitted {
return None;
}
inner.emitted = true;
let duration = inner.start.elapsed();
#[allow(
clippy::cast_possible_truncation,
reason = "duration_ns fits in u64 for any practical duration"
)]
inner.fields.insert(
"duration_ns",
Value::Number((duration.as_nanos() as u64).into()),
);
let mut fields = std::mem::take(&mut inner.fields);
if let Some(ref hook) = inner.emit_hook {
hook(&fields);
}
fields.remove("duration_ns");
let (trace_id, span_id) = otel_context();
#[cfg(feature = "opentelemetry")]
fn otel_context() -> (Option<String>, Option<String>) {
use opentelemetry::trace::TraceContextExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::Span::current();
let cx = span.context();
let sc = cx.span().span_context().clone();
if sc.is_valid() {
(
Some(format!("{:032x}", sc.trace_id())), Some(format!("{:016x}", sc.span_id())), )
} else {
(None, None)
}
}
#[cfg(not(feature = "opentelemetry"))]
fn otel_context() -> (Option<String>, Option<String>) {
(None, None)
}
Some(WideEventRecord {
subsystem: self.subsystem,
duration,
fields,
trace_id,
span_id,
})
}
}
pub struct WideEventGuard(WideEvent);
impl WideEventGuard {
#[must_use]
pub fn new(subsystem: &'static str) -> Self {
Self(WideEvent::new(subsystem))
}
}
impl Drop for WideEventGuard {
fn drop(&mut self) {
self.0.emit();
}
}
impl std::ops::Deref for WideEventGuard {
type Target = WideEvent;
fn deref(&self) -> &WideEvent {
&self.0
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
#[test]
fn set_and_has_key() {
let evt = WideEvent::new("test");
assert!(!evt.has_key("foo"));
evt.set_str("foo", "bar");
assert!(evt.has_key("foo"));
}
#[test]
fn incr_creates_and_increments() {
let evt = WideEvent::new("test");
evt.incr("counter");
evt.incr("counter");
evt.incr("counter");
let inner = evt.inner.lock().unwrap();
assert_eq!(inner.fields["counter"], Value::Number(3.into()));
}
#[test]
fn set_error_sets_three_fields() {
let evt = WideEvent::new("test");
evt.set_error("timeout", "connection timed out");
let inner = evt.inner.lock().unwrap();
assert_eq!(inner.fields["error"], Value::Bool(true));
assert_eq!(
inner.fields["error.type"],
Value::String("timeout".to_string())
);
}
#[test]
fn double_emit_is_noop() {
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let count_clone = count.clone();
let evt = WideEvent::new("test");
evt.set_emit_hook(Arc::new(move |_| {
count_clone.fetch_add(1, Ordering::SeqCst);
}));
evt.emit();
evt.emit();
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[test]
fn finalize_moves_fields() {
let evt = WideEvent::new("test");
evt.set_str("key", "value");
let record = evt.finalize().unwrap();
assert_eq!(record.fields["key"], Value::String("value".to_string()));
assert_eq!(record.subsystem, "test");
assert!(evt.finalize().is_none());
}
#[test]
fn guard_auto_emits() {
let emitted = Arc::new(AtomicBool::new(false));
let emitted_clone = emitted.clone();
{
let guard = WideEventGuard::new("test");
guard.set_emit_hook(Arc::new(move |_| {
emitted_clone.store(true, Ordering::SeqCst);
}));
guard.set_str("method", "GET");
}
assert!(emitted.load(Ordering::SeqCst));
}
#[test]
fn guard_deref() {
let guard = WideEventGuard::new("http");
guard.set_str("path", "/api");
assert!(guard.has_key("path"));
guard.emit(); }
#[test]
fn rfc3339_timer() {
let mut buf = String::new();
Rfc3339.format_time(&mut Writer::new(&mut buf)).unwrap();
assert!(buf.contains('T'));
assert!(buf.ends_with('Z'));
}
}