viz_core/middleware/otel/
tracing.rs

1//! Request tracing middleware with [`OpenTelemetry`].
2//!
3//! [`OpenTelemetry`]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
4
5use http::{uri::Scheme, HeaderValue};
6use opentelemetry::{
7    global,
8    propagation::Extractor,
9    trace::{
10        FutureExt as OtelFutureExt, Span, SpanKind, Status, TraceContextExt, Tracer, TracerProvider,
11    },
12    Context, InstrumentationScope, KeyValue,
13};
14use opentelemetry_semantic_conventions::trace::{
15    CLIENT_ADDRESS, EXCEPTION_MESSAGE, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE,
16    NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, SERVER_PORT, URL_PATH, URL_QUERY, URL_SCHEME,
17    USER_AGENT_ORIGINAL,
18};
19
20use crate::{
21    header::{HeaderMap, HeaderName},
22    headers::UserAgent,
23    Handler, IntoResponse, Request, RequestExt, Response, ResponseExt, Result, Transform,
24};
25
26const HTTP_REQUEST_BODY_SIZE: &str = "http.request.body.size";
27const HTTP_RESPONSE_BODY_SIZE: &str = "http.response.body.size";
28
29/// `OpenTelemetry` tracing config.
30#[derive(Debug)]
31pub struct Config<T> {
32    tracer: T,
33    name: Option<String>,
34}
35
36impl<T> Config<T> {
37    /// Creats new `OpenTelemetry` tracing config.
38    pub fn new(t: T, name: Option<String>) -> Self {
39        Self { tracer: t, name }
40    }
41}
42
43impl<H, T> Transform<H> for Config<T>
44where
45    T: Clone,
46{
47    type Output = TracingMiddleware<H, T>;
48
49    fn transform(&self, h: H) -> Self::Output {
50        TracingMiddleware {
51            h,
52            tracer: self.tracer.clone(),
53            name: self.name.clone().unwrap_or("tracing".to_string()),
54        }
55    }
56}
57
58/// `OpenTelemetry` tracing middleware.
59#[derive(Debug, Clone)]
60pub struct TracingMiddleware<H, T> {
61    h: H,
62    tracer: T,
63    name: String,
64}
65
66#[crate::async_trait]
67impl<H, O, T> Handler<Request> for TracingMiddleware<H, T>
68where
69    H: Handler<Request, Output = Result<O>>,
70    O: IntoResponse,
71    T: TracerProvider + Send + Sync + Clone + 'static,
72    T::Tracer: Tracer + Send + Sync + 'static,
73    <T::Tracer as Tracer>::Span: Span + Send + Sync + 'static,
74{
75    type Output = Result<Response>;
76
77    async fn call(&self, req: Request) -> Self::Output {
78        let parent_context = global::get_text_map_propagator(|propagator| {
79            propagator.extract(&RequestHeaderCarrier::new(req.headers()))
80        });
81
82        let http_route = &req.route_info().pattern;
83        let attributes = build_attributes(&req, http_route.as_str());
84        let scope = InstrumentationScope::builder(self.name.clone())
85            .with_attributes(attributes)
86            .build();
87        let tracer = self.tracer.tracer_with_scope(scope);
88        let mut span = tracer.build_with_context(
89            tracer
90                .span_builder(format!("{} {}", req.method(), http_route))
91                .with_kind(SpanKind::Server),
92            &parent_context,
93        );
94
95        span.add_event("request.started".to_string(), vec![]);
96
97        let resp = self
98            .h
99            .call(req)
100            .with_context(Context::current_with_span(span))
101            .await;
102
103        let cx = Context::current();
104        let span = cx.span();
105
106        match resp {
107            Ok(resp) => {
108                let resp = resp.into_response();
109                span.add_event("request.completed".to_string(), vec![]);
110                span.set_attribute(KeyValue::new(
111                    HTTP_RESPONSE_STATUS_CODE,
112                    i64::from(resp.status().as_u16()),
113                ));
114                if let Some(content_length) = resp.content_length() {
115                    span.set_attribute(KeyValue::new(
116                        HTTP_RESPONSE_BODY_SIZE,
117                        i64::try_from(content_length).unwrap_or(i64::MAX),
118                    ));
119                }
120                if resp.status().is_server_error() {
121                    span.set_status(Status::error(
122                        resp.status()
123                            .canonical_reason()
124                            .map(ToString::to_string)
125                            .unwrap_or_default(),
126                    ));
127                };
128                span.end();
129                Ok(resp)
130            }
131            Err(err) => {
132                span.add_event(
133                    "request.error".to_string(),
134                    vec![KeyValue::new(EXCEPTION_MESSAGE, err.to_string())],
135                );
136                span.set_status(Status::error(err.to_string()));
137                span.end();
138                Err(err)
139            }
140        }
141    }
142}
143
144struct RequestHeaderCarrier<'a> {
145    headers: &'a HeaderMap,
146}
147
148impl<'a> RequestHeaderCarrier<'a> {
149    const fn new(headers: &'a HeaderMap) -> Self {
150        RequestHeaderCarrier { headers }
151    }
152}
153
154impl Extractor for RequestHeaderCarrier<'_> {
155    fn get(&self, key: &str) -> Option<&str> {
156        self.headers
157            .get(key)
158            .map(HeaderValue::to_str)
159            .and_then(Result::ok)
160    }
161
162    fn keys(&self) -> Vec<&str> {
163        self.headers.keys().map(HeaderName::as_str).collect()
164    }
165}
166
167fn build_attributes(req: &Request, http_route: &str) -> Vec<KeyValue> {
168    let mut attributes = Vec::with_capacity(10);
169    // <https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-spans.md#http-server>
170    attributes.push(KeyValue::new(HTTP_ROUTE, http_route.to_string()));
171
172    // <https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-spans.md#common-attributes>
173    attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()));
174    attributes.push(KeyValue::new(
175        NETWORK_PROTOCOL_VERSION,
176        format!("{:?}", req.version()),
177    ));
178
179    if let Some(remote_addr) = req.remote_addr() {
180        attributes.push(KeyValue::new(CLIENT_ADDRESS, remote_addr.to_string()));
181    }
182
183    let uri = req.uri();
184    if let Some(host) = uri.host() {
185        attributes.push(KeyValue::new(SERVER_ADDRESS, host.to_string()));
186    }
187    if let Some(port) = uri
188        .port_u16()
189        .map(i64::from)
190        .filter(|port| *port != 80 && *port != 443)
191    {
192        attributes.push(KeyValue::new(SERVER_PORT, port.to_string()));
193    }
194
195    if let Some(path_query) = uri.path_and_query() {
196        if path_query.path() != "/" {
197            attributes.push(KeyValue::new(URL_PATH, path_query.path().to_string()));
198        }
199        if let Some(query) = path_query.query() {
200            attributes.push(KeyValue::new(URL_QUERY, query.to_string()));
201        }
202    }
203
204    attributes.push(KeyValue::new(
205        URL_SCHEME,
206        uri.scheme().unwrap_or(&Scheme::HTTP).to_string(),
207    ));
208
209    if let Some(content_length) = req
210        .content_length()
211        .and_then(|len| i64::try_from(len).ok())
212        .filter(|len| *len > 0)
213    {
214        attributes.push(KeyValue::new(
215            HTTP_REQUEST_BODY_SIZE,
216            content_length.to_string(),
217        ));
218    }
219
220    if let Some(user_agent) = req
221        .header_typed::<UserAgent>()
222        .as_ref()
223        .map(UserAgent::as_str)
224    {
225        attributes.push(KeyValue::new(USER_AGENT_ORIGINAL, user_agent.to_string()));
226    }
227
228    attributes
229}