1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
use opentelemetry::api::{self, Provider, Span, Tracer};
use std::fmt;
use tracing_core::span::{self, Attributes, Id, Record};
use tracing_core::{field, Event, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;

/// OpenTelemetry layer for use in a project that uses [tracing].
///
/// [tracing]: https://github.com/tokio-rs/tracing
pub struct OpentelemetryLayer<T: api::Tracer> {
    tracer: T,
}

struct SpanEventVisitor<'a, S: api::Span>(&'a mut S);

impl<'a, S: api::Span> field::Visit for SpanEventVisitor<'a, S> {
    /// Record events on the underlying OpenTelemetry `Span`.
    fn record_debug(&mut self, field: &field::Field, value: &dyn fmt::Debug) {
        self.0
            .add_event(format!("{} = {:?}; ", field.name(), value));
    }
}

struct SpanAttributeVisitor<'a, S: api::Span>(&'a mut S);

impl<'a, S: api::Span> field::Visit for SpanAttributeVisitor<'a, S> {
    /// Set attributes on the underlying OpenTelemetry `Span`.
    fn record_debug(&mut self, field: &field::Field, value: &dyn fmt::Debug) {
        self.0
            .set_attribute(api::Key::new(field.name()).string(format!("{:?}", value)))
    }
}

impl<T: api::Tracer + 'static> OpentelemetryLayer<T> {
    /// Retrieve the parent OpenTelemetry `SpanContext` either from the current
    /// `tracing` span through the `Registry`, or from `OpenTelemetry` active
    /// span as fallback. This `SpanContext` links created spans to their parent
    /// context.
    fn parse_context<S>(
        &self,
        attrs: &Attributes<'_>,
        ctx: &Context<'_, S>,
    ) -> Option<api::SpanContext>
    where
        S: Subscriber + for<'span> LookupSpan<'span>,
    {
        // If a span is specified, it _should_ exist in the underlying `Registry`.
        if let Some(parent) = attrs.parent() {
            let span = ctx.span(parent).expect("Span not found, this is a bug");
            let extensions = span.extensions();
            extensions
                .get::<T::Span>()
                .map(|otel_span| otel_span.get_context())
        // Else if the span is inferred from context, look up any available current span.
        } else if attrs.is_contextual() {
            ctx.current_span()
                .id()
                .and_then(|span_id| {
                    let span = ctx.span(span_id).expect("Span not found, this is a bug");
                    let extensions = span.extensions();
                    extensions
                        .get::<T::Span>()
                        .map(|otel_span| otel_span.get_context())
                })
                .or_else(|| {
                    let ctx = opentelemetry::global::trace_provider()
                        .get_tracer("tracing-opentelemetry")
                        .get_active_span()
                        .get_context();
                    Some(ctx)
                })
        // Explicit root spans should have no parent context.
        } else {
            None
        }
    }

    /// Set the `OpenTelemetry` `Tracer` that this layer will use to produce
    /// and track `Span`s.
    ///
    /// ```rust,no_run
    /// use opentelemetry::{api::{Sampler, Key, Provider}, exporter::trace::jaeger, global, sdk};
    /// use tracing_opentelemetry::OpentelemetryLayer;
    ///
    /// // Create a jaeger exporter for a `trace-demo` service.
    /// let exporter = jaeger::Exporter::builder()
    ///     .with_collector_endpoint("127.0.0.1:6831".parse().unwrap())
    ///     .with_process(jaeger::Process {
    ///         service_name: "trace-demo",
    ///         tags: Vec::new(),
    ///     })
    ///     .init();
    ///
    /// // Build a provider from the jaeger exporter that always samples.
    /// let provider = sdk::Provider::builder()
    ///     .with_exporter(exporter)
    ///     .with_config(sdk::Config {
    ///         default_sampler: Sampler::Always,
    ///         ..Default::default()
    ///     })
    ///     .build();
    ///
    /// // Get a tracer from the provider for a component
    /// let tracer = provider.get_tracer("component-name");
    ///
    /// // Create a layer with the configured tracer
    /// let _layer = OpentelemetryLayer::with_tracer(tracer);
    /// ```
    pub fn with_tracer(tracer: T) -> Self {
        OpentelemetryLayer { tracer }
    }
}

impl<S, T> Layer<S> for OpentelemetryLayer<T>
where
    S: Subscriber + for<'span> LookupSpan<'span>,
    T: api::Tracer + 'static,
{
    /// Creates an `OpenTelemetry` `Span` for the corresponding `tracing` `Span`.
    /// This will attempt to parse the parent context if possible from the given attributes.
    fn new_span(&self, attrs: &Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
        let span = ctx.span(id).expect("Span not found, this is a bug");
        let mut extensions = span.extensions_mut();

        let span_context = self.parse_context(attrs, &ctx);
        let mut span = self.tracer.start(attrs.metadata().name(), span_context);

        attrs.record(&mut SpanAttributeVisitor(&mut span));
        extensions.insert(span);
    }

    /// Record values for the given span.
    fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
        let span = ctx.span(id).expect("Span not found, this is a bug");
        let mut extensions = span.extensions_mut();
        if let Some(otel_span) = extensions.get_mut::<T::Span>() {
            values.record(&mut SpanEventVisitor(otel_span));
        }
    }

    /// Record logs for the given event.
    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
        // Ignore events that are not in the context of a span
        if let Some(span_id) = ctx.current_span().id() {
            let span = ctx.span(span_id).expect("Span not found, this is a bug");
            let mut extensions = span.extensions_mut();
            if let Some(otel_span) = extensions.get_mut::<T::Span>() {
                event.record(&mut SpanEventVisitor(otel_span));
            }
        };
    }

    /// Mark the `Span` as ended when it is closed.
    fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
        let span = ctx.span(&id).expect("Span not found, this is a bug");
        let mut extensions = span.extensions_mut();
        if let Some(otel_span) = extensions.get_mut::<T::Span>() {
            otel_span.end()
        }
    }
}