spring_opentelemetry/
metrics.rs

1//! Middleware that adds metrics to a [`Service`] that handles HTTP requests.
2//! refs: https://opentelemetry.io/docs/specs/semconv/http/http-metrics/
3
4use crate::util::http as http_util;
5use http::{Request, Response};
6use http_body::Body;
7use opentelemetry::{
8    metrics::{Histogram, Meter, UpDownCounter},
9    KeyValue,
10};
11use opentelemetry_semantic_conventions::{
12    attribute::{HTTP_REQUEST_METHOD, SERVER_ADDRESS},
13    metric::{
14        HTTP_CLIENT_ACTIVE_REQUESTS, HTTP_CLIENT_REQUEST_BODY_SIZE, HTTP_CLIENT_REQUEST_DURATION,
15        HTTP_CLIENT_RESPONSE_BODY_SIZE, HTTP_SERVER_ACTIVE_REQUESTS, HTTP_SERVER_REQUEST_BODY_SIZE,
16        HTTP_SERVER_REQUEST_DURATION, HTTP_SERVER_RESPONSE_BODY_SIZE,
17    },
18    trace::{
19        ERROR_TYPE, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE, NETWORK_PROTOCOL_NAME,
20        NETWORK_PROTOCOL_VERSION, SERVER_PORT,
21    },
22};
23use pin_project::pin_project;
24use std::{
25    fmt::Display,
26    future::Future,
27    pin::Pin,
28    sync::Arc,
29    task::{ready, Context, Poll},
30    time::Instant,
31};
32use tower_layer::Layer;
33use tower_service::Service;
34
35#[derive(Debug)]
36struct MetricsRecord {
37    request_duration: Histogram<f64>,
38    active_requests: UpDownCounter<i64>,
39    request_body_size: Histogram<u64>,
40    response_body_size: Histogram<u64>,
41}
42
43impl MetricsRecord {
44    fn server(meter: &Meter) -> Self {
45        Self {
46            request_duration: meter
47                .f64_histogram(HTTP_SERVER_REQUEST_DURATION)
48                .with_description("Duration of HTTP server requests")
49                .with_unit("s")
50                .with_boundaries(vec![
51                    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
52                ])
53                .build(),
54            active_requests: meter
55                .i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
56                .with_description("Number of active HTTP server requests")
57                .with_unit("{request}")
58                .build(),
59            request_body_size: meter
60                .u64_histogram(HTTP_SERVER_REQUEST_BODY_SIZE)
61                .with_description("Size of HTTP server request body")
62                .with_unit("By")
63                .build(),
64            response_body_size: meter
65                .u64_histogram(HTTP_SERVER_RESPONSE_BODY_SIZE)
66                .with_description("Size of HTTP server response body")
67                .with_unit("By")
68                .build(),
69        }
70    }
71
72    fn client(meter: &Meter) -> Self {
73        Self {
74            request_duration: meter
75                .f64_histogram(HTTP_CLIENT_REQUEST_DURATION)
76                .with_description("Duration of HTTP client requests")
77                .with_unit("s")
78                .with_boundaries(vec![
79                    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
80                ])
81                .build(),
82            request_body_size: meter
83                .u64_histogram(HTTP_CLIENT_REQUEST_BODY_SIZE)
84                .with_description("Size of HTTP client request body")
85                .with_unit("By")
86                .build(),
87            response_body_size: meter
88                .u64_histogram(HTTP_CLIENT_RESPONSE_BODY_SIZE)
89                .with_description("Size of HTTP client response body")
90                .with_unit("By")
91                .build(),
92            active_requests: meter
93                .i64_up_down_counter(HTTP_CLIENT_ACTIVE_REQUESTS)
94                .with_description("Number of active HTTP client requests")
95                .with_unit("{request}")
96                .build(),
97        }
98    }
99}
100
101/// [`Layer`] that adds tracing to a [`Service`] that handles HTTP requests.
102#[derive(Clone, Debug)]
103pub struct HttpLayer {
104    record: Arc<MetricsRecord>,
105}
106
107impl HttpLayer {
108    /// Metrics are recorded from server side.
109    pub fn server(meter: &Meter) -> Self {
110        let record = MetricsRecord::server(meter);
111        Self {
112            record: Arc::new(record),
113        }
114    }
115
116    /// Metrics are recorded from client side.
117    pub fn client(meter: &Meter) -> Self {
118        let record = MetricsRecord::client(meter);
119        Self {
120            record: Arc::new(record),
121        }
122    }
123}
124
125impl<S> Layer<S> for HttpLayer {
126    type Service = Http<S>;
127
128    fn layer(&self, inner: S) -> Self::Service {
129        Http {
130            inner,
131            record: Arc::clone(&self.record),
132        }
133    }
134}
135
136/// Middleware that adds tracing to a [`Service`] that handles HTTP requests.
137#[derive(Clone, Debug)]
138pub struct Http<S> {
139    inner: S,
140    record: Arc<MetricsRecord>,
141}
142
143impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for Http<S>
144where
145    S: Service<Request<ReqBody>, Response = Response<ResBody>>,
146    S::Error: Display,
147    ReqBody: Body,
148    ResBody: Body,
149{
150    type Response = S::Response;
151    type Error = S::Error;
152    type Future = ResponseFuture<S::Future>;
153
154    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155        self.inner.poll_ready(cx)
156    }
157
158    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
159        let state = ResponseMetricState::new(&req);
160        let record = Arc::clone(&self.record);
161        let inner = self.inner.call(req);
162
163        record
164            .active_requests
165            .add(1, state.active_requests_attributes());
166
167        ResponseFuture {
168            inner,
169            record,
170            state,
171        }
172    }
173}
174
175/// Response future for [`Http`].
176#[pin_project]
177pub struct ResponseFuture<F> {
178    #[pin]
179    inner: F,
180    record: Arc<MetricsRecord>,
181    state: ResponseMetricState,
182}
183
184impl<F, ResBody, E> Future for ResponseFuture<F>
185where
186    F: Future<Output = Result<Response<ResBody>, E>>,
187    ResBody: Body,
188    E: Display,
189{
190    type Output = Result<Response<ResBody>, E>;
191
192    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193        let this = self.project();
194
195        let inner_response = ready!(this.inner.poll(cx));
196        let duration = this.state.elapsed_seconds();
197
198        this.state.push_response_attributes(&inner_response);
199
200        this.record
201            .request_duration
202            .record(duration, this.state.attributes());
203
204        this.record
205            .active_requests
206            .add(-1, this.state.active_requests_attributes());
207
208        if let Some(request_body_size) = this.state.request_body_size {
209            this.record
210                .request_body_size
211                .record(request_body_size, this.state.attributes());
212        }
213
214        if let Ok(response) = inner_response.as_ref() {
215            if let Some(response_size) = http_util::http_response_size(response) {
216                this.record
217                    .response_body_size
218                    .record(response_size, this.state.attributes());
219            }
220        }
221
222        Poll::Ready(inner_response)
223    }
224}
225
226struct ResponseMetricState {
227    start: Instant,
228    /// The size of the request body.
229    request_body_size: Option<u64>,
230    /// Attributes to add to the metrics.
231    attributes: Vec<KeyValue>,
232    /// The number of attributes that are used for only for active requests counter.
233    active_requests_attributes: usize,
234}
235
236impl ResponseMetricState {
237    fn new<B: Body>(req: &Request<B>) -> Self {
238        let start = Instant::now();
239
240        let request_body_size = http_util::http_request_size(req);
241
242        let active_requests_attributes;
243        let attributes = {
244            let mut attributes = vec![];
245
246            let http_method = http_util::http_method(req.method());
247            attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, http_method));
248
249            if let Some(server_address) = req.uri().host() {
250                attributes.push(KeyValue::new(SERVER_ADDRESS, server_address.to_string()));
251            }
252
253            if let Some(server_port) = req.uri().port_u16() {
254                attributes.push(KeyValue::new(SERVER_PORT, server_port as i64));
255            }
256
257            active_requests_attributes = attributes.len();
258
259            attributes.push(KeyValue::new(NETWORK_PROTOCOL_NAME, "http"));
260
261            if let Some(http_version) = http_util::http_version(req.version()) {
262                attributes.push(KeyValue::new(NETWORK_PROTOCOL_VERSION, http_version));
263            }
264
265            if let Some(http_route) = http_util::http_route(req) {
266                attributes.push(KeyValue::new(HTTP_ROUTE, http_route.to_string()));
267            }
268
269            attributes
270        };
271
272        Self {
273            start,
274            request_body_size,
275            attributes,
276            active_requests_attributes,
277        }
278    }
279
280    fn push_response_attributes<B, E>(&mut self, res: &Result<Response<B>, E>)
281    where
282        E: Display,
283    {
284        match res {
285            Ok(response) => {
286                self.attributes.push(KeyValue::new(
287                    HTTP_RESPONSE_STATUS_CODE,
288                    response.status().as_u16() as i64,
289                ));
290            }
291            Err(err) => {
292                self.attributes
293                    .push(KeyValue::new(ERROR_TYPE, err.to_string()));
294            }
295        }
296    }
297
298    /// Returns the elapsed time since the request was created in seconds.
299    fn elapsed_seconds(&self) -> f64 {
300        self.start.elapsed().as_secs_f64()
301    }
302
303    /// Return the attributes for each metric.
304    fn attributes(&self) -> &[KeyValue] {
305        &self.attributes[..]
306    }
307
308    /// Returns the attributes used for active requests counter.
309    fn active_requests_attributes(&self) -> &[KeyValue] {
310        &self.attributes[..self.active_requests_attributes]
311    }
312}