re_perf_telemetry/
lib.rs

1//! Everything needed to set up telemetry (logs, traces, metrics) for both clients and servers.
2//!
3//! Despite the name `re_perf_telemetry`, this actually handles _all_ forms of telemetry,
4//! including all log output.
5//!
6//! This sort of telemetry is always disabled on our OSS binaries, and is only used for
7//! * The Rerun Cloud infrastructure
8//! * Profiling by Rerun developer
9//!
10//! Logging strategy
11//! ================
12//!
13//! * All our logs go through the structured `tracing` macros.
14//!
15//! * We always log from `tracing` directly into stdio: we never involve the `OpenTelemetry`
16//!   logging API. Production is expected to read the logs from the pod's output.
17//!   There is never any internal buffering going on, besides the buffering of stdio itself.
18//!
19//! * All logs that happen as part of the larger trace/span will automatically be uploaded
20//!   with that trace/span.
21//!   This makes our traces a very powerful debugging tool, in addition to a profiler.
22//!
23//! Tracing strategy
24//! ================
25//!
26//! * All our traces go through the structured `tracing` macros. We *never* use the
27//!   `OpenTelemetry` macros.
28//!
29//! * The traces go through a first layer of filtering based on the value of `RUST_TRACE`, which
30//!   functions similarly to a `RUST_LOG` filter.
31//!
32//! * The traces are then sent to the `OpenTelemetry` SDK, where they will go through a pass of
33//!   sampling before being sent to the OTLP endpoint.
34//!   The sampling mechanism is controlled by the official OTEL environment variables.
35//!
36//! * Spans that contains error logs will properly be marked as failed, and easily findable.
37//!
38//! Metric strategy
39//! ===============
40//!
41//! * Our metric strategy is basically the opposite of our logging strategy: everything goes
42//!   through `OpenTelemetry` directly, `tracing` is never involved.
43//!
44//! * Metrics are uploaded (as opposed to scrapped!) using the OTLP protocol, on a fixed interval
45//!   defined by the `OTEL_METRIC_EXPORT_INTERVAL` environment variable.
46
47mod args;
48mod grpc;
49mod memory_telemetry;
50mod metrics_server;
51mod prometheus;
52mod shared_reader;
53mod telemetry;
54mod utils;
55
56use opentelemetry_sdk::propagation::TraceContextPropagator;
57
58pub use self::{
59    args::{LogFormat, TelemetryArgs},
60    grpc::{
61        ClientTelemetryLayer, GrpcMakeSpan, GrpcOnEos, GrpcOnFirstBodyChunk, GrpcOnRequest,
62        GrpcOnResponse, ServerTelemetryLayer, TracingExtractorInterceptor,
63        TracingInjectorInterceptor, new_client_telemetry_layer, new_server_telemetry_layer,
64    },
65    telemetry::{Telemetry, TelemetryDropBehavior},
66    utils::to_short_str,
67};
68
69pub mod external {
70    pub use clap;
71    pub use opentelemetry;
72    pub use tower;
73    pub use tower_http;
74    pub use tracing;
75    pub use tracing_opentelemetry;
76
77    #[cfg(feature = "tracy")]
78    pub use tracing_tracy;
79}
80
81// ---
82
83/// Returns the active [`TraceId`] in the current context, if any.
84///
85/// The returned trace ID can be search for in the distributed tracing backend, e.g. in jaeger:
86/// ```text
87/// http://localhost:16686/trace/{trace_id}
88/// ```
89///
90/// Returns `None` if there is no trace *actively being sampled* in the current context.
91///
92/// [`TraceId`]: [opentelemetry::TraceId]
93pub fn current_trace_id() -> Option<opentelemetry::TraceId> {
94    use opentelemetry::trace::TraceContextExt as _;
95    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
96
97    let cx = tracing::Span::current().context();
98    let span = cx.span();
99    let span_cx = span.span_context();
100
101    (span_cx.is_valid() && span_cx.is_sampled()).then(|| span_cx.trace_id())
102}
103
104/// Export the active trace in the current context as the W3C trace headers, if any.
105///
106/// Returns `None` if there is no trace *actively being sampled* in the current context.
107pub fn current_trace_headers() -> Option<TraceHeaders> {
108    use opentelemetry::propagation::text_map_propagator::TextMapPropagator as _;
109    use opentelemetry::trace::TraceContextExt as _;
110    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
111
112    let cx = tracing::Span::current().context();
113    let span = cx.span();
114    let span_cx = span.span_context();
115
116    if !span_cx.is_valid() || !span_cx.is_sampled() {
117        return None;
118    }
119
120    let propagator = TraceContextPropagator::new();
121    let mut carrier = TraceHeaders::empty();
122
123    propagator.inject_context(&cx, &mut carrier);
124
125    Some(carrier)
126}
127
128#[derive(Debug, Clone)]
129pub struct TraceHeaders {
130    pub traceparent: String,
131    pub tracestate: Option<String>,
132}
133
134impl TraceHeaders {
135    pub const TRACEPARENT_KEY: &'static str = "traceparent";
136    pub const TRACESTATE_KEY: &'static str = "tracestate";
137
138    fn empty() -> Self {
139        Self {
140            traceparent: String::new(),
141            tracestate: None,
142        }
143    }
144}
145
146impl opentelemetry::propagation::Injector for TraceHeaders {
147    fn set(&mut self, key: &str, value: String) {
148        match key {
149            Self::TRACEPARENT_KEY => self.traceparent = value,
150            Self::TRACESTATE_KEY => {
151                if !value.is_empty() {
152                    self.tracestate = Some(value);
153                }
154            }
155            _ => {}
156        }
157    }
158}
159
160impl opentelemetry::propagation::Extractor for TraceHeaders {
161    fn get(&self, key: &str) -> Option<&str> {
162        match key {
163            Self::TRACEPARENT_KEY => Some(self.traceparent.as_str()),
164            Self::TRACESTATE_KEY => self.tracestate.as_deref(),
165            _ => None,
166        }
167    }
168
169    fn keys(&self) -> Vec<&str> {
170        vec![Self::TRACEPARENT_KEY, Self::TRACESTATE_KEY]
171    }
172}
173
174impl From<&TraceHeaders> for opentelemetry::Context {
175    fn from(value: &TraceHeaders) -> Self {
176        use opentelemetry::propagation::text_map_propagator::TextMapPropagator as _;
177        let propagator = TraceContextPropagator::new();
178        propagator.extract(value)
179    }
180}
181
182// ---
183
184// Extension to [`tracing_subscriber:EnvFilter`] that allows to
185// add a directive only if not already present in the base filter
186pub trait EnvFilterExt
187where
188    Self: Sized,
189{
190    fn add_directive_if_absent(
191        self,
192        base: &str,
193        target: &str,
194        default: &str,
195    ) -> anyhow::Result<Self>;
196}
197
198impl EnvFilterExt for tracing_subscriber::EnvFilter {
199    fn add_directive_if_absent(
200        self,
201        base: &str,
202        target: &str,
203        default: &str,
204    ) -> anyhow::Result<Self> {
205        if !base.contains(&format!("{target}=")) {
206            let filter = self.add_directive(format!("{target}={default}").parse()?);
207            Ok(filter)
208        } else {
209            Ok(self)
210        }
211    }
212}