tracing_lv_core/
tracing_layer.rs

1use alloc::borrow::Cow;
2use chrono::{DateTime, Utc};
3use core::fmt::{Debug, Display, Formatter};
4use portable_atomic::AtomicU64;
5use derive_more::From;
6use hashbrown::HashMap;
7use serde::{Deserialize, Serialize};
8use smol_str::{format_smolstr, SmolStr, ToSmolStr};
9use tracing::field::{Field, Visit};
10use tracing::span::{Attributes, Record};
11use tracing::{Id, Metadata, Subscriber};
12use tracing_subscriber::layer::Context;
13use tracing_subscriber::registry::LookupSpan;
14use tracing_subscriber::Layer;
15use uuid::Uuid;
16use alloc::string::String;
17#[derive(Serialize, Deserialize, Default, Debug, Clone)]
18pub struct SpanAttrs(pub HashMap<Cow<'static, str>, TLValue>);
19
20impl Visit for SpanAttrs {
21    fn record_f64(&mut self, field: &Field, value: f64) {
22        self.0.insert(field.name().into(), TLValue::F64(value));
23    }
24    fn record_i64(&mut self, field: &Field, value: i64) {
25        self.0.insert(field.name().into(), TLValue::I64(value));
26    }
27
28    fn record_u64(&mut self, field: &Field, value: u64) {
29        self.0.insert(field.name().into(), TLValue::U64(value));
30    }
31    fn record_bool(&mut self, field: &Field, value: bool) {
32        self.0.insert(field.name().into(), TLValue::Bool(value));
33    }
34    fn record_str(&mut self, field: &Field, value: &str) {
35        self.0.insert(
36            field.name().into(),
37            TLValue::Str(if value.bytes().len() > MAX_RECORD_LEN {
38                format_smolstr!("<string too long. {}>", value.bytes().len())
39            } else {
40                value.replace("\x00", "").to_smolstr()
41            }),
42        );
43    }
44    fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
45        let str = format_smolstr!("{:?}", value);
46        if str.bytes().len() > MAX_RECORD_LEN {
47            self.0.insert(
48                field.name().into(),
49                TLValue::Str(format_smolstr!("<value too long. {}>", str.bytes().len())),
50            );
51        }
52        if str.contains("\x00") {
53            self.0.insert(
54                field.name().into(),
55                TLValue::Str(str.replace("\x00", "").to_smolstr()),
56            );
57        } else {
58            self.0.insert(field.name().into(), TLValue::Str(str));
59        }
60    }
61}
62
63#[derive(Debug, From, Clone, PartialEq, Serialize, Deserialize)]
64pub enum TLValue {
65    F64(f64),
66    I64(i64),
67    U64(u64),
68    Bool(bool),
69    Str(SmolStr),
70}
71
72impl Display for TLValue {
73    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
74        match self {
75            TLValue::F64(n) => write!(f, "{n}"),
76            TLValue::I64(n) => write!(f, "{n}"),
77            TLValue::U64(n) => write!(f, "{n}"),
78            TLValue::Bool(n) => write!(f, "{n}"),
79            TLValue::Str(n) => write!(f, "{n}"),
80        }?;
81        Ok(())
82    }
83}
84
85impl From<&'static str> for TLValue {
86    fn from(value: &'static str) -> Self {
87        TLValue::Str(value.into())
88    }
89}
90
91impl From<String> for TLValue {
92    fn from(value: String) -> Self {
93        TLValue::Str(value.into())
94    }
95}
96
97#[derive(Serialize, Deserialize, Debug, Clone)]
98pub struct VxMetadata {
99    pub name: Cow<'static, str>,
100    pub target: Cow<'static, str>,
101    pub level: Cow<'static, str>,
102    pub module_path: Option<Cow<'static, str>>,
103    pub file: Option<Cow<'static, str>>,
104    pub line: Option<u32>,
105}
106
107impl From<&'static Metadata<'static>> for VxMetadata {
108    fn from(value: &'static Metadata<'static>) -> Self {
109        Self {
110            name: value.name().into(),
111            target: value.target().into(),
112            level: value.level().as_str().into(),
113            module_path: value.module_path().map(|n|n.into()),
114            file: value.file().map(|n|n.into()),
115            line: value.line(),
116        }
117    }
118}
119
120pub const MAX_RECORD_LEN: usize = 1024 * 1024;
121
122#[derive(Serialize, Deserialize, Debug, Clone)]
123pub struct SpanInfo {
124    pub id: u64,
125    pub metadata: VxMetadata,
126}
127
128#[derive(Serialize, Deserialize, Debug, Clone)]
129pub enum TLMsg {
130    SpanCreate {
131        record_index: u64,
132        date: DateTime<Utc>,
133        span: SpanInfo,
134        attributes: SpanAttrs,
135        parent_span: Option<SpanInfo>,
136    },
137    SpanRecordAttr {
138        record_index: u64,
139        date: DateTime<Utc>,
140        span: SpanInfo,
141        attributes: SpanAttrs,
142    },
143    SpanEnter {
144        record_index: u64,
145        date: DateTime<Utc>,
146        span: SpanInfo,
147    },
148    SpanLeave {
149        record_index: u64,
150        date: DateTime<Utc>,
151        span: SpanInfo,
152    },
153    SpanClose {
154        record_index: u64,
155        date: DateTime<Utc>,
156        span: SpanInfo,
157    },
158    Event {
159        record_index: u64,
160        date: DateTime<Utc>,
161        message: SmolStr,
162        metadata: VxMetadata,
163        attributes: SpanAttrs,
164        span: Option<SpanInfo>,
165    },
166}
167
168impl TLMsg {
169    pub fn record_index(&self)->u64 {
170         match self {
171               TLMsg::SpanCreate { record_index, .. } => *record_index,
172               TLMsg::SpanRecordAttr { record_index, .. } => *record_index,
173               TLMsg::SpanEnter { record_index, .. } => *record_index,
174               TLMsg::SpanLeave { record_index, .. } => *record_index,
175               TLMsg::SpanClose { record_index, .. } => *record_index,
176               TLMsg::Event { record_index, .. } => *record_index,
177         }
178    }
179}
180
181pub trait TracingLiveMsgSubscriber: Send + Sync + 'static {
182    fn on_msg(&self, msg: TLMsg);
183}
184
185impl TracingLiveMsgSubscriber for alloc::boxed::Box<dyn TracingLiveMsgSubscriber> {
186    fn on_msg(&self, msg: TLMsg) {
187        (**self).on_msg(msg)
188    }
189}
190
191impl<T, F> TracingLiveMsgSubscriber for (T, F)
192where
193    T: TracingLiveMsgSubscriber,
194    F: Fn(&TLMsg) + Send + Sync + 'static,
195{
196    fn on_msg(&self, msg: TLMsg) {
197        (self.1)(&msg);
198        self.0.on_msg(msg);
199    }
200}
201impl<T, F> TracingLiveMsgSubscriber for (T, Option<F>)
202where
203   T: TracingLiveMsgSubscriber,
204   F: Fn(&TLMsg) + Send + Sync + 'static,
205{
206    fn on_msg(&self, msg: TLMsg) {
207        if let Some(f) = &self.1 {
208            f(&msg);
209        }
210        self.0.on_msg(msg);
211    }
212}
213
214pub struct TLLayer<F> {
215    pub subscriber: F,
216    pub enable_enter: bool,
217    pub record_index: AtomicU64,
218}
219
220impl<S, F> Layer<S> for TLLayer<F>
221where
222    for<'a> S: Subscriber + LookupSpan<'a>,
223    F: TracingLiveMsgSubscriber,
224{
225    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
226        if attrs.metadata().target().starts_with("h2::proto") {
227            return;
228        }
229        let mut attributes = SpanAttrs::default();
230        attrs.record(&mut attributes);
231        let parent_id = attrs
232            .parent()
233            .and_then(|id| _ctx.span(id))
234            .or_else(|| _ctx.lookup_current())
235            .map(|n| n.id().into_u64());
236        let parent = parent_id.map(|n| _ctx.span(&Id::from_u64(n))).flatten();
237        let msg = TLMsg::SpanCreate {
238            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
239            date: now(),
240            span: SpanInfo {
241                id: id.into_u64(),
242                metadata: attrs.metadata().into(),
243            },
244            attributes,
245            parent_span: parent.map(|n| SpanInfo {
246                id: n.id().into_u64(),
247                metadata: n.metadata().into(),
248            }),
249        };
250        self.subscriber.on_msg(msg);
251    }
252
253    fn on_record(&self, _span: &Id, _values: &Record<'_>, _ctx: Context<'_, S>) {
254        let date = now();
255        let _span = _ctx.span(_span).unwrap();
256        if _span.metadata().target().starts_with("h2::proto") {
257            return;
258        }
259        let mut attributes = SpanAttrs::default();
260        _values.record(&mut attributes);
261
262        let msg = TLMsg::SpanRecordAttr {
263            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
264            date,
265            span: SpanInfo {
266                id: _span.id().into_u64(),
267                metadata: _span.metadata().into(),
268            },
269            attributes,
270        };
271
272        self.subscriber.on_msg(msg);
273    }
274    fn on_event(&self, _event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
275        if _event.metadata().target().starts_with("h2::proto") {
276            return;
277        }
278        let date = now();
279        let mut attributes = SpanAttrs::default();
280        _event.record(&mut attributes);
281        let message = attributes.0.remove("message");
282        let msg = TLMsg::Event {
283            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
284            date,
285            message: message.map(|n| n.to_smolstr()).unwrap_or("".into()),
286            metadata: _event.metadata().into(),
287            attributes,
288            span: _ctx.event_span(_event).map(|n| SpanInfo {
289                id: n.id().into_u64(),
290                metadata: n.metadata().into(),
291            }),
292        };
293
294        self.subscriber.on_msg(msg);
295    }
296    fn on_enter(&self, _id: &Id, _ctx: Context<'_, S>) {
297        if !self.enable_enter {
298            return;
299        }
300        let date = now();
301        let _span = _ctx.span(_id).unwrap();
302        if _span.metadata().target().starts_with("h2::proto") {
303            return;
304        }
305        let msg = TLMsg::SpanEnter {
306            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
307            date,
308            span: SpanInfo {
309                id: _span.id().into_u64(),
310                metadata: _span.metadata().into(),
311            },
312        };
313
314        self.subscriber.on_msg(msg);
315    }
316    fn on_exit(&self, _id: &Id, _ctx: Context<'_, S>) {
317        if !self.enable_enter {
318            return;
319        }
320        let date = now();
321        let _span = _ctx.span(_id).unwrap();
322        if _span.metadata().target().starts_with("h2::proto") {
323            return;
324        }
325        let msg = TLMsg::SpanLeave {
326            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
327            date,
328            span: SpanInfo {
329                id: _span.id().into_u64(),
330                metadata: _span.metadata().into(),
331            },
332        };
333
334        self.subscriber.on_msg(msg);
335    }
336    fn on_close(&self, _id: Id, _ctx: Context<'_, S>) {
337        let date = now();
338        let _span = _ctx.span(&_id).unwrap();
339        if _span.metadata().target().starts_with("h2::proto") {
340            return;
341        }
342        let msg = TLMsg::SpanClose {
343            record_index: self.record_index.fetch_add(1, core::sync::atomic::Ordering::SeqCst),
344            date,
345            span: SpanInfo {
346                id: _span.id().into_u64(),
347                metadata: _span.metadata().into(),
348            },
349        };
350
351        self.subscriber.on_msg(msg);
352    }
353}
354
355#[derive(Clone, Debug)]
356pub struct TLAppInfo {
357    pub app_id: Uuid,
358    pub app_name: SmolStr,
359    pub app_version: SmolStr,
360    pub node_id: SmolStr,
361    pub static_data: HashMap<SmolStr, TLValue>,
362    pub data: HashMap<SmolStr, TLValue>,
363}
364
365impl TLAppInfo {
366    pub fn new(
367        app_id: impl Into<Uuid>,
368        app_name: impl Into<SmolStr>,
369        app_version: impl Into<SmolStr>,
370    ) -> Self {
371        Self {
372            app_id: app_id.into(),
373            app_name: app_name.into(),
374            app_version: app_version.into(),
375            node_id: "default_node".into(),
376            static_data: Default::default(),
377            data: Default::default(),
378        }
379    }
380
381    pub fn node_id(mut self, node_id: impl Into<SmolStr>) -> Self {
382        self.node_id = node_id.into();
383        self
384    }
385
386    pub fn node_name(self, node_name: impl Into<SmolStr>) -> Self {
387        self.with_data("node_name", node_name.into())
388    }
389
390    pub fn brief(self, brief: impl Into<SmolStr>) -> Self {
391        self.with_data("brief", brief.into())
392    }
393
394    pub fn second_name(self, second_name: impl Into<SmolStr>) -> Self {
395        self.with_data("second_name", second_name.into())
396    }
397
398    pub fn with_data(mut self, name: impl Into<SmolStr>, value: impl Into<TLValue>) -> Self {
399        self.data.insert(name.into(), value.into());
400        self
401    }
402
403    pub fn with_static_data(mut self, name: impl Into<SmolStr>, value: impl Into<TLValue>) -> Self {
404        self.static_data.insert(name.into(), value.into());
405        self
406    }
407}
408
409#[cfg(feature = "std")]
410fn now() -> DateTime<Utc> {
411    Utc::now()
412}
413
414#[cfg(not(feature = "std"))]
415fn now() -> DateTime<Utc> {
416    DateTime::from_timestamp_nanos(0)
417}