tracing-opentelemetry-fmt 0.0.1

Add opentelemetry trace.id and span.id to `FmtLayer` of tracing-subscriber
Documentation
use std::any::TypeId;

use opentelemetry::trace::{TraceContextExt, Tracer};
use tracing::{
    field::FieldSet,
    metadata::LevelFilter,
    span::{Attributes, Record},
    subscriber::Interest,
    Event, Id, Metadata, Span, Subscriber, Value,
};
use tracing_opentelemetry::{OpenTelemetryLayer, OpenTelemetrySpanExt, PreSampledTracer};
use tracing_subscriber::{
    fmt::{FormatEvent, FormatFields, Layer as FmtLayer, MakeWriter},
    layer::{Context, Layered},
    registry::LookupSpan,
    Layer,
};

pub struct OpenTelemetryFmtLayerBuilder<S, T1, N2, E2, W2> {
    opentelemetry_layer: OpenTelemetryLayer<S, T1>,
    fmt_layer: FmtLayer<S, N2, E2, W2>,
}

impl<S, T1, N2, E2, W2> OpenTelemetryFmtLayerBuilder<S, T1, N2, E2, W2> {
    pub fn new(
        opentelemetry_layer: OpenTelemetryLayer<S, T1>,
        fmt_layer: FmtLayer<S, N2, E2, W2>,
    ) -> Self {
        Self {
            opentelemetry_layer,
            fmt_layer,
        }
    }
}

impl<S, T1, N2, E2, W2> OpenTelemetryFmtLayerBuilder<S, T1, N2, E2, W2>
where
    S: Subscriber + for<'a> LookupSpan<'a>,
    T1: Tracer + PreSampledTracer + 'static,
    N2: for<'writer> FormatFields<'writer> + 'static,
    E2: FormatEvent<S, N2> + 'static,
    W2: for<'writer> MakeWriter<'writer> + 'static,
{
    pub fn build(
        self,
    ) -> Layered<OpenTelemetryFmtLayer<S, N2, E2, W2>, OpenTelemetryLayer<S, T1>, S> {
        let Self {
            opentelemetry_layer,
            fmt_layer,
        } = self;
        let opentelemetry_fmt_layer = OpenTelemetryFmtLayer { fmt_layer };
        opentelemetry_layer.and_then(opentelemetry_fmt_layer)
    }
}

pub struct OpenTelemetryFmtLayer<S, N2, E2, W2> {
    fmt_layer: FmtLayer<S, N2, E2, W2>,
}

impl<S, N2, E2, W2> Layer<S> for OpenTelemetryFmtLayer<S, N2, E2, W2>
where
    S: Subscriber + for<'a> LookupSpan<'a>,
    N2: for<'writer> FormatFields<'writer> + 'static,
    E2: FormatEvent<S, N2> + 'static,
    W2: for<'writer> MakeWriter<'writer> + 'static,
{
    fn on_layer(&mut self, subscriber: &mut S) {
        self.fmt_layer.on_layer(subscriber)
    }

    fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
        self.fmt_layer.register_callsite(metadata)
    }

    fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
        self.fmt_layer.enabled(metadata, ctx)
    }

    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
        self.fmt_layer.on_new_span(attrs, id, ctx)
    }

    fn max_level_hint(&self) -> Option<LevelFilter> {
        self.fmt_layer.max_level_hint()
    }

    fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
        self.fmt_layer.on_record(span, values, ctx)
    }

    fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) {
        self.fmt_layer.on_follows_from(span, follows, ctx)
    }

    fn event_enabled(&self, event: &Event<'_>, ctx: Context<'_, S>) -> bool {
        self.fmt_layer.event_enabled(event, ctx)
    }

    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
        self.fmt_layer.on_event(event, ctx)
    }

    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
        let span_context = Span::current().context();
        let opentelemetry_span = span_context.span();
        let ids = if opentelemetry_span.span_context().is_valid() {
            Some((
                opentelemetry_span.span_context().trace_id().to_string(),
                opentelemetry_span.span_context().span_id().to_string(),
            ))
        } else {
            None
        };

        self.fmt_layer.on_enter(id, ctx.clone());

        if let Some(ids) = ids {
            let field_set = FieldSet::new(
                &["trace.id", "span.id"],
                ctx.metadata(id)
                    .expect("Metadata not found, this is a bug")
                    .callsite(),
            );
            let mut it = field_set.iter();
            let trace_field = it.next().expect("Trace field not found, this is a bug");
            let span_field = it.next().expect("Trace field not found, this is a bug");
            let values = [
                (&trace_field, Some(&ids.0 as &dyn Value)),
                (&span_field, Some(&ids.1 as &dyn Value)),
            ];
            let values = field_set.value_set(&values);
            let record = Record::new(&values);
            self.fmt_layer.on_record(id, &record, ctx.clone());
        }
    }

    fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
        self.fmt_layer.on_exit(id, ctx)
    }

    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
        self.fmt_layer.on_close(id, ctx)
    }

    fn on_id_change(&self, old: &Id, new: &Id, ctx: Context<'_, S>) {
        self.fmt_layer.on_id_change(old, new, ctx)
    }

    unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
        self.fmt_layer.downcast_raw(id)
    }
}

#[cfg(test)]
mod tests {
    use std::io::Write;

    use opentelemetry::sdk::export::trace::stdout;
    use tracing_subscriber::{
        fmt::{self, format::FmtSpan},
        layer::SubscriberExt,
        util::SubscriberInitExt,
    };

    use super::*;

    #[derive(Debug)]
    struct Drain;

    impl Write for Drain {
        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
            Ok(buf.len())
        }

        fn flush(&mut self) -> std::io::Result<()> {
            Ok(())
        }
    }

    #[test]
    fn it_works() {
        let fmt_layer = fmt::layer()
            .with_thread_ids(true)
            .with_target(true)
            .with_span_events(FmtSpan::FULL);
        let tracer = stdout::new_pipeline().with_writer(Drain).install_simple();
        let opentelemetry_layer = tracing_opentelemetry::layer()
            .with_exception_field_propagation(true)
            .with_threads(true)
            .with_tracer(tracer);

        let opentelemetry_fmt_layer =
            OpenTelemetryFmtLayerBuilder::new(opentelemetry_layer, fmt_layer).build();
        tracing_subscriber::registry()
            .with(opentelemetry_fmt_layer)
            .try_init()
            .expect("It should be successful");

        tracing::info_span!("span1").in_scope(|| {
            tracing::info!("in span1");
        });
    }
}