Skip to main content

tracing_broadcast/
lib.rs

1//! [`BroadcastTracingLayer`] — a `tracing_subscriber::Layer` that fans
2//! every emitted [`tracing::Event`] into a `tokio::sync::broadcast`
3//! channel as a [`TracingFrame`] (timestamp / level / target / message
4//! / structured fields).
5//!
6//! The layer composes alongside the host's normal subscriber stack
7//! (`tracing_subscriber::fmt::Layer` writing to stderr is unaffected);
8//! it adds one more sink without changing user-visible logging. Each
9//! subscriber gets its own `broadcast::Receiver` with independent
10//! backlog tracking; slow subscribers see `RecvError::Lagged(n)` and
11//! resume from the next available frame, which the operator-facing
12//! transport can surface as a sentinel.
13//!
14//! `TracingFrame` derives `serde::Serialize` / `Deserialize`, so the
15//! transport (NDJSON, websocket text frames, JSON-RPC, …) is just
16//! `serde_json::to_string(&frame)`.
17
18use std::time::{SystemTime, UNIX_EPOCH};
19
20use serde::{Deserialize, Serialize};
21use tokio::sync::broadcast;
22use tracing::field::{Field, Visit};
23use tracing::{Event, Subscriber};
24use tracing_subscriber::Layer;
25use tracing_subscriber::layer::Context;
26
27/// Default broadcast channel capacity for tracing frames. Tracing
28/// events are lower-volume than flow logs (one per log line vs one
29/// per per-step trajectory), so the channel is sized smaller —
30/// `spec/flow-model.md` § _Flow log verbosity_ owns the per-stream
31/// sizing rationale. Override via `VANE_TRACE_BROADCAST_CAP`.
32///
33/// A subscriber that falls more than `capacity` events behind sees
34/// [`broadcast::error::RecvError::Lagged`] and resumes from the next
35/// available event.
36pub const DEFAULT_BROADCAST_CAP: usize = 1024;
37
38/// Env var that overrides [`DEFAULT_BROADCAST_CAP`] at construction
39/// time. Values that fail to parse or evaluate to 0 fall back to the
40/// default — same shape as the rest of the daemon's `VANE_*` knobs.
41pub const ENV_BROADCAST_CAP: &str = "VANE_TRACE_BROADCAST_CAP";
42
43fn resolve_broadcast_cap() -> usize {
44	std::env::var(ENV_BROADCAST_CAP)
45		.ok()
46		.and_then(|s| s.parse::<usize>().ok())
47		.filter(|&n| n > 0)
48		.unwrap_or(DEFAULT_BROADCAST_CAP)
49}
50
51/// Wire shape for a single tracing event.
52///
53/// Field layout follows JSON-formatter conventions (`t` / `level` /
54/// `target` / `message` / `fields`) so `jq` queries written for
55/// JSON-rendered logs apply to the broadcast stream unchanged.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct TracingFrame {
58	/// Wall-clock timestamp in milliseconds since the Unix epoch.
59	pub t: u64,
60	/// `tracing::Level` rendered as upper-case (`ERROR` / `WARN` /
61	/// `INFO` / `DEBUG` / `TRACE`).
62	pub level: String,
63	/// `metadata.target()` — typically the module path that emitted
64	/// the event.
65	pub target: String,
66	/// Formatted `message` field if present, otherwise empty.
67	pub message: String,
68	/// Remaining structured fields as `{name → JSON value}`.
69	pub fields: serde_json::Value,
70}
71
72/// Tracing layer that broadcasts each event as a [`TracingFrame`].
73///
74/// Cheap to clone — the inner [`broadcast::Sender`] is itself `Clone`
75/// and shares the channel through an internal `Arc`. Compose one
76/// instance into the subscriber registry and keep cloned references
77/// wherever handlers need to call [`Self::subscribe`].
78///
79/// Optional `on_drop` callback is invoked when the broadcast channel
80/// has no receivers and `send` returns `Err` (i.e. the frame is lost).
81/// The default is a no-op; the daemon wires it to a metrics counter
82/// so operators can observe drop rate without this crate pulling a
83/// metrics-framework dependency.
84#[derive(Clone)]
85pub struct BroadcastTracingLayer {
86	tx: broadcast::Sender<TracingFrame>,
87	on_drop: std::sync::Arc<dyn Fn() + Send + Sync>,
88}
89
90impl BroadcastTracingLayer {
91	#[must_use]
92	pub fn new() -> Self {
93		Self::with_capacity(resolve_broadcast_cap())
94	}
95
96	/// Explicit-capacity constructor for tests and bespoke wiring. The
97	/// `new` / `Default` path resolves capacity from
98	/// `VANE_TRACE_BROADCAST_CAP`, falling back to
99	/// [`DEFAULT_BROADCAST_CAP`].
100	#[must_use]
101	pub fn with_capacity(capacity: usize) -> Self {
102		Self::with_capacity_and_drop_hook(capacity, std::sync::Arc::new(|| {}))
103	}
104
105	/// Construct the layer with both a custom channel capacity and a
106	/// drop callback. The callback runs every time a frame is dropped
107	/// because no subscriber was attached; the daemon installs a
108	/// `metrics::counter!` here to surface the rate without this crate
109	/// pulling a metrics dep.
110	#[must_use]
111	pub fn with_capacity_and_drop_hook(
112		capacity: usize,
113		on_drop: std::sync::Arc<dyn Fn() + Send + Sync>,
114	) -> Self {
115		// Initial receiver dropped immediately; subscribers come and go
116		// over the lifetime of the process as clients connect.
117		let cap = capacity.max(1);
118		let (tx, _initial_rx) = broadcast::channel(cap);
119		Self { tx, on_drop }
120	}
121
122	/// Subscribe to live events. Each subscriber gets its own receiver
123	/// with independent backlog tracking.
124	#[must_use]
125	pub fn subscribe(&self) -> broadcast::Receiver<TracingFrame> {
126		self.tx.subscribe()
127	}
128
129	/// Active subscriber count — exposed for tests and instrumentation.
130	#[must_use]
131	pub fn subscriber_count(&self) -> usize {
132		self.tx.receiver_count()
133	}
134}
135
136impl Default for BroadcastTracingLayer {
137	fn default() -> Self {
138		Self::new()
139	}
140}
141
142impl<S> Layer<S> for BroadcastTracingLayer
143where
144	S: Subscriber,
145{
146	fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
147		let metadata = event.metadata();
148		let mut visitor = FieldVisitor::default();
149		event.record(&mut visitor);
150
151		let frame = TracingFrame {
152			t: SystemTime::now()
153				.duration_since(UNIX_EPOCH)
154				.map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX)),
155			level: metadata.level().to_string(),
156			target: metadata.target().to_string(),
157			message: visitor.message.unwrap_or_default(),
158			fields: serde_json::Value::Object(visitor.fields),
159		};
160		// `send` returns `Err` only when there are no receivers — that's
161		// the steady state when no client is tailing. Run the
162		// constructor-supplied drop hook so the daemon can record the
163		// rate as a metric; the hook defaults to a no-op when no one
164		// installed one.
165		if self.tx.send(frame).is_err() {
166			(self.on_drop)();
167		}
168	}
169}
170
171/// Field visitor that splits the special `message` field from the rest
172/// of the event's structured fields. Numeric and boolean values stay
173/// typed in JSON; anything else (including `Debug`-only values) is
174/// stringified — operators can still grep on the rendered form, and
175/// the lossy degradation matches what `tracing-subscriber::fmt` does.
176#[derive(Default)]
177struct FieldVisitor {
178	message: Option<String>,
179	fields: serde_json::Map<String, serde_json::Value>,
180}
181
182impl FieldVisitor {
183	fn record(&mut self, field: &Field, value: serde_json::Value) {
184		if field.name() == "message"
185			&& let serde_json::Value::String(s) = &value
186		{
187			self.message = Some(s.clone());
188			return;
189		}
190		self.fields.insert(field.name().to_string(), value);
191	}
192}
193
194impl Visit for FieldVisitor {
195	fn record_str(&mut self, field: &Field, value: &str) {
196		self.record(field, serde_json::Value::String(value.to_string()));
197	}
198
199	fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
200		// Most `tracing::info!("…", foo)` calls hit this path because
201		// the macro records the message via `Debug`. We strip the outer
202		// quotes that `{value:?}` adds for string-typed values so the
203		// rendered message matches the raw `info!` argument.
204		let s = format!("{value:?}");
205		let trimmed = if s.starts_with('"') && s.ends_with('"') && s.len() >= 2 {
206			s[1..s.len() - 1].to_string()
207		} else {
208			s
209		};
210		self.record(field, serde_json::Value::String(trimmed));
211	}
212
213	fn record_i64(&mut self, field: &Field, value: i64) {
214		self.record(field, serde_json::Value::Number(value.into()));
215	}
216
217	fn record_u64(&mut self, field: &Field, value: u64) {
218		self.record(field, serde_json::Value::Number(value.into()));
219	}
220
221	fn record_bool(&mut self, field: &Field, value: bool) {
222		self.record(field, serde_json::Value::Bool(value));
223	}
224
225	fn record_f64(&mut self, field: &Field, value: f64) {
226		// `serde_json::Number::from_f64` returns `Option` — non-finite
227		// floats fall back to a string representation.
228		let v = serde_json::Number::from_f64(value)
229			.map_or_else(|| serde_json::Value::String(value.to_string()), serde_json::Value::Number);
230		self.record(field, v);
231	}
232}
233
234#[cfg(test)]
235mod tests {
236	use super::*;
237	use tracing_subscriber::layer::SubscriberExt;
238	use tracing_subscriber::util::SubscriberInitExt;
239
240	fn install(layer: BroadcastTracingLayer) -> tracing::subscriber::DefaultGuard {
241		// Per-test subscriber so concurrent tests don't share a global
242		// dispatcher. `set_default` returns a guard that restores the
243		// previous dispatcher on drop.
244		tracing_subscriber::registry().with(layer).set_default()
245	}
246
247	#[tokio::test]
248	async fn broadcast_tracing_layer_emits_event_to_subscriber() {
249		let layer = BroadcastTracingLayer::new();
250		let mut rx = layer.subscribe();
251		assert_eq!(layer.subscriber_count(), 1);
252
253		let _guard = install(layer.clone());
254		tracing::info!(addr = "127.0.0.1", port = 8080_u64, "listener bound");
255
256		let frame = rx.recv().await.expect("recv frame");
257		assert_eq!(frame.level, "INFO");
258		assert_eq!(frame.message, "listener bound");
259		assert_eq!(frame.fields["addr"], "127.0.0.1");
260		assert_eq!(frame.fields["port"], 8080);
261		assert!(!frame.target.is_empty(), "target captured from metadata");
262	}
263
264	#[tokio::test]
265	async fn broadcast_tracing_layer_no_receivers_silently_drops() {
266		// With zero subscribers, broadcast::send returns Err. The layer
267		// must not propagate that — `on_event` is on the tracing hot path.
268		let layer = BroadcastTracingLayer::new();
269		assert_eq!(layer.subscriber_count(), 0);
270		let _guard = install(layer.clone());
271		// No panic, no deadlock.
272		tracing::warn!("no subscribers attached");
273	}
274
275	#[tokio::test]
276	async fn broadcast_tracing_layer_lagged_subscriber_sees_recv_error() {
277		let layer = BroadcastTracingLayer::new();
278		let mut rx = layer.subscribe();
279		let _guard = install(layer.clone());
280
281		// Saturate the channel beyond a single subscriber's buffer
282		// without ever calling `rx.recv` so the backlog overflows.
283		for i in 0..(DEFAULT_BROADCAST_CAP + 5) {
284			tracing::info!(seq = i as u64, "saturate");
285		}
286		match rx.recv().await {
287			Err(broadcast::error::RecvError::Lagged(n)) => {
288				assert!(n >= 5, "expected lag >= 5, got {n}");
289			}
290			other => panic!("expected Lagged, got {other:?}"),
291		}
292	}
293
294	#[tokio::test]
295	async fn broadcast_tracing_layer_preserves_typed_int_field() {
296		let layer = BroadcastTracingLayer::new();
297		let mut rx = layer.subscribe();
298		let _guard = install(layer.clone());
299		tracing::info!(count = 42_i64, ratio = 0.5_f64, ok = true, "typed");
300		let frame = rx.recv().await.expect("recv");
301		assert_eq!(frame.fields["count"], 42);
302		assert_eq!(frame.fields["ok"], true);
303		assert!(frame.fields["ratio"].is_number());
304	}
305}