1use crate::util::http as http_util;
5use http::{Request, Response};
6use http_body::Body;
7use opentelemetry::{
8 metrics::{Histogram, Meter, UpDownCounter},
9 KeyValue,
10};
11use opentelemetry_semantic_conventions::{
12 attribute::{HTTP_REQUEST_METHOD, SERVER_ADDRESS},
13 metric::{
14 HTTP_CLIENT_ACTIVE_REQUESTS, HTTP_CLIENT_REQUEST_BODY_SIZE, HTTP_CLIENT_REQUEST_DURATION,
15 HTTP_CLIENT_RESPONSE_BODY_SIZE, HTTP_SERVER_ACTIVE_REQUESTS, HTTP_SERVER_REQUEST_BODY_SIZE,
16 HTTP_SERVER_REQUEST_DURATION, HTTP_SERVER_RESPONSE_BODY_SIZE,
17 },
18 trace::{
19 ERROR_TYPE, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE, NETWORK_PROTOCOL_NAME,
20 NETWORK_PROTOCOL_VERSION, SERVER_PORT,
21 },
22};
23use pin_project::pin_project;
24use std::{
25 fmt::Display,
26 future::Future,
27 pin::Pin,
28 sync::Arc,
29 task::{ready, Context, Poll},
30 time::Instant,
31};
32use tower_layer::Layer;
33use tower_service::Service;
34
35#[derive(Debug)]
36struct MetricsRecord {
37 request_duration: Histogram<f64>,
38 active_requests: UpDownCounter<i64>,
39 request_body_size: Histogram<u64>,
40 response_body_size: Histogram<u64>,
41}
42
43impl MetricsRecord {
44 fn server(meter: &Meter) -> Self {
45 Self {
46 request_duration: meter
47 .f64_histogram(HTTP_SERVER_REQUEST_DURATION)
48 .with_description("Duration of HTTP server requests")
49 .with_unit("s")
50 .with_boundaries(vec![
51 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
52 ])
53 .build(),
54 active_requests: meter
55 .i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
56 .with_description("Number of active HTTP server requests")
57 .with_unit("{request}")
58 .build(),
59 request_body_size: meter
60 .u64_histogram(HTTP_SERVER_REQUEST_BODY_SIZE)
61 .with_description("Size of HTTP server request body")
62 .with_unit("By")
63 .build(),
64 response_body_size: meter
65 .u64_histogram(HTTP_SERVER_RESPONSE_BODY_SIZE)
66 .with_description("Size of HTTP server response body")
67 .with_unit("By")
68 .build(),
69 }
70 }
71
72 fn client(meter: &Meter) -> Self {
73 Self {
74 request_duration: meter
75 .f64_histogram(HTTP_CLIENT_REQUEST_DURATION)
76 .with_description("Duration of HTTP client requests")
77 .with_unit("s")
78 .with_boundaries(vec![
79 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
80 ])
81 .build(),
82 request_body_size: meter
83 .u64_histogram(HTTP_CLIENT_REQUEST_BODY_SIZE)
84 .with_description("Size of HTTP client request body")
85 .with_unit("By")
86 .build(),
87 response_body_size: meter
88 .u64_histogram(HTTP_CLIENT_RESPONSE_BODY_SIZE)
89 .with_description("Size of HTTP client response body")
90 .with_unit("By")
91 .build(),
92 active_requests: meter
93 .i64_up_down_counter(HTTP_CLIENT_ACTIVE_REQUESTS)
94 .with_description("Number of active HTTP client requests")
95 .with_unit("{request}")
96 .build(),
97 }
98 }
99}
100
101#[derive(Clone, Debug)]
103pub struct HttpLayer {
104 record: Arc<MetricsRecord>,
105}
106
107impl HttpLayer {
108 pub fn server(meter: &Meter) -> Self {
110 let record = MetricsRecord::server(meter);
111 Self {
112 record: Arc::new(record),
113 }
114 }
115
116 pub fn client(meter: &Meter) -> Self {
118 let record = MetricsRecord::client(meter);
119 Self {
120 record: Arc::new(record),
121 }
122 }
123}
124
125impl<S> Layer<S> for HttpLayer {
126 type Service = Http<S>;
127
128 fn layer(&self, inner: S) -> Self::Service {
129 Http {
130 inner,
131 record: Arc::clone(&self.record),
132 }
133 }
134}
135
136#[derive(Clone, Debug)]
138pub struct Http<S> {
139 inner: S,
140 record: Arc<MetricsRecord>,
141}
142
143impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for Http<S>
144where
145 S: Service<Request<ReqBody>, Response = Response<ResBody>>,
146 S::Error: Display,
147 ReqBody: Body,
148 ResBody: Body,
149{
150 type Response = S::Response;
151 type Error = S::Error;
152 type Future = ResponseFuture<S::Future>;
153
154 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155 self.inner.poll_ready(cx)
156 }
157
158 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
159 let state = ResponseMetricState::new(&req);
160 let record = Arc::clone(&self.record);
161 let inner = self.inner.call(req);
162
163 record
164 .active_requests
165 .add(1, state.active_requests_attributes());
166
167 ResponseFuture {
168 inner,
169 record,
170 state,
171 }
172 }
173}
174
175#[pin_project]
177pub struct ResponseFuture<F> {
178 #[pin]
179 inner: F,
180 record: Arc<MetricsRecord>,
181 state: ResponseMetricState,
182}
183
184impl<F, ResBody, E> Future for ResponseFuture<F>
185where
186 F: Future<Output = Result<Response<ResBody>, E>>,
187 ResBody: Body,
188 E: Display,
189{
190 type Output = Result<Response<ResBody>, E>;
191
192 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193 let this = self.project();
194
195 let inner_response = ready!(this.inner.poll(cx));
196 let duration = this.state.elapsed_seconds();
197
198 this.state.push_response_attributes(&inner_response);
199
200 this.record
201 .request_duration
202 .record(duration, this.state.attributes());
203
204 this.record
205 .active_requests
206 .add(-1, this.state.active_requests_attributes());
207
208 if let Some(request_body_size) = this.state.request_body_size {
209 this.record
210 .request_body_size
211 .record(request_body_size, this.state.attributes());
212 }
213
214 if let Ok(response) = inner_response.as_ref() {
215 if let Some(response_size) = http_util::http_response_size(response) {
216 this.record
217 .response_body_size
218 .record(response_size, this.state.attributes());
219 }
220 }
221
222 Poll::Ready(inner_response)
223 }
224}
225
226struct ResponseMetricState {
227 start: Instant,
228 request_body_size: Option<u64>,
230 attributes: Vec<KeyValue>,
232 active_requests_attributes: usize,
234}
235
236impl ResponseMetricState {
237 fn new<B: Body>(req: &Request<B>) -> Self {
238 let start = Instant::now();
239
240 let request_body_size = http_util::http_request_size(req);
241
242 let active_requests_attributes;
243 let attributes = {
244 let mut attributes = vec![];
245
246 let http_method = http_util::http_method(req.method());
247 attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, http_method));
248
249 if let Some(server_address) = req.uri().host() {
250 attributes.push(KeyValue::new(SERVER_ADDRESS, server_address.to_string()));
251 }
252
253 if let Some(server_port) = req.uri().port_u16() {
254 attributes.push(KeyValue::new(SERVER_PORT, server_port as i64));
255 }
256
257 active_requests_attributes = attributes.len();
258
259 attributes.push(KeyValue::new(NETWORK_PROTOCOL_NAME, "http"));
260
261 if let Some(http_version) = http_util::http_version(req.version()) {
262 attributes.push(KeyValue::new(NETWORK_PROTOCOL_VERSION, http_version));
263 }
264
265 if let Some(http_route) = http_util::http_route(req) {
266 attributes.push(KeyValue::new(HTTP_ROUTE, http_route.to_string()));
267 }
268
269 attributes
270 };
271
272 Self {
273 start,
274 request_body_size,
275 attributes,
276 active_requests_attributes,
277 }
278 }
279
280 fn push_response_attributes<B, E>(&mut self, res: &Result<Response<B>, E>)
281 where
282 E: Display,
283 {
284 match res {
285 Ok(response) => {
286 self.attributes.push(KeyValue::new(
287 HTTP_RESPONSE_STATUS_CODE,
288 response.status().as_u16() as i64,
289 ));
290 }
291 Err(err) => {
292 self.attributes
293 .push(KeyValue::new(ERROR_TYPE, err.to_string()));
294 }
295 }
296 }
297
298 fn elapsed_seconds(&self) -> f64 {
300 self.start.elapsed().as_secs_f64()
301 }
302
303 fn attributes(&self) -> &[KeyValue] {
305 &self.attributes[..]
306 }
307
308 fn active_requests_attributes(&self) -> &[KeyValue] {
310 &self.attributes[..self.active_requests_attributes]
311 }
312}