viz_core/middleware/otel/
metrics.rs

1//! Request metrics middleware with [`OpenTelemetry`].
2//!
3//! [`OpenTelemetry`]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/http-metrics.md
4
5use std::time::SystemTime;
6
7use http::uri::Scheme;
8use opentelemetry::{
9    metrics::{Histogram, Meter, UpDownCounter},
10    KeyValue,
11};
12use opentelemetry_semantic_conventions::trace::{
13    CLIENT_ADDRESS, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE,
14    NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, SERVER_PORT, URL_SCHEME,
15};
16
17use crate::{Handler, IntoResponse, Request, RequestExt, Response, ResponseExt, Result, Transform};
18
19const HTTP_SERVER_ACTIVE_REQUESTS: &str = "http.server.active_requests";
20const HTTP_SERVER_DURATION: &str = "http.server.duration";
21const HTTP_SERVER_REQUEST_SIZE: &str = "http.server.request.size";
22const HTTP_SERVER_RESPONSE_SIZE: &str = "http.server.response.size";
23
24/// Request metrics middleware config.
25#[derive(Clone, Debug)]
26pub struct Config {
27    active_requests: UpDownCounter<i64>,
28    duration: Histogram<f64>,
29    request_size: Histogram<u64>,
30    response_size: Histogram<u64>,
31}
32
33impl Config {
34    /// Creates a new Config
35    #[must_use]
36    pub fn new(meter: &Meter) -> Self {
37        let active_requests = meter
38            .i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
39            .with_description(
40                "Measures the number of concurrent HTTP requests that are currently in-flight.",
41            )
42            .with_unit("{request}")
43            .build();
44
45        let duration = meter
46            .f64_histogram(HTTP_SERVER_DURATION)
47            .with_description("Measures the duration of inbound HTTP requests.")
48            .with_unit("s")
49            .build();
50
51        let request_size = meter
52            .u64_histogram(HTTP_SERVER_REQUEST_SIZE)
53            .with_description("Measures the size of HTTP request messages (compressed).")
54            .with_unit("By")
55            .build();
56
57        let response_size = meter
58            .u64_histogram(HTTP_SERVER_RESPONSE_SIZE)
59            .with_description("Measures the size of HTTP request messages (compressed).")
60            .with_unit("By")
61            .build();
62
63        Self {
64            active_requests,
65            duration,
66            request_size,
67            response_size,
68        }
69    }
70}
71
72impl<H> Transform<H> for Config {
73    type Output = MetricsMiddleware<H>;
74
75    fn transform(&self, h: H) -> Self::Output {
76        MetricsMiddleware {
77            h,
78            active_requests: self.active_requests.clone(),
79            duration: self.duration.clone(),
80            request_size: self.request_size.clone(),
81            response_size: self.response_size.clone(),
82        }
83    }
84}
85
86/// Request metrics middleware with `OpenTelemetry`.
87#[derive(Debug, Clone)]
88pub struct MetricsMiddleware<H> {
89    h: H,
90    active_requests: UpDownCounter<i64>,
91    duration: Histogram<f64>,
92    request_size: Histogram<u64>,
93    response_size: Histogram<u64>,
94}
95
96#[crate::async_trait]
97impl<H, O> Handler<Request> for MetricsMiddleware<H>
98where
99    H: Handler<Request, Output = Result<O>>,
100    O: IntoResponse,
101{
102    type Output = Result<Response>;
103
104    async fn call(&self, req: Request) -> Self::Output {
105        let Self {
106            active_requests,
107            duration,
108            request_size,
109            response_size,
110            h,
111        } = self;
112
113        let timer = SystemTime::now();
114        let mut attributes = build_attributes(&req, req.route_info().pattern.as_str());
115
116        active_requests.add(1, &attributes);
117
118        request_size.record(req.content_length().unwrap_or(0), &attributes);
119
120        let resp = h
121            .call(req)
122            .await
123            .map(IntoResponse::into_response)
124            .map(|resp| {
125                active_requests.add(-1, &attributes);
126
127                attributes.push(KeyValue::new(
128                    HTTP_RESPONSE_STATUS_CODE,
129                    i64::from(resp.status().as_u16()),
130                ));
131
132                response_size.record(resp.content_length().unwrap_or(0), &attributes);
133
134                resp
135            });
136
137        duration.record(
138            timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
139            &attributes,
140        );
141
142        resp
143    }
144}
145
146fn build_attributes(req: &Request, http_route: &str) -> Vec<KeyValue> {
147    let mut attributes = Vec::with_capacity(5);
148    // <https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-spans.md#http-server>
149    attributes.push(KeyValue::new(HTTP_ROUTE, http_route.to_string()));
150
151    // <https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-spans.md#common-attributes>
152    attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()));
153    attributes.push(KeyValue::new(
154        NETWORK_PROTOCOL_VERSION,
155        format!("{:?}", req.version()),
156    ));
157
158    if let Some(remote_addr) = req.remote_addr() {
159        attributes.push(KeyValue::new(CLIENT_ADDRESS, remote_addr.to_string()));
160    }
161
162    let uri = req.uri();
163    if let Some(host) = uri.host() {
164        attributes.push(KeyValue::new(SERVER_ADDRESS, host.to_string()));
165    }
166    if let Some(port) = uri
167        .port_u16()
168        .map(i64::from)
169        .filter(|port| *port != 80 && *port != 443)
170    {
171        attributes.push(KeyValue::new(SERVER_PORT, port));
172    }
173
174    attributes.push(KeyValue::new(
175        URL_SCHEME,
176        uri.scheme().unwrap_or(&Scheme::HTTP).to_string(),
177    ));
178
179    attributes
180}