devtools_core/
layer.rs

1use crate::visitors::{EventVisitor, FieldVisitor};
2use crate::{Event, Shared, EVENT_BUFFER_CAPACITY};
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6use tokio::sync::mpsc;
7use tokio::sync::mpsc::error::TrySendError;
8use tracing_core::span::{Attributes, Id};
9use tracing_core::{Interest, Metadata};
10use tracing_subscriber::layer::Context;
11use tracing_subscriber::registry::SpanRef;
12
13// TODO replace this with `std::thread::ThreadId::as_u64` once it's stable
14static THREAD_COUNTER: AtomicU64 = AtomicU64::new(0);
15
16thread_local! {
17    static THREAD_ID: u64 = {
18        let mut last = THREAD_COUNTER.load(Ordering::Relaxed);
19        loop {
20            let id = last.checked_add(1).expect("Thread id overflowed");
21            match THREAD_COUNTER.compare_exchange_weak(last, id, Ordering::Relaxed, Ordering::Relaxed) {
22                Ok(_) => return id,
23                Err(id) => last = id,
24            }
25        }
26    };
27}
28
29/// A tracing layer that forwards all events to the aggregator
30/// This is intentionally kept as simple as possible to avoid any performance overhead on
31/// the application thread. All the heavy lifting is done in the aggregator.
32pub struct Layer {
33    shared: Arc<Shared>,
34    tx: mpsc::Sender<Event>,
35}
36
37impl Layer {
38    pub fn new(shared: Arc<Shared>, tx: mpsc::Sender<Event>) -> Self {
39        Self { shared, tx }
40    }
41
42    pub fn send_event(&self, dropped: &AtomicUsize, mk_event: impl FnOnce() -> Event) {
43        match self.tx.try_reserve() {
44            Ok(permit) => {
45                permit.send(mk_event());
46            }
47            Err(TrySendError::Closed(())) => {
48                tracing::error!("Event channel closed!");
49            }
50            Err(TrySendError::Full(())) => {
51                dropped.fetch_add(1, Ordering::Release);
52            }
53        }
54
55        let capacity = self.tx.capacity();
56        if capacity <= EVENT_BUFFER_CAPACITY / 2 {
57            self.shared.flush.notify_one();
58        }
59    }
60}
61
62impl<S> tracing_subscriber::layer::Layer<S> for Layer
63where
64    S: tracing_core::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
65{
66    fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
67        let dropped = if metadata.is_event() {
68            &self.shared.dropped_log_events
69        } else {
70            &self.shared.dropped_span_events
71        };
72        self.send_event(dropped, || Event::Metadata(metadata));
73
74        Interest::always()
75    }
76
77    fn on_record(
78        &self,
79        id: &tracing_core::span::Id,
80        values: &tracing_core::span::Record<'_>,
81        ctx: Context<'_, S>,
82    ) {
83        let span = ctx.span(id).expect("Span not in context, probably a bug");
84        let metadata = span.metadata();
85        let mut visitor = FieldVisitor::new(std::ptr::from_ref(metadata) as u64);
86        values.record(&mut visitor);
87        let fields = visitor.result();
88
89        self.send_event(&self.shared.dropped_span_events, move || {
90            Event::SpanRecorded {
91                span_id: id.clone(),
92                fields,
93            }
94        });
95    }
96
97    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
98        let at = Instant::now();
99
100        self.send_event(&self.shared.dropped_span_events, move || {
101            let span = ctx.span(id).expect("Span not in context, probably a bug");
102            let metadata = span.metadata();
103            let maybe_parent = span.parent().map(|s| s.id());
104
105            let mut visitor = FieldVisitor::new(std::ptr::from_ref(metadata) as u64);
106            attrs.record(&mut visitor);
107            let fields = visitor.result();
108
109            Event::NewSpan {
110                at,
111                id: id.clone(),
112                metadata: span.metadata(),
113                fields,
114                maybe_parent,
115            }
116        });
117    }
118
119    fn on_event(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) {
120        let at = Instant::now();
121        let metadata = event.metadata();
122
123        self.send_event(&self.shared.dropped_log_events, || {
124            Event::Metadata(metadata)
125        });
126
127        self.send_event(&self.shared.dropped_log_events, || {
128            let mut visitor = EventVisitor::new(std::ptr::from_ref(metadata) as u64);
129            event.record(&mut visitor);
130            let (message, fields) = visitor.result();
131
132            let maybe_parent = ctx.event_span(event).as_ref().map(SpanRef::id);
133
134            Event::Event {
135                at,
136                metadata,
137                message: message.unwrap_or_default(),
138                fields,
139                maybe_parent,
140            }
141        });
142    }
143
144    fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
145        let at = Instant::now();
146
147        self.send_event(&self.shared.dropped_span_events, || Event::EnterSpan {
148            at,
149            thread_id: THREAD_ID.with(|id| *id),
150            span_id: id.clone(),
151        });
152    }
153
154    fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
155        let at = Instant::now();
156
157        self.send_event(&self.shared.dropped_span_events, || Event::ExitSpan {
158            at,
159            thread_id: THREAD_ID.with(|id| *id),
160            span_id: id.clone(),
161        });
162    }
163
164    fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
165        let at = Instant::now();
166
167        self.send_event(&self.shared.dropped_span_events, || Event::CloseSpan {
168            at,
169            span_id: id.clone(),
170        });
171    }
172}
173
174// TODO reenable tests. These are currently broken bc apparently `tracing` leaks events cross-thread
175// even when we explicitly set a subscriber only for the current test. Don't ask me why.
176#[cfg(test)]
177mod test {
178    use super::*;
179    use futures::StreamExt;
180    use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
181    use tracing_subscriber::prelude::*;
182
183    /// Asserts that a value matches a given pattern, bringing all name bindings from the pattern into scope
184    macro_rules! assert_matches {
185        ($value:expr, $pattern:pat, $msg:expr) => {
186            let $pattern = $value else { panic!($msg) };
187        };
188    }
189
190    #[tokio::test]
191    #[ignore = "Currently broken, apparently tracing leaks events cross-thread"]
192    async fn log_event() {
193        let (evt_tx, evt_rx) = mpsc::channel(10);
194        let layer = Layer::new(Default::default(), evt_tx);
195        let subscriber = tracing_subscriber::registry().with(layer);
196
197        tracing::subscriber::with_default(subscriber, || {
198            tracing::debug!("a debug event");
199        });
200
201        let events: Vec<_> = ReceiverStream::new(evt_rx).collect().await;
202        assert_eq!(events.len(), 2, "{events:#?}");
203
204        assert_matches!(
205            events[0],
206            Event::Metadata(metadata),
207            "expected metadata event"
208        );
209        assert_eq!(*metadata.level(), tracing_core::Level::DEBUG);
210
211        assert_matches!(
212            &events[1],
213            Event::Event {
214                metadata,
215                maybe_parent,
216                fields,
217                message,
218                ..
219            },
220            "expected log event"
221        );
222        assert_eq!(metadata, metadata);
223        assert!(maybe_parent.is_none());
224        assert!(fields.is_empty());
225        assert_eq!(message, "a debug event");
226    }
227
228    #[tokio::test]
229    #[ignore = "Currently broken, apparently tracing leaks events cross-thread"]
230    async fn span() {
231        let (evt_tx, evt_rx) = mpsc::channel(10);
232        let layer = Layer::new(Default::default(), evt_tx);
233        let subscriber = tracing_subscriber::registry().with(layer);
234
235        tracing::subscriber::with_default(subscriber, || {
236            let _enter = tracing::debug_span!("a span").entered();
237            drop(_enter);
238        });
239
240        let events: Vec<_> = ReceiverStream::new(evt_rx).collect().await;
241        assert_eq!(events.len(), 5, "{events:#?}");
242
243        assert_matches!(
244            events[0],
245            Event::Metadata(metadata),
246            "expected metadata event"
247        );
248        assert_eq!(*metadata.level(), tracing_core::Level::DEBUG);
249
250        assert_matches!(
251            &events[1],
252            Event::NewSpan {
253                metadata,
254                maybe_parent,
255                fields,
256                id: new_span_id,
257                ..
258            },
259            "expected new span event"
260        );
261        assert_eq!(metadata, metadata);
262        assert!(maybe_parent.is_none());
263        assert!(fields.is_empty());
264
265        assert_matches!(
266            &events[2],
267            Event::EnterSpan {
268                span_id: enter_span_id,
269                ..
270            },
271            "expected enter span event"
272        );
273        assert_eq!(enter_span_id, new_span_id);
274
275        assert_matches!(
276            &events[3],
277            Event::ExitSpan {
278                span_id: exit_span_id,
279                ..
280            },
281            "expected exit span event"
282        );
283        assert_eq!(exit_span_id, new_span_id);
284
285        assert_matches!(
286            &events[4],
287            Event::CloseSpan {
288                span_id: close_span_id,
289                ..
290            },
291            "expected close span event"
292        );
293        assert_eq!(close_span_id, new_span_id);
294    }
295}