viz_core/middleware/otel/
metrics.rs1use std::time::SystemTime;
6
7use http::uri::Scheme;
8use opentelemetry::{
9 metrics::{Histogram, Meter, UpDownCounter},
10 KeyValue,
11};
12use opentelemetry_semantic_conventions::trace::{
13 CLIENT_ADDRESS, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE,
14 NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, SERVER_PORT, URL_SCHEME,
15};
16
17use crate::{Handler, IntoResponse, Request, RequestExt, Response, ResponseExt, Result, Transform};
18
19const HTTP_SERVER_ACTIVE_REQUESTS: &str = "http.server.active_requests";
20const HTTP_SERVER_DURATION: &str = "http.server.duration";
21const HTTP_SERVER_REQUEST_SIZE: &str = "http.server.request.size";
22const HTTP_SERVER_RESPONSE_SIZE: &str = "http.server.response.size";
23
24#[derive(Clone, Debug)]
26pub struct Config {
27 active_requests: UpDownCounter<i64>,
28 duration: Histogram<f64>,
29 request_size: Histogram<u64>,
30 response_size: Histogram<u64>,
31}
32
33impl Config {
34 #[must_use]
36 pub fn new(meter: &Meter) -> Self {
37 let active_requests = meter
38 .i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
39 .with_description(
40 "Measures the number of concurrent HTTP requests that are currently in-flight.",
41 )
42 .with_unit("{request}")
43 .build();
44
45 let duration = meter
46 .f64_histogram(HTTP_SERVER_DURATION)
47 .with_description("Measures the duration of inbound HTTP requests.")
48 .with_unit("s")
49 .build();
50
51 let request_size = meter
52 .u64_histogram(HTTP_SERVER_REQUEST_SIZE)
53 .with_description("Measures the size of HTTP request messages (compressed).")
54 .with_unit("By")
55 .build();
56
57 let response_size = meter
58 .u64_histogram(HTTP_SERVER_RESPONSE_SIZE)
59 .with_description("Measures the size of HTTP request messages (compressed).")
60 .with_unit("By")
61 .build();
62
63 Self {
64 active_requests,
65 duration,
66 request_size,
67 response_size,
68 }
69 }
70}
71
72impl<H> Transform<H> for Config {
73 type Output = MetricsMiddleware<H>;
74
75 fn transform(&self, h: H) -> Self::Output {
76 MetricsMiddleware {
77 h,
78 active_requests: self.active_requests.clone(),
79 duration: self.duration.clone(),
80 request_size: self.request_size.clone(),
81 response_size: self.response_size.clone(),
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct MetricsMiddleware<H> {
89 h: H,
90 active_requests: UpDownCounter<i64>,
91 duration: Histogram<f64>,
92 request_size: Histogram<u64>,
93 response_size: Histogram<u64>,
94}
95
96#[crate::async_trait]
97impl<H, O> Handler<Request> for MetricsMiddleware<H>
98where
99 H: Handler<Request, Output = Result<O>>,
100 O: IntoResponse,
101{
102 type Output = Result<Response>;
103
104 async fn call(&self, req: Request) -> Self::Output {
105 let Self {
106 active_requests,
107 duration,
108 request_size,
109 response_size,
110 h,
111 } = self;
112
113 let timer = SystemTime::now();
114 let mut attributes = build_attributes(&req, req.route_info().pattern.as_str());
115
116 active_requests.add(1, &attributes);
117
118 request_size.record(req.content_length().unwrap_or(0), &attributes);
119
120 let resp = h
121 .call(req)
122 .await
123 .map(IntoResponse::into_response)
124 .map(|resp| {
125 active_requests.add(-1, &attributes);
126
127 attributes.push(KeyValue::new(
128 HTTP_RESPONSE_STATUS_CODE,
129 i64::from(resp.status().as_u16()),
130 ));
131
132 response_size.record(resp.content_length().unwrap_or(0), &attributes);
133
134 resp
135 });
136
137 duration.record(
138 timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
139 &attributes,
140 );
141
142 resp
143 }
144}
145
146fn build_attributes(req: &Request, http_route: &str) -> Vec<KeyValue> {
147 let mut attributes = Vec::with_capacity(5);
148 attributes.push(KeyValue::new(HTTP_ROUTE, http_route.to_string()));
150
151 attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()));
153 attributes.push(KeyValue::new(
154 NETWORK_PROTOCOL_VERSION,
155 format!("{:?}", req.version()),
156 ));
157
158 if let Some(remote_addr) = req.remote_addr() {
159 attributes.push(KeyValue::new(CLIENT_ADDRESS, remote_addr.to_string()));
160 }
161
162 let uri = req.uri();
163 if let Some(host) = uri.host() {
164 attributes.push(KeyValue::new(SERVER_ADDRESS, host.to_string()));
165 }
166 if let Some(port) = uri
167 .port_u16()
168 .map(i64::from)
169 .filter(|port| *port != 80 && *port != 443)
170 {
171 attributes.push(KeyValue::new(SERVER_PORT, port));
172 }
173
174 attributes.push(KeyValue::new(
175 URL_SCHEME,
176 uri.scheme().unwrap_or(&Scheme::HTTP).to_string(),
177 ));
178
179 attributes
180}