apollo-opentelemetry 0.8.0

OpenTelemetry configuration types for Apollo platform
Documentation
//! Tower middleware for OpenTelemetry instrumentation.
//!
//! This module provides tower [`Layer`](tower::Layer) and [`Service`](tower::Service)
//! implementations for instrumenting requests with OpenTelemetry spans.
//!
//! Spans are created lazily on first poll, ensuring they only start when work
//! actually begins. If a future is never polled, no span is created.
//!
//! # Example
//!
//! ```
//! use apollo_opentelemetry::{default_instrumentation_scope, tower::ServiceBuilderExt};
//! use opentelemetry::trace::SpanBuilder;
//! use opentelemetry::KeyValue;
//! use tower::ServiceBuilder;
//!
//! # fn wrap<S, E: std::fmt::Display>(inner: S) -> impl tower::Service<String, Response = String, Error = E>
//! # where S: tower::Service<String, Response = String, Error = E> {
//! ServiceBuilder::new()
//!     .traced(default_instrumentation_scope!(), |req: &String| {
//!         SpanBuilder::from_name("handle-request")
//!             .with_attributes([KeyValue::new("request.len", req.len() as i64)])
//!     })
//!     .record_result()  // Sets span status from Ok/Err
//!     .service(inner)
//! # }
//! ```
//!
//! See [`ServiceBuilderExt`] for all available combinators.

mod propagation;
mod record_result;
mod traced;

pub use propagation::{
    HttpClientPropagator, HttpClientPropagatorLayer, HttpServerPropagator,
    HttpServerPropagatorLayer,
};
pub use record_result::{RecordResultFuture, RecordResultLayer, RecordResultService};
pub use traced::{MakeSpan, TracedFuture, TracedFutureExt, TracedLayer, TracedService};

use opentelemetry::InstrumentationScope;
use tower::ServiceBuilder;

mod private {
    pub trait Sealed {}
    impl<L> Sealed for tower::ServiceBuilder<L> {}
}

/// Extension trait for [`ServiceBuilder`] to add OpenTelemetry instrumentation.
///
/// This trait is sealed and cannot be implemented outside this crate.
pub trait ServiceBuilderExt<L>: private::Sealed {
    /// Adds a layer that extracts trace context from incoming HTTP request headers.
    ///
    /// Uses the globally registered text map propagator. This should be placed at
    /// the outer edge of your service stack so the extracted context is available
    /// to all inner layers and services.
    ///
    /// # Example
    ///
    /// ```
    /// use apollo_opentelemetry::{default_instrumentation_scope, tower::ServiceBuilderExt};
    /// use opentelemetry::trace::SpanBuilder;
    /// use tower::ServiceBuilder;
    ///
    /// # fn wrap<S>(inner: S) -> impl tower::Service<http::Request<()>>
    /// # where S: tower::Service<http::Request<()>> {
    /// ServiceBuilder::new()
    ///     .http_server_propagation()
    ///     .traced(default_instrumentation_scope!(), |_req: &http::Request<()>| SpanBuilder::from_name("handle-request"))
    ///     .service(inner)
    /// # }
    /// ```
    fn http_server_propagation(
        self,
    ) -> ServiceBuilder<tower::layer::util::Stack<HttpServerPropagatorLayer, L>>;

    /// Adds a layer that injects trace context into outgoing HTTP request headers.
    ///
    /// Uses the globally registered text map propagator. This should be placed
    /// close to the inner HTTP client service so the current trace context is
    /// injected into outgoing requests.
    ///
    /// # Example
    ///
    /// ```
    /// use apollo_opentelemetry::tower::ServiceBuilderExt;
    /// use tower::ServiceBuilder;
    ///
    /// # fn wrap<S>(inner: S) -> impl tower::Service<http::Request<()>>
    /// # where S: tower::Service<http::Request<()>> + Clone {
    /// ServiceBuilder::new()
    ///     .http_client_propagation()
    ///     .service(inner)
    /// # }
    /// ```
    fn http_client_propagation(
        self,
    ) -> ServiceBuilder<tower::layer::util::Stack<HttpClientPropagatorLayer, L>>;

