opentelemetry_tracing_utils/
lib.rs

1//! Utilities for tracing and logging.
2//!
3//! Some fairly opinionated!
4
5use anyhow::Result;
6use std::str::FromStr;
7use tracing_opentelemetry::OpenTelemetryLayer;
8
9// tracing
10use opentelemetry::{global, propagation::TextMapCompositePropagator, trace::TracerProvider};
11use opentelemetry_sdk::{
12    propagation::{BaggagePropagator, TraceContextPropagator},
13    trace::SdkTracerProvider,
14};
15pub use opentelemetry_semantic_conventions as semcov;
16use tonic::{metadata::MetadataKey, service::Interceptor};
17use tracing::Span;
18pub use tracing_opentelemetry::OpenTelemetrySpanExt;
19use tracing_subscriber::{
20    fmt::{self, format::FmtSpan, TestWriter},
21    layer::SubscriberExt,
22    util::SubscriberInitExt,
23    EnvFilter, Layer,
24};
25
26use self::trace_output_fmt::JsonWithTraceId;
27
28pub mod trace_output_fmt;
29
30/// Set up an OTEL pipeline when the OTLP endpoint is set. Otherwise just set up tokio tracing
31/// support.
32pub fn set_up_logging() -> Result<LoggingSetupBuildResult> {
33    LoggingSetupBuilder::new().build()
34}
35
36/// The env var that controls whether to use OTLP or not.
37/// Accepts "otlp" or "none".
38pub const OTEL_TRACES_EXPORTER: &str = "OTEL_TRACES_EXPORTER";
39/// This will override OTEL_TRACES_EXPORTER if set.
40pub const NO_OTLP: &str = "NO_OTLP";
41
42#[derive(Debug)]
43pub struct LoggingSetupBuilder {
44    pub otlp_output_enabled: bool,
45    pub pretty_logs: bool,
46    pub use_test_writer: bool,
47}
48impl Default for LoggingSetupBuilder {
49    fn default() -> Self {
50        let no_otlp = match std::env::var(NO_OTLP).as_deref() {
51            Ok("0") => false,
52            Ok(_) => true,
53            Err(_) => false,
54        };
55
56        let otel_traces_exporter = match std::env::var(OTEL_TRACES_EXPORTER).as_deref() {
57            Ok("otlp") => OtelTracesExporterOption::Otlp,
58            Ok("none") => OtelTracesExporterOption::None,
59            _ => OtelTracesExporterOption::Otlp,
60        };
61
62        let otlp_enabled = no_otlp == false
63            && match otel_traces_exporter {
64                OtelTracesExporterOption::Otlp => true,
65                OtelTracesExporterOption::None => false,
66            };
67
68        // either use the otlp state or PRETTY_LOGS env var to decide log format
69        let pretty_logs = std::env::var("PRETTY_LOGS")
70            .map(|e| &e == "1")
71            .unwrap_or_else(|_| !otlp_enabled);
72
73        Self {
74            otlp_output_enabled: otlp_enabled,
75            pretty_logs,
76            use_test_writer: false,
77        }
78    }
79}
80
81enum OtelTracesExporterOption {
82    Otlp,
83    None,
84}
85
86impl LoggingSetupBuilder {
87    pub fn new() -> Self {
88        Self::default()
89    }
90    pub fn build(&self) -> Result<LoggingSetupBuildResult> {
91        let otlp_enabled = self.otlp_output_enabled;
92
93        // First create 1 or more propagators
94        let baggage_propagator = BaggagePropagator::new();
95        let trace_context_propagator = TraceContextPropagator::new();
96
97        // Then create a composite propagator
98        let composite_propagator = TextMapCompositePropagator::new(vec![
99            Box::new(baggage_propagator),
100            Box::new(trace_context_propagator),
101        ]);
102
103        global::set_text_map_propagator(composite_propagator);
104
105        // An exporter to be used when there is no OTLP endpoint
106        let basic_no_otlp_tracer_provider = SdkTracerProvider::builder().build();
107
108        // Install a new OpenTelemetry trace pipeline
109        // OTLP over GRPC tracer exporter
110        let otlp_tracer_exporter = opentelemetry_otlp::SpanExporter::builder()
111            .with_tonic()
112            .build()?;
113        // OTLP tracer setup, using the exporter from above
114        let otlp_tracer: SdkTracerProvider = SdkTracerProvider::builder()
115            .with_batch_exporter(otlp_tracer_exporter)
116            .build();
117
118        let tracer_provider = match otlp_enabled {
119            true => otlp_tracer,
120            // BUG: the non-otlp tracer isn't correctly setting context/linking ids
121            false => basic_no_otlp_tracer_provider,
122        };
123        let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME"));
124
125        // Create a tracing layer with the configured tracer
126        let opentelemetry: OpenTelemetryLayer<_, _> = tracing_opentelemetry::layer()
127            .with_error_fields_to_exceptions(true)
128            .with_error_records_to_exceptions(true)
129            .with_tracer(tracer);
130
131        let use_test_writer = self.use_test_writer;
132        let pretty_logs = self.pretty_logs;
133
134        #[derive(Debug)]
135        enum MaybeTestWriterLayer<N, E> {
136            WithTestWriter(fmt::Layer<tracing_subscriber::Registry, N, E, TestWriter>),
137            NoTestWriter(fmt::Layer<tracing_subscriber::Registry>),
138        }
139
140        let base_layer = fmt::Layer::default();
141        let base_layer: MaybeTestWriterLayer<_, _> = match use_test_writer {
142            false => MaybeTestWriterLayer::NoTestWriter(base_layer),
143            true => MaybeTestWriterLayer::WithTestWriter(base_layer.with_test_writer()),
144        };
145
146        // Include an option for when there is no otlp endpoint available. In this case, pretty print
147        // events, as the data doesn't need to be nicely formatted json for analysis.
148        let format_layers = match pretty_logs {
149            // json fmt layer
150            false => match base_layer {
151                MaybeTestWriterLayer::NoTestWriter(layer) => {
152                    layer.json().event_format(JsonWithTraceId).boxed()
153                }
154                MaybeTestWriterLayer::WithTestWriter(layer) => {
155                    layer.json().event_format(JsonWithTraceId).boxed()
156                }
157            },
158            // pretty fmt layer
159            true => match base_layer {
160                MaybeTestWriterLayer::NoTestWriter(layer) => {
161                    layer.pretty().with_span_events(FmtSpan::NONE).boxed()
162                }
163                MaybeTestWriterLayer::WithTestWriter(layer) => {
164                    layer.pretty().with_span_events(FmtSpan::NONE).boxed()
165                }
166            },
167        };
168
169        let layers = opentelemetry.and_then(format_layers);
170
171        let tracing_registry = tracing_subscriber::registry()
172            // Add a filter to the layers so that they only observe the spans that I want
173            .with(layers.with_filter(
174                // Parse env filter from RUST_LOG, setting a default directive if that fails.
175                // Syntax for directives is here: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
176                EnvFilter::try_from_default_env().unwrap_or_else(|_| {
177                    // e.g. "RUST_LOG=hello_rust_backend,warn" would do everything from hello_rust_backend, and only "warn" level or higher from elsewhere
178                    EnvFilter::try_new("info")
179                        .expect("hard-coded default directive should be valid")
180                }),
181            ));
182
183        #[cfg(feature = "tokio-console")]
184        let tracing_registry = tracing_registry.with(console_subscriber::spawn());
185
186        tracing_registry.try_init()?;
187
188        // Set the global tracer provider using a clone of the tracer_provider.
189        // Setting global tracer provider is required if other parts of the application
190        // uses global::tracer() or global::tracer_with_version() to get a tracer.
191        // Cloning simply creates a new reference to the same tracer provider. It is
192        // important to hold on to the tracer_provider here, so as to invoke
193        // shutdown on it when application ends.
194        global::set_tracer_provider(tracer_provider.clone());
195
196        Ok(LoggingSetupBuildResult { tracer_provider })
197    }
198}
199
200/// Hang on to this to be able to call `shutdown()` on the providers.
201pub struct LoggingSetupBuildResult {
202    /// Hang on to this to be able to call `shutdown()` on the provider.
203    pub tracer_provider: SdkTracerProvider,
204}
205
206/// This interceptor adds tokio tracing opentelemetry headers to grpc requests.
207/// Allows stitching together distributed traces!
208#[derive(Clone)]
209pub struct GrpcInterceptor;
210impl Interceptor for GrpcInterceptor {
211    fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
212        // get otel context from current tokio tracing span
213        let context = Span::current().context();
214
215        opentelemetry::global::get_text_map_propagator(|propagator| {
216            propagator.inject_context(&context, &mut MetadataInjector(req.metadata_mut()));
217        });
218
219        Ok(req)
220    }
221}
222
223pub struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
224impl<'a> opentelemetry::propagation::Injector for MetadataInjector<'a> {
225    fn set(&mut self, key: &str, value: String) {
226        if let Ok(key) = MetadataKey::from_str(key) {
227            if let Ok(val) = value.parse() {
228                self.0.insert(key, val);
229            }
230        }
231    }
232}
233
234/// A tonic channel intercepted to provide distributed tracing context propagation.
235pub type InterceptedGrpcService =
236    tonic::codegen::InterceptedService<tonic::transport::Channel, GrpcInterceptor>;
237
238#[cfg(feature = "tower")]
239pub use tower_tracing::*;
240
241#[cfg(feature = "tower")]
242pub mod tower_tracing {
243    use std::task::{Context, Poll};
244
245    use http::Request;
246    use opentelemetry::global;
247    use opentelemetry_http::HeaderExtractor;
248    use tower::{Layer, Service};
249    use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
250    use tower_http::trace::TraceLayer;
251    use tracing::trace;
252    use tracing_opentelemetry::OpenTelemetrySpanExt;
253
254    pub struct TracingLayer;
255
256    impl<S> Layer<S> for TracingLayer {
257        type Service = TracingService<S>;
258
259        fn layer(&self, service: S) -> Self::Service {
260            TracingService { service }
261        }
262    }
263
264    /// A middleware that sorts tracing propagation to a client
265    #[derive(Clone, Debug)]
266    pub struct TracingService<S> {
267        service: S,
268    }
269
270    impl<S, BodyType> Service<http::Request<BodyType>> for TracingService<S>
271    where
272        S: Service<http::Request<BodyType>>,
273        BodyType: std::fmt::Debug,
274    {
275        type Response = S::Response;
276        type Error = S::Error;
277        type Future = S::Future;
278
279        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
280            self.service.poll_ready(cx)
281        }
282
283        fn call(&mut self, mut request: Request<BodyType>) -> Self::Future {
284            let old_headers = request.headers().clone();
285
286            let context = tracing::Span::current().context();
287
288            global::get_text_map_propagator(|propagator| {
289                propagator.inject_context(
290                    &context,
291                    &mut opentelemetry_http::HeaderInjector(request.headers_mut()),
292                )
293            });
294
295            trace!(
296                "
297--------------------------------------------------------------
298old headers:
299{:#?}
300new headers:
301{:#?}
302-----------------------------------------------",
303                old_headers,
304                request.headers()
305            );
306
307            self.service.call(request)
308        }
309    }
310
311    /// Trace context propagation: associate a new span with the OTel trace of the given request,
312    /// if any and valid.
313    ///
314    /// This uses the tower-http crate
315    ///
316    /// For propagation to work, RUST_LOG needs to include this crate, and also tower_http if you
317    /// want access log events from there.
318    pub fn make_tower_http_otel_trace_layer<BodyType>() -> TraceLayer<
319        SharedClassifier<ServerErrorsAsFailures>,
320        impl (Fn(&Request<BodyType>) -> tracing::Span) + Clone,
321    > {
322        tower_http::trace::TraceLayer::new_for_http().make_span_with(
323            |request: &http::Request<BodyType>| {
324                let context = get_otel_context_from_request(request);
325
326                let span = tracing::debug_span!(
327                        "request",
328                        method = %request.method(),
329                        uri = %request.uri(),
330                        version = ?request.version(),
331                        headers = ?request.headers());
332
333                span.set_parent(context);
334
335                span
336            },
337        )
338    }
339
340    /// Just get the context, don't set the parent context
341    pub fn get_otel_context_from_request<BodyType>(
342        request: &Request<BodyType>,
343    ) -> opentelemetry::Context {
344        // Return context, either from request or pre-existing if no or invalid data is received.
345        let parent_context = global::get_text_map_propagator(|propagator| {
346            let extracted = propagator.extract(&HeaderExtractor(request.headers()));
347            trace!("extracted: {:#?}", &extracted);
348            extracted
349        });
350        trace!("parent context (extraction): {:#?}", parent_context);
351
352        parent_context
353    }
354
355    #[cfg(test)]
356    mod tests {
357        use opentelemetry::{baggage::BaggageExt, trace::TraceContextExt};
358
359        use crate::tower_tracing::get_otel_context_from_request;
360
361        /// Test whether propagation from standard headers is working
362        #[tokio::test]
363        async fn test_trace_context_extractor() {
364            let _ = crate::set_up_logging().map_err(|err| dbg!(err));
365
366            let request: http::Request<String> = http::Request::builder()
367                .uri("/")
368                .header(
369                    "traceparent",
370                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
371                )
372                .header("tracestate", "asdf=123456")
373                .header(
374                    "baggage",
375                    "userId=alice,serverNode=DF%2028,isProduction=false",
376                )
377                .body("".to_string())
378                .unwrap();
379
380            let context = get_otel_context_from_request(&request);
381
382            dbg!(&context);
383            dbg!(&context.has_active_span());
384            assert!(context.has_active_span());
385
386            let baggage = context.baggage();
387            assert_eq!(baggage.get("userId"), Some(&"alice".into()));
388            assert_eq!(baggage.get("serverNode"), Some(&"DF 28".into()));
389            assert_eq!(baggage.get("isProduction"), Some(&"false".into()));
390        }
391    }
392}