eaze_tracing_distributed/
telemetry_layer.rs

1use crate::telemetry::Telemetry;
2use crate::trace;
3use std::any::TypeId;
4use std::collections::HashMap;
5use std::time::SystemTime;
6use tracing::span::{Attributes, Id, Record};
7use tracing::{Event, Subscriber};
8use tracing_subscriber::{layer::Context, registry, Layer};
9
10#[cfg(feature = "use_parking_lot")]
11use parking_lot::RwLock;
12#[cfg(not(feature = "use_parking_lot"))]
13use std::sync::RwLock;
14
15/// A `tracing_subscriber::Layer` that publishes events and spans to some backend
16/// using the provided `Telemetry` capability.
17pub struct TelemetryLayer<Telemetry, SpanId, TraceId> {
18    pub(crate) telemetry: Telemetry,
19    service_name: &'static str,
20    // used to construct span ids to avoid collisions
21    pub(crate) trace_ctx_registry: TraceCtxRegistry<SpanId, TraceId>,
22}
23
24#[derive(PartialEq, Eq, Hash, Clone, Debug)]
25pub(crate) struct TraceCtx<SpanId, TraceId> {
26    pub(crate) parent_span: Option<SpanId>,
27    pub(crate) trace_id: TraceId,
28}
29
30// resolvable via downcast_ref, to avoid propagating 'T' parameter of TelemetryLayer where not req'd
31pub(crate) struct TraceCtxRegistry<SpanId, TraceId> {
32    registry: RwLock<HashMap<Id, TraceCtx<SpanId, TraceId>>>,
33    promote_span_id: Box<dyn 'static + Send + Sync + Fn(Id) -> SpanId>,
34}
35
36impl<SpanId, TraceId> TraceCtxRegistry<SpanId, TraceId>
37where
38    SpanId: 'static + Clone + Send + Sync,
39    TraceId: 'static + Clone + Send + Sync,
40{
41    pub(crate) fn promote_span_id(&self, id: Id) -> SpanId {
42        (self.promote_span_id)(id)
43    }
44
45    pub(crate) fn record_trace_ctx(
46        &self,
47        trace_id: TraceId,
48        remote_parent_span: Option<SpanId>,
49        id: Id,
50    ) {
51        let trace_ctx = TraceCtx {
52            trace_id,
53            parent_span: remote_parent_span,
54        };
55
56        #[cfg(not(feature = "use_parking_lot"))]
57        let mut trace_ctx_registry = self.registry.write().expect("write lock!");
58        #[cfg(feature = "use_parking_lot")]
59        let mut trace_ctx_registry = self.registry.write();
60
61        trace_ctx_registry.insert(id, trace_ctx); // TODO: handle overwrite?
62    }
63
64    pub(crate) fn eval_ctx<
65        'a,
66        X: 'a + registry::LookupSpan<'a>,
67        I: std::iter::Iterator<Item = registry::SpanRef<'a, X>>,
68    >(
69        &self,
70        iter: I,
71    ) -> Option<TraceCtx<SpanId, TraceId>> {
72        let mut path = Vec::new();
73
74        for span_ref in iter {
75            let mut write_guard = span_ref.extensions_mut();
76            match write_guard.get_mut::<LazyTraceCtx<SpanId, TraceId>>() {
77                None => {
78                    #[cfg(not(feature = "use_parking_lot"))]
79                    let trace_ctx_registry = self.registry.read().unwrap();
80                    #[cfg(feature = "use_parking_lot")]
81                    let trace_ctx_registry = self.registry.read();
82
83                    match trace_ctx_registry.get(&span_ref.id()) {
84                        None => {
85                            drop(write_guard);
86                            path.push(span_ref);
87                        }
88                        Some(local_trace_root) => {
89                            write_guard.insert(LazyTraceCtx(local_trace_root.clone()));
90
91                            let res = if path.is_empty() {
92                                local_trace_root.clone()
93                            } else {
94                                TraceCtx {
95                                    trace_id: local_trace_root.trace_id.clone(),
96                                    parent_span: None,
97                                }
98                            };
99
100                            for span_ref in path.into_iter() {
101                                let mut write_guard = span_ref.extensions_mut();
102                                write_guard.replace::<LazyTraceCtx<SpanId, TraceId>>(LazyTraceCtx(
103                                    TraceCtx {
104                                        trace_id: local_trace_root.trace_id.clone(),
105                                        parent_span: None,
106                                    },
107                                ));
108                            }
109                            return Some(res);
110                        }
111                    }
112                }
113                Some(LazyTraceCtx(already_evaluated)) => {
114                    let res = if path.is_empty() {
115                        already_evaluated.clone()
116                    } else {
117                        TraceCtx {
118                            trace_id: already_evaluated.trace_id.clone(),
119                            parent_span: None,
120                        }
121                    };
122
123                    for span_ref in path.into_iter() {
124                        let mut write_guard = span_ref.extensions_mut();
125                        write_guard.replace::<LazyTraceCtx<SpanId, TraceId>>(LazyTraceCtx(
126                            TraceCtx {
127                                trace_id: already_evaluated.trace_id.clone(),
128                                parent_span: None,
129                            },
130                        ));
131                    }
132                    return Some(res);
133                }
134            }
135        }
136
137        None
138    }
139
140    pub(crate) fn new<F: 'static + Send + Sync + Fn(Id) -> SpanId>(f: F) -> Self {
141        let registry = RwLock::new(HashMap::new());
142        let promote_span_id = Box::new(f);
143
144        TraceCtxRegistry {
145            registry,
146            promote_span_id,
147        }
148    }
149}
150
151impl<T, SpanId, TraceId> TelemetryLayer<T, SpanId, TraceId>
152where
153    SpanId: 'static + Clone + Send + Sync,
154    TraceId: 'static + Clone + Send + Sync,
155{
156    /// Construct a new TelemetryLayer using the provided `Telemetry` capability.
157    /// Uses the provided function, `F`, to promote `tracing::span::Id` instances to the
158    /// `SpanId` type associated with the provided `Telemetry` instance.
159    pub fn new<F: 'static + Send + Sync + Fn(Id) -> SpanId>(
160        service_name: &'static str,
161        telemetry: T,
162        promote_span_id: F,
163    ) -> Self {
164        let trace_ctx_registry = TraceCtxRegistry::new(promote_span_id);
165
166        TelemetryLayer {
167            service_name,
168            telemetry,
169            trace_ctx_registry,
170        }
171    }
172}
173
174impl<S, TraceId, SpanId, V, T> Layer<S> for TelemetryLayer<T, SpanId, TraceId>
175where
176    S: Subscriber + for<'a> registry::LookupSpan<'a>,
177    TraceId: 'static + Clone + Eq + Send + Sync,
178    SpanId: 'static + Clone + Eq + Send + Sync,
179    V: 'static + tracing::field::Visit + Send + Sync,
180    T: 'static + Telemetry<Visitor = V, TraceId = TraceId, SpanId = SpanId>,
181{
182    fn new_span(&self, attrs: &Attributes, id: &Id, ctx: Context<S>) {
183        let span = ctx.span(id).expect("span data not found during new_span");
184        let mut extensions_mut = span.extensions_mut();
185        extensions_mut.insert(SpanInitAt::new());
186
187        let mut visitor: V = self.telemetry.mk_visitor();
188        attrs.record(&mut visitor);
189        extensions_mut.insert::<V>(visitor);
190    }
191
192    fn on_record(&self, id: &Id, values: &Record, ctx: Context<S>) {
193        let span = ctx.span(id).expect("span data not found during on_record");
194        let mut extensions_mut = span.extensions_mut();
195        let visitor: &mut V = extensions_mut
196            .get_mut()
197            .expect("fields extension not found during on_record");
198        values.record(visitor);
199    }
200
201    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
202        let parent_id = if let Some(parent_id) = event.parent() {
203            // explicit parent
204            Some(parent_id.clone())
205        } else if event.is_root() {
206            // don't bother checking thread local if span is explicitly root according to this fn
207            None
208        } else if let Some(parent_id) = ctx.current_span().id() {
209            // implicit parent from threadlocal ctx
210            Some(parent_id.clone())
211        } else {
212            // no parent span, thus this is a root span
213            None
214        };
215
216        match parent_id {
217            None => {} // not part of a trace, don't bother recording via honeycomb
218            Some(parent_id) => {
219                let initialized_at = SystemTime::now();
220
221                let mut visitor = self.telemetry.mk_visitor();
222                event.record(&mut visitor);
223
224                // TODO: dedup
225                let iter = itertools::unfold(Some(parent_id.clone()), |st| match st {
226                    Some(target_id) => {
227                        let res = ctx
228                            .span(target_id)
229                            .expect("span data not found during eval_ctx");
230                        *st = res.parent().map(|x| x.id());
231                        Some(res)
232                    }
233                    None => None,
234                });
235
236                // only report event if it's part of a trace
237                if let Some(parent_trace_ctx) = self.trace_ctx_registry.eval_ctx(iter) {
238                    let event = trace::Event {
239                        trace_id: parent_trace_ctx.trace_id,
240                        parent_id: Some(self.trace_ctx_registry.promote_span_id(parent_id)),
241                        initialized_at,
242                        meta: event.metadata(),
243                        service_name: &self.service_name,
244                        values: visitor,
245                    };
246
247                    self.telemetry.report_event(event);
248                }
249            }
250        }
251    }
252
253    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
254        let span = ctx.span(&id).expect("span data not found during on_close");
255
256        // TODO: could be span.parents() but also needs span itself
257        let iter = itertools::unfold(Some(id.clone()), |st| match st {
258            Some(target_id) => {
259                let res = ctx
260                    .span(target_id)
261                    .expect("span data not found during eval_ctx");
262                *st = res.parent().map(|x| x.id());
263                Some(res)
264            }
265            None => None,
266        });
267
268        // if span's enclosing ctx has a trace id, eval & use to report telemetry
269        if let Some(trace_ctx) = self.trace_ctx_registry.eval_ctx(iter) {
270            let mut extensions_mut = span.extensions_mut();
271            let visitor: V = extensions_mut
272                .remove()
273                .expect("should be present on all spans");
274            let SpanInitAt(initialized_at) = extensions_mut
275                .remove()
276                .expect("should be present on all spans");
277
278            let completed_at = SystemTime::now();
279
280            let parent_id = match trace_ctx.parent_span {
281                None => span
282                    .parent()
283                    .map(|parent_ref| self.trace_ctx_registry.promote_span_id(parent_ref.id())),
284                Some(parent_span) => Some(parent_span),
285            };
286
287            let span = trace::Span {
288                id: self.trace_ctx_registry.promote_span_id(id),
289                meta: span.metadata(),
290                parent_id,
291                initialized_at,
292                trace_id: trace_ctx.trace_id,
293                completed_at,
294                service_name: self.service_name,
295                values: visitor,
296            };
297
298            self.telemetry.report_span(span);
299        };
300    }
301
302    // FIXME: do I need to do something here? I think no (better to require explicit re-marking as root after copy).
303    // called when span copied, needed iff span has trace id/etc already? nah,
304    // fn on_id_change(&self, _old: &Id, _new: &Id, _ctx: Context<'_, S>) {}
305
306    unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
307        // This `downcast_raw` impl allows downcasting this layer to any of
308        // its components (currently just trace ctx registry)
309        // as well as to the layer's type itself (technique borrowed from formatting subscriber)
310        match () {
311            _ if id == TypeId::of::<Self>() => Some(self as *const Self as *const ()),
312            _ if id == TypeId::of::<TraceCtxRegistry<SpanId, TraceId>>() => Some(
313                &self.trace_ctx_registry as *const TraceCtxRegistry<SpanId, TraceId> as *const (),
314            ),
315            _ => None,
316        }
317    }
318}
319
320// TODO: delete?
321struct LazyTraceCtx<SpanId, TraceId>(TraceCtx<SpanId, TraceId>);
322
323struct SpanInitAt(SystemTime);
324
325impl SpanInitAt {
326    fn new() -> Self {
327        let initialized_at = SystemTime::now();
328
329        Self(initialized_at)
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::telemetry::test::{SpanId, TestTelemetry, TraceId};
337    use std::sync::Arc;
338    use std::sync::Mutex;
339    use std::time::Duration;
340    use tokio::runtime::Runtime;
341    use tracing::instrument;
342    use tracing_subscriber::layer::Layer;
343
344    fn explicit_trace_id() -> TraceId {
345        135
346    }
347
348    fn explicit_parent_span_id() -> SpanId {
349        Id::from_u64(246)
350    }
351
352    #[test]
353    fn test_instrument() {
354        with_test_scenario_runner(|| {
355            #[instrument]
356            fn f(ns: Vec<u64>) {
357                trace::register_dist_tracing_root(
358                    explicit_trace_id(),
359                    Some(explicit_parent_span_id()),
360                )
361                .unwrap();
362                for n in ns {
363                    g(format!("{}", n));
364                }
365            }
366
367            #[instrument]
368            fn g(_s: String) {
369                let use_of_reserved_word = "duration-value";
370                tracing::event!(
371                    tracing::Level::INFO,
372                    duration_ms = use_of_reserved_word,
373                    foo = "bar"
374                );
375
376                assert_eq!(
377                    trace::current_dist_trace_ctx::<SpanId, TraceId>()
378                        .map(|x| x.0)
379                        .unwrap(),
380                    explicit_trace_id(),
381                );
382            }
383
384            f(vec![1, 2, 3]);
385        });
386    }
387
388    // run async fn (with multiple entry and exit for each span due to delay) with test scenario
389    #[test]
390    fn test_async_instrument() {
391        with_test_scenario_runner(|| {
392            #[instrument]
393            async fn f(ns: Vec<u64>) {
394                trace::register_dist_tracing_root(
395                    explicit_trace_id(),
396                    Some(explicit_parent_span_id()),
397                )
398                .unwrap();
399                for n in ns {
400                    g(format!("{}", n)).await;
401                }
402            }
403
404            #[instrument]
405            async fn g(s: String) {
406                // delay to force multiple span entry
407                tokio::time::delay_for(Duration::from_millis(100)).await;
408                let use_of_reserved_word = "duration-value";
409                tracing::event!(
410                    tracing::Level::INFO,
411                    duration_ms = use_of_reserved_word,
412                    foo = "bar"
413                );
414
415                assert_eq!(
416                    trace::current_dist_trace_ctx::<SpanId, TraceId>()
417                        .map(|x| x.0)
418                        .unwrap(),
419                    explicit_trace_id(),
420                );
421            }
422
423            let mut rt = Runtime::new().unwrap();
424            rt.block_on(f(vec![1, 2, 3]));
425        });
426    }
427
428    fn with_test_scenario_runner<F>(f: F)
429    where
430        F: Fn(),
431    {
432        let spans = Arc::new(Mutex::new(Vec::new()));
433        let events = Arc::new(Mutex::new(Vec::new()));
434        let cap: TestTelemetry = TestTelemetry::new(spans.clone(), events.clone());
435        let layer = TelemetryLayer::new("test_svc_name", cap, |x| x);
436
437        let subscriber = layer.with_subscriber(registry::Registry::default());
438        tracing::subscriber::with_default(subscriber, f);
439
440        let spans = spans.lock().unwrap();
441        let events = events.lock().unwrap();
442
443        // root span is exited (and reported) last
444        let root_span = &spans[3];
445        let child_spans = &spans[0..3];
446
447        let expected_trace_id = explicit_trace_id();
448
449        assert_eq!(root_span.parent_id, Some(explicit_parent_span_id()));
450        assert_eq!(root_span.trace_id, expected_trace_id);
451
452        for (span, event) in child_spans.iter().zip(events.iter()) {
453            // confirm parent and trace ids are as expected
454            assert_eq!(span.parent_id, Some(root_span.id.clone()));
455            assert_eq!(event.parent_id, Some(span.id.clone()));
456            assert_eq!(span.trace_id, explicit_trace_id());
457            assert_eq!(event.trace_id, explicit_trace_id());
458        }
459    }
460}