    /// Adds a span layer that creates spans from requests.
    ///
    /// `make_span` can be a closure `|req: &Req| -> SpanBuilder` or any [`MakeSpan`]
    /// implementation. The span is started lazily on first poll, so spans are only
    /// created when work actually begins.
    ///
    /// # Example
    ///
    /// ```
    /// use apollo_opentelemetry::{default_instrumentation_scope, tower::ServiceBuilderExt};
    /// use opentelemetry::trace::SpanBuilder;
    /// use tower::ServiceBuilder;
    ///
    /// # fn wrap<S>(inner: S) -> impl tower::Service<String>
    /// # where S: tower::Service<String> {
    /// ServiceBuilder::new()
    ///     .traced(default_instrumentation_scope!(), |req: &String| {
    ///         SpanBuilder::from_name("process")
    ///     })
    ///     .service(inner)
    /// # }
    /// ```
    fn traced<F>(
        self,
        scope: &'static InstrumentationScope,
        make_span: F,
    ) -> ServiceBuilder<tower::layer::util::Stack<TracedLayer<F>, L>>
    where
        F: Clone;

    /// Adds a layer that records the service result to the current span.
    ///
    /// This should be placed after [`traced`](Self::traced) so the result
    /// is recorded before the span ends.
    ///
    /// When the service returns:
    /// - `Ok(_)` → span status set to `Ok`
    /// - `Err(e)` → span status set to `Error` with the error's display message
    ///
    /// # Example
    ///
    /// ```
    /// use apollo_opentelemetry::{default_instrumentation_scope, tower::ServiceBuilderExt};
    /// use opentelemetry::trace::SpanBuilder;
    /// use tower::ServiceBuilder;
    ///
    /// # fn wrap<S, E: std::fmt::Display>(inner: S) -> impl tower::Service<String, Response = String, Error = E>
    /// # where S: tower::Service<String, Response = String, Error = E> {
    /// ServiceBuilder::new()
    ///     .traced(default_instrumentation_scope!(), |req: &String| SpanBuilder::from_name("process"))
    ///     .record_result()
    ///     .service(inner)
    /// # }
    /// ```
    fn record_result(self) -> ServiceBuilder<tower::layer::util::Stack<RecordResultLayer, L>>;
}

impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
    fn http_server_propagation(
        self,
    ) -> ServiceBuilder<tower::layer::util::Stack<HttpServerPropagatorLayer, L>> {
        self.layer(HttpServerPropagatorLayer::new())
    }

    fn http_client_propagation(
        self,
    ) -> ServiceBuilder<tower::layer::util::Stack<HttpClientPropagatorLayer, L>> {
        self.layer(HttpClientPropagatorLayer::new())
    }

    fn traced<F>(
        self,
        scope: &'static InstrumentationScope,
        make_span: F,
    ) -> ServiceBuilder<tower::layer::util::Stack<TracedLayer<F>, L>>
    where
        F: Clone,
    {
        self.layer(TracedLayer::new(scope, make_span))
    }

    fn record_result(self) -> ServiceBuilder<tower::layer::util::Stack<RecordResultLayer, L>> {
        self.layer(RecordResultLayer::new())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use apollo_opentelemetry_test::{TelemetryContext, assert_spans_snapshot};
    use http::{Request, Response};
    use opentelemetry::KeyValue;
    use opentelemetry::trace::SpanBuilder;

    fn test_scope() -> &'static InstrumentationScope {
        static SCOPE: std::sync::LazyLock<InstrumentationScope> =
            std::sync::LazyLock::new(|| InstrumentationScope::builder("test").build());
        &SCOPE
    }

    #[tokio::test]
    async fn test_service_builder_ext() {
        let ctx = TelemetryContext::new();

        let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
            ServiceBuilder::new()
                .traced(test_scope(), |req: &String| {
                    SpanBuilder::from_name("echo")
                        .with_attributes([KeyValue::new("input", req.clone())])
                })
                .service(inner)
        });

        assert!(service.poll_ready().is_ready());
        let response = service.call("hello".to_string());
        let (req, send_response) = handle.next_request().await.unwrap();
        assert_eq!(req, "hello");
        send_response.send_response("hello".to_string());
        assert_eq!(response.await.unwrap(), "hello");

        assert_spans_snapshot!(ctx, @r#"
        - name: echo
          span_kind: Internal
          is_sampled: true
          attributes:
            input: hello
        "#);
    }

    #[tokio::test]
    async fn test_service_builder_ext_with_record_result() {
        let ctx = TelemetryContext::new();

        let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
            ServiceBuilder::new()
                .traced(test_scope(), |req: &String| {
                    SpanBuilder::from_name("process")
                        .with_attributes([KeyValue::new("input", req.clone())])
                })
                .record_result()
                .service(inner)
        });

        assert!(service.poll_ready().is_ready());
        let response = service.call("test".to_string());
        let (req, send_response) = handle.next_request().await.unwrap();
        assert_eq!(req, "test");
        send_response.send_response("ok".to_string());
        assert_eq!(response.await.unwrap(), "ok");

        assert_spans_snapshot!(ctx, @r#"
        - name: process
          span_kind: Internal
          is_sampled: true
          attributes:
            input: test
          status: Ok
        "#);
    }

    #[tokio::test]
    async fn test_nested_spans_with_record_result() {
        let ctx = TelemetryContext::new();

        let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
            ServiceBuilder::new()
                .traced(test_scope(), |req: &String| {
                    SpanBuilder::from_name("outer")
                        .with_attributes([KeyValue::new("request", req.clone())])
                })
                .record_result()
                .traced(test_scope(), |_req: &String| {
                    SpanBuilder::from_name("inner")
                })
                .record_result()
                .service(inner)
        });

        assert!(service.poll_ready().is_ready());
        let response = service.call("test".to_string());
        let (req, send_response) = handle.next_request().await.unwrap();
        assert_eq!(req, "test");
        send_response.send_response("ok".to_string());
        assert_eq!(response.await.unwrap(), "ok");

        assert_spans_snapshot!(ctx, @r#"
        - name: inner
          span_kind: Internal
          has_parent: true
          is_sampled: true
          status: Ok
        - name: outer
          span_kind: Internal
          is_sampled: true
          attributes:
            request: test
          status: Ok
        "#);
    }

    #[tokio::test]
    async fn test_http_server_propagation_via_service_builder_ext() {
        let ctx = TelemetryContext::new();

        let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
            ServiceBuilder::new()
                .http_server_propagation()
                .traced(test_scope(), |_req: &Request<()>| {
                    SpanBuilder::from_name("server-handler")
                        .with_kind(opentelemetry::trace::SpanKind::Server)
                })
                .service(inner)
        });

        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
        let req = Request::builder()
            .header("traceparent", traceparent)
            .body(())
            .unwrap();

        assert!(service.poll_ready().is_ready());
        let response_fut = service.call(req);

        let (_req, send_response) = handle.next_request().await.unwrap();
        send_response.send_response(Response::new("ok"));
        response_fut.await.unwrap();

        assert_spans_snapshot!(ctx, @r#"
        - name: server-handler
          span_kind: Server
          has_parent: true
          is_sampled: true
        "#);
    }

    #[tokio::test]
    async fn test_http_client_propagation_via_service_builder_ext() {
        let _ctx = TelemetryContext::new();

        let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
            ServiceBuilder::new()
                .http_client_propagation()
                .service(inner)
        });

        // Create a span context to propagate
        use opentelemetry::trace::{TraceContextExt, Tracer};
        let tracer = opentelemetry::global::tracer("test");
        let span = tracer.start("parent-span");
        let cx = opentelemetry::Context::current_with_span(span);
        let _guard = cx.attach();

        assert!(service.poll_ready().is_ready());
        let response_fut = service.call(Request::builder().body(()).unwrap());
        let response_handle = tokio::spawn(response_fut);

        let (req, send_response) = handle.next_request().await.unwrap();

        // Verify traceparent was injected
        assert!(
            req.headers().get("traceparent").is_some(),
            "traceparent should be injected via http_client_propagation"
        );

        send_response.send_response(Response::new("ok"));
        response_handle.await.unwrap().unwrap();
    }
}