opentelemetry_tracing_utils/
lib.rs

1//! Utilities for tracing and logging.
2//!
3//! Some fairly opinionated!
4
5use anyhow::Result;
6use std::str::FromStr;
7use tracing_opentelemetry::OpenTelemetryLayer;
8
9// tracing
10use opentelemetry::{global, propagation::TextMapCompositePropagator, trace::TracerProvider};
11use opentelemetry_sdk::{
12    propagation::{BaggagePropagator, TraceContextPropagator},
13    trace::SdkTracerProvider,
14};
15pub use opentelemetry_semantic_conventions as semcov;
16use tonic::{metadata::MetadataKey, service::Interceptor};
17use tracing::Span;
18pub use tracing_opentelemetry::OpenTelemetrySpanExt;
19use tracing_subscriber::{
20    fmt::{self, format::FmtSpan, TestWriter},
21    layer::SubscriberExt,
22    util::SubscriberInitExt,
23    EnvFilter, Layer,
24};
25
26use self::trace_output_fmt::JsonWithTraceId;
27
28pub mod trace_output_fmt;
29
30/// Set up an OTEL pipeline when the OTLP endpoint is set. Otherwise just set up tokio tracing
31/// support.
32pub fn set_up_logging() -> Result<LoggingSetupBuildResult> {
33    LoggingSetupBuilder::new().build()
34}
35
36#[derive(Debug)]
37pub struct LoggingSetupBuilder {
38    pub otlp_output_enabled: bool,
39    pub pretty_logs: bool,
40    pub use_test_writer: bool,
41}
42impl Default for LoggingSetupBuilder {
43    fn default() -> Self {
44        let otlp_enabled = std::env::var("NO_OTLP")
45            .unwrap_or_else(|_| "0".to_owned())
46            .as_str()
47            == "0";
48
49        // either use the otlp state or PRETTY_LOGS env var to decide log format
50        let pretty_logs = std::env::var("PRETTY_LOGS")
51            .map(|e| &e == "1")
52            .unwrap_or_else(|_| !otlp_enabled);
53
54        Self {
55            otlp_output_enabled: otlp_enabled,
56            pretty_logs,
57            use_test_writer: false,
58        }
59    }
60}
61
62impl LoggingSetupBuilder {
63    pub fn new() -> Self {
64        Self::default()
65    }
66    pub fn build(&self) -> Result<LoggingSetupBuildResult> {
67        let otlp_enabled = self.otlp_output_enabled;
68
69        // First create 1 or more propagators
70        let baggage_propagator = BaggagePropagator::new();
71        let trace_context_propagator = TraceContextPropagator::new();
72
73        // Then create a composite propagator
74        let composite_propagator = TextMapCompositePropagator::new(vec![
75            Box::new(baggage_propagator),
76            Box::new(trace_context_propagator),
77        ]);
78
79        global::set_text_map_propagator(composite_propagator);
80
81        let basic_no_otlp_tracer_provider = SdkTracerProvider::builder()
82            .with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
83            .build();
84
85        // Install a new OpenTelemetry trace pipeline
86        // OTLP over GRPC tracer exporter
87        let otlp_tracer_exporter = opentelemetry_otlp::SpanExporter::builder()
88            .with_tonic()
89            .build()?;
90        // OTLP tracer setup, using the exporter from above
91        let otlp_tracer: SdkTracerProvider = SdkTracerProvider::builder()
92            .with_batch_exporter(otlp_tracer_exporter)
93            .build();
94
95        let tracer_provider = match otlp_enabled {
96            true => otlp_tracer,
97            // BUG: the non-otlp tracer isn't correctly setting context/linking ids
98            false => basic_no_otlp_tracer_provider,
99        };
100        let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME"));
101
102        // Create a tracing layer with the configured tracer
103        let opentelemetry: OpenTelemetryLayer<_, _> = tracing_opentelemetry::layer()
104            .with_error_fields_to_exceptions(true)
105            .with_error_records_to_exceptions(true)
106            .with_tracer(tracer);
107
108        let use_test_writer = self.use_test_writer;
109        let pretty_logs = self.pretty_logs;
110
111        #[derive(Debug)]
112        enum MaybeTestWriterLayer<N, E> {
113            WithTestWriter(fmt::Layer<tracing_subscriber::Registry, N, E, TestWriter>),
114            NoTestWriter(fmt::Layer<tracing_subscriber::Registry>),
115        }
116
117        let base_layer = fmt::Layer::default();
118        let base_layer: MaybeTestWriterLayer<_, _> = match use_test_writer {
119            false => MaybeTestWriterLayer::NoTestWriter(base_layer),
120            true => MaybeTestWriterLayer::WithTestWriter(base_layer.with_test_writer()),
121        };
122
123        // Include an option for when there is no otlp endpoint available. In this case, pretty print
124        // events, as the data doesn't need to be nicely formatted json for analysis.
125        let format_layers = match pretty_logs {
126            // json fmt layer
127            false => match base_layer {
128                MaybeTestWriterLayer::NoTestWriter(layer) => {
129                    layer.json().event_format(JsonWithTraceId).boxed()
130                }
131                MaybeTestWriterLayer::WithTestWriter(layer) => {
132                    layer.json().event_format(JsonWithTraceId).boxed()
133                }
134            },
135            // pretty fmt layer
136            true => match base_layer {
137                MaybeTestWriterLayer::NoTestWriter(layer) => {
138                    layer.pretty().with_span_events(FmtSpan::NONE).boxed()
139                }
140                MaybeTestWriterLayer::WithTestWriter(layer) => {
141                    layer.pretty().with_span_events(FmtSpan::NONE).boxed()
142                }
143            },
144        };
145
146        let layers = opentelemetry.and_then(format_layers);
147
148        let tracing_registry = tracing_subscriber::registry()
149            // Add a filter to the layers so that they only observe the spans that I want
150            .with(layers.with_filter(
151                // Parse env filter from RUST_LOG, setting a default directive if that fails.
152                // Syntax for directives is here: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
153                EnvFilter::try_from_default_env().unwrap_or_else(|_| {
154                    // e.g. "RUST_LOG=hello_rust_backend,warn" would do everything from hello_rust_backend, and only "warn" level or higher from elsewhere
155                    EnvFilter::try_new("info")
156                        .expect("hard-coded default directive should be valid")
157                }),
158            ));
159
160        #[cfg(feature = "tokio-console")]
161        let tracing_registry = tracing_registry.with(console_subscriber::spawn());
162
163        tracing_registry.try_init()?;
164
165        // Set the global tracer provider using a clone of the tracer_provider.
166        // Setting global tracer provider is required if other parts of the application
167        // uses global::tracer() or global::tracer_with_version() to get a tracer.
168        // Cloning simply creates a new reference to the same tracer provider. It is
169        // important to hold on to the tracer_provider here, so as to invoke
170        // shutdown on it when application ends.
171        global::set_tracer_provider(tracer_provider.clone());
172
173        Ok(LoggingSetupBuildResult { tracer_provider })
174    }
175}
176
177/// Hang on to this to be able to call `shutdown()` on the providers.
178pub struct LoggingSetupBuildResult {
179    /// Hang on to this to be able to call `shutdown()` on the provider.
180    pub tracer_provider: SdkTracerProvider,
181}
182
183/// This interceptor adds tokio tracing opentelemetry headers to grpc requests.
184/// Allows stitching together distributed traces!
185#[derive(Clone)]
186pub struct GrpcInterceptor;
187impl Interceptor for GrpcInterceptor {
188    fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
189        // get otel context from current tokio tracing span
190        let context = Span::current().context();
191
192        opentelemetry::global::get_text_map_propagator(|propagator| {
193            propagator.inject_context(&context, &mut MetadataInjector(req.metadata_mut()));
194        });
195
196        Ok(req)
197    }
198}
199
200pub struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
201impl<'a> opentelemetry::propagation::Injector for MetadataInjector<'a> {
202    fn set(&mut self, key: &str, value: String) {
203        if let Ok(key) = MetadataKey::from_str(key) {
204            if let Ok(val) = value.parse() {
205                self.0.insert(key, val);
206            }
207        }
208    }
209}
210
211/// A tonic channel intercepted to provide distributed tracing context propagation.
212pub type InterceptedGrpcService =
213    tonic::codegen::InterceptedService<tonic::transport::Channel, GrpcInterceptor>;
214
215#[cfg(feature = "tower")]
216pub use tower_tracing::*;
217
218#[cfg(feature = "tower")]
219pub mod tower_tracing {
220    use std::task::{Context, Poll};
221
222    use http::Request;
223    use opentelemetry::global;
224    use opentelemetry_http::HeaderExtractor;
225    use tower::{Layer, Service};
226    use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
227    use tower_http::trace::TraceLayer;
228    use tracing::trace;
229    use tracing_opentelemetry::OpenTelemetrySpanExt;
230
231    pub struct TracingLayer;
232
233    impl<S> Layer<S> for TracingLayer {
234        type Service = TracingService<S>;
235
236        fn layer(&self, service: S) -> Self::Service {
237            TracingService { service }
238        }
239    }
240
241    /// A middleware that sorts tracing propagation to a client
242    #[derive(Clone, Debug)]
243    pub struct TracingService<S> {
244        service: S,
245    }
246
247    impl<S, BodyType> Service<http::Request<BodyType>> for TracingService<S>
248    where
249        S: Service<http::Request<BodyType>>,
250        BodyType: std::fmt::Debug,
251    {
252        type Response = S::Response;
253        type Error = S::Error;
254        type Future = S::Future;
255
256        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
257            self.service.poll_ready(cx)
258        }
259
260        fn call(&mut self, mut request: Request<BodyType>) -> Self::Future {
261            let old_headers = request.headers().clone();
262
263            let context = tracing::Span::current().context();
264
265            global::get_text_map_propagator(|propagator| {
266                propagator.inject_context(
267                    &context,
268                    &mut opentelemetry_http::HeaderInjector(request.headers_mut()),
269                )
270            });
271
272            trace!(
273                "
274--------------------------------------------------------------
275old headers:
276{:#?}
277new headers:
278{:#?}
279-----------------------------------------------",
280                old_headers,
281                request.headers()
282            );
283
284            self.service.call(request)
285        }
286    }
287
288    /// Trace context propagation: associate a new span with the OTel trace of the given request,
289    /// if any and valid.
290    ///
291    /// This uses the tower-http crate
292    ///
293    /// For propagation to work, RUST_LOG needs to include this crate, and also tower_http if you
294    /// want access log events from there.
295    pub fn make_tower_http_otel_trace_layer<BodyType>() -> TraceLayer<
296        SharedClassifier<ServerErrorsAsFailures>,
297        impl (Fn(&Request<BodyType>) -> tracing::Span) + Clone,
298    > {
299        tower_http::trace::TraceLayer::new_for_http().make_span_with(
300            |request: &http::Request<BodyType>| {
301                let context = get_otel_context_from_request(request);
302
303                let span = tracing::debug_span!(
304                        "request",
305                        method = %request.method(),
306                        uri = %request.uri(),
307                        version = ?request.version(),
308                        headers = ?request.headers());
309
310                span.set_parent(context);
311
312                span
313            },
314        )
315    }
316
317    /// Just get the context, don't set the parent context
318    pub fn get_otel_context_from_request<BodyType>(
319        request: &Request<BodyType>,
320    ) -> opentelemetry::Context {
321        // Return context, either from request or pre-existing if no or invalid data is received.
322        let parent_context = global::get_text_map_propagator(|propagator| {
323            let extracted = propagator.extract(&HeaderExtractor(request.headers()));
324            trace!("extracted: {:#?}", &extracted);
325            extracted
326        });
327        trace!("parent context (extraction): {:#?}", parent_context);
328
329        parent_context
330    }
331
332    #[cfg(test)]
333    mod tests {
334        use opentelemetry::{baggage::BaggageExt, trace::TraceContextExt};
335
336        use crate::tower_tracing::get_otel_context_from_request;
337
338        /// Test whether propagation from standard headers is working
339        #[tokio::test]
340        async fn test_trace_context_extractor() {
341            let _ = crate::set_up_logging().map_err(|err| dbg!(err));
342
343            let request: http::Request<String> = http::Request::builder()
344                .uri("/")
345                .header(
346                    "traceparent",
347                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
348                )
349                .header("tracestate", "asdf=123456")
350                .header(
351                    "baggage",
352                    "userId=alice,serverNode=DF%2028,isProduction=false",
353                )
354                .body("".to_string())
355                .unwrap();
356
357            let context = get_otel_context_from_request(&request);
358
359            dbg!(&context);
360            dbg!(&context.has_active_span());
361            assert!(context.has_active_span());
362
363            let baggage = context.baggage();
364            assert_eq!(baggage.get("userId"), Some(&"alice".into()));
365            assert_eq!(baggage.get("serverNode"), Some(&"DF 28".into()));
366            assert_eq!(baggage.get("isProduction"), Some(&"false".into()));
367        }
368    }
369}