Skip to main content

modkit_http/layers/
otel.rs

1use bytes::Bytes;
2use http::{Request, Response};
3use http_body_util::Full;
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tower::{Layer, Service};
8
9/// Tower layer that adds OpenTelemetry tracing to outbound HTTP requests
10///
11/// Creates a span for each request with:
12/// - `http.method`: The HTTP method
13/// - `http.url`: The full URL (string form of URI)
14/// - `otel.kind`: "client"
15///
16/// Records `http.status_code` on response and sets `error=true` for 4xx/5xx.
17/// Injects W3C trace context headers when OTEL feature is enabled.
18#[derive(Clone, Default)]
19pub struct OtelLayer;
20
21impl OtelLayer {
22    /// Create a new OTEL tracing layer
23    #[must_use]
24    pub fn new() -> Self {
25        Self
26    }
27}
28
29impl<S> Layer<S> for OtelLayer {
30    type Service = OtelService<S>;
31
32    fn layer(&self, inner: S) -> Self::Service {
33        OtelService { inner }
34    }
35}
36
37/// Service that wraps requests with OpenTelemetry tracing spans
38#[derive(Clone)]
39pub struct OtelService<S> {
40    inner: S,
41}
42
43impl<S, ResBody> Service<Request<Full<Bytes>>> for OtelService<S>
44where
45    S: Service<Request<Full<Bytes>>, Response = Response<ResBody>> + Clone + Send + 'static,
46    S::Future: Send,
47    S::Error: Send + 'static,
48    ResBody: Send + 'static,
49{
50    type Response = S::Response;
51    type Error = S::Error;
52    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
53
54    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55        self.inner.poll_ready(cx)
56    }
57
58    fn call(&mut self, mut req: Request<Full<Bytes>>) -> Self::Future {
59        use tracing::{Instrument, Level};
60
61        let method = req.method().clone();
62        let uri = req.uri().clone();
63
64        // Sanitize URL for tracing: remove query string to avoid leaking sensitive params
65        let url_str = format!(
66            "{}://{}{}",
67            uri.scheme_str().unwrap_or("https"),
68            uri.authority().map_or("", http::uri::Authority::as_str),
69            uri.path()
70        );
71
72        // Inject trace context into request headers (no-op when OTEL disabled)
73        crate::otel::inject_current_span(req.headers_mut());
74
75        // Swap so we call the instance that was poll_ready'd, leaving a fresh clone
76        // for the next poll_ready cycle. This satisfies the Tower Service contract.
77        let clone = self.inner.clone();
78        let mut inner = std::mem::replace(&mut self.inner, clone);
79
80        Box::pin(async move {
81            let span = tracing::span!(
82                Level::INFO, "outgoing_http",
83                http.method = %method,
84                http.url = %url_str,
85                otel.kind = "client",
86                http.status_code = tracing::field::Empty,
87                error = tracing::field::Empty,
88            );
89
90            let result = inner.call(req).instrument(span.clone()).await;
91
92            match &result {
93                Ok(response) => {
94                    let status = response.status().as_u16();
95                    span.record("http.status_code", status);
96                    if response.status().is_client_error() || response.status().is_server_error() {
97                        span.record("error", true);
98                    }
99                }
100                Err(_) => {
101                    span.record("error", true);
102                }
103            }
104
105            result
106        })
107    }
108}