tracing_serde_subscriber/
lib.rs1pub use tracing_serde_modality_ingest::TimelineId;
2pub use tracing_serde_wire::Packet;
3
4use std::{fmt::Debug, thread, thread_local, time::Instant};
5
6use anyhow::Context as _;
7use once_cell::sync::Lazy;
8use parking_lot::RwLock;
9use tokio::runtime::Runtime;
10use tracing_core::{
11 field::Visit,
12 span::{Attributes, Id, Record},
13 Field, Subscriber,
14};
15use tracing_subscriber::{
16 layer::{Context, Layer},
17 prelude::*,
18 registry::{LookupSpan, Registry},
19};
20use uuid::Uuid;
21
22use tracing_serde_modality_ingest::{options::Options, ConnectError, TracingModality};
23use tracing_serde_structured::{AsSerde, CowString, RecordMap, SerializeValue};
24use tracing_serde_wire::TracingWire;
25
26static START: Lazy<Instant> = Lazy::new(Instant::now);
27static GLOBAL_OPTIONS: RwLock<Option<Options>> = RwLock::new(None);
28
29thread_local! {
30 static HANDLER: LocalHandler = const { LocalHandler::new() };
31}
32
33struct LocalHandler(RwLock<Option<Result<TSHandler, ConnectError>>>);
34
35impl LocalHandler {
36 const fn new() -> Self {
37 LocalHandler(RwLock::new(None))
38 }
39
40 fn manual_init(&self, new_handler: TSHandler) {
41 let mut handler = self.0.write();
42 *handler = Some(Ok(new_handler));
43 }
44
45 fn with_read<R, F: FnOnce(&TSHandler) -> R>(&self, f: F) -> Option<R> {
48 let mut handler = self.0.write();
49
50 if handler.is_none() {
51 *handler = Some(TSHandler::new());
52 }
53
54 if let Some(Ok(ref handler)) = *handler {
55 Some(f(handler))
56 } else {
57 None
58 }
59 }
60
61 fn with_write<R, F: FnOnce(&mut TSHandler) -> R>(&self, f: F) -> Option<R> {
64 let mut handler = self.0.write();
65
66 if handler.is_none() {
67 *handler = Some(TSHandler::new());
68 }
69
70 if let Some(Ok(ref mut handler)) = *handler {
71 Some(f(handler))
72 } else {
73 None
74 }
75 }
76}
77
78impl LocalHandler {
79 fn handle_message(&self, msg: TracingWire<'_>) {
80 self.with_write(|h| h.handle_message(msg));
81 }
82
83 fn timeline_id(&self) -> TimelineId {
84 self.with_read(|t| t.tracer.timeline_id())
85 .unwrap_or_else(TimelineId::zero)
86 }
87}
88
89pub fn timeline_id() -> TimelineId {
90 HANDLER.with(|h| h.timeline_id())
91}
92
93pub struct TSHandler {
94 tracer: TracingModality,
95 rt: Runtime,
96}
97
98impl TSHandler {
99 fn new() -> Result<Self, ConnectError> {
100 let mut local_opts = GLOBAL_OPTIONS
101 .read()
102 .as_ref()
103 .context("global options not initialized, but global logger was set to us somehow")?
104 .clone();
105
106 let cur = thread::current();
107 let name = cur
108 .name()
109 .map(str::to_string)
110 .unwrap_or_else(|| format!("Thread#{:?}", cur.id()));
111 local_opts.set_name(name);
112
113 let rt = Runtime::new().context("create local tokio runtime for sdk")?;
114 let tracing_result = {
115 let handle = rt.handle();
116 handle.block_on(async { TracingModality::connect_with_options(local_opts).await })
117 };
118
119 match tracing_result {
120 Ok(tracer) => Ok(TSHandler { rt, tracer }),
121 Err(e) => Err(e),
122 }
123 }
124
125 fn handle_message(&mut self, message: TracingWire<'_>) {
126 let packet = Packet {
127 message,
128 tick: START.elapsed().as_micros() as u64,
130 };
131 self.rt
132 .handle()
133 .block_on(async { self.tracer.handle_packet(packet).await })
134 .unwrap();
135 }
136}
137
138pub struct TSSubscriber {
139 _no_external_construct: (),
140}
141
142impl TSSubscriber {
143 #[allow(clippy::new_ret_no_self)]
144 pub fn new() -> impl Subscriber {
146 Self::new_with_options(Default::default())
147 }
148
149 pub fn new_with_options(opts: Options) -> impl Subscriber {
150 Registry::default().with(TSLayer::new_with_options(opts))
151 }
152
153 pub fn connect() -> Result<(), ConnectError> {
155 let first_local_handler = TSHandler::new()?;
156 HANDLER.with(|h| h.manual_init(first_local_handler));
157 Ok(())
158 }
159}
160
161pub struct TSLayer {
162 _no_external_construct: (),
163}
164
165impl TSLayer {
166 pub fn new() -> Self {
167 Self::new_with_options(Default::default())
168 }
169
170 pub fn new_with_options(mut opts: Options) -> Self {
171 let run_id = Uuid::new_v4();
172 opts.add_metadata("run_id", run_id.to_string());
173
174 {
175 let mut global_opts = GLOBAL_OPTIONS.write();
176 *global_opts = Some(opts);
177 }
178
179 TSLayer {
180 _no_external_construct: (),
181 }
182 }
183
184 pub fn connect(&self) -> Result<(), ConnectError> {
188 let first_local_handler = TSHandler::new()?;
189 HANDLER.with(|h| h.manual_init(first_local_handler));
190 Ok(())
191 }
192
193 pub fn connect_or_panic(&self) {
195 if let Err(e) = self.connect() {
196 panic!("Cannot connect to to modality: {e}")
197 }
198 }
199}
200
201impl Default for TSLayer {
202 fn default() -> Self {
203 Self::new()
204 }
205}
206
207impl<S> Layer<S> for TSLayer
208where
209 S: Subscriber + for<'a> LookupSpan<'a>,
210{
211 fn enabled(&self, _metadata: &tracing_core::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
212 true
214 }
215
216 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
217 let mut visitor = RecordMapBuilder::new();
218
219 attrs.record(&mut visitor);
220
221 let msg = TracingWire::NewSpan {
222 id: id.as_serde(),
223 attrs: attrs.as_serde(),
224 values: visitor.values().into(),
225 };
226
227 HANDLER.with(move |h| h.handle_message(msg));
228 }
229
230 fn on_record(&self, span: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
231 let msg = TracingWire::Record {
232 span: span.as_serde(),
233 values: values.as_serde().to_owned(),
234 };
235
236 HANDLER.with(move |h| h.handle_message(msg));
237 }
238
239 fn on_follows_from(&self, span: &Id, follows: &Id, _ctx: Context<'_, S>) {
240 let msg = TracingWire::RecordFollowsFrom {
241 span: span.as_serde(),
242 follows: follows.as_serde().to_owned(),
243 };
244
245 HANDLER.with(move |h| h.handle_message(msg));
246 }
247
248 fn on_event(&self, event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
249 let msg = TracingWire::Event(event.as_serde().to_owned());
250
251 HANDLER.with(move |h| h.handle_message(msg));
252 }
253
254 fn on_enter(&self, span: &Id, _ctx: Context<'_, S>) {
255 let msg = TracingWire::Enter(span.as_serde());
256
257 HANDLER.with(move |h| h.handle_message(msg));
258 }
259
260 fn on_exit(&self, span: &Id, _ctx: Context<'_, S>) {
261 let msg = TracingWire::Exit(span.as_serde());
262
263 HANDLER.with(move |h| h.handle_message(msg));
264 }
265
266 fn on_id_change(&self, old: &Id, new: &Id, _ctx: Context<'_, S>) {
267 let msg = TracingWire::IdClone {
268 old: old.as_serde(),
269 new: new.as_serde(),
270 };
271
272 HANDLER.with(move |h| h.handle_message(msg));
273 }
274
275 fn on_close(&self, span: Id, _ctx: Context<'_, S>) {
276 let msg = TracingWire::Close(span.as_serde());
277
278 HANDLER.with(move |h| h.handle_message(msg));
279 }
280}
281
282struct RecordMapBuilder<'a> {
283 record_map: RecordMap<'a>,
284}
285
286impl<'a> RecordMapBuilder<'a> {
287 fn values(self) -> RecordMap<'a> {
288 self.record_map
289 }
290}
291
292impl<'a> RecordMapBuilder<'a> {
293 fn new() -> RecordMapBuilder<'a> {
294 RecordMapBuilder {
295 record_map: RecordMap::new(),
296 }
297 }
298}
299
300impl<'a> Visit for RecordMapBuilder<'a> {
301 fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
302 self.record_map.insert(
303 CowString::Borrowed(field.name()),
304 SerializeValue::Debug(CowString::Owned(format!("{:?}", value)).into()),
305 );
306 }
307
308 fn record_f64(&mut self, field: &Field, value: f64) {
309 self.record_map.insert(
310 CowString::Borrowed(field.name()),
311 SerializeValue::F64(value),
312 );
313 }
314
315 fn record_i64(&mut self, field: &Field, value: i64) {
316 self.record_map.insert(
317 CowString::Borrowed(field.name()),
318 SerializeValue::I64(value),
319 );
320 }
321
322 fn record_u64(&mut self, field: &Field, value: u64) {
323 self.record_map.insert(
324 CowString::Borrowed(field.name()),
325 SerializeValue::U64(value),
326 );
327 }
328
329 fn record_bool(&mut self, field: &Field, value: bool) {
330 self.record_map.insert(
331 CowString::Borrowed(field.name()),
332 SerializeValue::Bool(value),
333 );
334 }
335
336 fn record_str(&mut self, field: &Field, value: &str) {
337 self.record_map.insert(
338 CowString::Borrowed(field.name()),
339 SerializeValue::Str(CowString::Borrowed(value).to_owned()),
340 );
341 }
342}