tracing_serde_subscriber/
lib.rs

1pub use tracing_serde_modality_ingest::TimelineId;
2pub use tracing_serde_wire::Packet;
3
4use std::{fmt::Debug, thread, thread_local, time::Instant};
5
6use anyhow::Context as _;
7use once_cell::sync::Lazy;
8use parking_lot::RwLock;
9use tokio::runtime::Runtime;
10use tracing_core::{
11    field::Visit,
12    span::{Attributes, Id, Record},
13    Field, Subscriber,
14};
15use tracing_subscriber::{
16    layer::{Context, Layer},
17    prelude::*,
18    registry::{LookupSpan, Registry},
19};
20use uuid::Uuid;
21
22use tracing_serde_modality_ingest::{options::Options, ConnectError, TracingModality};
23use tracing_serde_structured::{AsSerde, CowString, RecordMap, SerializeValue};
24use tracing_serde_wire::TracingWire;
25
26static START: Lazy<Instant> = Lazy::new(Instant::now);
27static GLOBAL_OPTIONS: RwLock<Option<Options>> = RwLock::new(None);
28
29thread_local! {
30    static HANDLER: LocalHandler = const { LocalHandler::new() };
31}
32
33struct LocalHandler(RwLock<Option<Result<TSHandler, ConnectError>>>);
34
35impl LocalHandler {
36    const fn new() -> Self {
37        LocalHandler(RwLock::new(None))
38    }
39
40    fn manual_init(&self, new_handler: TSHandler) {
41        let mut handler = self.0.write();
42        *handler = Some(Ok(new_handler));
43    }
44
45    // ensures handler has been initialized, then runs the provided function on it if it has been
46    // successfully initialized, otherwise does nothing
47    fn with_read<R, F: FnOnce(&TSHandler) -> R>(&self, f: F) -> Option<R> {
48        let mut handler = self.0.write();
49
50        if handler.is_none() {
51            *handler = Some(TSHandler::new());
52        }
53
54        if let Some(Ok(ref handler)) = *handler {
55            Some(f(handler))
56        } else {
57            None
58        }
59    }
60
61    // ensures handler has been initialized, then runs the provided function on it if it has been
62    // successfully initialized, otherwise does nothing
63    fn with_write<R, F: FnOnce(&mut TSHandler) -> R>(&self, f: F) -> Option<R> {
64        let mut handler = self.0.write();
65
66        if handler.is_none() {
67            *handler = Some(TSHandler::new());
68        }
69
70        if let Some(Ok(ref mut handler)) = *handler {
71            Some(f(handler))
72        } else {
73            None
74        }
75    }
76}
77
78impl LocalHandler {
79    fn handle_message(&self, msg: TracingWire<'_>) {
80        self.with_write(|h| h.handle_message(msg));
81    }
82
83    fn timeline_id(&self) -> TimelineId {
84        self.with_read(|t| t.tracer.timeline_id())
85            .unwrap_or_else(TimelineId::zero)
86    }
87}
88
89pub fn timeline_id() -> TimelineId {
90    HANDLER.with(|h| h.timeline_id())
91}
92
93pub struct TSHandler {
94    tracer: TracingModality,
95    rt: Runtime,
96}
97
98impl TSHandler {
99    fn new() -> Result<Self, ConnectError> {
100        let mut local_opts = GLOBAL_OPTIONS
101            .read()
102            .as_ref()
103            .context("global options not initialized, but global logger was set to us somehow")?
104            .clone();
105
106        let cur = thread::current();
107        let name = cur
108            .name()
109            .map(str::to_string)
110            .unwrap_or_else(|| format!("Thread#{:?}", cur.id()));
111        local_opts.set_name(name);
112
113        let rt = Runtime::new().context("create local tokio runtime for sdk")?;
114        let tracing_result = {
115            let handle = rt.handle();
116            handle.block_on(async { TracingModality::connect_with_options(local_opts).await })
117        };
118
119        match tracing_result {
120            Ok(tracer) => Ok(TSHandler { rt, tracer }),
121            Err(e) => Err(e),
122        }
123    }
124
125    fn handle_message(&mut self, message: TracingWire<'_>) {
126        let packet = Packet {
127            message,
128            // NOTE: will give inaccurate data if the program has run for more than 584942 years.
129            tick: START.elapsed().as_micros() as u64,
130        };
131        self.rt
132            .handle()
133            .block_on(async { self.tracer.handle_packet(packet).await })
134            .unwrap();
135    }
136}
137
138pub struct TSSubscriber {
139    _no_external_construct: (),
140}
141
142impl TSSubscriber {
143    #[allow(clippy::new_ret_no_self)]
144    // this doesn't technically build a `Self`, but that's the way people should think of it
145    pub fn new() -> impl Subscriber {
146        Self::new_with_options(Default::default())
147    }
148
149    pub fn new_with_options(opts: Options) -> impl Subscriber {
150        Registry::default().with(TSLayer::new_with_options(opts))
151    }
152
153    // another bit of type-based lies, this doesn't take an &self because we never build a Self
154    pub fn connect() -> Result<(), ConnectError> {
155        let first_local_handler = TSHandler::new()?;
156        HANDLER.with(|h| h.manual_init(first_local_handler));
157        Ok(())
158    }
159}
160
161pub struct TSLayer {
162    _no_external_construct: (),
163}
164
165impl TSLayer {
166    pub fn new() -> Self {
167        Self::new_with_options(Default::default())
168    }
169
170    pub fn new_with_options(mut opts: Options) -> Self {
171        let run_id = Uuid::new_v4();
172        opts.add_metadata("run_id", run_id.to_string());
173
174        {
175            let mut global_opts = GLOBAL_OPTIONS.write();
176            *global_opts = Some(opts);
177        }
178
179        TSLayer {
180            _no_external_construct: (),
181        }
182    }
183
184    /// This is an optional step that allows you to handle connection errors at initialization
185    /// time. If this is not called it will be called implicitly during the handling of the first
186    /// trace event.
187    pub fn connect(&self) -> Result<(), ConnectError> {
188        let first_local_handler = TSHandler::new()?;
189        HANDLER.with(|h| h.manual_init(first_local_handler));
190        Ok(())
191    }
192
193    /// Try to connect, and panic if that's not possible.
194    pub fn connect_or_panic(&self) {
195        if let Err(e) = self.connect() {
196            panic!("Cannot connect to to modality: {e}")
197        }
198    }
199}
200
201impl Default for TSLayer {
202    fn default() -> Self {
203        Self::new()
204    }
205}
206
207impl<S> Layer<S> for TSLayer
208where
209    S: Subscriber + for<'a> LookupSpan<'a>,
210{
211    fn enabled(&self, _metadata: &tracing_core::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
212        // always enabled for all levels
213        true
214    }
215
216    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
217        let mut visitor = RecordMapBuilder::new();
218
219        attrs.record(&mut visitor);
220
221        let msg = TracingWire::NewSpan {
222            id: id.as_serde(),
223            attrs: attrs.as_serde(),
224            values: visitor.values().into(),
225        };
226
227        HANDLER.with(move |h| h.handle_message(msg));
228    }
229
230    fn on_record(&self, span: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
231        let msg = TracingWire::Record {
232            span: span.as_serde(),
233            values: values.as_serde().to_owned(),
234        };
235
236        HANDLER.with(move |h| h.handle_message(msg));
237    }
238
239    fn on_follows_from(&self, span: &Id, follows: &Id, _ctx: Context<'_, S>) {
240        let msg = TracingWire::RecordFollowsFrom {
241            span: span.as_serde(),
242            follows: follows.as_serde().to_owned(),
243        };
244
245        HANDLER.with(move |h| h.handle_message(msg));
246    }
247
248    fn on_event(&self, event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
249        let msg = TracingWire::Event(event.as_serde().to_owned());
250
251        HANDLER.with(move |h| h.handle_message(msg));
252    }
253
254    fn on_enter(&self, span: &Id, _ctx: Context<'_, S>) {
255        let msg = TracingWire::Enter(span.as_serde());
256
257        HANDLER.with(move |h| h.handle_message(msg));
258    }
259
260    fn on_exit(&self, span: &Id, _ctx: Context<'_, S>) {
261        let msg = TracingWire::Exit(span.as_serde());
262
263        HANDLER.with(move |h| h.handle_message(msg));
264    }
265
266    fn on_id_change(&self, old: &Id, new: &Id, _ctx: Context<'_, S>) {
267        let msg = TracingWire::IdClone {
268            old: old.as_serde(),
269            new: new.as_serde(),
270        };
271
272        HANDLER.with(move |h| h.handle_message(msg));
273    }
274
275    fn on_close(&self, span: Id, _ctx: Context<'_, S>) {
276        let msg = TracingWire::Close(span.as_serde());
277
278        HANDLER.with(move |h| h.handle_message(msg));
279    }
280}
281
282struct RecordMapBuilder<'a> {
283    record_map: RecordMap<'a>,
284}
285
286impl<'a> RecordMapBuilder<'a> {
287    fn values(self) -> RecordMap<'a> {
288        self.record_map
289    }
290}
291
292impl<'a> RecordMapBuilder<'a> {
293    fn new() -> RecordMapBuilder<'a> {
294        RecordMapBuilder {
295            record_map: RecordMap::new(),
296        }
297    }
298}
299
300impl<'a> Visit for RecordMapBuilder<'a> {
301    fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
302        self.record_map.insert(
303            CowString::Borrowed(field.name()),
304            SerializeValue::Debug(CowString::Owned(format!("{:?}", value)).into()),
305        );
306    }
307
308    fn record_f64(&mut self, field: &Field, value: f64) {
309        self.record_map.insert(
310            CowString::Borrowed(field.name()),
311            SerializeValue::F64(value),
312        );
313    }
314
315    fn record_i64(&mut self, field: &Field, value: i64) {
316        self.record_map.insert(
317            CowString::Borrowed(field.name()),
318            SerializeValue::I64(value),
319        );
320    }
321
322    fn record_u64(&mut self, field: &Field, value: u64) {
323        self.record_map.insert(
324            CowString::Borrowed(field.name()),
325            SerializeValue::U64(value),
326        );
327    }
328
329    fn record_bool(&mut self, field: &Field, value: bool) {
330        self.record_map.insert(
331            CowString::Borrowed(field.name()),
332            SerializeValue::Bool(value),
333        );
334    }
335
336    fn record_str(&mut self, field: &Field, value: &str) {
337        self.record_map.insert(
338            CowString::Borrowed(field.name()),
339            SerializeValue::Str(CowString::Borrowed(value).to_owned()),
340        );
341    }
342}