1use crate::visitors::{EventVisitor, FieldVisitor};
2use crate::{Event, Shared, EVENT_BUFFER_CAPACITY};
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6use tokio::sync::mpsc;
7use tokio::sync::mpsc::error::TrySendError;
8use tracing_core::span::{Attributes, Id};
9use tracing_core::{Interest, Metadata};
10use tracing_subscriber::layer::Context;
11use tracing_subscriber::registry::SpanRef;
12
13static THREAD_COUNTER: AtomicU64 = AtomicU64::new(0);
15
16thread_local! {
17 static THREAD_ID: u64 = {
18 let mut last = THREAD_COUNTER.load(Ordering::Relaxed);
19 loop {
20 let id = last.checked_add(1).expect("Thread id overflowed");
21 match THREAD_COUNTER.compare_exchange_weak(last, id, Ordering::Relaxed, Ordering::Relaxed) {
22 Ok(_) => return id,
23 Err(id) => last = id,
24 }
25 }
26 };
27}
28
29pub struct Layer {
33 shared: Arc<Shared>,
34 tx: mpsc::Sender<Event>,
35}
36
37impl Layer {
38 pub fn new(shared: Arc<Shared>, tx: mpsc::Sender<Event>) -> Self {
39 Self { shared, tx }
40 }
41
42 pub fn send_event(&self, dropped: &AtomicUsize, mk_event: impl FnOnce() -> Event) {
43 match self.tx.try_reserve() {
44 Ok(permit) => {
45 permit.send(mk_event());
46 }
47 Err(TrySendError::Closed(())) => {
48 tracing::error!("Event channel closed!");
49 }
50 Err(TrySendError::Full(())) => {
51 dropped.fetch_add(1, Ordering::Release);
52 }
53 }
54
55 let capacity = self.tx.capacity();
56 if capacity <= EVENT_BUFFER_CAPACITY / 2 {
57 self.shared.flush.notify_one();
58 }
59 }
60}
61
62impl<S> tracing_subscriber::layer::Layer<S> for Layer
63where
64 S: tracing_core::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
65{
66 fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
67 let dropped = if metadata.is_event() {
68 &self.shared.dropped_log_events
69 } else {
70 &self.shared.dropped_span_events
71 };
72 self.send_event(dropped, || Event::Metadata(metadata));
73
74 Interest::always()
75 }
76
77 fn on_record(
78 &self,
79 id: &tracing_core::span::Id,
80 values: &tracing_core::span::Record<'_>,
81 ctx: Context<'_, S>,
82 ) {
83 let span = ctx.span(id).expect("Span not in context, probably a bug");
84 let metadata = span.metadata();
85 let mut visitor = FieldVisitor::new(std::ptr::from_ref(metadata) as u64);
86 values.record(&mut visitor);
87 let fields = visitor.result();
88
89 self.send_event(&self.shared.dropped_span_events, move || {
90 Event::SpanRecorded {
91 span_id: id.clone(),
92 fields,
93 }
94 });
95 }
96
97 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
98 let at = Instant::now();
99
100 self.send_event(&self.shared.dropped_span_events, move || {
101 let span = ctx.span(id).expect("Span not in context, probably a bug");
102 let metadata = span.metadata();
103 let maybe_parent = span.parent().map(|s| s.id());
104
105 let mut visitor = FieldVisitor::new(std::ptr::from_ref(metadata) as u64);
106 attrs.record(&mut visitor);
107 let fields = visitor.result();
108
109 Event::NewSpan {
110 at,
111 id: id.clone(),
112 metadata: span.metadata(),
113 fields,
114 maybe_parent,
115 }
116 });
117 }
118
119 fn on_event(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) {
120 let at = Instant::now();
121 let metadata = event.metadata();
122
123 self.send_event(&self.shared.dropped_log_events, || {
124 Event::Metadata(metadata)
125 });
126
127 self.send_event(&self.shared.dropped_log_events, || {
128 let mut visitor = EventVisitor::new(std::ptr::from_ref(metadata) as u64);
129 event.record(&mut visitor);
130 let (message, fields) = visitor.result();
131
132 let maybe_parent = ctx.event_span(event).as_ref().map(SpanRef::id);
133
134 Event::Event {
135 at,
136 metadata,
137 message: message.unwrap_or_default(),
138 fields,
139 maybe_parent,
140 }
141 });
142 }
143
144 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
145 let at = Instant::now();
146
147 self.send_event(&self.shared.dropped_span_events, || Event::EnterSpan {
148 at,
149 thread_id: THREAD_ID.with(|id| *id),
150 span_id: id.clone(),
151 });
152 }
153
154 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
155 let at = Instant::now();
156
157 self.send_event(&self.shared.dropped_span_events, || Event::ExitSpan {
158 at,
159 thread_id: THREAD_ID.with(|id| *id),
160 span_id: id.clone(),
161 });
162 }
163
164 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
165 let at = Instant::now();
166
167 self.send_event(&self.shared.dropped_span_events, || Event::CloseSpan {
168 at,
169 span_id: id.clone(),
170 });
171 }
172}
173
174#[cfg(test)]
177mod test {
178 use super::*;
179 use futures::StreamExt;
180 use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
181 use tracing_subscriber::prelude::*;
182
183 macro_rules! assert_matches {
185 ($value:expr, $pattern:pat, $msg:expr) => {
186 let $pattern = $value else { panic!($msg) };
187 };
188 }
189
190 #[tokio::test]
191 #[ignore = "Currently broken, apparently tracing leaks events cross-thread"]
192 async fn log_event() {
193 let (evt_tx, evt_rx) = mpsc::channel(10);
194 let layer = Layer::new(Default::default(), evt_tx);
195 let subscriber = tracing_subscriber::registry().with(layer);
196
197 tracing::subscriber::with_default(subscriber, || {
198 tracing::debug!("a debug event");
199 });
200
201 let events: Vec<_> = ReceiverStream::new(evt_rx).collect().await;
202 assert_eq!(events.len(), 2, "{events:#?}");
203
204 assert_matches!(
205 events[0],
206 Event::Metadata(metadata),
207 "expected metadata event"
208 );
209 assert_eq!(*metadata.level(), tracing_core::Level::DEBUG);
210
211 assert_matches!(
212 &events[1],
213 Event::Event {
214 metadata,
215 maybe_parent,
216 fields,
217 message,
218 ..
219 },
220 "expected log event"
221 );
222 assert_eq!(metadata, metadata);
223 assert!(maybe_parent.is_none());
224 assert!(fields.is_empty());
225 assert_eq!(message, "a debug event");
226 }
227
228 #[tokio::test]
229 #[ignore = "Currently broken, apparently tracing leaks events cross-thread"]
230 async fn span() {
231 let (evt_tx, evt_rx) = mpsc::channel(10);
232 let layer = Layer::new(Default::default(), evt_tx);
233 let subscriber = tracing_subscriber::registry().with(layer);
234
235 tracing::subscriber::with_default(subscriber, || {
236 let _enter = tracing::debug_span!("a span").entered();
237 drop(_enter);
238 });
239
240 let events: Vec<_> = ReceiverStream::new(evt_rx).collect().await;
241 assert_eq!(events.len(), 5, "{events:#?}");
242
243 assert_matches!(
244 events[0],
245 Event::Metadata(metadata),
246 "expected metadata event"
247 );
248 assert_eq!(*metadata.level(), tracing_core::Level::DEBUG);
249
250 assert_matches!(
251 &events[1],
252 Event::NewSpan {
253 metadata,
254 maybe_parent,
255 fields,
256 id: new_span_id,
257 ..
258 },
259 "expected new span event"
260 );
261 assert_eq!(metadata, metadata);
262 assert!(maybe_parent.is_none());
263 assert!(fields.is_empty());
264
265 assert_matches!(
266 &events[2],
267 Event::EnterSpan {
268 span_id: enter_span_id,
269 ..
270 },
271 "expected enter span event"
272 );
273 assert_eq!(enter_span_id, new_span_id);
274
275 assert_matches!(
276 &events[3],
277 Event::ExitSpan {
278 span_id: exit_span_id,
279 ..
280 },
281 "expected exit span event"
282 );
283 assert_eq!(exit_span_id, new_span_id);
284
285 assert_matches!(
286 &events[4],
287 Event::CloseSpan {
288 span_id: close_span_id,
289 ..
290 },
291 "expected close span event"
292 );
293 assert_eq!(close_span_id, new_span_id);
294 }
295}