1use alloc::borrow::Cow;
2use chrono::{DateTime, Utc};
3use core::fmt::{Debug, Display, Formatter};
4use portable_atomic::AtomicU64;
5use derive_more::From;
6use hashbrown::HashMap;
7use serde::{Deserialize, Serialize};
8use smol_str::{format_smolstr, SmolStr, ToSmolStr};
9use tracing::field::{Field, Visit};
10use tracing::span::{Attributes, Record};
11use tracing::{Id, Metadata, Subscriber};
12use tracing_subscriber::layer::Context;
13use tracing_subscriber::registry::LookupSpan;
14use tracing_subscriber::Layer;
15use uuid::Uuid;
16use alloc::string::String;
17#[derive(Serialize, Deserialize, Default, Debug, Clone)]
18pub struct SpanAttrs(pub HashMap<Cow<'static, str>, TLValue>);
19
20impl Visit for SpanAttrs {
21 fn record_f64(&mut self, field: &Field, value: f64) {
22 self.0.insert(field.name().into(), TLValue::F64(value));
23 }
24 fn record_i64(&mut self, field: &Field, value: i64) {
25 self.0.insert(field.name().into(), TLValue::I64(value));
26 }
27
28 fn record_u64(&mut self, field: &Field, value: u64) {
29 self.0.insert(field.name().into(), TLValue::U64(value));
30 }
31 fn record_bool(&mut self, field: &Field, value: bool) {
32 self.0.insert(field.name().into(), TLValue::Bool(value));
33 }
34 fn record_str(&mut self, field: &Field, value: &str) {
35 self.0.insert(
36 field.name().into(),
37 TLValue::Str(if value.bytes().len() > MAX_RECORD_LEN {
38 format_smolstr!("<string too long. {}>", value.bytes().len())
39 } else {
40 value.replace("\x00", "").to_smolstr()
41 }),
42 );
43 }
44 fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
45 let str = format_smolstr!("{:?}", value);
46 if str.bytes().len() > MAX_RECORD_LEN {
47 self.0.insert(
48 field.name().into(),
49 TLValue::Str(format_smolstr!("<value too long. {}>", str.bytes().len())),
50 );
51 }
52 if str.contains("\x00") {
53 self.0.insert(
54 field.name().into(),
55 TLValue::Str(str.replace("\x00", "").to_smolstr()),
56 );
57 } else {
58 self.0.insert(field.name().into(), TLValue::Str(str));
59 }
60 }
61}
62
63#[derive(Debug, From, Clone, PartialEq, Serialize, Deserialize)]
64pub enum TLValue {
65 F64(f64),
66 I64(i64),
67 U64(u64),
68 Bool(bool),
69 Str(SmolStr),
70}
71
72impl Display for TLValue {
73 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
74 match self {
75 TLValue::F64(n) => write!(f, "{n}"),
76 TLValue::I64(n) => write!(f, "{n}"),
77 TLValue::U64(n) => write!(f, "{n}"),
78 TLValue::Bool(n) => write!(f, "{n}"),
79 TLValue::Str(n) => write!(f, "{n}"),
80 }?;
81 Ok(())
82 }
83}
84
85impl From<&'static str> for TLValue {
86 fn from(value: &'static str) -> Self {
87 TLValue::Str(value.into())
88 }
89}
90
91impl From<String> for TLValue {
92 fn from(value: String) -> Self {
93 TLValue::Str(value.into())
94 }
95}
96
97#[derive(Serialize, Deserialize, Debug, Clone)]
98pub struct VxMetadata {
99 pub name: Cow<'static, str>,
100 pub target: Cow<'static, str>,
101 pub level: Cow<'static, str>,
102 pub module_path: Option<Cow<'static, str>>,
103 pub file: Option<Cow<'static, str>>,
104 pub line: Option<u32>,
105}
106
107impl From<&'static Metadata<'static>> for VxMetadata {
108 fn from(value: &'static Metadata<'static>) -> Self {
109 Self {
110 name: value.name().into(),
111 target: value.target().into(),
112 level: value.level().as_str().into(),
113 module_path: value.module_path().map(|n|n.into()),
114 file: value.file().map(|n|n.into()),
115 line: value.line(),
116 }
117 }
118}
119
120pub const MAX_RECORD_LEN: usize = 1024 * 1024;
121
122#[derive(Serialize, Deserialize, Debug, Clone)]
123pub struct SpanInfo {
124 pub id: u64,
125 pub metadata: VxMetadata,
126}
127
128#[derive(Serialize, Deserialize, Debug, Clone)]
129pub enum TLMsg {
130 SpanCreate {
131 record_index: u64,
132 date: DateTime<Utc>,
133 span: SpanInfo,
134 attributes: SpanAttrs,
135 parent_span: Option<SpanInfo>,
136 },
137 SpanRecordAttr {
138 record_index: u64,
139 date: DateTime<Utc>,
140 span: SpanInfo,
141 attributes: SpanAttrs,
142 },
143 SpanEnter {
144 record_index: u64,
145 date: DateTime<Utc>,
146 span: SpanInfo,
147 },
148 SpanLeave {
149 record_index: u64,
150 date: DateTime<Utc>,
151 span: SpanInfo,
152 },
153 SpanClose {
154 record_index: u64,
155 date: DateTime<Utc>,
156 span: SpanInfo,
157 },
158 Event {
159 record_index: u64,
160 date: DateTime<Utc>,
161 message: SmolStr,
162 metadata: VxMetadata,
163 attributes: SpanAttrs,
164 span: Option<SpanInfo>,
165 },
166}
167
168impl TLMsg {
169 pub fn record_index(&self)->u64 {
170 match self {
171 TLMsg::SpanCreate { record_index, .. } => *record_index,
172 TLMsg::SpanRecordAttr { record_index, .. } => *record_index,
173 TLMsg::SpanEnter { record_index, .. } => *record_index,
174 TLMsg::SpanLeave { record_index, .. } => *record_index,
175 TLMsg::SpanClose { record_index, .. } => *record_index,
176 TLMsg::Event { record_index, .. } => *record_index,
177 }
178 }
179}
180
181pub trait TracingLiveMsgSubscriber: Send + Sync + 'static {
182 fn on_msg(&self, msg: TLMsg);
183}
184
185impl TracingLiveMsgSubscriber for alloc::boxed::Box<dyn TracingLiveMsgSubscriber> {
186 fn on_msg(&self, msg: TLMsg) {
187 (**self).on_msg(msg)
188 }
189}
190
191impl<T, F> TracingLiveMsgSubscriber for (T, F)
192where
193 T: TracingLiveMsgSubscriber,
194 F: Fn(&TLMsg) + Send + Sync + 'static,
195{
196 fn on_msg(&self, msg: TLMsg) {
197 (self.1)(&msg);
198 self.0.on_msg(msg);
199 }
200}
201impl<T, F> TracingLiveMsgSubscriber for (T, Option<F>)
202where
203 T: TracingLiveMsgSubscriber,
204 F: Fn(&TLMsg) + Send + Sync + 'static,
205{
206 fn on_msg(&self, msg: TLMsg) {
207 if let Some(f) = &self.1 {
208 f(&msg);
209 }
210 self.0.on_msg(msg);
211 }
212}
213
214pub struct TLLayer<F> {
215 pub subscriber: F,
216 pub enable_enter: bool,
217 pub record_index: AtomicU64,
218}
219
220impl<S, F> Layer<S> for TLLayer<F>
221where
222 for<'a> S: Subscriber + LookupSpan<'a>,
223 F: TracingLiveMsgSubscriber,
224{
225 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
226 if attrs.metadata().target().starts_with("h2::proto") {
227 return;
228 }
229 let mut attributes = SpanAttrs::default();
230 attrs.record(&mut attributes);
231 let parent_id = attrs
232 .parent()
233 .and_then(|id| _ctx.span(id))
234 .or_else(|| _ctx.lookup_current())
235 .map(|n| n.id().into_u64());
236 let parent = parent_id.map(|n| _ctx.span(&Id::from_u64(n))).flatten();
237 let msg = TLMsg::SpanCreate {
238 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
239 date: now(),
240 span: SpanInfo {
241 id: id.into_u64(),
242 metadata: attrs.metadata().into(),
243 },
244 attributes,
245 parent_span: parent.map(|n| SpanInfo {
246 id: n.id().into_u64(),
247 metadata: n.metadata().into(),
248 }),
249 };
250 self.subscriber.on_msg(msg);
251 }
252
253 fn on_record(&self, _span: &Id, _values: &Record<'_>, _ctx: Context<'_, S>) {
254 let date = now();
255 let _span = _ctx.span(_span).unwrap();
256 if _span.metadata().target().starts_with("h2::proto") {
257 return;
258 }
259 let mut attributes = SpanAttrs::default();
260 _values.record(&mut attributes);
261
262 let msg = TLMsg::SpanRecordAttr {
263 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
264 date,
265 span: SpanInfo {
266 id: _span.id().into_u64(),
267 metadata: _span.metadata().into(),
268 },
269 attributes,
270 };
271
272 self.subscriber.on_msg(msg);
273 }
274 fn on_event(&self, _event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
275 if _event.metadata().target().starts_with("h2::proto") {
276 return;
277 }
278 let date = now();
279 let mut attributes = SpanAttrs::default();
280 _event.record(&mut attributes);
281 let message = attributes.0.remove("message");
282 let msg = TLMsg::Event {
283 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
284 date,
285 message: message.map(|n| n.to_smolstr()).unwrap_or("".into()),
286 metadata: _event.metadata().into(),
287 attributes,
288 span: _ctx.event_span(_event).map(|n| SpanInfo {
289 id: n.id().into_u64(),
290 metadata: n.metadata().into(),
291 }),
292 };
293
294 self.subscriber.on_msg(msg);
295 }
296 fn on_enter(&self, _id: &Id, _ctx: Context<'_, S>) {
297 if !self.enable_enter {
298 return;
299 }
300 let date = now();
301 let _span = _ctx.span(_id).unwrap();
302 if _span.metadata().target().starts_with("h2::proto") {
303 return;
304 }
305 let msg = TLMsg::SpanEnter {
306 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
307 date,
308 span: SpanInfo {
309 id: _span.id().into_u64(),
310 metadata: _span.metadata().into(),
311 },
312 };
313
314 self.subscriber.on_msg(msg);
315 }
316 fn on_exit(&self, _id: &Id, _ctx: Context<'_, S>) {
317 if !self.enable_enter {
318 return;
319 }
320 let date = now();
321 let _span = _ctx.span(_id).unwrap();
322 if _span.metadata().target().starts_with("h2::proto") {
323 return;
324 }
325 let msg = TLMsg::SpanLeave {
326 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
327 date,
328 span: SpanInfo {
329 id: _span.id().into_u64(),
330 metadata: _span.metadata().into(),
331 },
332 };
333
334 self.subscriber.on_msg(msg);
335 }
336 fn on_close(&self, _id: Id, _ctx: Context<'_, S>) {
337 let date = now();
338 let _span = _ctx.span(&_id).unwrap();
339 if _span.metadata().target().starts_with("h2::proto") {
340 return;
341 }
342 let msg = TLMsg::SpanClose {
343 record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
344 date,
345 span: SpanInfo {
346 id: _span.id().into_u64(),
347 metadata: _span.metadata().into(),
348 },
349 };
350
351 self.subscriber.on_msg(msg);
352 }
353}
354
355#[derive(Clone, Debug)]
356pub struct TLAppInfo {
357 pub app_id: Uuid,
358 pub app_name: SmolStr,
359 pub app_version: SmolStr,
360 pub node_id: SmolStr,
361 pub static_data: HashMap<SmolStr, TLValue>,
362 pub data: HashMap<SmolStr, TLValue>,
363}
364
365impl TLAppInfo {
366 pub fn new(
367 app_id: impl Into<Uuid>,
368 app_name: impl Into<SmolStr>,
369 app_version: impl Into<SmolStr>,
370 ) -> Self {
371 Self {
372 app_id: app_id.into(),
373 app_name: app_name.into(),
374 app_version: app_version.into(),
375 node_id: "default_node".into(),
376 static_data: Default::default(),
377 data: Default::default(),
378 }
379 }
380
381 pub fn node_id(mut self, node_id: impl Into<SmolStr>) -> Self {
382 self.node_id = node_id.into();
383 self
384 }
385
386 pub fn node_name(self, node_name: impl Into<SmolStr>) -> Self {
387 self.with_data("node_name", node_name.into())
388 }
389
390 pub fn brief(self, brief: impl Into<SmolStr>) -> Self {
391 self.with_data("brief", brief.into())
392 }
393
394 pub fn second_name(self, second_name: impl Into<SmolStr>) -> Self {
395 self.with_data("second_name", second_name.into())
396 }
397
398 pub fn with_data(mut self, name: impl Into<SmolStr>, value: impl Into<TLValue>) -> Self {
399 self.data.insert(name.into(), value.into());
400 self
401 }
402
403 pub fn with_static_data(mut self, name: impl Into<SmolStr>, value: impl Into<TLValue>) -> Self {
404 self.static_data.insert(name.into(), value.into());
405 self
406 }
407}
408
409#[cfg(feature = "std")]
410fn now() -> DateTime<Utc> {
411 Utc::now()
412}
413
414#[cfg(not(feature = "std"))]
415fn now() -> DateTime<Utc> {
416 DateTime::from_timestamp_nanos(0)
417}