1use std::{
4 fmt::Display,
5 future::Future,
6 pin::Pin,
7 sync::Arc,
8 task::{ready, Context, Poll},
9 time::Instant,
10};
11
12use http::{Request, Response};
13use http_body::Body;
14use opentelemetry::{
15 metrics::{Histogram, Meter, UpDownCounter},
16 KeyValue,
17};
18use pin_project::pin_project;
19use tower_layer::Layer;
20use tower_service::Service;
21
22use crate::util;
23
24#[derive(Clone, Copy, Debug)]
26enum MetricSide {
27 Client,
29 Server,
31}
32
33#[derive(Debug)]
34struct MetricsRecord {
35 side: MetricSide,
36 request_duration: Histogram<f64>,
37 active_requests: UpDownCounter<i64>,
38 request_body_size: Histogram<u64>,
39 response_body_size: Histogram<u64>,
40}
41
42impl MetricsRecord {
43 fn server(meter: &Meter) -> Self {
44 Self {
45 side: MetricSide::Server,
46 request_duration: meter
47 .f64_histogram("http.server.request.duration")
48 .with_description("Duration of HTTP server requests")
49 .with_unit("s")
50 .with_boundaries(vec![
51 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
52 ])
53 .build(),
54 active_requests: meter
55 .i64_up_down_counter("http.server.active_requests")
56 .with_description("Number of active HTTP server requests")
57 .with_unit("{request}")
58 .build(),
59 request_body_size: meter
60 .u64_histogram("http.server.request.body.size")
61 .with_description("Size of HTTP server request body")
62 .with_unit("By")
63 .build(),
64 response_body_size: meter
65 .u64_histogram("http.server.response.body.size")
66 .with_description("Size of HTTP server response body")
67 .with_unit("By")
68 .build(),
69 }
70 }
71
72 fn client(meter: &Meter) -> Self {
73 Self {
74 side: MetricSide::Client,
75 request_duration: meter
76 .f64_histogram("http.client.request.duration")
77 .with_description("Duration of HTTP client requests")
78 .with_unit("s")
79 .with_boundaries(vec![
80 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
81 ])
82 .build(),
83 request_body_size: meter
84 .u64_histogram("http.client.request.body.size")
85 .with_description("Size of HTTP client request body")
86 .with_unit("By")
87 .build(),
88 response_body_size: meter
89 .u64_histogram("http.client.response.body.size")
90 .with_description("Size of HTTP client response body")
91 .with_unit("By")
92 .build(),
93 active_requests: meter
94 .i64_up_down_counter("http.client.active_requests")
95 .with_description("Number of active HTTP client requests")
96 .with_unit("{request}")
97 .build(),
98 }
99 }
100}
101
102#[derive(Clone, Debug)]
104pub struct HttpLayer {
105 record: Arc<MetricsRecord>,
106}
107
108impl HttpLayer {
109 pub fn server(meter: &Meter) -> Self {
111 let record = MetricsRecord::server(meter);
112 Self {
113 record: Arc::new(record),
114 }
115 }
116
117 pub fn client(meter: &Meter) -> Self {
119 let record = MetricsRecord::client(meter);
120 Self {
121 record: Arc::new(record),
122 }
123 }
124}
125
126impl<S> Layer<S> for HttpLayer {
127 type Service = Http<S>;
128
129 fn layer(&self, inner: S) -> Self::Service {
130 Http {
131 inner,
132 record: Arc::clone(&self.record),
133 }
134 }
135}
136
137#[derive(Clone, Debug)]
139pub struct Http<S> {
140 inner: S,
141 record: Arc<MetricsRecord>,
142}
143
144impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for Http<S>
145where
146 S: Service<Request<ReqBody>, Response = Response<ResBody>>,
147 S::Error: Display,
148 ReqBody: Body,
149 ResBody: Body,
150{
151 type Response = S::Response;
152 type Error = S::Error;
153 type Future = ResponseFuture<S::Future>;
154
155 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156 self.inner.poll_ready(cx)
157 }
158
159 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
160 let side = self.record.side;
161 let state = ResponseMetricState::new(side, &req);
162 let record = Arc::clone(&self.record);
163 let inner = self.inner.call(req);
164
165 record
166 .active_requests
167 .add(1, state.active_requests_attributes());
168
169 ResponseFuture {
170 inner,
171 record,
172 state,
173 }
174 }
175}
176
177#[pin_project]
179pub struct ResponseFuture<F> {
180 #[pin]
181 inner: F,
182 record: Arc<MetricsRecord>,
183 state: ResponseMetricState,
184}
185
186impl<F, ResBody, E> Future for ResponseFuture<F>
187where
188 F: Future<Output = Result<Response<ResBody>, E>>,
189 ResBody: Body,
190 E: Display,
191{
192 type Output = Result<Response<ResBody>, E>;
193
194 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195 let this = self.project();
196
197 let inner_response = ready!(this.inner.poll(cx));
198 let duration = this.state.elapsed_seconds();
199
200 this.state.push_response_attributes(&inner_response);
201
202 this.record
203 .request_duration
204 .record(duration, this.state.attributes());
205
206 this.record
207 .active_requests
208 .add(-1, this.state.active_requests_attributes());
209
210 if let Some(request_body_size) = this.state.request_body_size {
211 this.record
212 .request_body_size
213 .record(request_body_size, this.state.attributes());
214 }
215
216 if let Ok(response) = inner_response.as_ref() {
217 if let Some(response_size) = util::http_response_size(response) {
218 this.record
219 .response_body_size
220 .record(response_size, this.state.attributes());
221 }
222 }
223
224 Poll::Ready(inner_response)
225 }
226}
227
228struct ResponseMetricState {
229 start: Instant,
230 request_body_size: Option<u64>,
232 attributes: Vec<KeyValue>,
234 active_requests_attributes: usize,
236}
237
238impl ResponseMetricState {
239 fn new<B: Body>(side: MetricSide, req: &Request<B>) -> Self {
240 let start = Instant::now();
241
242 let request_body_size = util::http_request_size(req);
243
244 let active_requests_attributes;
245 let attributes = {
246 let mut attributes = vec![];
247
248 let http_method = util::http_method(req.method());
249 attributes.push(KeyValue::new("http.request.method", http_method));
250
251 if let Some(server_address) = req.uri().host() {
252 attributes.push(KeyValue::new("server.address", server_address.to_string()));
253 }
254
255 if let Some(server_port) = req.uri().port_u16() {
256 attributes.push(KeyValue::new("server.port", server_port as i64));
257 }
258
259 match side {
260 MetricSide::Client => {
262 let util::HttpRequestAttributes {
263 url_scheme,
264 server_address,
265 server_port,
266 } = util::HttpRequestAttributes::from_sent_request(req);
267
268 if let Some(server_address) = server_address {
269 attributes
270 .push(KeyValue::new("server.address", server_address.to_string()));
271 }
272 if let Some(server_port) = server_port {
273 attributes.push(KeyValue::new("server.port", server_port.to_string()));
274 }
275 if let Some(url_scheme) = url_scheme {
276 attributes.push(KeyValue::new("url.scheme", url_scheme.to_string()));
277 }
278 }
279 MetricSide::Server => {
280 let util::HttpRequestAttributes {
281 url_scheme,
282 server_address,
283 server_port,
284 } = util::HttpRequestAttributes::from_recv_request(req);
285
286 if let Some(server_address) = server_address {
287 attributes
288 .push(KeyValue::new("server.address", server_address.to_string()));
289 }
290 if let Some(server_port) = server_port {
291 attributes.push(KeyValue::new("server.port", server_port.to_string()));
292 }
293 if let Some(url_scheme) = url_scheme {
294 attributes.push(KeyValue::new("url.scheme", url_scheme.to_string()));
295 }
296 }
297 };
298
299 active_requests_attributes = attributes.len();
300
301 attributes.push(KeyValue::new("network.protocol.name", "http"));
302
303 if let Some(http_version) = util::http_version(req.version()) {
304 attributes.push(KeyValue::new("network.protocol.version", http_version));
305 }
306
307 if let Some(http_route) = util::http_route(req) {
308 attributes.push(KeyValue::new("http.route", http_route.to_string()));
309 }
310
311 attributes
312 };
313
314 Self {
315 start,
316 request_body_size,
317 attributes,
318 active_requests_attributes,
319 }
320 }
321
322 fn push_response_attributes<B, E>(&mut self, res: &Result<Response<B>, E>)
323 where
324 E: Display,
325 {
326 match res {
327 Ok(response) => {
328 self.attributes.push(KeyValue::new(
329 "http.response.status_code",
330 response.status().as_u16() as i64,
331 ));
332 }
333 Err(err) => {
334 self.attributes
335 .push(KeyValue::new("error.type", err.to_string()));
336 }
337 }
338 }
339
340 fn elapsed_seconds(&self) -> f64 {
342 self.start.elapsed().as_secs_f64()
343 }
344
345 fn attributes(&self) -> &[KeyValue] {
347 &self.attributes[..]
348 }
349
350 fn active_requests_attributes(&self) -> &[KeyValue] {
352 &self.attributes[..self.active_requests_attributes]
353 }
354}