1use alloc::borrow::Cow;
2use chrono::{DateTime, Utc};
3use core::fmt::{Debug, Display, Formatter};
4use core::sync::atomic::AtomicU64;
5use derive_more::From;
6use hashbrown::HashMap;
7use serde::{Deserialize, Serialize};
8use smol_str::{format_smolstr, SmolStr, ToSmolStr};
9use tracing::field::{Field, Visit};
10use tracing::span::{Attributes, Record};
11use tracing::{Id, Metadata, Subscriber};
12use tracing_subscriber::layer::Context;
13use tracing_subscriber::registry::LookupSpan;
14use tracing_subscriber::Layer;
15use uuid::Uuid;
16
17#[derive(Serialize, Deserialize, Default, Debug, Clone)]
18pub struct SpanAttrs(pub HashMap<Cow<'static, str>, TLValue>);
19
20impl Visit for SpanAttrs {
21 fn record_f64(&mut self, field: &Field, value: f64) {
22 self.0.insert(field.name().into(), TLValue::F64(value));
23 }
24 fn record_i64(&mut self, field: &Field, value: i64) {
25 self.0.insert(field.name().into(), TLValue::I64(value));
26 }
27
28 fn record_u64(&mut self, field: &Field, value: u64) {
29 self.0.insert(field.name().into(), TLValue::U64(value));
30 }
31 fn record_bool(&mut self, field: &Field, value: bool) {
32 self.0.insert(field.name().into(), TLValue::Bool(value));
33 }
34 fn record_str(&mut self, field: &Field, value: &str) {
35 self.0.insert(
36 field.name().into(),
37 TLValue::Str(if value.bytes().len() > MAX_RECORD_LEN {
38 format_smolstr!("<string too long. {}>", value.bytes().len())
39 } else {
40 value.replace("\x00", "").to_smolstr()
41 }),
42 );
43 }
44 fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
45 let str = format_smolstr!("{:?}", value);
46 if str.bytes().len() > MAX_RECORD_LEN {
47 self.0.insert(
48 field.name().into(),
49 TLValue::Str(format_smolstr!("<value too long. {}>", str.bytes().len())),
50 );
51 }
52 if str.contains("\x00") {
53 self.0.insert(
54 field.name().into(),
55 TLValue::Str(str.replace("\x00", "").to_smolstr()),
56 );
57 } else {
58 self.0.insert(field.name().into(), TLValue::Str(str));
59 }
60 }
61}
62
63#[derive(Debug, From, Clone, PartialEq, Serialize, Deserialize)]
64pub enum TLValue {
65 F64(f64),
66 I64(i64),
67 U64(u64),
68 Bool(bool),
69 Str(SmolStr),
70}
71
72impl Display for TLValue {
73 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
74 match self {
75 TLValue::F64(n) => write!(f, "{n}"),
76 TLValue::I64(n) => write!(f, "{n}"),
77 TLValue::U64(n) => write!(f, "{n}"),
78 TLValue::Bool(n) => write!(f, "{n}"),
79 TLValue::Str(n) => write!(f, "{n}"),
80 }?;
81 Ok(())
82 }
83}
84
85impl From<&'static str> for TLValue {
86 fn from(value: &'static str) -> Self {
87 TLValue::Str(value.into())
88 }
89}
90
91#[derive(Serialize, Deserialize, Debug, Clone)]
92pub struct VxMetadata {
93 pub name: Cow<'static, str>,
94 pub target: Cow<'static, str>,
95 pub level: Cow<'static, str>,
96 pub module_path: Option<Cow<'static, str>>,
97 pub file: Option<Cow<'static, str>>,
98 pub line: Option<u32>,
99}
100
101impl From<&'static Metadata<'static>> for VxMetadata {
102 fn from(value: &'static Metadata<'static>) -> Self {
103 Self {
104 name: value.name().into(),
105 target: value.target().into(),
106 level: value.level().as_str().into(),
107 module_path: value.module_path().map(|n|n.into()),
108 file: value.file().map(|n|n.into()),
109 line: value.line(),
110 }
111 }
112}
113
114pub const MAX_RECORD_LEN: usize = 1024 * 1024;
115
116#[derive(Serialize, Deserialize, Debug, Clone)]
117pub struct SpanInfo {
118 pub id: u64,
119 pub metadata: VxMetadata,
120}
121
122#[derive(Serialize, Deserialize, Debug, Clone)]
123pub enum TLMsg {
124 SpanCreate {
125 record_index: u64,
126 date: DateTime<Utc>,
127 span: SpanInfo,
128 attributes: SpanAttrs,
129 parent_span: Option<SpanInfo>,
130 },
131 SpanRecordAttr {
132 record_index: u64,
133 date: DateTime<Utc>,
134 span: SpanInfo,
135 attributes: SpanAttrs,
136 },
137 SpanEnter {
138 record_index: u64,
139 date: DateTime<Utc>,
140 span: SpanInfo,
141 },
142 SpanLeave {
143 record_index: u64,
144 date: DateTime<Utc>,
145 span: SpanInfo,
146 },
147 SpanClose {
148 record_index: u64,
149 date: DateTime<Utc>,
150 span: SpanInfo,
151 },
152 Event {
153 record_index: u64,
154 date: DateTime<Utc>,
155 message: SmolStr,
156 metadata: VxMetadata,
157 attributes: SpanAttrs,
158 span: Option<SpanInfo>,
159 },
160}
161
162impl TLMsg {
163 pub fn record_index(&self)->u64 {
164 match self {
165 TLMsg::SpanCreate { record_index, .. } => *record_index,
166 TLMsg::SpanRecordAttr { record_index, .. } => *record_index,
167 TLMsg::SpanEnter { record_index, .. } => *record_index,
168 TLMsg::SpanLeave { record_index, .. } => *record_index,
169 TLMsg::SpanClose { record_index, .. } => *record_index,
170 TLMsg::Event { record_index, .. } => *record_index,
171 }
172 }
173}
174
175pub trait TracingLiveMsgSubscriber: Send + Sync + 'static {
176 fn on_msg(&self, msg: TLMsg);
177}
178
179impl TracingLiveMsgSubscriber for alloc::boxed::Box<dyn TracingLiveMsgSubscriber> {
180 fn on_msg(&self, msg: TLMsg) {
181 (**self).on_msg(msg)
182 }
183}
184
185impl<T, F> TracingLiveMsgSubscriber for (T, F)
186where
187 T: TracingLiveMsgSubscriber,
188 F: Fn(&TLMsg) + Send + Sync + 'static,
189{
190 fn on_msg(&self, msg: TLMsg) {
191 (self.1)(&msg);
192 self.0.on_msg(msg);
193 }
194}
195impl<T, F> TracingLiveMsgSubscriber for (T, Option<F>)
196where
197 T: TracingLiveMsgSubscriber,
198 F: Fn(&TLMsg) + Send + Sync + 'static,
199{
200 fn on_msg(&self, msg: TLMsg) {
201 if let Some(f) = &self.1 {
202 f(&msg);
203 }
204 self.0.on_msg(msg);
205 }
206}
207
208pub struct TLLayer<F> {
209 pub subscriber: F,
210 pub enable_enter: bool,
211 pub record_index: AtomicU64,
212}
213
214impl<S, F> Layer<S> for TLLayer<F>
215where
216 for<'a> S: Subscriber + LookupSpan<'a>,
217 F: TracingLiveMsgSubscriber,
218{
219 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
220 if attrs.metadata().target().starts_with("h2::proto") {
221 return;
222 }
223 let mut attributes = SpanAttrs::default();
224 attrs.record(&mut attributes);
225 let parent_id = attrs
226 .parent()
227 .and_then(|id| _ctx.span(id))
228 .or_else(|| _ctx.lookup_current())
229 .map(|n| n.id().into_u64());
230 let parent = parent_id.map(|n| _ctx.span(&Id::from_u64(n))).flatten();
231 let msg = TLMsg::SpanCreate {
232 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
233 date: now(),
234 span: SpanInfo {
235 id: id.into_u64(),
236 metadata: attrs.metadata().into(),
237 },
238 attributes,
239 parent_span: parent.map(|n| SpanInfo {
240 id: n.id().into_u64(),
241 metadata: n.metadata().into(),
242 }),
243 };
244 self.subscriber.on_msg(msg);
245 }
246
247 fn on_record(&self, _span: &Id, _values: &Record<'_>, _ctx: Context<'_, S>) {
248 let date = now();
249 let _span = _ctx.span(_span).unwrap();
250 if _span.metadata().target().starts_with("h2::proto") {
251 return;
252 }
253 let mut attributes = SpanAttrs::default();
254 _values.record(&mut attributes);
255
256 let msg = TLMsg::SpanRecordAttr {
257 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
258 date,
259 span: SpanInfo {
260 id: _span.id().into_u64(),
261 metadata: _span.metadata().into(),
262 },
263 attributes,
264 };
265
266 self.subscriber.on_msg(msg);
267 }
268 fn on_event(&self, _event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
269 if _event.metadata().target().starts_with("h2::proto") {
270 return;
271 }
272 let date = now();
273 let mut attributes = SpanAttrs::default();
274 _event.record(&mut attributes);
275 let message = attributes.0.remove("message");
276 let msg = TLMsg::Event {
277 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
278 date,
279 message: message.map(|n| n.to_smolstr()).unwrap_or("".into()),
280 metadata: _event.metadata().into(),
281 attributes,
282 span: _ctx.event_span(_event).map(|n| SpanInfo {
283 id: n.id().into_u64(),
284 metadata: n.metadata().into(),
285 }),
286 };
287
288 self.subscriber.on_msg(msg);
289 }
290 fn on_enter(&self, _id: &Id, _ctx: Context<'_, S>) {
291 if !self.enable_enter {
292 return;
293 }
294 let date = now();
295 let _span = _ctx.span(_id).unwrap();
296 if _span.metadata().target().starts_with("h2::proto") {
297 return;
298 }
299 let msg = TLMsg::SpanEnter {
300 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
301 date,
302 span: SpanInfo {
303 id: _span.id().into_u64(),
304 metadata: _span.metadata().into(),
305 },
306 };
307
308 self.subscriber.on_msg(msg);
309 }
310 fn on_exit(&self, _id: &Id, _ctx: Context<'_, S>) {
311 if !self.enable_enter {
312 return;
313 }
314 let date = now();
315 let _span = _ctx.span(_id).unwrap();
316 if _span.metadata().target().starts_with("h2::proto") {
317 return;
318 }
319 let msg = TLMsg::SpanLeave {
320 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
321 date,
322 span: SpanInfo {
323 id: _span.id().into_u64(),
324 metadata: _span.metadata().into(),
325 },
326 };
327
328 self.subscriber.on_msg(msg);
329 }
330 fn on_close(&self, _id: Id, _ctx: Context<'_, S>) {
331 let date = now();
332 let _span = _ctx.span(&_id).unwrap();
333 if _span.metadata().target().starts_with("h2::proto") {
334 return;
335 }
336 let msg = TLMsg::SpanClose {
337 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
338 date,
339 span: SpanInfo {
340 id: _span.id().into_u64(),
341 metadata: _span.metadata().into(),
342 },
343 };
344
345 self.subscriber.on_msg(msg);
346 }
347}
348
349#[derive(Clone, Debug)]
350pub struct TLAppInfo {
351 pub app_id: Uuid,
352 pub app_name: SmolStr,
353 pub app_version: SmolStr,
354 pub node_id: SmolStr,
355 pub static_data: HashMap<SmolStr, TLValue>,
356 pub data: HashMap<SmolStr, TLValue>,
357}
358
359impl TLAppInfo {
360 pub fn new(
361 app_id: impl Into<Uuid>,
362 app_name: impl Into<SmolStr>,
363 app_version: impl Into<SmolStr>,
364 ) -> Self {
365 Self {
366 app_id: app_id.into(),
367 app_name: app_name.into(),
368 app_version: app_version.into(),
369 node_id: "default_node".into(),
370 static_data: Default::default(),
371 data: Default::default(),
372 }
373 }
374
375 pub fn node_id(mut self, node_id: impl Into<SmolStr>) -> Self {
376 self.node_id = node_id.into();
377 self
378 }
379
380 pub fn node_name(self, node_name: impl Into<SmolStr>) -> Self {
381 self.with_data("node_name", node_name.into())
382 }
383
384 pub fn brief(self, brief: impl Into<SmolStr>) -> Self {
385 self.with_data("brief", brief.into())
386 }
387
388 pub fn second_name(self, second_name: impl Into<SmolStr>) -> Self {
389 self.with_data("second_name", second_name.into())
390 }
391
392 pub fn with_data(mut self, name: impl Into<SmolStr>, value: impl Into<TLValue>) -> Self {
393 self.data.insert(name.into(), value.into());
394 self
395 }
396
397 pub fn with_static_data(mut self, name: impl Into<SmolStr>, value: impl Into<TLValue>) -> Self {
398 self.static_data.insert(name.into(), value.into());
399 self
400 }
401}
402
403#[cfg(feature = "std")]
404fn now() -> DateTime<Utc> {
405 Utc::now()
406}
407
408#[cfg(not(feature = "std"))]
409fn now() -> DateTime<Utc> {
410 DateTime::from_timestamp_nanos(0)
411}