auxon_sdk/tracing/common/
layer.rs

1use crate::{
2    api::{Nanoseconds, TimelineId},
3    tracing::ingest::{self, WrappedMessage},
4};
5use duplicate::duplicate_item;
6use once_cell::sync::Lazy;
7use std::time::SystemTime;
8use std::{
9    cell::Cell,
10    collections::HashMap,
11    fmt::Debug,
12    num::NonZeroU64,
13    sync::atomic::{AtomicU64, Ordering},
14    sync::Once,
15    thread,
16    thread::LocalKey,
17    time::Instant,
18};
19use tokio::sync::mpsc;
20use tracing_core::{
21    field::Visit,
22    span::{Attributes, Id, Record},
23    Field, Subscriber,
24};
25use tracing_subscriber::{
26    layer::{Context, Layer},
27    registry::LookupSpan,
28};
29
30static START: Lazy<Instant> = Lazy::new(Instant::now);
31static NEXT_SPAN_ID: AtomicU64 = AtomicU64::new(1);
32
33/// An ID for spans that we can use directly.
34#[derive(Copy, Clone, Debug)]
35pub(crate) struct LocalSpanId(NonZeroU64);
36
37/// A newtype to store the span's name in itself for later use.
38#[allow(dead_code)]
39#[derive(Clone, Debug)]
40pub(crate) struct SpanName(String);
41
42pub(crate) struct LocalMetadata {
43    pub(crate) thread_timeline: TimelineId,
44}
45
46impl LayerCommon for crate::tracing::r#async::ModalityLayer {}
47impl LayerCommon for crate::tracing::blocking::ModalityLayer {}
48
49pub(crate) trait LayerHandler {
50    fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>>;
51    fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>>;
52    fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>>;
53}
54
55trait LayerCommon: LayerHandler {
56    fn handle_message(&self, message: ingest::Message) {
57        self.ensure_timeline_has_been_initialized();
58        let wrapped_message = ingest::WrappedMessage {
59            message,
60            tick: START.elapsed(),
61            nanos_since_unix_epoch: SystemTime::now()
62                .duration_since(std::time::UNIX_EPOCH)
63                .ok()
64                .and_then(|d| {
65                    let n: Option<u64> = d.as_nanos().try_into().ok();
66                    n.map(Nanoseconds::from)
67                }),
68            timeline: self.local_metadata().with(|m| m.thread_timeline),
69        };
70
71        if let Err(_e) = self.send(wrapped_message) {
72            static WARN_LATCH: Once = Once::new();
73            WARN_LATCH.call_once(|| {
74                eprintln!(
75                    "warning: attempted trace after tracing modality has stopped accepting \
76                     messages, ensure spans from all threads have closed before calling \
77                     `finish()`"
78                );
79            });
80        }
81    }
82
83    fn get_next_span_id(&self) -> LocalSpanId {
84        loop {
85            // ordering of IDs doesn't matter, only uniqueness, use relaxed ordering
86            let id = NEXT_SPAN_ID.fetch_add(1, Ordering::Relaxed);
87            if let Some(id) = NonZeroU64::new(id) {
88                return LocalSpanId(id);
89            }
90        }
91    }
92
93    fn ensure_timeline_has_been_initialized(&self) {
94        if !self.thread_timeline_initialized().with(|i| i.get()) {
95            self.thread_timeline_initialized().with(|i| i.set(true));
96
97            let cur = thread::current();
98            let name = cur
99                .name()
100                .map(Into::into)
101                .unwrap_or_else(|| format!("thread-{:?}", cur.id()));
102
103            let message = ingest::Message::NewTimeline { name };
104            let wrapped_message = ingest::WrappedMessage {
105                message,
106                tick: START.elapsed(),
107                nanos_since_unix_epoch: SystemTime::now()
108                    .duration_since(std::time::UNIX_EPOCH)
109                    .ok()
110                    .and_then(|d| {
111                        let n: Option<u64> = d.as_nanos().try_into().ok();
112                        n.map(Nanoseconds::from)
113                    }),
114                timeline: self.local_metadata().with(|m| m.thread_timeline),
115            };
116
117            // ignore failures, exceedingly unlikely here, will get caught in `handle_message`
118            let _ = self.send(wrapped_message);
119        }
120    }
121}
122
123fn get_local_span_id<S>(span: &Id, ctx: &Context<'_, S>) -> LocalSpanId
124where
125    S: Subscriber + for<'a> LookupSpan<'a>,
126{
127    // if either of these fail, it's a bug in `tracing`
128    *ctx.span(span)
129        .expect("get span tracing just told us about")
130        .extensions()
131        .get()
132        .expect("get `LocalSpanId`, should always exist on spans")
133}
134
135use crate::tracing::blocking::ModalityLayer as BlockingModalityLayer;
136use crate::tracing::r#async::ModalityLayer as AsyncModalityLayer;
137
138#[duplicate_item(layer; [ AsyncModalityLayer ]; [ BlockingModalityLayer ];)]
139impl<S> Layer<S> for layer
140where
141    S: Subscriber + for<'a> LookupSpan<'a>,
142{
143    fn enabled(&self, _metadata: &tracing_core::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
144        // always enabled for all levels
145        true
146    }
147
148    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
149        let local_id = self.get_next_span_id();
150        ctx.span(id).unwrap().extensions_mut().insert(local_id);
151
152        let mut visitor = RecordMapBuilder::new();
153        attrs.record(&mut visitor);
154        let records = visitor.values();
155        let metadata = attrs.metadata();
156
157        let msg = ingest::Message::NewSpan {
158            id: local_id.0,
159            metadata,
160            records,
161        };
162
163        self.handle_message(msg);
164    }
165
166    fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
167        let local_id = get_local_span_id(span, &ctx);
168
169        let mut visitor = RecordMapBuilder::new();
170        values.record(&mut visitor);
171
172        let msg = ingest::Message::Record {
173            span: local_id.0,
174            records: visitor.values(),
175        };
176
177        self.handle_message(msg)
178    }
179
180    fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) {
181        let local_id = get_local_span_id(span, &ctx);
182        let follows_local_id = get_local_span_id(follows, &ctx);
183
184        let msg = ingest::Message::RecordFollowsFrom {
185            span: local_id.0,
186            follows: follows_local_id.0,
187        };
188
189        self.handle_message(msg)
190    }
191
192    fn on_event(&self, event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
193        let mut visitor = RecordMapBuilder::new();
194        event.record(&mut visitor);
195
196        let msg = ingest::Message::Event {
197            metadata: event.metadata(),
198            records: visitor.values(),
199        };
200
201        self.handle_message(msg)
202    }
203
204    fn on_enter(&self, span: &Id, ctx: Context<'_, S>) {
205        let local_id = get_local_span_id(span, &ctx);
206
207        let msg = ingest::Message::Enter { span: local_id.0 };
208
209        self.handle_message(msg)
210    }
211
212    fn on_exit(&self, span: &Id, ctx: Context<'_, S>) {
213        let local_id = get_local_span_id(span, &ctx);
214
215        let msg = ingest::Message::Exit { span: local_id.0 };
216
217        self.handle_message(msg)
218    }
219
220    fn on_id_change(&self, old: &Id, new: &Id, ctx: Context<'_, S>) {
221        let old_local_id = get_local_span_id(old, &ctx);
222        let new_local_id = self.get_next_span_id();
223        ctx.span(new).unwrap().extensions_mut().insert(new_local_id);
224
225        let msg = ingest::Message::IdChange {
226            old: old_local_id.0,
227            new: new_local_id.0,
228        };
229
230        self.handle_message(msg)
231    }
232
233    fn on_close(&self, span: Id, ctx: Context<'_, S>) {
234        let local_id = get_local_span_id(&span, &ctx);
235
236        let msg = ingest::Message::Close { span: local_id.0 };
237
238        self.handle_message(msg)
239    }
240}
241
242#[derive(Debug)]
243pub(crate) enum TracingValue {
244    String(String),
245    F64(f64),
246    I64(i64),
247    U64(u64),
248    Bool(bool),
249}
250
251pub(crate) type RecordMap = HashMap<String, TracingValue>;
252
253struct RecordMapBuilder {
254    record_map: RecordMap,
255}
256
257impl RecordMapBuilder {
258    /// Extract the underlying RecordMap.
259    fn values(self) -> RecordMap {
260        self.record_map
261    }
262}
263
264impl RecordMapBuilder {
265    fn new() -> RecordMapBuilder {
266        RecordMapBuilder {
267            record_map: HashMap::new(),
268        }
269    }
270}
271
272impl Visit for RecordMapBuilder {
273    fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
274        self.record_map.insert(
275            field.name().to_string(),
276            TracingValue::String(format!("{:?}", value)),
277        );
278    }
279
280    fn record_f64(&mut self, field: &Field, value: f64) {
281        self.record_map
282            .insert(field.name().to_string(), TracingValue::F64(value));
283    }
284
285    fn record_i64(&mut self, field: &Field, value: i64) {
286        self.record_map
287            .insert(field.name().to_string(), TracingValue::I64(value));
288    }
289
290    fn record_u64(&mut self, field: &Field, value: u64) {
291        self.record_map
292            .insert(field.name().to_string(), TracingValue::U64(value));
293    }
294
295    fn record_bool(&mut self, field: &Field, value: bool) {
296        self.record_map
297            .insert(field.name().to_string(), TracingValue::Bool(value));
298    }
299
300    fn record_str(&mut self, field: &Field, value: &str) {
301        self.record_map.insert(
302            field.name().to_string(),
303            TracingValue::String(value.to_string()),
304        );
305    }
306}