Skip to main content

modkit_http/layers/
otel.rs

1use bytes::Bytes;
2use http::{Request, Response};
3use http_body_util::Full;
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tower::{Layer, Service};
8
9/// Tower layer that adds OpenTelemetry tracing to outbound HTTP requests
10///
11/// Creates a span for each request with:
12/// - `http.method`: The HTTP method
13/// - `http.url`: The full URL (string form of URI)
14/// - `otel.kind`: "client"
15///
16/// Records `http.status_code` on response and sets `error=true` for 4xx/5xx.
17/// Injects W3C trace context headers when OTEL feature is enabled.
18#[derive(Clone, Default)]
19pub struct OtelLayer;
20
21impl OtelLayer {
22    /// Create a new OTEL tracing layer
23    #[must_use]
24    pub fn new() -> Self {
25        Self
26    }
27}
28
29impl<S> Layer<S> for OtelLayer {
30    type Service = OtelService<S>;
31
32    fn layer(&self, inner: S) -> Self::Service {
33        OtelService { inner }
34    }
35}
36
37/// Service that wraps requests with OpenTelemetry tracing spans
38#[derive(Clone)]
39pub struct OtelService<S> {
40    inner: S,
41}
42
43impl<S, ResBody> Service<Request<Full<Bytes>>> for OtelService<S>
44where
45    S: Service<Request<Full<Bytes>>, Response = Response<ResBody>> + Clone + Send + 'static,
46    S::Future: Send,
47    S::Error: Send + 'static,
48    ResBody: Send + 'static,
49{
50    type Response = S::Response;
51    type Error = S::Error;
52    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
53
54    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55        self.inner.poll_ready(cx)
56    }
57
58    fn call(&mut self, mut req: Request<Full<Bytes>>) -> Self::Future {
59        use tracing::{Instrument, Level};
60
61        let method = req.method().clone();
62        let uri = req.uri().clone();
63
64        // Sanitize URL for tracing: remove query string to avoid leaking sensitive params
65        let url_str = format!(
66            "{}://{}{}",
67            uri.scheme_str().unwrap_or("https"),
68            uri.authority().map_or("", http::uri::Authority::as_str),
69            uri.path()
70        );
71
72        // Create span before injection so that inject_current_span propagates
73        // this span's context (not the parent's) into the outgoing request headers.
74        // This ensures the server sees outgoing_http as its parent span.
75        let span = tracing::span!(
76            Level::INFO, "outgoing_http",
77            http.method = %method,
78            http.url = %url_str,
79            otel.kind = "client",
80            http.status_code = tracing::field::Empty,
81            error = tracing::field::Empty,
82        );
83
84        // Inject trace context inside the span's scope so the propagator
85        // picks up the outgoing_http span ID, not the caller's.
86        {
87            let _guard = span.enter();
88            crate::otel::inject_current_span(req.headers_mut());
89        }
90
91        // Swap so we call the instance that was poll_ready'd, leaving a fresh clone
92        // for the next poll_ready cycle. This satisfies the Tower Service contract.
93        let clone = self.inner.clone();
94        let mut inner = std::mem::replace(&mut self.inner, clone);
95
96        Box::pin(async move {
97            let result = inner.call(req).instrument(span.clone()).await;
98
99            match &result {
100                Ok(response) => {
101                    let status = response.status().as_u16();
102                    span.record("http.status_code", status);
103                    if response.status().is_client_error() || response.status().is_server_error() {
104                        span.record("error", true);
105                    }
106                }
107                Err(_) => {
108                    span.record("error", true);
109                }
110            }
111
112            result
113        })
114    }
115}