1pub mod handy;
2
3use std::{
4 collections::HashMap,
5 fmt::Debug,
6 future::Future,
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll},
10};
11
12use handy::NullExporter;
13use serde::de::DeserializeOwned;
14use serde_json::Value;
15
16use crate::{Event, GroupID, VantagePointType};
17
18pub trait Log {
19 fn new_trace(&self, vantage_point: VantagePointType, group_id: GroupID) -> Span;
20}
21
22pub trait ExportEvent: Send + Sync {
23 fn emit(&self, event: Event);
24
25 fn filter_event(&self, scheme: &'static str) -> bool {
26 _ = scheme;
27 true
28 }
29
30 fn filter_raw_data(&self) -> bool {
31 false
32 }
33}
34
35pub mod filter {
36 #[inline]
37 #[cfg(feature = "enabled")]
38 pub fn event(scheme: &'static str) -> bool {
39 super::current_span::CURRENT_SPAN.with(|span| span.borrow().filter_event(scheme))
40 }
41
42 #[inline]
43 #[cfg(not(feature = "enabled"))]
44 pub fn event(_scheme: &'static str) -> bool {
45 false
46 }
47
48 #[inline]
49 #[cfg(all(feature = "enabled", feature = "raw_data"))]
50 pub fn raw_data() -> bool {
51 super::current_span::CURRENT_SPAN.with(|span| span.borrow().filter_raw_data())
52 }
53
54 #[inline]
55 #[cfg(not(all(feature = "enabled", feature = "raw_data")))]
56 pub fn raw_data() -> bool {
57 false
58 }
59}
60
61#[derive(Clone)]
62pub struct Span {
63 exporter: Arc<dyn ExportEvent>,
64 fields: Arc<HashMap<&'static str, Value>>,
65}
66
67impl Debug for Span {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("Span")
70 .field("fields", &self.fields)
71 .field("broker", &"..")
72 .finish()
73 }
74}
75
76impl Span {
77 #[inline]
78 pub fn new(exporter: Arc<dyn ExportEvent>, fields: HashMap<&'static str, Value>) -> Self {
79 Self {
80 exporter,
81 fields: Arc::new(fields),
82 }
83 }
84
85 #[inline]
86 pub fn emit(&self, event: Event) {
87 self.exporter.emit(event);
88 }
89
90 #[inline]
91 pub fn filter_event(&self, scheme: &'static str) -> bool {
92 self.exporter.filter_event(scheme)
93 }
94
95 #[inline]
96 pub fn filter_raw_data(&self) -> bool {
97 self.exporter.filter_raw_data()
98 }
99
100 #[inline]
101 pub fn load<T: DeserializeOwned>(&self, name: &'static str) -> T {
102 serde_json::from_value(self.fields[name].clone()).unwrap()
103 }
104
105 #[inline]
106 pub fn try_load<T: DeserializeOwned>(&self, name: &'static str) -> Option<T> {
107 serde_json::from_value(self.fields.get(name)?.clone()).ok()
108 }
109}
110
111impl PartialEq for Span {
112 fn eq(&self, other: &Self) -> bool {
113 Arc::ptr_eq(&self.fields, &other.fields) && Arc::ptr_eq(&self.exporter, &other.exporter)
114 }
115}
116
117impl Default for Span {
118 fn default() -> Self {
119 Self::new(Arc::new(NullExporter), HashMap::new())
120 }
121}
122
123pub struct Entered {
124 previous: Option<Span>,
125}
126
127mod current_span {
128 use std::cell::RefCell;
129
130 use super::{Entered, Span};
131
132 thread_local! {
133 pub static CURRENT_SPAN: RefCell<Span> = RefCell::new(Span::default());
134 }
135
136 impl Drop for Entered {
137 fn drop(&mut self) {
138 if let Some(previous) = &self.previous {
139 CURRENT_SPAN.with(|span| {
140 span.replace(previous.clone());
141 });
142 }
143 }
144 }
145
146 impl Span {
147 pub fn enter(&self) -> Entered {
148 let previous = CURRENT_SPAN.with(|current| {
149 if &*current.borrow() == self {
150 None
151 } else {
152 Some(current.replace(self.clone()))
153 }
154 });
155 Entered { previous }
156 }
157
158 pub fn in_scope<T>(&self, f: impl FnOnce() -> T) -> T {
159 let _guard = self.enter();
160 f()
161 }
162
163 pub fn current() -> Span {
164 CURRENT_SPAN.with(|span| span.borrow().clone())
165 }
166 }
167}
168
169pin_project_lite::pin_project! {
170 pub struct Instrumented<F: ?Sized> {
171 span: Span,
172 #[pin]
173 inner: F,
174 }
175}
176
177pub trait Instrument {
178 fn instrument(self, span: Span) -> Instrumented<Self>;
179
180 fn instrument_in_current(self) -> Instrumented<Self>;
181}
182
183impl<F: Future> Instrument for F {
184 fn instrument(self, span: Span) -> Instrumented<Self> {
185 Instrumented { span, inner: self }
186 }
187
188 fn instrument_in_current(self) -> Instrumented<Self> {
189 self.instrument(crate::span!())
190 }
191}
192
193impl<F: Future> Future for Instrumented<F> {
194 type Output = F::Output;
195
196 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
197 let this = self.project();
198 this.span.in_scope(|| this.inner.poll(cx))
199 }
200}
201
202#[doc(hidden)]
203pub mod macro_support {
204 use serde::Serialize;
205
206 use super::*;
207 use crate::{BeSpecificEventData, EventBuilder};
208
209 pub fn modify_event_builder_costom_fields(
210 builder: &mut EventBuilder,
211 f: impl FnOnce(&mut HashMap<String, Value>),
212 ) {
213 if builder.custom_fields.is_none() {
214 builder.custom_fields = Some(HashMap::new());
215 }
216 let custom_fields = builder.custom_fields.as_mut().unwrap();
217 f(custom_fields);
218 }
219
220 pub fn current_span_exporter() -> Arc<dyn ExportEvent> {
221 current_span::CURRENT_SPAN.with(|span| span.borrow().exporter.clone())
222 }
223
224 pub fn current_span_fields() -> HashMap<&'static str, Value> {
225 current_span::CURRENT_SPAN.with(|span| span.borrow().fields.as_ref().clone())
226 }
227
228 pub fn try_load_current_span<T: DeserializeOwned>(name: &'static str) -> Option<T> {
229 current_span::CURRENT_SPAN.with(|span| {
230 let span = span.borrow();
231 Some(from_value::<T>(span.fields.get(name)?.clone()))
232 })
233 }
234
235 pub fn build_and_emit_event<D: BeSpecificEventData>(
236 build_data: impl FnOnce() -> D,
237 build_event: impl FnOnce(D) -> Event,
238 ) {
239 if !filter::event(D::scheme()) {
240 return;
241 }
242 let event = build_event(build_data());
243 current_span::CURRENT_SPAN.with(|span| span.borrow().emit(event));
244 }
245
246 pub fn to_value<T: Serialize>(value: T) -> Value {
247 serde_json::to_value(value).unwrap()
248 }
249
250 pub fn from_value<T: DeserializeOwned>(value: Value) -> T {
251 serde_json::from_value(value).unwrap()
252 }
253}
254
255#[macro_export]
256macro_rules! span {
257 () => {{
258 $crate::telemetry::Span::current()
259 }};
260 (@current $(, $($tt:tt)* )?) => {{
261 let __current_exporter = $crate::telemetry::macro_support::current_span_exporter();
262 $crate::span!(__current_exporter $(, $($tt)* )?)
263 }};
264 ($broker:expr $(, $($tt:tt)* )?) => {{
265 #[allow(unused_mut)]
266 let mut __current_fields = $crate::telemetry::macro_support::current_span_fields();
267 $crate::span!(@field __current_fields $(, $($tt)* )?);
268 $crate::telemetry::Span::new($broker, __current_fields)
269 }};
270 (@field $fields:expr, $name:ident $(, $($tt:tt)* )?) => {
271 $crate::span!( @field $fields, $name = $name $(, $($tt)* )? );
272 };
273 (@field $fields:expr, $name:ident = $value:expr $(, $($tt:tt)* )?) => {
274 let __value = $crate::telemetry::macro_support::to_value($value);
275 $fields.insert(stringify!($name), __value);
276 $crate::span!( @field $fields $(, $($tt)* )? );
277 };
278 (@field $fields:expr $(,)? ) => {};
279}
280
281#[macro_export]
282macro_rules! event {
283 ($event_type:ty { $($evnet_field:tt)* } $(, $($tt:tt)* )?) => {{
284 $crate::event!($crate::build!($event_type { $($evnet_field)* }) $(, $($tt)* )?);
285 }};
286 ($event_data:expr $(, $($tt:tt)* )?) => {{
287 let __build_data = || $event_data;
288 let __build_event = |__event_data| {
289 let mut __event_builder = $crate::Event::builder();
290 let __time = std::time::SystemTime::now()
292 .duration_since(std::time::UNIX_EPOCH)
293 .unwrap()
294 .as_secs_f64()
295 * 1000.0;
296 __event_builder.time(__time);
297 __event_builder.data(__event_data);
298 $crate::event!(@load_known __event_builder, path: $crate::PathID);
299 $crate::event!(@load_known __event_builder, protocol_types: $crate::ProtocolTypeList);
300 $crate::event!(@load_known __event_builder, group_id: $crate::GroupID);
301 $crate::event!(@field __event_builder $(, $($tt)* )?);
302
303 __event_builder.build()
304 };
305 $crate::telemetry::macro_support::build_and_emit_event(__build_data, __build_event);
306 }};
307 (@load_known $event_builder:expr, $name:ident: $type:ty) => {
308 if let Some(__value) = $crate::telemetry::macro_support::try_load_current_span::<$type>(stringify!($name)) {
309 $event_builder.$name(__value);
310 }
311 };
312 (@field $event_builder:expr, $name:ident $(, $($tt:tt)* )?) => {
313 $crate::event!( @field $event_builder, $name = $name $(, $($tt)* )? );
314 };
315 (@field $event_builder:expr, $name:ident = $value:expr $(, $($tt:tt)* )?) => {
316 let __value = $crate::telemetry::macro_support::to_value($value);
317 $crate::telemetry::macro_support::modify_event_builder_costom_fields(&mut $event_builder, |__custom_fields| {
318 __custom_fields.insert(stringify!($name).to_owned(), __value);
319 });
320 $crate::event!( @field $event_builder $(, $($tt)* )? );
321 };
322 (@field $event_builder:expr $(,)? ) => {};
323}
324
325#[cfg(test)]
326mod tests {
327 use std::sync::Arc;
328
329 use qbase::cid::ConnectionId;
330
331 use super::*;
332 use crate::{
333 GroupID,
334 quic::{ConnectionID, connectivity::ServerListening},
335 };
336
337 #[test]
338 fn span_fields() {
339 let exporter = Arc::new(NullExporter);
340 let _span = span!(exporter.clone());
341 let a = 0i32;
342 let c = 123456789usize;
343 span!(exporter.clone(), a, a, b = 12.3f32, c, d = "Hello world!").in_scope(|| {
344 assert_eq!(Span::current().load::<i32>("a"), 0);
345 assert_eq!(Span::current().load::<f32>("b"), 12.3);
346 assert_eq!(Span::current().load::<usize>("c"), 123456789);
347 assert_eq!(Span::current().load::<String>("d"), "Hello world!");
348 let e = vec![1, 2, 3];
349 span!(exporter.clone(), a = 1, b = 2, c = 3, e).in_scope(|| {
350 assert_eq!(Span::current().load::<i32>("a"), 1);
351 assert_eq!(Span::current().load::<i32>("b"), 2);
352 assert_eq!(Span::current().load::<i32>("c"), 3);
353 assert_eq!(Span::current().load::<String>("d"), "Hello world!");
354 assert_eq!(Span::current().load::<Vec<i32>>("e"), vec![1, 2, 3]);
355 });
356 })
357 }
358
359 #[test]
360 fn event() {
361 struct TestBroker;
362
363 impl ExportEvent for TestBroker {
364 fn emit(&self, event: Event) {
365 let str = serde_json::to_string_pretty(&event).unwrap();
366 let event = serde_json::to_value(event).unwrap();
367 println!("{str}");
368 assert_eq!(event["name"], "quic:server_listening");
369 let event_data_json = serde_json::json!({
370 "ip_v4": "127.0.0.1",
371 "port_v4": 8080,
372 });
373 assert_eq!(event["data"], event_data_json);
374 assert_eq!(event["group_id"], String::from(group_id()));
375 assert_eq!(event["use_strict_mode"], true);
376 }
377 }
378
379 fn group_id() -> GroupID {
380 GroupID::from(ConnectionID::from(ConnectionId::from_slice(&[
381 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef,
382 ])))
383 }
384
385 span!(Arc::new(TestBroker), group_id = group_id()).in_scope(|| {
386 event!(
387 crate::build!(ServerListening {
388 ip_v4: "127.0.0.1".to_owned(),
389 port_v4: 8080u16,
390 }),
391 use_strict_mode = true
392 );
393 });
394 }
395}