qevent/
telemetry.rs

1pub mod handy;
2
3use std::{
4    collections::HashMap,
5    fmt::Debug,
6    future::Future,
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll},
10};
11
12use handy::NullExporter;
13use serde::de::DeserializeOwned;
14use serde_json::Value;
15
16use crate::{Event, GroupID, VantagePointType};
17
18pub trait Log {
19    fn new_trace(&self, vantage_point: VantagePointType, group_id: GroupID) -> Span;
20}
21
22pub trait ExportEvent: Send + Sync {
23    fn emit(&self, event: Event);
24
25    fn filter_event(&self, scheme: &'static str) -> bool {
26        _ = scheme;
27        true
28    }
29
30    fn filter_raw_data(&self) -> bool {
31        false
32    }
33}
34
35pub mod filter {
36    #[inline]
37    #[cfg(feature = "enabled")]
38    pub fn event(scheme: &'static str) -> bool {
39        super::current_span::CURRENT_SPAN.with(|span| span.borrow().filter_event(scheme))
40    }
41
42    #[inline]
43    #[cfg(not(feature = "enabled"))]
44    pub fn event(_scheme: &'static str) -> bool {
45        false
46    }
47
48    #[inline]
49    #[cfg(all(feature = "enabled", feature = "raw_data"))]
50    pub fn raw_data() -> bool {
51        super::current_span::CURRENT_SPAN.with(|span| span.borrow().filter_raw_data())
52    }
53
54    #[inline]
55    #[cfg(not(all(feature = "enabled", feature = "raw_data")))]
56    pub fn raw_data() -> bool {
57        false
58    }
59}
60
61#[derive(Clone)]
62pub struct Span {
63    exporter: Arc<dyn ExportEvent>,
64    fields: Arc<HashMap<&'static str, Value>>,
65}
66
67impl Debug for Span {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("Span")
70            .field("fields", &self.fields)
71            .field("broker", &"..")
72            .finish()
73    }
74}
75
76impl Span {
77    #[inline]
78    pub fn new(exporter: Arc<dyn ExportEvent>, fields: HashMap<&'static str, Value>) -> Self {
79        Self {
80            exporter,
81            fields: Arc::new(fields),
82        }
83    }
84
85    #[inline]
86    pub fn emit(&self, event: Event) {
87        self.exporter.emit(event);
88    }
89
90    #[inline]
91    pub fn filter_event(&self, scheme: &'static str) -> bool {
92        self.exporter.filter_event(scheme)
93    }
94
95    #[inline]
96    pub fn filter_raw_data(&self) -> bool {
97        self.exporter.filter_raw_data()
98    }
99
100    #[inline]
101    pub fn load<T: DeserializeOwned>(&self, name: &'static str) -> T {
102        serde_json::from_value(self.fields[name].clone()).unwrap()
103    }
104
105    #[inline]
106    pub fn try_load<T: DeserializeOwned>(&self, name: &'static str) -> Option<T> {
107        serde_json::from_value(self.fields.get(name)?.clone()).ok()
108    }
109}
110
111impl PartialEq for Span {
112    fn eq(&self, other: &Self) -> bool {
113        Arc::ptr_eq(&self.fields, &other.fields) && Arc::ptr_eq(&self.exporter, &other.exporter)
114    }
115}
116
117impl Default for Span {
118    fn default() -> Self {
119        Self::new(Arc::new(NullExporter), HashMap::new())
120    }
121}
122
123pub struct Entered {
124    previous: Option<Span>,
125}
126
127mod current_span {
128    use std::cell::RefCell;
129
130    use super::{Entered, Span};
131
132    thread_local! {
133        pub static CURRENT_SPAN: RefCell<Span> = RefCell::new(Span::default());
134    }
135
136    impl Drop for Entered {
137        fn drop(&mut self) {
138            if let Some(previous) = &self.previous {
139                CURRENT_SPAN.with(|span| {
140                    span.replace(previous.clone());
141                });
142            }
143        }
144    }
145
146    impl Span {
147        pub fn enter(&self) -> Entered {
148            let previous = CURRENT_SPAN.with(|current| {
149                if &*current.borrow() == self {
150                    None
151                } else {
152                    Some(current.replace(self.clone()))
153                }
154            });
155            Entered { previous }
156        }
157
158        pub fn in_scope<T>(&self, f: impl FnOnce() -> T) -> T {
159            let _guard = self.enter();
160            f()
161        }
162
163        pub fn current() -> Span {
164            CURRENT_SPAN.with(|span| span.borrow().clone())
165        }
166    }
167}
168
169pin_project_lite::pin_project! {
170    pub struct Instrumented<F: ?Sized> {
171        span: Span,
172        #[pin]
173        inner: F,
174    }
175}
176
177pub trait Instrument {
178    fn instrument(self, span: Span) -> Instrumented<Self>;
179
180    fn instrument_in_current(self) -> Instrumented<Self>;
181}
182
183impl<F: Future> Instrument for F {
184    fn instrument(self, span: Span) -> Instrumented<Self> {
185        Instrumented { span, inner: self }
186    }
187
188    fn instrument_in_current(self) -> Instrumented<Self> {
189        self.instrument(crate::span!())
190    }
191}
192
193impl<F: Future> Future for Instrumented<F> {
194    type Output = F::Output;
195
196    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
197        let this = self.project();
198        this.span.in_scope(|| this.inner.poll(cx))
199    }
200}
201
202#[doc(hidden)]
203pub mod macro_support {
204    use serde::Serialize;
205
206    use super::*;
207    use crate::{BeSpecificEventData, EventBuilder};
208
209    pub fn modify_event_builder_costom_fields(
210        builder: &mut EventBuilder,
211        f: impl FnOnce(&mut HashMap<String, Value>),
212    ) {
213        if builder.custom_fields.is_none() {
214            builder.custom_fields = Some(HashMap::new());
215        }
216        let custom_fields = builder.custom_fields.as_mut().unwrap();
217        f(custom_fields);
218    }
219
220    pub fn current_span_exporter() -> Arc<dyn ExportEvent> {
221        current_span::CURRENT_SPAN.with(|span| span.borrow().exporter.clone())
222    }
223
224    pub fn current_span_fields() -> HashMap<&'static str, Value> {
225        current_span::CURRENT_SPAN.with(|span| span.borrow().fields.as_ref().clone())
226    }
227
228    pub fn try_load_current_span<T: DeserializeOwned>(name: &'static str) -> Option<T> {
229        current_span::CURRENT_SPAN.with(|span| {
230            let span = span.borrow();
231            Some(from_value::<T>(span.fields.get(name)?.clone()))
232        })
233    }
234
235    pub fn build_and_emit_event<D: BeSpecificEventData>(
236        build_data: impl FnOnce() -> D,
237        build_event: impl FnOnce(D) -> Event,
238    ) {
239        if !filter::event(D::scheme()) {
240            return;
241        }
242        let event = build_event(build_data());
243        current_span::CURRENT_SPAN.with(|span| span.borrow().emit(event));
244    }
245
246    pub fn to_value<T: Serialize>(value: T) -> Value {
247        serde_json::to_value(value).unwrap()
248    }
249
250    pub fn from_value<T: DeserializeOwned>(value: Value) -> T {
251        serde_json::from_value(value).unwrap()
252    }
253}
254
255#[macro_export]
256macro_rules! span {
257    () => {{
258        $crate::telemetry::Span::current()
259    }};
260    (@current     $(, $($tt:tt)* )?) => {{
261        let __current_exporter = $crate::telemetry::macro_support::current_span_exporter();
262        $crate::span!(__current_exporter $(, $($tt)* )?)
263    }};
264    ($broker:expr $(, $($tt:tt)* )?) => {{
265        #[allow(unused_mut)]
266        let mut __current_fields = $crate::telemetry::macro_support::current_span_fields();
267        $crate::span!(@field __current_fields $(, $($tt)* )?);
268        $crate::telemetry::Span::new($broker, __current_fields)
269    }};
270    (@field $fields:expr, $name:ident               $(, $($tt:tt)* )?) => {
271        $crate::span!( @field $fields, $name = $name $(, $($tt)* )? );
272    };
273    (@field $fields:expr, $name:ident = $value:expr $(, $($tt:tt)* )?) => {
274        let __value = $crate::telemetry::macro_support::to_value($value);
275        $fields.insert(stringify!($name), __value);
276        $crate::span!( @field $fields $(, $($tt)* )? );
277    };
278    (@field $fields:expr $(,)? ) => {};
279}
280
281#[macro_export]
282macro_rules! event {
283    ($event_type:ty { $($evnet_field:tt)* } $(, $($tt:tt)* )?) => {{
284        $crate::event!($crate::build!($event_type { $($evnet_field)* }) $(, $($tt)* )?);
285    }};
286    ($event_data:expr                       $(, $($tt:tt)* )?) => {{
287        let __build_data = || $event_data;
288        let __build_event = |__event_data| {
289            let mut __event_builder = $crate::Event::builder();
290            // as_millis_f64 is nightly only
291            let __time = std::time::SystemTime::now()
292                .duration_since(std::time::UNIX_EPOCH)
293                .unwrap()
294                .as_secs_f64()
295                * 1000.0;
296            __event_builder.time(__time);
297            __event_builder.data(__event_data);
298            $crate::event!(@load_known __event_builder, path: $crate::PathID);
299            $crate::event!(@load_known __event_builder, protocol_types: $crate::ProtocolTypeList);
300            $crate::event!(@load_known __event_builder, group_id: $crate::GroupID);
301            $crate::event!(@field __event_builder $(, $($tt)* )?);
302
303            __event_builder.build()
304        };
305        $crate::telemetry::macro_support::build_and_emit_event(__build_data, __build_event);
306    }};
307    (@load_known $event_builder:expr, $name:ident: $type:ty) => {
308        if let Some(__value) = $crate::telemetry::macro_support::try_load_current_span::<$type>(stringify!($name)) {
309            $event_builder.$name(__value);
310        }
311    };
312    (@field $event_builder:expr, $name:ident               $(, $($tt:tt)* )?) => {
313        $crate::event!( @field $event_builder, $name = $name $(, $($tt)* )? );
314    };
315    (@field $event_builder:expr, $name:ident = $value:expr $(, $($tt:tt)* )?) => {
316        let __value = $crate::telemetry::macro_support::to_value($value);
317        $crate::telemetry::macro_support::modify_event_builder_costom_fields(&mut $event_builder, |__custom_fields| {
318            __custom_fields.insert(stringify!($name).to_owned(), __value);
319        });
320        $crate::event!( @field $event_builder $(, $($tt)* )? );
321    };
322    (@field $event_builder:expr $(,)? ) => {};
323}
324
325#[cfg(test)]
326mod tests {
327    use std::sync::Arc;
328
329    use qbase::cid::ConnectionId;
330
331    use super::*;
332    use crate::{
333        GroupID,
334        quic::{ConnectionID, connectivity::ServerListening},
335    };
336
337    #[test]
338    fn span_fields() {
339        let exporter = Arc::new(NullExporter);
340        let _span = span!(exporter.clone());
341        let a = 0i32;
342        let c = 123456789usize;
343        span!(exporter.clone(), a, a, b = 12.3f32, c, d = "Hello world!").in_scope(|| {
344            assert_eq!(Span::current().load::<i32>("a"), 0);
345            assert_eq!(Span::current().load::<f32>("b"), 12.3);
346            assert_eq!(Span::current().load::<usize>("c"), 123456789);
347            assert_eq!(Span::current().load::<String>("d"), "Hello world!");
348            let e = vec![1, 2, 3];
349            span!(exporter.clone(), a = 1, b = 2, c = 3, e).in_scope(|| {
350                assert_eq!(Span::current().load::<i32>("a"), 1);
351                assert_eq!(Span::current().load::<i32>("b"), 2);
352                assert_eq!(Span::current().load::<i32>("c"), 3);
353                assert_eq!(Span::current().load::<String>("d"), "Hello world!");
354                assert_eq!(Span::current().load::<Vec<i32>>("e"), vec![1, 2, 3]);
355            });
356        })
357    }
358
359    #[test]
360    fn event() {
361        struct TestBroker;
362
363        impl ExportEvent for TestBroker {
364            fn emit(&self, event: Event) {
365                let str = serde_json::to_string_pretty(&event).unwrap();
366                let event = serde_json::to_value(event).unwrap();
367                println!("{str}");
368                assert_eq!(event["name"], "quic:server_listening");
369                let event_data_json = serde_json::json!({
370                    "ip_v4": "127.0.0.1",
371                    "port_v4": 8080,
372                });
373                assert_eq!(event["data"], event_data_json);
374                assert_eq!(event["group_id"], String::from(group_id()));
375                assert_eq!(event["use_strict_mode"], true);
376            }
377        }
378
379        fn group_id() -> GroupID {
380            GroupID::from(ConnectionID::from(ConnectionId::from_slice(&[
381                0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef,
382            ])))
383        }
384
385        span!(Arc::new(TestBroker), group_id = group_id()).in_scope(|| {
386            event!(
387                crate::build!(ServerListening {
388                    ip_v4: "127.0.0.1".to_owned(),
389                    port_v4: 8080u16,
390                }),
391                use_strict_mode = true
392            );
393        });
394    }
395}