1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use crate::telemetry_layer::TraceCtxRegistry;
use std::time::SystemTime;
use tracing_subscriber::registry::LookupSpan;
pub fn register_dist_tracing_root<SpanId, TraceId>(
trace_id: TraceId,
remote_parent_span: Option<SpanId>,
) -> Result<(), TraceCtxError>
where
SpanId: 'static + Clone + Send + Sync,
TraceId: 'static + Clone + Send + Sync,
{
let span = tracing::Span::current();
span.with_subscriber(|(current_span_id, dispatch)| {
if let Some(trace_ctx_registry) =
dispatch.downcast_ref::<TraceCtxRegistry<SpanId, TraceId>>()
{
trace_ctx_registry.record_trace_ctx(
trace_id,
remote_parent_span,
current_span_id.clone(),
);
Ok(())
} else {
Err(TraceCtxError::TelemetryLayerNotRegistered)
}
})
.ok_or(TraceCtxError::NoEnabledSpan)?
}
pub fn current_dist_trace_ctx<SpanId, TraceId>() -> Result<(TraceId, SpanId), TraceCtxError>
where
SpanId: 'static + Clone + Send + Sync,
TraceId: 'static + Clone + Send + Sync,
{
let span = tracing::Span::current();
span.with_subscriber(|(current_span_id, dispatch)| {
let trace_ctx_registry = dispatch
.downcast_ref::<TraceCtxRegistry<SpanId, TraceId>>()
.ok_or(TraceCtxError::TelemetryLayerNotRegistered)?;
let registry = dispatch
.downcast_ref::<tracing_subscriber::Registry>()
.ok_or(TraceCtxError::RegistrySubscriberNotRegistered)?;
let iter = itertools::unfold(Some(current_span_id.clone()), |st| match st {
Some(target_id) => {
let res = registry
.span(target_id)
.expect("span data not found during eval_ctx for current_trace_ctx");
*st = res.parent().map(|x| x.id());
Some(res)
}
None => None,
});
trace_ctx_registry
.eval_ctx(iter)
.map(|x| {
(
x.trace_id,
trace_ctx_registry.promote_span_id(current_span_id.clone()),
)
})
.ok_or(TraceCtxError::NoParentNodeHasTraceCtx)
})
.ok_or(TraceCtxError::NoEnabledSpan)?
}
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
#[non_exhaustive]
pub enum TraceCtxError {
TelemetryLayerNotRegistered,
RegistrySubscriberNotRegistered,
NoEnabledSpan,
NoParentNodeHasTraceCtx,
}
#[derive(Debug, Clone)]
pub struct Span<Visitor, SpanId, TraceId> {
pub id: SpanId,
pub trace_id: TraceId,
pub parent_id: Option<SpanId>,
pub initialized_at: SystemTime,
pub completed_at: SystemTime,
pub meta: &'static tracing::Metadata<'static>,
pub service_name: &'static str,
pub values: Visitor,
}
#[derive(Clone, Debug)]
pub struct Event<Visitor, SpanId, TraceId> {
pub trace_id: TraceId,
pub parent_id: Option<SpanId>,
pub initialized_at: SystemTime,
pub meta: &'static tracing::Metadata<'static>,
pub service_name: &'static str,
pub values: Visitor,
}