auxon_sdk/tracing/common/
layer.rs1use crate::{
2 api::{Nanoseconds, TimelineId},
3 tracing::ingest::{self, WrappedMessage},
4};
5use duplicate::duplicate_item;
6use once_cell::sync::Lazy;
7use std::time::SystemTime;
8use std::{
9 cell::Cell,
10 collections::HashMap,
11 fmt::Debug,
12 num::NonZeroU64,
13 sync::atomic::{AtomicU64, Ordering},
14 sync::Once,
15 thread,
16 thread::LocalKey,
17 time::Instant,
18};
19use tokio::sync::mpsc;
20use tracing_core::{
21 field::Visit,
22 span::{Attributes, Id, Record},
23 Field, Subscriber,
24};
25use tracing_subscriber::{
26 layer::{Context, Layer},
27 registry::LookupSpan,
28};
29
30static START: Lazy<Instant> = Lazy::new(Instant::now);
31static NEXT_SPAN_ID: AtomicU64 = AtomicU64::new(1);
32
33#[derive(Copy, Clone, Debug)]
35pub(crate) struct LocalSpanId(NonZeroU64);
36
37#[allow(dead_code)]
39#[derive(Clone, Debug)]
40pub(crate) struct SpanName(String);
41
42pub(crate) struct LocalMetadata {
43 pub(crate) thread_timeline: TimelineId,
44}
45
46impl LayerCommon for crate::tracing::r#async::ModalityLayer {}
47impl LayerCommon for crate::tracing::blocking::ModalityLayer {}
48
49pub(crate) trait LayerHandler {
50 fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>>;
51 fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>>;
52 fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>>;
53}
54
55trait LayerCommon: LayerHandler {
56 fn handle_message(&self, message: ingest::Message) {
57 self.ensure_timeline_has_been_initialized();
58 let wrapped_message = ingest::WrappedMessage {
59 message,
60 tick: START.elapsed(),
61 nanos_since_unix_epoch: SystemTime::now()
62 .duration_since(std::time::UNIX_EPOCH)
63 .ok()
64 .and_then(|d| {
65 let n: Option<u64> = d.as_nanos().try_into().ok();
66 n.map(Nanoseconds::from)
67 }),
68 timeline: self.local_metadata().with(|m| m.thread_timeline),
69 };
70
71 if let Err(_e) = self.send(wrapped_message) {
72 static WARN_LATCH: Once = Once::new();
73 WARN_LATCH.call_once(|| {
74 eprintln!(
75 "warning: attempted trace after tracing modality has stopped accepting \
76 messages, ensure spans from all threads have closed before calling \
77 `finish()`"
78 );
79 });
80 }
81 }
82
83 fn get_next_span_id(&self) -> LocalSpanId {
84 loop {
85 let id = NEXT_SPAN_ID.fetch_add(1, Ordering::Relaxed);
87 if let Some(id) = NonZeroU64::new(id) {
88 return LocalSpanId(id);
89 }
90 }
91 }
92
93 fn ensure_timeline_has_been_initialized(&self) {
94 if !self.thread_timeline_initialized().with(|i| i.get()) {
95 self.thread_timeline_initialized().with(|i| i.set(true));
96
97 let cur = thread::current();
98 let name = cur
99 .name()
100 .map(Into::into)
101 .unwrap_or_else(|| format!("thread-{:?}", cur.id()));
102
103 let message = ingest::Message::NewTimeline { name };
104 let wrapped_message = ingest::WrappedMessage {
105 message,
106 tick: START.elapsed(),
107 nanos_since_unix_epoch: SystemTime::now()
108 .duration_since(std::time::UNIX_EPOCH)
109 .ok()
110 .and_then(|d| {
111 let n: Option<u64> = d.as_nanos().try_into().ok();
112 n.map(Nanoseconds::from)
113 }),
114 timeline: self.local_metadata().with(|m| m.thread_timeline),
115 };
116
117 let _ = self.send(wrapped_message);
119 }
120 }
121}
122
123fn get_local_span_id<S>(span: &Id, ctx: &Context<'_, S>) -> LocalSpanId
124where
125 S: Subscriber + for<'a> LookupSpan<'a>,
126{
127 *ctx.span(span)
129 .expect("get span tracing just told us about")
130 .extensions()
131 .get()
132 .expect("get `LocalSpanId`, should always exist on spans")
133}
134
135use crate::tracing::blocking::ModalityLayer as BlockingModalityLayer;
136use crate::tracing::r#async::ModalityLayer as AsyncModalityLayer;
137
138#[duplicate_item(layer; [ AsyncModalityLayer ]; [ BlockingModalityLayer ];)]
139impl<S> Layer<S> for layer
140where
141 S: Subscriber + for<'a> LookupSpan<'a>,
142{
143 fn enabled(&self, _metadata: &tracing_core::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
144 true
146 }
147
148 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
149 let local_id = self.get_next_span_id();
150 ctx.span(id).unwrap().extensions_mut().insert(local_id);
151
152 let mut visitor = RecordMapBuilder::new();
153 attrs.record(&mut visitor);
154 let records = visitor.values();
155 let metadata = attrs.metadata();
156
157 let msg = ingest::Message::NewSpan {
158 id: local_id.0,
159 metadata,
160 records,
161 };
162
163 self.handle_message(msg);
164 }
165
166 fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
167 let local_id = get_local_span_id(span, &ctx);
168
169 let mut visitor = RecordMapBuilder::new();
170 values.record(&mut visitor);
171
172 let msg = ingest::Message::Record {
173 span: local_id.0,
174 records: visitor.values(),
175 };
176
177 self.handle_message(msg)
178 }
179
180 fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) {
181 let local_id = get_local_span_id(span, &ctx);
182 let follows_local_id = get_local_span_id(follows, &ctx);
183
184 let msg = ingest::Message::RecordFollowsFrom {
185 span: local_id.0,
186 follows: follows_local_id.0,
187 };
188
189 self.handle_message(msg)
190 }
191
192 fn on_event(&self, event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
193 let mut visitor = RecordMapBuilder::new();
194 event.record(&mut visitor);
195
196 let msg = ingest::Message::Event {
197 metadata: event.metadata(),
198 records: visitor.values(),
199 };
200
201 self.handle_message(msg)
202 }
203
204 fn on_enter(&self, span: &Id, ctx: Context<'_, S>) {
205 let local_id = get_local_span_id(span, &ctx);
206
207 let msg = ingest::Message::Enter { span: local_id.0 };
208
209 self.handle_message(msg)
210 }
211
212 fn on_exit(&self, span: &Id, ctx: Context<'_, S>) {
213 let local_id = get_local_span_id(span, &ctx);
214
215 let msg = ingest::Message::Exit { span: local_id.0 };
216
217 self.handle_message(msg)
218 }
219
220 fn on_id_change(&self, old: &Id, new: &Id, ctx: Context<'_, S>) {
221 let old_local_id = get_local_span_id(old, &ctx);
222 let new_local_id = self.get_next_span_id();
223 ctx.span(new).unwrap().extensions_mut().insert(new_local_id);
224
225 let msg = ingest::Message::IdChange {
226 old: old_local_id.0,
227 new: new_local_id.0,
228 };
229
230 self.handle_message(msg)
231 }
232
233 fn on_close(&self, span: Id, ctx: Context<'_, S>) {
234 let local_id = get_local_span_id(&span, &ctx);
235
236 let msg = ingest::Message::Close { span: local_id.0 };
237
238 self.handle_message(msg)
239 }
240}
241
242#[derive(Debug)]
243pub(crate) enum TracingValue {
244 String(String),
245 F64(f64),
246 I64(i64),
247 U64(u64),
248 Bool(bool),
249}
250
251pub(crate) type RecordMap = HashMap<String, TracingValue>;
252
253struct RecordMapBuilder {
254 record_map: RecordMap,
255}
256
257impl RecordMapBuilder {
258 fn values(self) -> RecordMap {
260 self.record_map
261 }
262}
263
264impl RecordMapBuilder {
265 fn new() -> RecordMapBuilder {
266 RecordMapBuilder {
267 record_map: HashMap::new(),
268 }
269 }
270}
271
272impl Visit for RecordMapBuilder {
273 fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
274 self.record_map.insert(
275 field.name().to_string(),
276 TracingValue::String(format!("{:?}", value)),
277 );
278 }
279
280 fn record_f64(&mut self, field: &Field, value: f64) {
281 self.record_map
282 .insert(field.name().to_string(), TracingValue::F64(value));
283 }
284
285 fn record_i64(&mut self, field: &Field, value: i64) {
286 self.record_map
287 .insert(field.name().to_string(), TracingValue::I64(value));
288 }
289
290 fn record_u64(&mut self, field: &Field, value: u64) {
291 self.record_map
292 .insert(field.name().to_string(), TracingValue::U64(value));
293 }
294
295 fn record_bool(&mut self, field: &Field, value: bool) {
296 self.record_map
297 .insert(field.name().to_string(), TracingValue::Bool(value));
298 }
299
300 fn record_str(&mut self, field: &Field, value: &str) {
301 self.record_map.insert(
302 field.name().to_string(),
303 TracingValue::String(value.to_string()),
304 );
305 }
306}