eaze_tracing_distributed/
telemetry_layer.rs1use crate::telemetry::Telemetry;
2use crate::trace;
3use std::any::TypeId;
4use std::collections::HashMap;
5use std::time::SystemTime;
6use tracing::span::{Attributes, Id, Record};
7use tracing::{Event, Subscriber};
8use tracing_subscriber::{layer::Context, registry, Layer};
9
10#[cfg(feature = "use_parking_lot")]
11use parking_lot::RwLock;
12#[cfg(not(feature = "use_parking_lot"))]
13use std::sync::RwLock;
14
15pub struct TelemetryLayer<Telemetry, SpanId, TraceId> {
18 pub(crate) telemetry: Telemetry,
19 service_name: &'static str,
20 pub(crate) trace_ctx_registry: TraceCtxRegistry<SpanId, TraceId>,
22}
23
24#[derive(PartialEq, Eq, Hash, Clone, Debug)]
25pub(crate) struct TraceCtx<SpanId, TraceId> {
26 pub(crate) parent_span: Option<SpanId>,
27 pub(crate) trace_id: TraceId,
28}
29
30pub(crate) struct TraceCtxRegistry<SpanId, TraceId> {
32 registry: RwLock<HashMap<Id, TraceCtx<SpanId, TraceId>>>,
33 promote_span_id: Box<dyn 'static + Send + Sync + Fn(Id) -> SpanId>,
34}
35
36impl<SpanId, TraceId> TraceCtxRegistry<SpanId, TraceId>
37where
38 SpanId: 'static + Clone + Send + Sync,
39 TraceId: 'static + Clone + Send + Sync,
40{
41 pub(crate) fn promote_span_id(&self, id: Id) -> SpanId {
42 (self.promote_span_id)(id)
43 }
44
45 pub(crate) fn record_trace_ctx(
46 &self,
47 trace_id: TraceId,
48 remote_parent_span: Option<SpanId>,
49 id: Id,
50 ) {
51 let trace_ctx = TraceCtx {
52 trace_id,
53 parent_span: remote_parent_span,
54 };
55
56 #[cfg(not(feature = "use_parking_lot"))]
57 let mut trace_ctx_registry = self.registry.write().expect("write lock!");
58 #[cfg(feature = "use_parking_lot")]
59 let mut trace_ctx_registry = self.registry.write();
60
61 trace_ctx_registry.insert(id, trace_ctx); }
63
64 pub(crate) fn eval_ctx<
65 'a,
66 X: 'a + registry::LookupSpan<'a>,
67 I: std::iter::Iterator<Item = registry::SpanRef<'a, X>>,
68 >(
69 &self,
70 iter: I,
71 ) -> Option<TraceCtx<SpanId, TraceId>> {
72 let mut path = Vec::new();
73
74 for span_ref in iter {
75 let mut write_guard = span_ref.extensions_mut();
76 match write_guard.get_mut::<LazyTraceCtx<SpanId, TraceId>>() {
77 None => {
78 #[cfg(not(feature = "use_parking_lot"))]
79 let trace_ctx_registry = self.registry.read().unwrap();
80 #[cfg(feature = "use_parking_lot")]
81 let trace_ctx_registry = self.registry.read();
82
83 match trace_ctx_registry.get(&span_ref.id()) {
84 None => {
85 drop(write_guard);
86 path.push(span_ref);
87 }
88 Some(local_trace_root) => {
89 write_guard.insert(LazyTraceCtx(local_trace_root.clone()));
90
91 let res = if path.is_empty() {
92 local_trace_root.clone()
93 } else {
94 TraceCtx {
95 trace_id: local_trace_root.trace_id.clone(),
96 parent_span: None,
97 }
98 };
99
100 for span_ref in path.into_iter() {
101 let mut write_guard = span_ref.extensions_mut();
102 write_guard.replace::<LazyTraceCtx<SpanId, TraceId>>(LazyTraceCtx(
103 TraceCtx {
104 trace_id: local_trace_root.trace_id.clone(),
105 parent_span: None,
106 },
107 ));
108 }
109 return Some(res);
110 }
111 }
112 }
113 Some(LazyTraceCtx(already_evaluated)) => {
114 let res = if path.is_empty() {
115 already_evaluated.clone()
116 } else {
117 TraceCtx {
118 trace_id: already_evaluated.trace_id.clone(),
119 parent_span: None,
120 }
121 };
122
123 for span_ref in path.into_iter() {
124 let mut write_guard = span_ref.extensions_mut();
125 write_guard.replace::<LazyTraceCtx<SpanId, TraceId>>(LazyTraceCtx(
126 TraceCtx {
127 trace_id: already_evaluated.trace_id.clone(),
128 parent_span: None,
129 },
130 ));
131 }
132 return Some(res);
133 }
134 }
135 }
136
137 None
138 }
139
140 pub(crate) fn new<F: 'static + Send + Sync + Fn(Id) -> SpanId>(f: F) -> Self {
141 let registry = RwLock::new(HashMap::new());
142 let promote_span_id = Box::new(f);
143
144 TraceCtxRegistry {
145 registry,
146 promote_span_id,
147 }
148 }
149}
150
151impl<T, SpanId, TraceId> TelemetryLayer<T, SpanId, TraceId>
152where
153 SpanId: 'static + Clone + Send + Sync,
154 TraceId: 'static + Clone + Send + Sync,
155{
156 pub fn new<F: 'static + Send + Sync + Fn(Id) -> SpanId>(
160 service_name: &'static str,
161 telemetry: T,
162 promote_span_id: F,
163 ) -> Self {
164 let trace_ctx_registry = TraceCtxRegistry::new(promote_span_id);
165
166 TelemetryLayer {
167 service_name,
168 telemetry,
169 trace_ctx_registry,
170 }
171 }
172}
173
174impl<S, TraceId, SpanId, V, T> Layer<S> for TelemetryLayer<T, SpanId, TraceId>
175where
176 S: Subscriber + for<'a> registry::LookupSpan<'a>,
177 TraceId: 'static + Clone + Eq + Send + Sync,
178 SpanId: 'static + Clone + Eq + Send + Sync,
179 V: 'static + tracing::field::Visit + Send + Sync,
180 T: 'static + Telemetry<Visitor = V, TraceId = TraceId, SpanId = SpanId>,
181{
182 fn new_span(&self, attrs: &Attributes, id: &Id, ctx: Context<S>) {
183 let span = ctx.span(id).expect("span data not found during new_span");
184 let mut extensions_mut = span.extensions_mut();
185 extensions_mut.insert(SpanInitAt::new());
186
187 let mut visitor: V = self.telemetry.mk_visitor();
188 attrs.record(&mut visitor);
189 extensions_mut.insert::<V>(visitor);
190 }
191
192 fn on_record(&self, id: &Id, values: &Record, ctx: Context<S>) {
193 let span = ctx.span(id).expect("span data not found during on_record");
194 let mut extensions_mut = span.extensions_mut();
195 let visitor: &mut V = extensions_mut
196 .get_mut()
197 .expect("fields extension not found during on_record");
198 values.record(visitor);
199 }
200
201 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
202 let parent_id = if let Some(parent_id) = event.parent() {
203 Some(parent_id.clone())
205 } else if event.is_root() {
206 None
208 } else if let Some(parent_id) = ctx.current_span().id() {
209 Some(parent_id.clone())
211 } else {
212 None
214 };
215
216 match parent_id {
217 None => {} Some(parent_id) => {
219 let initialized_at = SystemTime::now();
220
221 let mut visitor = self.telemetry.mk_visitor();
222 event.record(&mut visitor);
223
224 let iter = itertools::unfold(Some(parent_id.clone()), |st| match st {
226 Some(target_id) => {
227 let res = ctx
228 .span(target_id)
229 .expect("span data not found during eval_ctx");
230 *st = res.parent().map(|x| x.id());
231 Some(res)
232 }
233 None => None,
234 });
235
236 if let Some(parent_trace_ctx) = self.trace_ctx_registry.eval_ctx(iter) {
238 let event = trace::Event {
239 trace_id: parent_trace_ctx.trace_id,
240 parent_id: Some(self.trace_ctx_registry.promote_span_id(parent_id)),
241 initialized_at,
242 meta: event.metadata(),
243 service_name: &self.service_name,
244 values: visitor,
245 };
246
247 self.telemetry.report_event(event);
248 }
249 }
250 }
251 }
252
253 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
254 let span = ctx.span(&id).expect("span data not found during on_close");
255
256 let iter = itertools::unfold(Some(id.clone()), |st| match st {
258 Some(target_id) => {
259 let res = ctx
260 .span(target_id)
261 .expect("span data not found during eval_ctx");
262 *st = res.parent().map(|x| x.id());
263 Some(res)
264 }
265 None => None,
266 });
267
268 if let Some(trace_ctx) = self.trace_ctx_registry.eval_ctx(iter) {
270 let mut extensions_mut = span.extensions_mut();
271 let visitor: V = extensions_mut
272 .remove()
273 .expect("should be present on all spans");
274 let SpanInitAt(initialized_at) = extensions_mut
275 .remove()
276 .expect("should be present on all spans");
277
278 let completed_at = SystemTime::now();
279
280 let parent_id = match trace_ctx.parent_span {
281 None => span
282 .parent()
283 .map(|parent_ref| self.trace_ctx_registry.promote_span_id(parent_ref.id())),
284 Some(parent_span) => Some(parent_span),
285 };
286
287 let span = trace::Span {
288 id: self.trace_ctx_registry.promote_span_id(id),
289 meta: span.metadata(),
290 parent_id,
291 initialized_at,
292 trace_id: trace_ctx.trace_id,
293 completed_at,
294 service_name: self.service_name,
295 values: visitor,
296 };
297
298 self.telemetry.report_span(span);
299 };
300 }
301
302 unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
307 match () {
311 _ if id == TypeId::of::<Self>() => Some(self as *const Self as *const ()),
312 _ if id == TypeId::of::<TraceCtxRegistry<SpanId, TraceId>>() => Some(
313 &self.trace_ctx_registry as *const TraceCtxRegistry<SpanId, TraceId> as *const (),
314 ),
315 _ => None,
316 }
317 }
318}
319
320struct LazyTraceCtx<SpanId, TraceId>(TraceCtx<SpanId, TraceId>);
322
323struct SpanInitAt(SystemTime);
324
325impl SpanInitAt {
326 fn new() -> Self {
327 let initialized_at = SystemTime::now();
328
329 Self(initialized_at)
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::telemetry::test::{SpanId, TestTelemetry, TraceId};
337 use std::sync::Arc;
338 use std::sync::Mutex;
339 use std::time::Duration;
340 use tokio::runtime::Runtime;
341 use tracing::instrument;
342 use tracing_subscriber::layer::Layer;
343
344 fn explicit_trace_id() -> TraceId {
345 135
346 }
347
348 fn explicit_parent_span_id() -> SpanId {
349 Id::from_u64(246)
350 }
351
352 #[test]
353 fn test_instrument() {
354 with_test_scenario_runner(|| {
355 #[instrument]
356 fn f(ns: Vec<u64>) {
357 trace::register_dist_tracing_root(
358 explicit_trace_id(),
359 Some(explicit_parent_span_id()),
360 )
361 .unwrap();
362 for n in ns {
363 g(format!("{}", n));
364 }
365 }
366
367 #[instrument]
368 fn g(_s: String) {
369 let use_of_reserved_word = "duration-value";
370 tracing::event!(
371 tracing::Level::INFO,
372 duration_ms = use_of_reserved_word,
373 foo = "bar"
374 );
375
376 assert_eq!(
377 trace::current_dist_trace_ctx::<SpanId, TraceId>()
378 .map(|x| x.0)
379 .unwrap(),
380 explicit_trace_id(),
381 );
382 }
383
384 f(vec![1, 2, 3]);
385 });
386 }
387
388 #[test]
390 fn test_async_instrument() {
391 with_test_scenario_runner(|| {
392 #[instrument]
393 async fn f(ns: Vec<u64>) {
394 trace::register_dist_tracing_root(
395 explicit_trace_id(),
396 Some(explicit_parent_span_id()),
397 )
398 .unwrap();
399 for n in ns {
400 g(format!("{}", n)).await;
401 }
402 }
403
404 #[instrument]
405 async fn g(s: String) {
406 tokio::time::delay_for(Duration::from_millis(100)).await;
408 let use_of_reserved_word = "duration-value";
409 tracing::event!(
410 tracing::Level::INFO,
411 duration_ms = use_of_reserved_word,
412 foo = "bar"
413 );
414
415 assert_eq!(
416 trace::current_dist_trace_ctx::<SpanId, TraceId>()
417 .map(|x| x.0)
418 .unwrap(),
419 explicit_trace_id(),
420 );
421 }
422
423 let mut rt = Runtime::new().unwrap();
424 rt.block_on(f(vec![1, 2, 3]));
425 });
426 }
427
428 fn with_test_scenario_runner<F>(f: F)
429 where
430 F: Fn(),
431 {
432 let spans = Arc::new(Mutex::new(Vec::new()));
433 let events = Arc::new(Mutex::new(Vec::new()));
434 let cap: TestTelemetry = TestTelemetry::new(spans.clone(), events.clone());
435 let layer = TelemetryLayer::new("test_svc_name", cap, |x| x);
436
437 let subscriber = layer.with_subscriber(registry::Registry::default());
438 tracing::subscriber::with_default(subscriber, f);
439
440 let spans = spans.lock().unwrap();
441 let events = events.lock().unwrap();
442
443 let root_span = &spans[3];
445 let child_spans = &spans[0..3];
446
447 let expected_trace_id = explicit_trace_id();
448
449 assert_eq!(root_span.parent_id, Some(explicit_parent_span_id()));
450 assert_eq!(root_span.trace_id, expected_trace_id);
451
452 for (span, event) in child_spans.iter().zip(events.iter()) {
453 assert_eq!(span.parent_id, Some(root_span.id.clone()));
455 assert_eq!(event.parent_id, Some(span.id.clone()));
456 assert_eq!(span.trace_id, explicit_trace_id());
457 assert_eq!(event.trace_id, explicit_trace_id());
458 }
459 }
460}