1use std::time::{SystemTime, UNIX_EPOCH};
19
20use serde::{Deserialize, Serialize};
21use tokio::sync::broadcast;
22use tracing::field::{Field, Visit};
23use tracing::{Event, Subscriber};
24use tracing_subscriber::Layer;
25use tracing_subscriber::layer::Context;
26
27pub const DEFAULT_BROADCAST_CAP: usize = 1024;
37
38pub const ENV_BROADCAST_CAP: &str = "VANE_TRACE_BROADCAST_CAP";
42
43fn resolve_broadcast_cap() -> usize {
44 std::env::var(ENV_BROADCAST_CAP)
45 .ok()
46 .and_then(|s| s.parse::<usize>().ok())
47 .filter(|&n| n > 0)
48 .unwrap_or(DEFAULT_BROADCAST_CAP)
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct TracingFrame {
58 pub t: u64,
60 pub level: String,
63 pub target: String,
66 pub message: String,
68 pub fields: serde_json::Value,
70}
71
72#[derive(Clone)]
85pub struct BroadcastTracingLayer {
86 tx: broadcast::Sender<TracingFrame>,
87 on_drop: std::sync::Arc<dyn Fn() + Send + Sync>,
88}
89
90impl BroadcastTracingLayer {
91 #[must_use]
92 pub fn new() -> Self {
93 Self::with_capacity(resolve_broadcast_cap())
94 }
95
96 #[must_use]
101 pub fn with_capacity(capacity: usize) -> Self {
102 Self::with_capacity_and_drop_hook(capacity, std::sync::Arc::new(|| {}))
103 }
104
105 #[must_use]
111 pub fn with_capacity_and_drop_hook(
112 capacity: usize,
113 on_drop: std::sync::Arc<dyn Fn() + Send + Sync>,
114 ) -> Self {
115 let cap = capacity.max(1);
118 let (tx, _initial_rx) = broadcast::channel(cap);
119 Self { tx, on_drop }
120 }
121
122 #[must_use]
125 pub fn subscribe(&self) -> broadcast::Receiver<TracingFrame> {
126 self.tx.subscribe()
127 }
128
129 #[must_use]
131 pub fn subscriber_count(&self) -> usize {
132 self.tx.receiver_count()
133 }
134}
135
136impl Default for BroadcastTracingLayer {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl<S> Layer<S> for BroadcastTracingLayer
143where
144 S: Subscriber,
145{
146 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
147 let metadata = event.metadata();
148 let mut visitor = FieldVisitor::default();
149 event.record(&mut visitor);
150
151 let frame = TracingFrame {
152 t: SystemTime::now()
153 .duration_since(UNIX_EPOCH)
154 .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX)),
155 level: metadata.level().to_string(),
156 target: metadata.target().to_string(),
157 message: visitor.message.unwrap_or_default(),
158 fields: serde_json::Value::Object(visitor.fields),
159 };
160 if self.tx.send(frame).is_err() {
166 (self.on_drop)();
167 }
168 }
169}
170
171#[derive(Default)]
177struct FieldVisitor {
178 message: Option<String>,
179 fields: serde_json::Map<String, serde_json::Value>,
180}
181
182impl FieldVisitor {
183 fn record(&mut self, field: &Field, value: serde_json::Value) {
184 if field.name() == "message"
185 && let serde_json::Value::String(s) = &value
186 {
187 self.message = Some(s.clone());
188 return;
189 }
190 self.fields.insert(field.name().to_string(), value);
191 }
192}
193
194impl Visit for FieldVisitor {
195 fn record_str(&mut self, field: &Field, value: &str) {
196 self.record(field, serde_json::Value::String(value.to_string()));
197 }
198
199 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
200 let s = format!("{value:?}");
205 let trimmed = if s.starts_with('"') && s.ends_with('"') && s.len() >= 2 {
206 s[1..s.len() - 1].to_string()
207 } else {
208 s
209 };
210 self.record(field, serde_json::Value::String(trimmed));
211 }
212
213 fn record_i64(&mut self, field: &Field, value: i64) {
214 self.record(field, serde_json::Value::Number(value.into()));
215 }
216
217 fn record_u64(&mut self, field: &Field, value: u64) {
218 self.record(field, serde_json::Value::Number(value.into()));
219 }
220
221 fn record_bool(&mut self, field: &Field, value: bool) {
222 self.record(field, serde_json::Value::Bool(value));
223 }
224
225 fn record_f64(&mut self, field: &Field, value: f64) {
226 let v = serde_json::Number::from_f64(value)
229 .map_or_else(|| serde_json::Value::String(value.to_string()), serde_json::Value::Number);
230 self.record(field, v);
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use tracing_subscriber::layer::SubscriberExt;
238 use tracing_subscriber::util::SubscriberInitExt;
239
240 fn install(layer: BroadcastTracingLayer) -> tracing::subscriber::DefaultGuard {
241 tracing_subscriber::registry().with(layer).set_default()
245 }
246
247 #[tokio::test]
248 async fn broadcast_tracing_layer_emits_event_to_subscriber() {
249 let layer = BroadcastTracingLayer::new();
250 let mut rx = layer.subscribe();
251 assert_eq!(layer.subscriber_count(), 1);
252
253 let _guard = install(layer.clone());
254 tracing::info!(addr = "127.0.0.1", port = 8080_u64, "listener bound");
255
256 let frame = rx.recv().await.expect("recv frame");
257 assert_eq!(frame.level, "INFO");
258 assert_eq!(frame.message, "listener bound");
259 assert_eq!(frame.fields["addr"], "127.0.0.1");
260 assert_eq!(frame.fields["port"], 8080);
261 assert!(!frame.target.is_empty(), "target captured from metadata");
262 }
263
264 #[tokio::test]
265 async fn broadcast_tracing_layer_no_receivers_silently_drops() {
266 let layer = BroadcastTracingLayer::new();
269 assert_eq!(layer.subscriber_count(), 0);
270 let _guard = install(layer.clone());
271 tracing::warn!("no subscribers attached");
273 }
274
275 #[tokio::test]
276 async fn broadcast_tracing_layer_lagged_subscriber_sees_recv_error() {
277 let layer = BroadcastTracingLayer::new();
278 let mut rx = layer.subscribe();
279 let _guard = install(layer.clone());
280
281 for i in 0..(DEFAULT_BROADCAST_CAP + 5) {
284 tracing::info!(seq = i as u64, "saturate");
285 }
286 match rx.recv().await {
287 Err(broadcast::error::RecvError::Lagged(n)) => {
288 assert!(n >= 5, "expected lag >= 5, got {n}");
289 }
290 other => panic!("expected Lagged, got {other:?}"),
291 }
292 }
293
294 #[tokio::test]
295 async fn broadcast_tracing_layer_preserves_typed_int_field() {
296 let layer = BroadcastTracingLayer::new();
297 let mut rx = layer.subscribe();
298 let _guard = install(layer.clone());
299 tracing::info!(count = 42_i64, ratio = 0.5_f64, ok = true, "typed");
300 let frame = rx.recv().await.expect("recv");
301 assert_eq!(frame.fields["count"], 42);
302 assert_eq!(frame.fields["ok"], true);
303 assert!(frame.fields["ratio"].is_number());
304 }
305}