opentelemetry_tracing_utils/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
//! Utilities for tracing and logging.
//!
//! Some fairly opinionated!

use anyhow::Result;
use std::str::FromStr;
use tracing_opentelemetry::OpenTelemetryLayer;

// tracing
use opentelemetry::{global, propagation::TextMapCompositePropagator, trace::TracerProvider as _};
use opentelemetry_sdk::{
    propagation::{BaggagePropagator, TraceContextPropagator},
    trace::TracerProvider,
};
pub use opentelemetry_semantic_conventions as semcov;
use tonic::{metadata::MetadataKey, service::Interceptor};
use tracing::Span;
pub use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::{
    fmt::{self, format::FmtSpan, TestWriter},
    layer::SubscriberExt,
    util::SubscriberInitExt,
    EnvFilter, Layer,
};

use self::trace_output_fmt::JsonWithTraceId;

pub mod trace_output_fmt;

pub use opentelemetry::global::shutdown_tracer_provider;

/// Set up an OTEL pipeline when the OTLP endpoint is set. Otherwise just set up tokio tracing
/// support.
pub fn set_up_logging() -> Result<()> {
    LoggingSetupBuilder::new().build()
}

#[derive(Debug)]
pub struct LoggingSetupBuilder {
    pub otlp_output_enabled: bool,
    pub pretty_logs: bool,
    pub use_test_writer: bool,
}
impl Default for LoggingSetupBuilder {
    fn default() -> Self {
        let otlp_enabled = std::env::var("NO_OTLP")
            .unwrap_or_else(|_| "0".to_owned())
            .as_str()
            == "0";

        // either use the otlp state or PRETTY_LOGS env var to decide log format
        let pretty_logs = std::env::var("PRETTY_LOGS")
            .map(|e| &e == "1")
            .unwrap_or_else(|_| !otlp_enabled);

        Self {
            otlp_output_enabled: otlp_enabled,
            pretty_logs,
            use_test_writer: false,
        }
    }
}

impl LoggingSetupBuilder {
    pub fn new() -> Self {
        Self::default()
    }
    pub fn build(&self) -> Result<()> {
        let otlp_enabled = self.otlp_output_enabled;

        // First create 1 or more propagators
        let baggage_propagator = BaggagePropagator::new();
        let trace_context_propagator = TraceContextPropagator::new();

        // Then create a composite propagator
        let composite_propagator = TextMapCompositePropagator::new(vec![
            Box::new(baggage_propagator),
            Box::new(trace_context_propagator),
        ]);

        global::set_text_map_propagator(composite_propagator);

        let basic_no_otlp_tracer_provider = TracerProvider::builder()
            .with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
            .build();

        // Install a new OpenTelemetry trace pipeline
        let otlp_tracer = opentelemetry_otlp::new_pipeline()
            .tracing()
            // trace config. Collects service.name etc.
            .with_trace_config(opentelemetry_sdk::trace::Config::default())
            .with_exporter(opentelemetry_otlp::new_exporter().tonic())
            .install_batch(opentelemetry_sdk::runtime::TokioCurrentThread)?;

        let tracer = match otlp_enabled {
            true => otlp_tracer,
            // BUG: the non-otlp tracer isn't correctly setting context/linking ids
            false => basic_no_otlp_tracer_provider,
        }
        .tracer(env!("CARGO_PKG_NAME"));

        // Create a tracing layer with the configured tracer
        let opentelemetry: OpenTelemetryLayer<_, _> = tracing_opentelemetry::layer()
            .with_error_fields_to_exceptions(true)
            .with_error_records_to_exceptions(true)
            .with_tracer(tracer);

        let use_test_writer = self.use_test_writer;
        let pretty_logs = self.pretty_logs;

        #[derive(Debug)]
        enum MaybeTestWriterLayer<N, E> {
            WithTestWriter(fmt::Layer<tracing_subscriber::Registry, N, E, TestWriter>),
            NoTestWriter(fmt::Layer<tracing_subscriber::Registry>),
        }

        let base_layer = fmt::Layer::default();
        let base_layer: MaybeTestWriterLayer<_, _> = match use_test_writer {
            false => MaybeTestWriterLayer::NoTestWriter(base_layer),
            true => MaybeTestWriterLayer::WithTestWriter(base_layer.with_test_writer()),
        };

        // Include an option for when there is no otlp endpoint available. In this case, pretty print
        // events, as the data doesn't need to be nicely formatted json for analysis.
        let format_layers = match pretty_logs {
            // json fmt layer
            false => match base_layer {
                MaybeTestWriterLayer::NoTestWriter(layer) => {
                    layer.json().event_format(JsonWithTraceId).boxed()
                }
                MaybeTestWriterLayer::WithTestWriter(layer) => {
                    layer.json().event_format(JsonWithTraceId).boxed()
                }
            },
            // pretty fmt layer
            true => match base_layer {
                MaybeTestWriterLayer::NoTestWriter(layer) => {
                    layer.pretty().with_span_events(FmtSpan::NONE).boxed()
                }
                MaybeTestWriterLayer::WithTestWriter(layer) => {
                    layer.pretty().with_span_events(FmtSpan::NONE).boxed()
                }
            },
        };

        let layers = opentelemetry.and_then(format_layers);

        let tracing_registry = tracing_subscriber::registry()
            // Add a filter to the layers so that they only observe the spans that I want
            .with(layers.with_filter(
                // Parse env filter from RUST_LOG, setting a default directive if that fails.
                // Syntax for directives is here: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
                EnvFilter::try_from_default_env().unwrap_or_else(|_| {
                    // e.g. "RUST_LOG=hello_rust_backend,warn" would do everything from hello_rust_backend, and only "warn" level or higher from elsewhere
                    EnvFilter::try_new("info")
                        .expect("hard-coded default directive should be valid")
                }),
            ));

        #[cfg(feature = "tokio-console")]
        let tracing_registry = tracing_registry.with(console_subscriber::spawn());

        tracing_registry.try_init()?;

        Ok(())
    }
}

