tracing_modality/common/
layer.rs

1use crate::ingest::TimelineId;
2
3use crate::ingest;
4use crate::ingest::WrappedMessage;
5
6use auxon_sdk::api::Nanoseconds;
7use duplicate::duplicate_item;
8use once_cell::sync::Lazy;
9use std::time::SystemTime;
10use std::{
11    cell::Cell,
12    collections::HashMap,
13    fmt::Debug,
14    num::NonZeroU64,
15    sync::atomic::{AtomicU64, Ordering},
16    sync::Once,
17    thread,
18    thread::LocalKey,
19    time::Instant,
20};
21use tokio::sync::mpsc;
22use tracing_core::{
23    field::Visit,
24    span::{Attributes, Id, Record},
25    Field, Subscriber,
26};
27use tracing_subscriber::{
28    layer::{Context, Layer},
29    registry::LookupSpan,
30};
31
32static START: Lazy<Instant> = Lazy::new(Instant::now);
33static NEXT_SPAN_ID: AtomicU64 = AtomicU64::new(1);
34
35/// An ID for spans that we can use directly.
36#[derive(Copy, Clone, Debug)]
37pub(crate) struct LocalSpanId(NonZeroU64);
38
39/// A newtype to store the span's name in itself for later use.
40#[allow(dead_code)]
41#[derive(Clone, Debug)]
42pub(crate) struct SpanName(String);
43
44pub(crate) struct LocalMetadata {
45    pub(crate) thread_timeline: TimelineId,
46}
47
48#[cfg(feature = "async")]
49impl LayerCommon for crate::r#async::ModalityLayer {}
50#[cfg(feature = "blocking")]
51impl LayerCommon for crate::blocking::ModalityLayer {}
52
53pub(crate) trait LayerHandler {
54    fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>>;
55    fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>>;
56    fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>>;
57}
58
59trait LayerCommon: LayerHandler {
60    fn handle_message(&self, message: ingest::Message) {
61        self.ensure_timeline_has_been_initialized();
62        let wrapped_message = ingest::WrappedMessage {
63            message,
64            tick: START.elapsed(),
65            nanos_since_unix_epoch: SystemTime::now()
66                .duration_since(std::time::UNIX_EPOCH)
67                .ok()
68                .and_then(|d| {
69                    let n: Option<u64> = d.as_nanos().try_into().ok();
70                    n.map(Nanoseconds::from)
71                }),
72            timeline: self.local_metadata().with(|m| m.thread_timeline),
73        };
74
75        if let Err(_e) = self.send(wrapped_message) {
76            static WARN_LATCH: Once = Once::new();
77            WARN_LATCH.call_once(|| {
78                eprintln!(
79                    "warning: attempted trace after tracing modality has stopped accepting \
80                     messages, ensure spans from all threads have closed before calling \
81                     `finish()`"
82                );
83            });
84        }
85    }
86
87    fn get_next_span_id(&self) -> LocalSpanId {
88        loop {
89            // ordering of IDs doesn't matter, only uniqueness, use relaxed ordering
90            let id = NEXT_SPAN_ID.fetch_add(1, Ordering::Relaxed);
91            if let Some(id) = NonZeroU64::new(id) {
92                return LocalSpanId(id);
93            }
94        }
95    }
96
97    fn ensure_timeline_has_been_initialized(&self) {
98        if !self.thread_timeline_initialized().with(|i| i.get()) {
99            self.thread_timeline_initialized().with(|i| i.set(true));
100
101            let cur = thread::current();
102            let name = cur
103                .name()
104                .map(Into::into)
105                .unwrap_or_else(|| format!("thread-{:?}", cur.id()));
106
107            let message = ingest::Message::NewTimeline { name };
108            let wrapped_message = ingest::WrappedMessage {
109                message,
110                tick: START.elapsed(),
111                nanos_since_unix_epoch: SystemTime::now()
112                    .duration_since(std::time::UNIX_EPOCH)
113                    .ok()
114                    .and_then(|d| {
115                        let n: Option<u64> = d.as_nanos().try_into().ok();
116                        n.map(Nanoseconds::from)
117                    }),
118                timeline: self.local_metadata().with(|m| m.thread_timeline),
119            };
120
121            // ignore failures, exceedingly unlikely here, will get caught in `handle_message`
122            let _ = self.send(wrapped_message);
123        }
124    }
125}
126
127fn get_local_span_id<S>(span: &Id, ctx: &Context<'_, S>) -> LocalSpanId
128where
129    S: Subscriber + for<'a> LookupSpan<'a>,
130{
131    // if either of these fail, it's a bug in `tracing`
132    *ctx.span(span)
133        .expect("get span tracing just told us about")
134        .extensions()
135        .get()
136        .expect("get `LocalSpanId`, should always exist on spans")
137}
138
139#[cfg(feature = "blocking")]
140use crate::blocking::ModalityLayer as BlockingModalityLayer;
141#[cfg(feature = "async")]
142use crate::r#async::ModalityLayer as AsyncModalityLayer;
143
144#[cfg_attr(all(feature = "async", feature = "blocking"),
145  duplicate_item(layer; [ AsyncModalityLayer ]; [ BlockingModalityLayer ];))]
146#[cfg_attr(all(feature = "async", not(feature = "blocking")),
147  duplicate_item(layer; [ AsyncModalityLayer ];))]
148#[cfg_attr(all(not(feature = "async"), feature = "blocking"),
149  duplicate_item(layer; [ BlockingModalityLayer ];))]
150impl<S> Layer<S> for layer
151where
152    S: Subscriber + for<'a> LookupSpan<'a>,
153{
154    fn enabled(&self, _metadata: &tracing_core::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
155        // always enabled for all levels
156        true
157    }
158
159    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
160        let local_id = self.get_next_span_id();
161        ctx.span(id).unwrap().extensions_mut().insert(local_id);
162
163        let mut visitor = RecordMapBuilder::new();
164        attrs.record(&mut visitor);
165        let records = visitor.values();
166        let metadata = attrs.metadata();
167
168        let msg = ingest::Message::NewSpan {
169            id: local_id.0,
170            metadata,
171            records,
172        };
173
174        self.handle_message(msg);
175    }
176
177    fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
178        let local_id = get_local_span_id(span, &ctx);
179
180        let mut visitor = RecordMapBuilder::new();
181        values.record(&mut visitor);
182
183        let msg = ingest::Message::Record {
184            span: local_id.0,
185            records: visitor.values(),
186        };
187
188        self.handle_message(msg)
189    }
190
191    fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) {
192        let local_id = get_local_span_id(span, &ctx);
193        let follows_local_id = get_local_span_id(follows, &ctx);
194
195        let msg = ingest::Message::RecordFollowsFrom {
196            span: local_id.0,
197            follows: follows_local_id.0,
198        };
199
200        self.handle_message(msg)
201    }
202
203    fn on_event(&self, event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
204        let mut visitor = RecordMapBuilder::new();
205        event.record(&mut visitor);
206
207        let msg = ingest::Message::Event {
208            metadata: event.metadata(),
209            records: visitor.values(),
210        };
211
212        self.handle_message(msg)
213    }
214
215    fn on_enter(&self, span: &Id, ctx: Context<'_, S>) {
216        let local_id = get_local_span_id(span, &ctx);
217
218        let msg = ingest::Message::Enter { span: local_id.0 };
219
220        self.handle_message(msg)
221    }
222
223    fn on_exit(&self, span: &Id, ctx: Context<'_, S>) {
224        let local_id = get_local_span_id(span, &ctx);
225
226        let msg = ingest::Message::Exit { span: local_id.0 };
227
228        self.handle_message(msg)
229    }
230
231    fn on_id_change(&self, old: &Id, new: &Id, ctx: Context<'_, S>) {
232        let old_local_id = get_local_span_id(old, &ctx);
233        let new_local_id = self.get_next_span_id();
234        ctx.span(new).unwrap().extensions_mut().insert(new_local_id);
235
236        let msg = ingest::Message::IdChange {
237            old: old_local_id.0,
238            new: new_local_id.0,
239        };
240
241        self.handle_message(msg)
242    }
243
244    fn on_close(&self, span: Id, ctx: Context<'_, S>) {
245        let local_id = get_local_span_id(&span, &ctx);
246
247        let msg = ingest::Message::Close { span: local_id.0 };
248
249        self.handle_message(msg)
250    }
251}
252
253#[derive(Debug)]
254pub(crate) enum TracingValue {
255    String(String),
256    F64(f64),
257    I64(i64),
258    U64(u64),
259    Bool(bool),
260}
261
262pub(crate) type RecordMap = HashMap<String, TracingValue>;
263
264struct RecordMapBuilder {
265    record_map: RecordMap,
266}
267
268impl RecordMapBuilder {
269    /// Extract the underlying RecordMap.
270    fn values(self) -> RecordMap {
271        self.record_map
272    }
273}
274
275impl RecordMapBuilder {
276    fn new() -> RecordMapBuilder {
277        RecordMapBuilder {
278            record_map: HashMap::new(),
279        }
280    }
281}
282
283impl Visit for RecordMapBuilder {
284    fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
285        self.record_map.insert(
286            field.name().to_string(),
287            TracingValue::String(format!("{:?}", value)),
288        );
289    }
290
291    fn record_f64(&mut self, field: &Field, value: f64) {
292        self.record_map
293            .insert(field.name().to_string(), TracingValue::F64(value));
294    }
295
296    fn record_i64(&mut self, field: &Field, value: i64) {
297        self.record_map
298            .insert(field.name().to_string(), TracingValue::I64(value));
299    }
300
301    fn record_u64(&mut self, field: &Field, value: u64) {
302        self.record_map
303            .insert(field.name().to_string(), TracingValue::U64(value));
304    }
305
306    fn record_bool(&mut self, field: &Field, value: bool) {
307        self.record_map
308            .insert(field.name().to_string(), TracingValue::Bool(value));
309    }
310
311    fn record_str(&mut self, field: &Field, value: &str) {
312        self.record_map.insert(
313            field.name().to_string(),
314            TracingValue::String(value.to_string()),
315        );
316    }
317}