tracing_lv_core/
tracing_layer.rs

1use alloc::borrow::Cow;
2use chrono::{DateTime, Utc};
3use core::fmt::{Debug, Display, Formatter};
4use core::sync::atomic::AtomicU64;
5use derive_more::From;
6use hashbrown::HashMap;
7use serde::{Deserialize, Serialize};
8use smol_str::{format_smolstr, SmolStr, ToSmolStr};
9use tracing::field::{Field, Visit};
10use tracing::span::{Attributes, Record};
11use tracing::{Id, Metadata, Subscriber};
12use tracing_subscriber::layer::Context;
13use tracing_subscriber::registry::LookupSpan;
14use tracing_subscriber::Layer;
15use uuid::Uuid;
16
17#[derive(Serialize, Deserialize, Default, Debug, Clone)]
18pub struct SpanAttrs(pub HashMap<Cow<'static, str>, TLValue>);
19
20impl Visit for SpanAttrs {
21    fn record_f64(&mut self, field: &Field, value: f64) {
22        self.0.insert(field.name().into(), TLValue::F64(value));
23    }
24    fn record_i64(&mut self, field: &Field, value: i64) {
25        self.0.insert(field.name().into(), TLValue::I64(value));
26    }
27
28    fn record_u64(&mut self, field: &Field, value: u64) {
29        self.0.insert(field.name().into(), TLValue::U64(value));
30    }
31    fn record_bool(&mut self, field: &Field, value: bool) {
32        self.0.insert(field.name().into(), TLValue::Bool(value));
33    }
34    fn record_str(&mut self, field: &Field, value: &str) {
35        self.0.insert(
36            field.name().into(),
37            TLValue::Str(if value.bytes().len() > MAX_RECORD_LEN {
38                format_smolstr!("<string too long. {}>", value.bytes().len())
39            } else {
40                value.replace("\x00", "").to_smolstr()
41            }),
42        );
43    }
44    fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
45        let str = format_smolstr!("{:?}", value);
46        if str.bytes().len() > MAX_RECORD_LEN {
47            self.0.insert(
48                field.name().into(),
49                TLValue::Str(format_smolstr!("<value too long. {}>", str.bytes().len())),
50            );
51        }
52        if str.contains("\x00") {
53            self.0.insert(
54                field.name().into(),
55                TLValue::Str(str.replace("\x00", "").to_smolstr()),
56            );
57        } else {
58            self.0.insert(field.name().into(), TLValue::Str(str));
59        }
60    }
61}
62
63#[derive(Debug, From, Clone, PartialEq, Serialize, Deserialize)]
64pub enum TLValue {
65    F64(f64),
66    I64(i64),
67    U64(u64),
68    Bool(bool),
69    Str(SmolStr),
70}
71
72impl Display for TLValue {
73    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
74        match self {
75            TLValue::F64(n) => write!(f, "{n}"),
76            TLValue::I64(n) => write!(f, "{n}"),
77            TLValue::U64(n) => write!(f, "{n}"),
78            TLValue::Bool(n) => write!(f, "{n}"),
79            TLValue::Str(n) => write!(f, "{n}"),
80        }?;
81        Ok(())
82    }
83}
84
85impl From<&'static str> for TLValue {
86    fn from(value: &'static str) -> Self {
87        TLValue::Str(value.into())
88    }
89}
90
91#[derive(Serialize, Deserialize, Debug, Clone)]
92pub struct VxMetadata {
93    pub name: Cow<'static, str>,
94    pub target: Cow<'static, str>,
95    pub level: Cow<'static, str>,
96    pub module_path: Option<Cow<'static, str>>,
97    pub file: Option<Cow<'static, str>>,
98    pub line: Option<u32>,
99}
100
101impl From<&'static Metadata<'static>> for VxMetadata {
102    fn from(value: &'static Metadata<'static>) -> Self {
103        Self {
104            name: value.name().into(),
105            target: value.target().into(),
106            level: value.level().as_str().into(),
107            module_path: value.module_path().map(|n|n.into()),
108            file: value.file().map(|n|n.into()),
109            line: value.line(),
110        }
111    }
112}
113
114pub const MAX_RECORD_LEN: usize = 1024 * 1024;
115
116#[derive(Serialize, Deserialize, Debug, Clone)]
117pub struct SpanInfo {
118    pub id: u64,
119    pub metadata: VxMetadata,
120}
121
122#[derive(Serialize, Deserialize, Debug, Clone)]
123pub enum TLMsg {
124    SpanCreate {
125        record_index: u64,
126        date: DateTime<Utc>,
127        span: SpanInfo,
128        attributes: SpanAttrs,
129        parent_span: Option<SpanInfo>,
130    },
131    SpanRecordAttr {
132        record_index: u64,
133        date: DateTime<Utc>,
134        span: SpanInfo,
135        attributes: SpanAttrs,
136    },
137    SpanEnter {
138        record_index: u64,
139        date: DateTime<Utc>,
140        span: SpanInfo,
141    },
142    SpanLeave {
143        record_index: u64,
144        date: DateTime<Utc>,
145        span: SpanInfo,
146    },
147    SpanClose {
148        record_index: u64,
149        date: DateTime<Utc>,
150        span: SpanInfo,
151    },
152    Event {
153        record_index: u64,
154        date: DateTime<Utc>,
155        message: SmolStr,
156        metadata: VxMetadata,
157        attributes: SpanAttrs,
158        span: Option<SpanInfo>,
159    },
160}
161
162impl TLMsg {
163    pub fn record_index(&self)->u64 {
164         match self {
165               TLMsg::SpanCreate { record_index, .. } => *record_index,
166               TLMsg::SpanRecordAttr { record_index, .. } => *record_index,
167               TLMsg::SpanEnter { record_index, .. } => *record_index,
168               TLMsg::SpanLeave { record_index, .. } => *record_index,
169               TLMsg::SpanClose { record_index, .. } => *record_index,
170               TLMsg::Event { record_index, .. } => *record_index,
171         }
172    }
173}
174
175pub trait TracingLiveMsgSubscriber: Send + Sync + 'static {
176    fn on_msg(&self, msg: TLMsg);
177}
178
179impl TracingLiveMsgSubscriber for alloc::boxed::Box<dyn TracingLiveMsgSubscriber> {
180    fn on_msg(&self, msg: TLMsg) {
181        (**self).on_msg(msg)
182    }
183}
184
185impl<T, F> TracingLiveMsgSubscriber for (T, F)
186where
187    T: TracingLiveMsgSubscriber,
188    F: Fn(&TLMsg) + Send + Sync + 'static,
189{
190    fn on_msg(&self, msg: TLMsg) {
191        (self.1)(&msg);
192        self.0.on_msg(msg);
193    }
194}
195impl<T, F> TracingLiveMsgSubscriber for (T, Option<F>)
196where
197   T: TracingLiveMsgSubscriber,
198   F: Fn(&TLMsg) + Send + Sync + 'static,
199{
200    fn on_msg(&self, msg: TLMsg) {
201        if let Some(f) = &self.1 {
202            f(&msg);
203        }
204        self.0.on_msg(msg);
205    }
206}
207
208pub struct TLLayer<F> {
209    pub subscriber: F,
210    pub enable_enter: bool,
211    pub record_index: AtomicU64,
212}
213
214impl<S, F> Layer<S> for TLLayer<F>
215where
216    for<'a> S: Subscriber + LookupSpan<'a>,
217    F: TracingLiveMsgSubscriber,
218{
219    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
220        if attrs.metadata().target().starts_with("h2::proto") {
221            return;
222        }
223        let mut attributes = SpanAttrs::default();
224        attrs.record(&mut attributes);
225        let parent_id = attrs
226            .parent()
227            .and_then(|id| _ctx.span(id))
228            .or_else(|| _ctx.lookup_current())
229            .map(|n| n.id().into_u64());
230        let parent = parent_id.map(|n| _ctx.span(&Id::from_u64(n))).flatten();
231        let msg = TLMsg::SpanCreate {
232            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
233            date: now(),
234            span: SpanInfo {
235                id: id.into_u64(),
236                metadata: attrs.metadata().into(),
237            },
238            attributes,
239            parent_span: parent.map(|n| SpanInfo {
240                id: n.id().into_u64(),
241                metadata: n.metadata().into(),
242            }),
243        };
244        self.subscriber.on_msg(msg);
245    }
246
247    fn on_record(&self, _span: &Id, _values: &Record<'_>, _ctx: Context<'_, S>) {
248        let date = now();
249        let _span = _ctx.span(_span).unwrap();
250        if _span.metadata().target().starts_with("h2::proto") {
251            return;
252        }
253        let mut attributes = SpanAttrs::default();
254        _values.record(&mut attributes);
255
256        let msg = TLMsg::SpanRecordAttr {
257            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
258            date,
259            span: SpanInfo {
260                id: _span.id().into_u64(),
261                metadata: _span.metadata().into(),
262            },
263            attributes,
264        };
265
266        self.subscriber.on_msg(msg);
267    }
268    fn on_event(&self, _event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
269        if _event.metadata().target().starts_with("h2::proto") {
270            return;
271        }
272        let date = now();
273        let mut attributes = SpanAttrs::default();
274        _event.record(&mut attributes);
275        let message = attributes.0.remove("message");
276        let msg = TLMsg::Event {
277            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
278            date,
279            message: message.map(|n| n.to_smolstr()).unwrap_or("".into()),
280            metadata: _event.metadata().into(),
281            attributes,
282            span: _ctx.event_span(_event).map(|n| SpanInfo {
283                id: n.id().into_u64(),
284                metadata: n.metadata().into(),
285            }),
286        };
287
288        self.subscriber.on_msg(msg);
289    }
290    fn on_enter(&self, _id: &Id, _ctx: Context<'_, S>) {
291        if !self.enable_enter {
292            return;
293        }
294        let date = now();
295        let _span = _ctx.span(_id).unwrap();
296        if _span.metadata().target().starts_with("h2::proto") {
297            return;
298        }
299        let msg = TLMsg::SpanEnter {
300            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
301            date,
302            span: SpanInfo {
303                id: _span.id().into_u64(),
304                metadata: _span.metadata().into(),
305            },
306        };
307
308        self.subscriber.on_msg(msg);
309    }
310    fn on_exit(&self, _id: &Id, _ctx: Context<'_, S>) {
311        if !self.enable_enter {
312            return;
313        }
314        let date = now();
315        let _span = _ctx.span(_id).unwrap();
316        if _span.metadata().target().starts_with("h2::proto") {
317            return;
318        }
319        let msg = TLMsg::SpanLeave {
320            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
321            date,
322            span: SpanInfo {
323                id: _span.id().into_u64(),
324                metadata: _span.metadata().into(),
325            },
326        };
327
328        self.subscriber.on_msg(msg);
329    }
330    fn on_close(&self, _id: Id, _ctx: Context<'_, S>) {
331        let date = now();
332        let _span = _ctx.span(&_id).unwrap();
333        if _span.metadata().target().starts_with("h2::proto") {
334            return;
335        }
336        let msg = TLMsg::SpanClose {
337            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
338            date,
339            span: SpanInfo {
340                id: _span.id().into_u64(),
341                metadata: _span.metadata().into(),
342            },
343        };
344
345        self.subscriber.on_msg(msg);
346    }
347}
348
349#[derive(Clone, Debug)]
350pub struct TLAppInfo {
351    pub app_id: Uuid,
352    pub app_name: SmolStr,
353    pub app_version: SmolStr,
354    pub node_id: SmolStr,
355    pub static_data: HashMap<SmolStr, TLValue>,
356    pub data: HashMap<SmolStr, TLValue>,
357}
358
359impl TLAppInfo {
360    pub fn new(
361        app_id: impl Into<Uuid>,
362        app_name: impl Into<SmolStr>,
363        app_version: impl Into<SmolStr>,
364    ) -> Self {
365        Self {
366            app_id: app_id.into(),
367            app_name: app_name.into(),
368            app_version: app_version.into(),
369            node_id: "default_node".into(),
370            static_data: Default::default(),
371            data: Default::default(),
372        }
373    }
374
375    pub fn node_id(mut self, node_id: impl Into<SmolStr>) -> Self {
376        self.node_id = node_id.into();
377        self
378    }
379
380    pub fn node_name(self, node_name: impl Into<SmolStr>) -> Self {
381        self.with_data("node_name", node_name.into())
382    }
383
384    pub fn brief(self, brief: impl Into<SmolStr>) -> Self {
385        self.with_data("brief", brief.into())
386    }
387
388    pub fn second_name(self, second_name: impl Into<SmolStr>) -> Self {
389        self.with_data("second_name", second_name.into())
390    }
391
392    pub fn with_data(mut self, name: impl Into<SmolStr>, value: impl Into<TLValue>) -> Self {
393        self.data.insert(name.into(), value.into());
394        self
395    }
396
397    pub fn with_static_data(mut self, name: impl Into<SmolStr>, value: impl Into<TLValue>) -> Self {
398        self.static_data.insert(name.into(), value.into());
399        self
400    }
401}
402
403#[cfg(feature = "std")]
404fn now() -> DateTime<Utc> {
405    Utc::now()
406}
407
408#[cfg(not(feature = "std"))]
409fn now() -> DateTime<Utc> {
410    DateTime::from_timestamp_nanos(0)
411}