/// This interceptor adds tokio tracing opentelemetry headers to grpc requests.
/// Allows stitching together distributed traces!
#[derive(Clone)]
pub struct GrpcInterceptor;
impl Interceptor for GrpcInterceptor {
    fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
        // get otel context from current tokio tracing span
        let context = Span::current().context();

        opentelemetry::global::get_text_map_propagator(|propagator| {
            propagator.inject_context(&context, &mut MetadataInjector(req.metadata_mut()));
        });

        Ok(req)
    }
}

pub struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
impl<'a> opentelemetry::propagation::Injector for MetadataInjector<'a> {
    fn set(&mut self, key: &str, value: String) {
        if let Ok(key) = MetadataKey::from_str(key) {
            if let Ok(val) = value.parse() {
                self.0.insert(key, val);
            }
        }
    }
}

/// A tonic channel intercepted to provide distributed tracing context propagation.
pub type InterceptedGrpcService =
    tonic::codegen::InterceptedService<tonic::transport::Channel, GrpcInterceptor>;

#[cfg(feature = "tower")]
pub use tower_tracing::*;

#[cfg(feature = "tower")]
pub mod tower_tracing {
    use std::task::{Context, Poll};

    use http::Request;
    use opentelemetry::global;
    use opentelemetry_http::HeaderExtractor;
    use tower::{Layer, Service};
    use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
    use tower_http::trace::TraceLayer;
    use tracing::trace;
    use tracing_opentelemetry::OpenTelemetrySpanExt;

    pub struct TracingLayer;

    impl<S> Layer<S> for TracingLayer {
        type Service = TracingService<S>;

        fn layer(&self, service: S) -> Self::Service {
            TracingService { service }
        }
    }

    /// A middleware that sorts tracing propagation to a client
    #[derive(Clone, Debug)]
    pub struct TracingService<S> {
        service: S,
    }

    impl<S, BodyType> Service<http::Request<BodyType>> for TracingService<S>
    where
        S: Service<http::Request<BodyType>>,
        BodyType: std::fmt::Debug,
    {
        type Response = S::Response;
        type Error = S::Error;
        type Future = S::Future;

        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
            self.service.poll_ready(cx)
        }

        fn call(&mut self, mut request: Request<BodyType>) -> Self::Future {
            let old_headers = request.headers().clone();

            let context = tracing::Span::current().context();

            global::get_text_map_propagator(|propagator| {
                propagator.inject_context(
                    &context,
                    &mut opentelemetry_http::HeaderInjector(request.headers_mut()),
                )
            });

            trace!(
                "
--------------------------------------------------------------
old headers:
{:#?}
new headers:
{:#?}
-----------------------------------------------",
                old_headers,
                request.headers()
            );

            self.service.call(request)
        }
    }

    /// Trace context propagation: associate a new span with the OTel trace of the given request,
    /// if any and valid.
    ///
    /// This uses the tower-http crate
    pub fn make_tower_http_otel_trace_layer<BodyType>() -> TraceLayer<
        SharedClassifier<ServerErrorsAsFailures>,
        impl (Fn(&Request<BodyType>) -> tracing::Span) + Clone,
    > {
        tower_http::trace::TraceLayer::new_for_http().make_span_with(
            |request: &http::Request<BodyType>| {
                let context = get_otel_context_from_request(request);

                let span = tracing::debug_span!(
                        "request",
                        method = %request.method(),
                        uri = %request.uri(),
                        version = ?request.version(),
                        headers = ?request.headers());

                span.set_parent(context);

                span
            },
        )
    }

    /// Just get the context, don't set the parent context
    pub fn get_otel_context_from_request<BodyType>(
        request: &Request<BodyType>,
    ) -> opentelemetry::Context {
        // Return context, either from request or pre-existing if no or invalid data is received.
        let parent_context = global::get_text_map_propagator(|propagator| {
            let extracted = propagator.extract(&HeaderExtractor(request.headers()));
            trace!("extracted: {:#?}", &extracted);
            extracted
        });
        trace!("parent context (extraction): {:#?}", parent_context);

        parent_context
    }

    #[cfg(test)]
    mod tests {
        use opentelemetry::{baggage::BaggageExt, trace::TraceContextExt};

        use crate::tower_tracing::get_otel_context_from_request;

        /// Test whether propagation from standard headers is working
        #[tokio::test]
        async fn test_trace_context_extractor() {
            crate::set_up_logging().unwrap_or(());

            let request: http::Request<String> = http::Request::builder()
                .uri("/")
                .header(
                    "traceparent",
                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
                )
                .header("tracestate", "asdf=123456")
                .header(
                    "baggage",
                    "userId=alice,serverNode=DF%2028,isProduction=false",
                )
                .body("".to_string())
                .unwrap();

            let context = get_otel_context_from_request(&request);

            dbg!(&context);
            dbg!(&context.has_active_span());
            assert!(context.has_active_span());

            let baggage = context.baggage();
            assert_eq!(baggage.get("userId"), Some(&"alice".into()));
            assert_eq!(baggage.get("serverNode"), Some(&"DF 28".into()));
            assert_eq!(baggage.get("isProduction"), Some(&"false".into()));
        }
    }
}