tower_otel/metrics/
http.rs

1//! Middleware that adds metrics to a [`Service`] that handles HTTP requests.
2
3use std::{
4    fmt::Display,
5    future::Future,
6    pin::Pin,
7    sync::Arc,
8    task::{ready, Context, Poll},
9    time::Instant,
10};
11
12use http::{Request, Response};
13use http_body::Body;
14use opentelemetry::{
15    metrics::{Histogram, Meter, UpDownCounter},
16    KeyValue,
17};
18use pin_project::pin_project;
19use tower_layer::Layer;
20use tower_service::Service;
21
22use crate::util;
23
24/// The side from which metrics are recorded.
25#[derive(Clone, Copy, Debug)]
26enum MetricSide {
27    /// The span describes a request sent to some remote service.
28    Client,
29    /// The span describes the server-side handling of a request.
30    Server,
31}
32
33#[derive(Debug)]
34struct MetricsRecord {
35    side: MetricSide,
36    request_duration: Histogram<f64>,
37    active_requests: UpDownCounter<i64>,
38    request_body_size: Histogram<u64>,
39    response_body_size: Histogram<u64>,
40}
41
42impl MetricsRecord {
43    fn server(meter: &Meter) -> Self {
44        Self {
45            side: MetricSide::Server,
46            request_duration: meter
47                .f64_histogram("http.server.request.duration")
48                .with_description("Duration of HTTP server requests")
49                .with_unit("s")
50                .with_boundaries(vec![
51                    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
52                ])
53                .build(),
54            active_requests: meter
55                .i64_up_down_counter("http.server.active_requests")
56                .with_description("Number of active HTTP server requests")
57                .with_unit("{request}")
58                .build(),
59            request_body_size: meter
60                .u64_histogram("http.server.request.body.size")
61                .with_description("Size of HTTP server request body")
62                .with_unit("By")
63                .build(),
64            response_body_size: meter
65                .u64_histogram("http.server.response.body.size")
66                .with_description("Size of HTTP server response body")
67                .with_unit("By")
68                .build(),
69        }
70    }
71
72    fn client(meter: &Meter) -> Self {
73        Self {
74            side: MetricSide::Client,
75            request_duration: meter
76                .f64_histogram("http.client.request.duration")
77                .with_description("Duration of HTTP client requests")
78                .with_unit("s")
79                .with_boundaries(vec![
80                    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
81                ])
82                .build(),
83            request_body_size: meter
84                .u64_histogram("http.client.request.body.size")
85                .with_description("Size of HTTP client request body")
86                .with_unit("By")
87                .build(),
88            response_body_size: meter
89                .u64_histogram("http.client.response.body.size")
90                .with_description("Size of HTTP client response body")
91                .with_unit("By")
92                .build(),
93            active_requests: meter
94                .i64_up_down_counter("http.client.active_requests")
95                .with_description("Number of active HTTP client requests")
96                .with_unit("{request}")
97                .build(),
98        }
99    }
100}
101
102/// [`Layer`] that adds tracing to a [`Service`] that handles HTTP requests.
103#[derive(Clone, Debug)]
104pub struct HttpLayer {
105    record: Arc<MetricsRecord>,
106}
107
108impl HttpLayer {
109    /// Metrics are recorded from server side.
110    pub fn server(meter: &Meter) -> Self {
111        let record = MetricsRecord::server(meter);
112        Self {
113            record: Arc::new(record),
114        }
115    }
116
117    /// Metrics are recorded from client side.
118    pub fn client(meter: &Meter) -> Self {
119        let record = MetricsRecord::client(meter);
120        Self {
121            record: Arc::new(record),
122        }
123    }
124}
125
126impl<S> Layer<S> for HttpLayer {
127    type Service = Http<S>;
128
129    fn layer(&self, inner: S) -> Self::Service {
130        Http {
131            inner,
132            record: Arc::clone(&self.record),
133        }
134    }
135}
136
137/// Middleware that adds tracing to a [`Service`] that handles HTTP requests.
138#[derive(Clone, Debug)]
139pub struct Http<S> {
140    inner: S,
141    record: Arc<MetricsRecord>,
142}
143
144impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for Http<S>
145where
146    S: Service<Request<ReqBody>, Response = Response<ResBody>>,
147    S::Error: Display,
148    ReqBody: Body,
149    ResBody: Body,
150{
151    type Response = S::Response;
152    type Error = S::Error;
153    type Future = ResponseFuture<S::Future>;
154
155    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156        self.inner.poll_ready(cx)
157    }
158
159    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
160        let side = self.record.side;
161        let state = ResponseMetricState::new(side, &req);
162        let record = Arc::clone(&self.record);
163        let inner = self.inner.call(req);
164
165        record
166            .active_requests
167            .add(1, state.active_requests_attributes());
168
169        ResponseFuture {
170            inner,
171            record,
172            state,
173        }
174    }
175}
176
177/// Response future for [`Http`].
178#[pin_project]
179pub struct ResponseFuture<F> {
180    #[pin]
181    inner: F,
182    record: Arc<MetricsRecord>,
183    state: ResponseMetricState,
184}
185
186impl<F, ResBody, E> Future for ResponseFuture<F>
187where
188    F: Future<Output = Result<Response<ResBody>, E>>,
189    ResBody: Body,
190    E: Display,
191{
192    type Output = Result<Response<ResBody>, E>;
193
194    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195        let this = self.project();
196
197        let inner_response = ready!(this.inner.poll(cx));
198        let duration = this.state.elapsed_seconds();
199
200        this.state.push_response_attributes(&inner_response);
201
202        this.record
203            .request_duration
204            .record(duration, this.state.attributes());
205
206        this.record
207            .active_requests
208            .add(-1, this.state.active_requests_attributes());
209
210        if let Some(request_body_size) = this.state.request_body_size {
211            this.record
212                .request_body_size
213                .record(request_body_size, this.state.attributes());
214        }
215
216        if let Ok(response) = inner_response.as_ref() {
217            if let Some(response_size) = util::http_response_size(response) {
218                this.record
219                    .response_body_size
220                    .record(response_size, this.state.attributes());
221            }
222        }
223
224        Poll::Ready(inner_response)
225    }
226}
227
228struct ResponseMetricState {
229    start: Instant,
230    /// The size of the request body.
231    request_body_size: Option<u64>,
232    /// Attributes to add to the metrics.
233    attributes: Vec<KeyValue>,
234    /// The number of attributes that are used for only for active requests counter.
235    active_requests_attributes: usize,
236}
237
238impl ResponseMetricState {
239    fn new<B: Body>(side: MetricSide, req: &Request<B>) -> Self {
240        let start = Instant::now();
241
242        let request_body_size = util::http_request_size(req);
243
244        let active_requests_attributes;
245        let attributes = {
246            let mut attributes = vec![];
247
248            let http_method = util::http_method(req.method());
249            attributes.push(KeyValue::new("http.request.method", http_method));
250
251            if let Some(server_address) = req.uri().host() {
252                attributes.push(KeyValue::new("server.address", server_address.to_string()));
253            }
254
255            if let Some(server_port) = req.uri().port_u16() {
256                attributes.push(KeyValue::new("server.port", server_port as i64));
257            }
258
259            match side {
260                // For client side the protocol is the URL.
261                MetricSide::Client => {
262                    let util::HttpRequestAttributes {
263                        url_scheme,
264                        server_address,
265                        server_port,
266                    } = util::HttpRequestAttributes::from_sent_request(req);
267
268                    if let Some(server_address) = server_address {
269                        attributes
270                            .push(KeyValue::new("server.address", server_address.to_string()));
271                    }
272                    if let Some(server_port) = server_port {
273                        attributes.push(KeyValue::new("server.port", server_port.to_string()));
274                    }
275                    if let Some(url_scheme) = url_scheme {
276                        attributes.push(KeyValue::new("url.scheme", url_scheme.to_string()));
277                    }
278                }
279                MetricSide::Server => {
280                    let util::HttpRequestAttributes {
281                        url_scheme,
282                        server_address,
283                        server_port,
284                    } = util::HttpRequestAttributes::from_recv_request(req);
285
286                    if let Some(server_address) = server_address {
287                        attributes
288                            .push(KeyValue::new("server.address", server_address.to_string()));
289                    }
290                    if let Some(server_port) = server_port {
291                        attributes.push(KeyValue::new("server.port", server_port.to_string()));
292                    }
293                    if let Some(url_scheme) = url_scheme {
294                        attributes.push(KeyValue::new("url.scheme", url_scheme.to_string()));
295                    }
296                }
297            };
298
299            active_requests_attributes = attributes.len();
300
301            attributes.push(KeyValue::new("network.protocol.name", "http"));
302
303            if let Some(http_version) = util::http_version(req.version()) {
304                attributes.push(KeyValue::new("network.protocol.version", http_version));
305            }
306
307            if let Some(http_route) = util::http_route(req) {
308                attributes.push(KeyValue::new("http.route", http_route.to_string()));
309            }
310
311            attributes
312        };
313
314        Self {
315            start,
316            request_body_size,
317            attributes,
318            active_requests_attributes,
319        }
320    }
321
322    fn push_response_attributes<B, E>(&mut self, res: &Result<Response<B>, E>)
323    where
324        E: Display,
325    {
326        match res {
327            Ok(response) => {
328                self.attributes.push(KeyValue::new(
329                    "http.response.status_code",
330                    response.status().as_u16() as i64,
331                ));
332            }
333            Err(err) => {
334                self.attributes
335                    .push(KeyValue::new("error.type", err.to_string()));
336            }
337        }
338    }
339
340    /// Returns the elapsed time since the request was created in seconds.
341    fn elapsed_seconds(&self) -> f64 {
342        self.start.elapsed().as_secs_f64()
343    }
344
345    /// Return the attributes for each metric.
346    fn attributes(&self) -> &[KeyValue] {
347        &self.attributes[..]
348    }
349
350    /// Returns the attributes used for active requests counter.
351    fn active_requests_attributes(&self) -> &[KeyValue] {
352        &self.attributes[..self.active_requests_attributes]
353    }
354}