1use crate::ingest::TimelineId;
2
3use crate::ingest;
4use crate::ingest::WrappedMessage;
5
6use auxon_sdk::api::Nanoseconds;
7use duplicate::duplicate_item;
8use once_cell::sync::Lazy;
9use std::time::SystemTime;
10use std::{
11 cell::Cell,
12 collections::HashMap,
13 fmt::Debug,
14 num::NonZeroU64,
15 sync::atomic::{AtomicU64, Ordering},
16 sync::Once,
17 thread,
18 thread::LocalKey,
19 time::Instant,
20};
21use tokio::sync::mpsc;
22use tracing_core::{
23 field::Visit,
24 span::{Attributes, Id, Record},
25 Field, Subscriber,
26};
27use tracing_subscriber::{
28 layer::{Context, Layer},
29 registry::LookupSpan,
30};
31
32static START: Lazy<Instant> = Lazy::new(Instant::now);
33static NEXT_SPAN_ID: AtomicU64 = AtomicU64::new(1);
34
35#[derive(Copy, Clone, Debug)]
37pub(crate) struct LocalSpanId(NonZeroU64);
38
39#[allow(dead_code)]
41#[derive(Clone, Debug)]
42pub(crate) struct SpanName(String);
43
44pub(crate) struct LocalMetadata {
45 pub(crate) thread_timeline: TimelineId,
46}
47
48#[cfg(feature = "async")]
49impl LayerCommon for crate::r#async::ModalityLayer {}
50#[cfg(feature = "blocking")]
51impl LayerCommon for crate::blocking::ModalityLayer {}
52
53pub(crate) trait LayerHandler {
54 fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>>;
55 fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>>;
56 fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>>;
57}
58
59trait LayerCommon: LayerHandler {
60 fn handle_message(&self, message: ingest::Message) {
61 self.ensure_timeline_has_been_initialized();
62 let wrapped_message = ingest::WrappedMessage {
63 message,
64 tick: START.elapsed(),
65 nanos_since_unix_epoch: SystemTime::now()
66 .duration_since(std::time::UNIX_EPOCH)
67 .ok()
68 .and_then(|d| {
69 let n: Option<u64> = d.as_nanos().try_into().ok();
70 n.map(Nanoseconds::from)
71 }),
72 timeline: self.local_metadata().with(|m| m.thread_timeline),
73 };
74
75 if let Err(_e) = self.send(wrapped_message) {
76 static WARN_LATCH: Once = Once::new();
77 WARN_LATCH.call_once(|| {
78 eprintln!(
79 "warning: attempted trace after tracing modality has stopped accepting \
80 messages, ensure spans from all threads have closed before calling \
81 `finish()`"
82 );
83 });
84 }
85 }
86
87 fn get_next_span_id(&self) -> LocalSpanId {
88 loop {
89 let id = NEXT_SPAN_ID.fetch_add(1, Ordering::Relaxed);
91 if let Some(id) = NonZeroU64::new(id) {
92 return LocalSpanId(id);
93 }
94 }
95 }
96
97 fn ensure_timeline_has_been_initialized(&self) {
98 if !self.thread_timeline_initialized().with(|i| i.get()) {
99 self.thread_timeline_initialized().with(|i| i.set(true));
100
101 let cur = thread::current();
102 let name = cur
103 .name()
104 .map(Into::into)
105 .unwrap_or_else(|| format!("thread-{:?}", cur.id()));
106
107 let message = ingest::Message::NewTimeline { name };
108 let wrapped_message = ingest::WrappedMessage {
109 message,
110 tick: START.elapsed(),
111 nanos_since_unix_epoch: SystemTime::now()
112 .duration_since(std::time::UNIX_EPOCH)
113 .ok()
114 .and_then(|d| {
115 let n: Option<u64> = d.as_nanos().try_into().ok();
116 n.map(Nanoseconds::from)
117 }),
118 timeline: self.local_metadata().with(|m| m.thread_timeline),
119 };
120
121 let _ = self.send(wrapped_message);
123 }
124 }
125}
126
127fn get_local_span_id<S>(span: &Id, ctx: &Context<'_, S>) -> LocalSpanId
128where
129 S: Subscriber + for<'a> LookupSpan<'a>,
130{
131 *ctx.span(span)
133 .expect("get span tracing just told us about")
134 .extensions()
135 .get()
136 .expect("get `LocalSpanId`, should always exist on spans")
137}
138
139#[cfg(feature = "blocking")]
140use crate::blocking::ModalityLayer as BlockingModalityLayer;
141#[cfg(feature = "async")]
142use crate::r#async::ModalityLayer as AsyncModalityLayer;
143
144#[cfg_attr(all(feature = "async", feature = "blocking"),
145 duplicate_item(layer; [ AsyncModalityLayer ]; [ BlockingModalityLayer ];))]
146#[cfg_attr(all(feature = "async", not(feature = "blocking")),
147 duplicate_item(layer; [ AsyncModalityLayer ];))]
148#[cfg_attr(all(not(feature = "async"), feature = "blocking"),
149 duplicate_item(layer; [ BlockingModalityLayer ];))]
150impl<S> Layer<S> for layer
151where
152 S: Subscriber + for<'a> LookupSpan<'a>,
153{
154 fn enabled(&self, _metadata: &tracing_core::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
155 true
157 }
158
159 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
160 let local_id = self.get_next_span_id();
161 ctx.span(id).unwrap().extensions_mut().insert(local_id);
162
163 let mut visitor = RecordMapBuilder::new();
164 attrs.record(&mut visitor);
165 let records = visitor.values();
166 let metadata = attrs.metadata();
167
168 let msg = ingest::Message::NewSpan {
169 id: local_id.0,
170 metadata,
171 records,
172 };
173
174 self.handle_message(msg);
175 }
176
177 fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
178 let local_id = get_local_span_id(span, &ctx);
179
180 let mut visitor = RecordMapBuilder::new();
181 values.record(&mut visitor);
182
183 let msg = ingest::Message::Record {
184 span: local_id.0,
185 records: visitor.values(),
186 };
187
188 self.handle_message(msg)
189 }
190
191 fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) {
192 let local_id = get_local_span_id(span, &ctx);
193 let follows_local_id = get_local_span_id(follows, &ctx);
194
195 let msg = ingest::Message::RecordFollowsFrom {
196 span: local_id.0,
197 follows: follows_local_id.0,
198 };
199
200 self.handle_message(msg)
201 }
202
203 fn on_event(&self, event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
204 let mut visitor = RecordMapBuilder::new();
205 event.record(&mut visitor);
206
207 let msg = ingest::Message::Event {
208 metadata: event.metadata(),
209 records: visitor.values(),
210 };
211
212 self.handle_message(msg)
213 }
214
215 fn on_enter(&self, span: &Id, ctx: Context<'_, S>) {
216 let local_id = get_local_span_id(span, &ctx);
217
218 let msg = ingest::Message::Enter { span: local_id.0 };
219
220 self.handle_message(msg)
221 }
222
223 fn on_exit(&self, span: &Id, ctx: Context<'_, S>) {
224 let local_id = get_local_span_id(span, &ctx);
225
226 let msg = ingest::Message::Exit { span: local_id.0 };
227
228 self.handle_message(msg)
229 }
230
231 fn on_id_change(&self, old: &Id, new: &Id, ctx: Context<'_, S>) {
232 let old_local_id = get_local_span_id(old, &ctx);
233 let new_local_id = self.get_next_span_id();
234 ctx.span(new).unwrap().extensions_mut().insert(new_local_id);
235
236 let msg = ingest::Message::IdChange {
237 old: old_local_id.0,
238 new: new_local_id.0,
239 };
240
241 self.handle_message(msg)
242 }
243
244 fn on_close(&self, span: Id, ctx: Context<'_, S>) {
245 let local_id = get_local_span_id(&span, &ctx);
246
247 let msg = ingest::Message::Close { span: local_id.0 };
248
249 self.handle_message(msg)
250 }
251}
252
253#[derive(Debug)]
254pub(crate) enum TracingValue {
255 String(String),
256 F64(f64),
257 I64(i64),
258 U64(u64),
259 Bool(bool),
260}
261
262pub(crate) type RecordMap = HashMap<String, TracingValue>;
263
264struct RecordMapBuilder {
265 record_map: RecordMap,
266}
267
268impl RecordMapBuilder {
269 fn values(self) -> RecordMap {
271 self.record_map
272 }
273}
274
275impl RecordMapBuilder {
276 fn new() -> RecordMapBuilder {
277 RecordMapBuilder {
278 record_map: HashMap::new(),
279 }
280 }
281}
282
283impl Visit for RecordMapBuilder {
284 fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
285 self.record_map.insert(
286 field.name().to_string(),
287 TracingValue::String(format!("{:?}", value)),
288 );
289 }
290
291 fn record_f64(&mut self, field: &Field, value: f64) {
292 self.record_map
293 .insert(field.name().to_string(), TracingValue::F64(value));
294 }
295
296 fn record_i64(&mut self, field: &Field, value: i64) {
297 self.record_map
298 .insert(field.name().to_string(), TracingValue::I64(value));
299 }
300
301 fn record_u64(&mut self, field: &Field, value: u64) {
302 self.record_map
303 .insert(field.name().to_string(), TracingValue::U64(value));
304 }
305
306 fn record_bool(&mut self, field: &Field, value: bool) {
307 self.record_map
308 .insert(field.name().to_string(), TracingValue::Bool(value));
309 }
310
311 fn record_str(&mut self, field: &Field, value: &str) {
312 self.record_map.insert(
313 field.name().to_string(),
314 TracingValue::String(value.to_string()),
315 );
316 }
317}