use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tracing::field::{Field, Visit};
use tracing::{Event, Subscriber};
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
pub const DEFAULT_BROADCAST_CAP: usize = 1024;
pub const ENV_BROADCAST_CAP: &str = "VANE_TRACE_BROADCAST_CAP";
fn resolve_broadcast_cap() -> usize {
std::env::var(ENV_BROADCAST_CAP)
.ok()
.and_then(|s| s.parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(DEFAULT_BROADCAST_CAP)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TracingFrame {
pub t: u64,
pub level: String,
pub target: String,
pub message: String,
pub fields: serde_json::Value,
}
#[derive(Clone)]
pub struct BroadcastTracingLayer {
tx: broadcast::Sender<TracingFrame>,
on_drop: std::sync::Arc<dyn Fn() + Send + Sync>,
}
impl BroadcastTracingLayer {
#[must_use]
pub fn new() -> Self {
Self::with_capacity(resolve_broadcast_cap())
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self::with_capacity_and_drop_hook(capacity, std::sync::Arc::new(|| {}))
}
#[must_use]
pub fn with_capacity_and_drop_hook(
capacity: usize,
on_drop: std::sync::Arc<dyn Fn() + Send + Sync>,
) -> Self {
let cap = capacity.max(1);
let (tx, _initial_rx) = broadcast::channel(cap);
Self { tx, on_drop }
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<TracingFrame> {
self.tx.subscribe()
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
}
impl Default for BroadcastTracingLayer {
fn default() -> Self {
Self::new()
}
}
impl<S> Layer<S> for BroadcastTracingLayer
where
S: Subscriber,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let metadata = event.metadata();
let mut visitor = FieldVisitor::default();
event.record(&mut visitor);
let frame = TracingFrame {
t: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX)),
level: metadata.level().to_string(),
target: metadata.target().to_string(),
message: visitor.message.unwrap_or_default(),
fields: serde_json::Value::Object(visitor.fields),
};
if self.tx.send(frame).is_err() {
(self.on_drop)();
}
}
}
#[derive(Default)]
struct FieldVisitor {
message: Option<String>,
fields: serde_json::Map<String, serde_json::Value>,
}
impl FieldVisitor {
fn record(&mut self, field: &Field, value: serde_json::Value) {
if field.name() == "message"
&& let serde_json::Value::String(s) = &value
{
self.message = Some(s.clone());
return;
}
self.fields.insert(field.name().to_string(), value);
}
}
impl Visit for FieldVisitor {
fn record_str(&mut self, field: &Field, value: &str) {
self.record(field, serde_json::Value::String(value.to_string()));
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
let s = format!("{value:?}");
let trimmed = if s.starts_with('"') && s.ends_with('"') && s.len() >= 2 {
s[1..s.len() - 1].to_string()
} else {
s
};
self.record(field, serde_json::Value::String(trimmed));
}
fn record_i64(&mut self, field: &Field, value: i64) {
self.record(field, serde_json::Value::Number(value.into()));
}
fn record_u64(&mut self, field: &Field, value: u64) {
self.record(field, serde_json::Value::Number(value.into()));
}
fn record_bool(&mut self, field: &Field, value: bool) {
self.record(field, serde_json::Value::Bool(value));
}
fn record_f64(&mut self, field: &Field, value: f64) {
let v = serde_json::Number::from_f64(value)
.map_or_else(|| serde_json::Value::String(value.to_string()), serde_json::Value::Number);
self.record(field, v);
}
}
#[cfg(test)]
mod tests {
use super::*;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
fn install(layer: BroadcastTracingLayer) -> tracing::subscriber::DefaultGuard {
tracing_subscriber::registry().with(layer).set_default()
}
#[tokio::test]
async fn broadcast_tracing_layer_emits_event_to_subscriber() {
let layer = BroadcastTracingLayer::new();
let mut rx = layer.subscribe();
assert_eq!(layer.subscriber_count(), 1);
let _guard = install(layer.clone());
tracing::info!(addr = "127.0.0.1", port = 8080_u64, "listener bound");
let frame = rx.recv().await.expect("recv frame");
assert_eq!(frame.level, "INFO");
assert_eq!(frame.message, "listener bound");
assert_eq!(frame.fields["addr"], "127.0.0.1");
assert_eq!(frame.fields["port"], 8080);
assert!(!frame.target.is_empty(), "target captured from metadata");
}
#[tokio::test]
async fn broadcast_tracing_layer_no_receivers_silently_drops() {
let layer = BroadcastTracingLayer::new();
assert_eq!(layer.subscriber_count(), 0);
let _guard = install(layer.clone());
tracing::warn!("no subscribers attached");
}
#[tokio::test]
async fn broadcast_tracing_layer_lagged_subscriber_sees_recv_error() {
let layer = BroadcastTracingLayer::new();
let mut rx = layer.subscribe();
let _guard = install(layer.clone());
for i in 0..(DEFAULT_BROADCAST_CAP + 5) {
tracing::info!(seq = i as u64, "saturate");
}
match rx.recv().await {
Err(broadcast::error::RecvError::Lagged(n)) => {
assert!(n >= 5, "expected lag >= 5, got {n}");
}
other => panic!("expected Lagged, got {other:?}"),
}
}
#[tokio::test]
async fn broadcast_tracing_layer_preserves_typed_int_field() {
let layer = BroadcastTracingLayer::new();
let mut rx = layer.subscribe();
let _guard = install(layer.clone());
tracing::info!(count = 42_i64, ratio = 0.5_f64, ok = true, "typed");
let frame = rx.recv().await.expect("recv");
assert_eq!(frame.fields["count"], 42);
assert_eq!(frame.fields["ok"], true);
assert!(frame.fields["ratio"].is_number());
}
}