opentelemetry_tracing_utils/
lib.rs

1//! Utilities for tracing and logging.
2//!
3//! Some fairly opinionated!
4
5use anyhow::Result;
6use std::str::FromStr;
7use tracing_opentelemetry::OpenTelemetryLayer;
8
9// tracing
10use opentelemetry::{global, propagation::TextMapCompositePropagator, trace::TracerProvider};
11use opentelemetry_sdk::{
12    propagation::{BaggagePropagator, TraceContextPropagator},
13    trace::SdkTracerProvider,
14};
15pub use opentelemetry_semantic_conventions as semcov;
16use tonic::{metadata::MetadataKey, service::Interceptor};
17use tracing::Span;
18pub use tracing_opentelemetry::OpenTelemetrySpanExt;
19use tracing_subscriber::{
20    fmt::{self, format::FmtSpan, TestWriter},
21    layer::SubscriberExt,
22    util::SubscriberInitExt,
23    EnvFilter, Layer,
24};
25
26use self::trace_output_fmt::JsonWithTraceId;
27
28pub mod trace_output_fmt;
29
30/// Set up an OTEL pipeline when the OTLP endpoint is set. Otherwise just set up tokio tracing
31/// support.
32pub fn set_up_logging() -> Result<LoggingSetupBuildResult> {
33    LoggingSetupBuilder::new().build()
34}
35
36/// The env var that controls whether to use OTLP or not.
37/// Accepts "otlp" or "none".
38pub const OTEL_TRACES_EXPORTER: &str = "OTEL_TRACES_EXPORTER";
39/// This will override OTEL_TRACES_EXPORTER if set.
40pub const NO_OTLP: &str = "NO_OTLP";
41
42#[derive(Debug)]
43pub struct LoggingSetupBuilder {
44    pub otel_traces_exporter: OtelTracesExporterOption,
45    pub pretty_logs: bool,
46    pub use_test_writer: bool,
47}
48#[derive(Debug)]
49pub enum OtelTracesExporterOption {
50    /// Send traces to an OTLP endpoint
51    Otlp,
52    /// Write OTLP traces to stdout. Useful for debugging otel stuff.
53    Stdout,
54    /// Don't send OTEL traces anywhere
55    None,
56}
57
58impl Default for LoggingSetupBuilder {
59    fn default() -> Self {
60        let no_otlp = match std::env::var(NO_OTLP).as_deref() {
61            Ok("0") => false,
62            Ok(_) => true,
63            Err(_) => false,
64        };
65
66        let otel_traces_exporter = match no_otlp {
67            true => OtelTracesExporterOption::None,
68            false => match std::env::var(OTEL_TRACES_EXPORTER).as_deref() {
69                Ok("otlp") => OtelTracesExporterOption::Otlp,
70                Ok("none") => OtelTracesExporterOption::None,
71                Ok("stdout") => OtelTracesExporterOption::Stdout,
72                _ => OtelTracesExporterOption::Otlp,
73            },
74        };
75
76        // either use the otlp state or PRETTY_LOGS env var to decide log format
77        let pretty_logs = std::env::var("PRETTY_LOGS")
78            .map(|e| &e == "1")
79            // if PRETTY_LOGS is not set, and the otel traces exporter is set to none, then use
80            // pretty logs (since this is probably a dev environment)
81            .unwrap_or_else(|_| matches!(otel_traces_exporter, OtelTracesExporterOption::None));
82
83        Self {
84            otel_traces_exporter,
85            pretty_logs,
86            use_test_writer: false,
87        }
88    }
89}
90
91impl LoggingSetupBuilder {
92    pub fn new() -> Self {
93        Self::default()
94    }
95    pub fn build(&self) -> Result<LoggingSetupBuildResult> {
96        let otlp_enabled = &self.otel_traces_exporter;
97
98        // First create 1 or more propagators
99        let baggage_propagator = BaggagePropagator::new();
100        let trace_context_propagator = TraceContextPropagator::new();
101
102        // Then create a composite propagator
103        let composite_propagator = TextMapCompositePropagator::new(vec![
104            Box::new(baggage_propagator),
105            Box::new(trace_context_propagator),
106        ]);
107
108        global::set_text_map_propagator(composite_propagator);
109
110        // An exporter to be used when there is no OTLP endpoint
111        let otel_tracer_provider_no_output = SdkTracerProvider::builder().build();
112        // An exporter that sends otel traces to stdout. Useful for debugging otel stuff.
113        let otel_tracer_provider_stdout = SdkTracerProvider::builder()
114            .with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
115            .build();
116
117        // Install a new OpenTelemetry trace pipeline
118        // OTLP over GRPC tracer exporter
119        let otlp_tracer_exporter = opentelemetry_otlp::SpanExporter::builder()
120            .with_tonic()
121            .build()?;
122        // OTLP tracer setup, using the exporter from above
123        let otlp_tracer: SdkTracerProvider = SdkTracerProvider::builder()
124            .with_batch_exporter(otlp_tracer_exporter)
125            .build();
126
127        let tracer_provider = match otlp_enabled {
128            OtelTracesExporterOption::Otlp => otlp_tracer,
129            OtelTracesExporterOption::Stdout => otel_tracer_provider_stdout,
130            // BUG: the non-otlp tracer isn't correctly setting context/linking ids
131            OtelTracesExporterOption::None => otel_tracer_provider_no_output,
132        };
133        let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME"));
134
135        // Create a tracing layer with the configured tracer
136        let opentelemetry: OpenTelemetryLayer<_, _> = tracing_opentelemetry::layer()
137            .with_error_fields_to_exceptions(true)
138            .with_error_records_to_exceptions(true)
139            .with_tracer(tracer);
140
141        let use_test_writer = self.use_test_writer;
142        let pretty_logs = self.pretty_logs;
143
144        #[derive(Debug)]
145        enum MaybeTestWriterLayer<N, E> {
146            WithTestWriter(fmt::Layer<tracing_subscriber::Registry, N, E, TestWriter>),
147            NoTestWriter(fmt::Layer<tracing_subscriber::Registry>),
148        }
149
150        let base_layer = fmt::Layer::default();
151        let base_layer: MaybeTestWriterLayer<_, _> = match use_test_writer {
152            false => MaybeTestWriterLayer::NoTestWriter(base_layer),
153            true => MaybeTestWriterLayer::WithTestWriter(base_layer.with_test_writer()),
154        };
155
156        // Include an option for when there is no otlp endpoint available. In this case, pretty print
157        // events, as the data doesn't need to be nicely formatted json for analysis.
158        let format_layers = match pretty_logs {
159            // json fmt layer
160            false => match base_layer {
161                MaybeTestWriterLayer::NoTestWriter(layer) => {
162                    layer.json().event_format(JsonWithTraceId).boxed()
163                }
164                MaybeTestWriterLayer::WithTestWriter(layer) => {
165                    layer.json().event_format(JsonWithTraceId).boxed()
166                }
167            },
168            // pretty fmt layer
169            true => match base_layer {
170                MaybeTestWriterLayer::NoTestWriter(layer) => {
171                    layer.pretty().with_span_events(FmtSpan::NONE).boxed()
172                }
173                MaybeTestWriterLayer::WithTestWriter(layer) => {
174                    layer.pretty().with_span_events(FmtSpan::NONE).boxed()
175                }
176            },
177        };
178
179        let layers = opentelemetry.and_then(format_layers);
180
181        let tracing_registry = tracing_subscriber::registry()
182            // Add a filter to the layers so that they only observe the spans that I want
183            .with(layers.with_filter(
184                // Parse env filter from RUST_LOG, setting a default directive if that fails.
185                // Syntax for directives is here: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
186                EnvFilter::try_from_default_env().unwrap_or_else(|_| {
187                    // e.g. "RUST_LOG=hello_rust_backend,warn" would do everything from hello_rust_backend, and only "warn" level or higher from elsewhere
188                    EnvFilter::try_new("info")
189                        .expect("hard-coded default directive should be valid")
190                }),
191            ));
192
193        #[cfg(feature = "tokio-console")]
194        let tracing_registry = tracing_registry.with(console_subscriber::spawn());
195
196        tracing_registry.try_init()?;
197
198        // Set the global tracer provider using a clone of the tracer_provider.
199        // Setting global tracer provider is required if other parts of the application
200        // uses global::tracer() or global::tracer_with_version() to get a tracer.
201        // Cloning simply creates a new reference to the same tracer provider. It is
202        // important to hold on to the tracer_provider here, so as to invoke
203        // shutdown on it when application ends.
204        global::set_tracer_provider(tracer_provider.clone());
205
206        Ok(LoggingSetupBuildResult { tracer_provider })
207    }
208}
209
210/// Hang on to this to be able to call `shutdown()` on the providers.
211pub struct LoggingSetupBuildResult {
212    /// Hang on to this to be able to call `shutdown()` on the provider.
213    pub tracer_provider: SdkTracerProvider,
214}
215
216/// This interceptor adds tokio tracing opentelemetry headers to grpc requests.
217/// Allows stitching together distributed traces!
218#[derive(Clone)]
219pub struct GrpcInterceptor;
220impl Interceptor for GrpcInterceptor {
221    fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
222        // get otel context from current tokio tracing span
223        let context = Span::current().context();
224
225        opentelemetry::global::get_text_map_propagator(|propagator| {
226            propagator.inject_context(&context, &mut MetadataInjector(req.metadata_mut()));
227        });
228
229        Ok(req)
230    }
231}
232
233pub struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
234impl<'a> opentelemetry::propagation::Injector for MetadataInjector<'a> {
235    fn set(&mut self, key: &str, value: String) {
236        if let Ok(key) = MetadataKey::from_str(key) {
237            if let Ok(val) = value.parse() {
238                self.0.insert(key, val);
239            }
240        }
241    }
242}
243
244/// A tonic channel intercepted to provide distributed tracing context propagation.
245pub type InterceptedGrpcService =
246    tonic::codegen::InterceptedService<tonic::transport::Channel, GrpcInterceptor>;
247
248#[cfg(feature = "tower")]
249pub use tower_tracing::*;
250
251#[cfg(feature = "tower")]
252pub mod tower_tracing {
253    use std::task::{Context, Poll};
254
255    use http::Request;
256    use opentelemetry::global;
257    use opentelemetry_http::HeaderExtractor;
258    use tower::{Layer, Service};
259    use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
260    use tower_http::trace::TraceLayer;
261    use tracing::trace;
262    use tracing_opentelemetry::OpenTelemetrySpanExt;
263
264    pub struct TracingLayer;
265
266    impl<S> Layer<S> for TracingLayer {
267        type Service = TracingService<S>;
268
269        fn layer(&self, service: S) -> Self::Service {
270            TracingService { service }
271        }
272    }
273
274    /// A middleware that sorts tracing propagation to a client
275    #[derive(Clone, Debug)]
276    pub struct TracingService<S> {
277        service: S,
278    }
279
280    impl<S, BodyType> Service<http::Request<BodyType>> for TracingService<S>
281    where
282        S: Service<http::Request<BodyType>>,
283        BodyType: std::fmt::Debug,
284    {
285        type Response = S::Response;
286        type Error = S::Error;
287        type Future = S::Future;
288
289        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
290            self.service.poll_ready(cx)
291        }
292
293        fn call(&mut self, mut request: Request<BodyType>) -> Self::Future {
294            let old_headers = request.headers().clone();
295
296            let context = tracing::Span::current().context();
297
298            global::get_text_map_propagator(|propagator| {
299                propagator.inject_context(
300                    &context,
301                    &mut opentelemetry_http::HeaderInjector(request.headers_mut()),
302                )
303            });
304
305            trace!(
306                "
307--------------------------------------------------------------
308old headers:
309{:#?}
310new headers:
311{:#?}
312-----------------------------------------------",
313                old_headers,
314                request.headers()
315            );
316
317            self.service.call(request)
318        }
319    }
320
321    /// Trace context propagation: associate a new span with the OTel trace of the given request,
322    /// if any and valid.
323    ///
324    /// This uses the tower-http crate
325    ///
326    /// For propagation to work, RUST_LOG needs to include this crate, and also tower_http if you
327    /// want access log events from there.
328    pub fn make_tower_http_otel_trace_layer<BodyType>() -> TraceLayer<
329        SharedClassifier<ServerErrorsAsFailures>,
330        impl (Fn(&Request<BodyType>) -> tracing::Span) + Clone,
331    > {
332        tower_http::trace::TraceLayer::new_for_http().make_span_with(
333            |request: &http::Request<BodyType>| {
334                let context = get_otel_context_from_request(request);
335
336                let span = tracing::debug_span!(
337                        "request",
338                        method = %request.method(),
339                        uri = %request.uri(),
340                        version = ?request.version(),
341                        headers = ?request.headers());
342
343                span.set_parent(context);
344
345                span
346            },
347        )
348    }
349
350    /// Just get the context, don't set the parent context
351    pub fn get_otel_context_from_request<BodyType>(
352        request: &Request<BodyType>,
353    ) -> opentelemetry::Context {
354        // Return context, either from request or pre-existing if no or invalid data is received.
355        let parent_context = global::get_text_map_propagator(|propagator| {
356            let extracted = propagator.extract(&HeaderExtractor(request.headers()));
357            trace!("extracted: {:#?}", &extracted);
358            extracted
359        });
360        trace!("parent context (extraction): {:#?}", parent_context);
361
362        parent_context
363    }
364
365    #[cfg(test)]
366    mod tests {
367        use opentelemetry::{baggage::BaggageExt, trace::TraceContextExt};
368
369        use crate::tower_tracing::get_otel_context_from_request;
370
371        /// Test whether propagation from standard headers is working
372        #[tokio::test]
373        async fn test_trace_context_extractor() {
374            let _ = crate::set_up_logging().map_err(|err| dbg!(err));
375
376            let request: http::Request<String> = http::Request::builder()
377                .uri("/")
378                .header(
379                    "traceparent",
380                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
381                )
382                .header("tracestate", "asdf=123456")
383                .header(
384                    "baggage",
385                    "userId=alice,serverNode=DF%2028,isProduction=false",
386                )
387                .body("".to_string())
388                .unwrap();
389
390            let context = get_otel_context_from_request(&request);
391
392            dbg!(&context);
393            dbg!(&context.has_active_span());
394            assert!(context.has_active_span());
395
396            let baggage = context.baggage();
397            assert_eq!(baggage.get("userId"), Some(&"alice".into()));
398            assert_eq!(baggage.get("serverNode"), Some(&"DF 28".into()));
399            assert_eq!(baggage.get("isProduction"), Some(&"false".into()));
400        }
401    }
402}