rustfoundry/telemetry/tracing/
internal.rs

1use super::init::TracingHarness;
2use super::StartTraceOptions;
3use rand::{self, Rng};
4
5use cf_rustracing::sampler::BoxSampler;
6use cf_rustracing::tag::Tag;
7use cf_rustracing_jaeger::span::{Span, SpanContext, SpanContextState};
8use std::borrow::Cow;
9use std::error::Error;
10use std::sync::Arc;
11
12pub(crate) type Tracer = cf_rustracing::Tracer<BoxSampler<SpanContextState>, SpanContextState>;
13
14#[derive(Debug, Clone)]
15pub(crate) struct SharedSpan {
16    // NOTE: we intentionally use a lock without poisoning here to not
17    // panic the threads if they just share telemetry with failed thread.
18    pub(crate) inner: Arc<parking_lot::RwLock<Span>>,
19    // NOTE: store sampling flag separately, so we don't need to acquire lock
20    // every time we need to check the flag.
21    is_sampled: bool,
22}
23
24impl From<Span> for SharedSpan {
25    fn from(inner: Span) -> Self {
26        let is_sampled = inner.is_sampled();
27
28        Self {
29            inner: Arc::new(parking_lot::RwLock::new(inner)),
30            is_sampled,
31        }
32    }
33}
34
35pub fn write_current_span(write_fn: impl FnOnce(&mut Span)) {
36    if let Some(span) = current_span() {
37        if span.is_sampled {
38            write_fn(&mut span.inner.write());
39        }
40    }
41}
42
43pub(crate) fn create_span(name: impl Into<Cow<'static, str>>) -> SharedSpan {
44    match current_span() {
45        Some(parent) => parent.inner.read().child(name, |o| o.start()),
46        None => start_trace(name, Default::default()),
47    }
48    .into()
49}
50
51pub(crate) fn current_span() -> Option<SharedSpan> {
52    TracingHarness::get().span_scope_stack.current()
53}
54
55pub(crate) fn span_trace_id(span: &Span) -> Option<String> {
56    span.context().map(|c| c.state().trace_id().to_string())
57}
58
59pub(crate) fn start_trace(
60    root_span_name: impl Into<Cow<'static, str>>,
61    options: StartTraceOptions,
62) -> Span {
63    let tracer = TracingHarness::get().tracer();
64    let root_span_name = root_span_name.into();
65    let mut span_builder = tracer.span(root_span_name.clone());
66
67    if let Some(state) = options.stitch_with_trace {
68        let ctx = SpanContext::new(state, vec![]);
69
70        span_builder = span_builder.child_of(&ctx);
71    }
72
73    if let Some(ratio) = options.override_sampling_ratio {
74        span_builder = span_builder.tag(Tag::new(
75            "sampling.priority",
76            if should_sample(ratio) { 1 } else { 0 },
77        ));
78    }
79
80    let mut current_span = match current_span() {
81        Some(current_span) if current_span.is_sampled => current_span,
82        _ => return span_builder.start(),
83    };
84
85    // if a prior trace was ongoing (e.g. during stitching, forking), we want to
86    // link the new trace with the existing one
87    let mut new_trace_root_span = span_builder.start();
88
89    link_new_trace_with_current(&mut current_span, &root_span_name, &mut new_trace_root_span);
90
91    new_trace_root_span
92}
93
94pub(super) fn reporter_error(err: impl Error) {
95    #[cfg(feature = "logging")]
96    crate::telemetry::log::error!("failed to report traces to the traces sink"; "error" => %err);
97
98    #[cfg(not(feature = "logging"))]
99    drop(err);
100}
101
102// Link a newly created trace in the current span's ref span and vice-versa
103fn link_new_trace_with_current(
104    current_span: &mut SharedSpan,
105    root_span_name: &str,
106    new_trace_root_span: &mut Span,
107) {
108    let current_span_lock = current_span.inner.read();
109    let mut new_trace_ref_span = create_fork_ref_span(root_span_name, &current_span_lock);
110
111    if let Some(trace_id) = span_trace_id(&*new_trace_root_span) {
112        new_trace_ref_span.set_tag(|| {
113            Tag::new(
114                "note",
115                "current trace was forked at this point, see the `trace_id` field to obtain the forked trace",
116            )
117        });
118
119        new_trace_ref_span.set_tag(|| Tag::new("trace_id", trace_id));
120    }
121
122    if let Some(trace_id) = span_trace_id(&current_span_lock) {
123        new_trace_root_span.set_tag(|| Tag::new("trace_id", trace_id));
124    }
125
126    if let Some(new_trace_ref_ctx) = new_trace_ref_span.context() {
127        let new_trace_ref_span_id = format!("{:32x}", new_trace_ref_ctx.state().span_id());
128
129        new_trace_root_span.set_tag(|| Tag::new("fork_of_span_id", new_trace_ref_span_id));
130    }
131}
132
133pub(crate) fn fork_trace(fork_name: impl Into<Cow<'static, str>>) -> SharedSpan {
134    match current_span() {
135        Some(span) if span.is_sampled => span,
136        _ => return Span::inactive().into(),
137    };
138
139    let fork_name = fork_name.into();
140
141    start_trace(
142        fork_name,
143        StartTraceOptions {
144            // NOTE: If the current span is sampled, then forked trace is also forcibly sampled
145            override_sampling_ratio: Some(1.0),
146            ..Default::default()
147        },
148    )
149    .into()
150}
151
152fn create_fork_ref_span(
153    fork_name: &str,
154    current_span_lock: &parking_lot::RwLockReadGuard<Span>,
155) -> Span {
156    let fork_ref_span_name = format!("[{fork_name} ref]");
157
158    current_span_lock.child(fork_ref_span_name, |o| o.start())
159}
160
161fn should_sample(sampling_ratio: f64) -> bool {
162    // NOTE: quick paths first, without rng involved
163    if sampling_ratio == 0.0 {
164        return false;
165    }
166
167    if sampling_ratio == 1.0 {
168        return true;
169    }
170
171    rand::thread_rng().gen_range(0.0..1.0) < sampling_ratio
172}