actix_web_opentelemetry/middleware/
metrics.rs1use actix_http::{
4 body::{BodySize, MessageBody},
5 header::CONTENT_LENGTH,
6};
7use actix_web::dev;
8use futures_util::future::{self, FutureExt as _, LocalBoxFuture};
9use opentelemetry::{
10 global,
11 metrics::{Histogram, Meter, MeterProvider, UpDownCounter},
12 KeyValue,
13};
14use std::borrow::Cow;
15use std::{sync::Arc, time::SystemTime};
16
17use super::get_scope;
18use crate::util::metrics_attributes_from_request;
19use crate::RouteFormatter;
20
21use opentelemetry_semantic_conventions::trace::HTTP_RESPONSE_STATUS_CODE;
24
25const HTTP_SERVER_DURATION: &str = "http.server.duration";
26const HTTP_SERVER_ACTIVE_REQUESTS: &str = "http.server.active_requests";
27const HTTP_SERVER_REQUEST_SIZE: &str = "http.server.request.size";
28const HTTP_SERVER_RESPONSE_SIZE: &str = "http.server.response.size";
29
30#[derive(Clone, Debug)]
36struct Metrics {
37 http_server_duration: Histogram<f64>,
38 http_server_active_requests: UpDownCounter<i64>,
39 http_server_request_size: Histogram<u64>,
40 http_server_response_size: Histogram<u64>,
41}
42
43impl Metrics {
44 fn new(meter: Meter) -> Self {
46 let http_server_duration = meter
47 .f64_histogram(HTTP_SERVER_DURATION)
48 .with_description("Measures the duration of inbound HTTP requests.")
49 .with_unit("s")
50 .build();
51
52 let http_server_active_requests = meter
53 .i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
54 .with_description(
55 "Measures the number of concurrent HTTP requests that are currently in-flight.",
56 )
57 .build();
58
59 let http_server_request_size = meter
60 .u64_histogram(HTTP_SERVER_REQUEST_SIZE)
61 .with_description("Measures the size of HTTP request messages (compressed).")
62 .with_unit("By")
63 .build();
64
65 let http_server_response_size = meter
66 .u64_histogram(HTTP_SERVER_RESPONSE_SIZE)
67 .with_description("Measures the size of HTTP response messages (compressed).")
68 .with_unit("By")
69 .build();
70
71 Metrics {
72 http_server_active_requests,
73 http_server_duration,
74 http_server_request_size,
75 http_server_response_size,
76 }
77 }
78}
79
80type MetricsAttrsFromReqFn = fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>;
81
82#[derive(Clone, Debug, Default)]
84pub struct RequestMetricsBuilder {
85 route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
86 meter: Option<Meter>,
87 metric_attrs_from_req: Option<MetricsAttrsFromReqFn>,
88}
89
90impl RequestMetricsBuilder {
91 pub fn new() -> Self {
93 Self::default()
94 }
95
96 pub fn with_route_formatter<R>(mut self, route_formatter: R) -> Self
98 where
99 R: RouteFormatter + Send + Sync + 'static,
100 {
101 self.route_formatter = Some(Arc::new(route_formatter));
102 self
103 }
104
105 pub fn with_meter_provider(mut self, meter_provider: impl MeterProvider) -> Self {
107 self.meter = Some(meter_provider.meter_with_scope(get_scope()));
108 self
109 }
110
111 pub fn with_metric_attrs_from_req(
113 mut self,
114 metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
115 ) -> Self {
116 self.metric_attrs_from_req = Some(metric_attrs_from_req);
117 self
118 }
119
120 pub fn build(self) -> RequestMetrics {
122 let meter = self
123 .meter
124 .unwrap_or_else(|| global::meter_provider().meter_with_scope(get_scope()));
125
126 RequestMetrics {
127 route_formatter: self.route_formatter,
128 metrics: Arc::new(Metrics::new(meter)),
129 metric_attrs_from_req: self
130 .metric_attrs_from_req
131 .unwrap_or(metrics_attributes_from_request),
132 }
133 }
134}
135
136#[derive(Clone, Debug)]
175pub struct RequestMetrics {
176 route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
177 metrics: Arc<Metrics>,
178 metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
179}
180
181impl RequestMetrics {
182 pub fn builder() -> RequestMetricsBuilder {
184 RequestMetricsBuilder::new()
185 }
186}
187
188impl Default for RequestMetrics {
189 fn default() -> Self {
190 RequestMetrics::builder().build()
191 }
192}
193
194impl<S, B> dev::Transform<S, dev::ServiceRequest> for RequestMetrics
195where
196 S: dev::Service<
197 dev::ServiceRequest,
198 Response = dev::ServiceResponse<B>,
199 Error = actix_web::Error,
200 >,
201 S::Future: 'static,
202 B: MessageBody + 'static,
203{
204 type Response = dev::ServiceResponse<B>;
205 type Error = actix_web::Error;
206 type Transform = RequestMetricsMiddleware<S>;
207 type InitError = ();
208 type Future = future::Ready<Result<Self::Transform, Self::InitError>>;
209
210 fn new_transform(&self, service: S) -> Self::Future {
211 let service = RequestMetricsMiddleware {
212 service,
213 metrics: self.metrics.clone(),
214 route_formatter: self.route_formatter.clone(),
215 metric_attrs_from_req: self.metric_attrs_from_req,
216 };
217
218 future::ok(service)
219 }
220}
221
222#[allow(missing_debug_implementations)]
224pub struct RequestMetricsMiddleware<S> {
225 service: S,
226 metrics: Arc<Metrics>,
227 route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
228 metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
229}
230
231impl<S, B> dev::Service<dev::ServiceRequest> for RequestMetricsMiddleware<S>
232where
233 S: dev::Service<
234 dev::ServiceRequest,
235 Response = dev::ServiceResponse<B>,
236 Error = actix_web::Error,
237 >,
238 S::Future: 'static,
239 B: MessageBody + 'static,
240{
241 type Response = dev::ServiceResponse<B>;
242 type Error = actix_web::Error;
243 type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
244
245 dev::forward_ready!(service);
246
247 fn call(&self, req: dev::ServiceRequest) -> Self::Future {
248 let timer = SystemTime::now();
249
250 let mut http_target = req
251 .match_pattern()
252 .map(Cow::Owned)
253 .unwrap_or(Cow::Borrowed("default"));
254
255 if let Some(formatter) = &self.route_formatter {
256 http_target = Cow::Owned(formatter.format(&http_target));
257 }
258
259 let mut attributes = (self.metric_attrs_from_req)(&req, http_target);
260 self.metrics.http_server_active_requests.add(1, &attributes);
261
262 let content_length = req
263 .headers()
264 .get(CONTENT_LENGTH)
265 .and_then(|len| len.to_str().ok().and_then(|s| s.parse().ok()))
266 .unwrap_or(0);
267 self.metrics
268 .http_server_request_size
269 .record(content_length, &attributes);
270
271 let request_metrics = self.metrics.clone();
272 Box::pin(self.service.call(req).map(move |res| {
273 request_metrics
274 .http_server_active_requests
275 .add(-1, &attributes);
276
277 if let Ok(res) = res {
279 attributes.push(KeyValue::new(
280 HTTP_RESPONSE_STATUS_CODE,
281 res.status().as_u16() as i64,
282 ));
283 let response_size = match res.response().body().size() {
284 BodySize::Sized(size) => size,
285 _ => 0,
286 };
287 request_metrics
288 .http_server_response_size
289 .record(response_size, &attributes);
290
291 request_metrics.http_server_duration.record(
292 timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
293 &attributes,
294 );
295
296 Ok(res)
297 } else {
298 res
299 }
300 }))
301 }
302}
303
304#[cfg(feature = "metrics-prometheus")]
305#[cfg_attr(docsrs, doc(cfg(feature = "metrics-prometheus")))]
306pub(crate) mod prometheus {
307 use actix_web::{dev, http::StatusCode};
308 use futures_util::future::{self, LocalBoxFuture};
309 use opentelemetry_sdk::metrics::MetricError;
310 use prometheus::{Encoder, Registry, TextEncoder};
311
312 #[derive(Clone, Debug)]
314 pub struct PrometheusMetricsHandler {
315 prometheus_registry: Registry,
316 }
317
318 impl PrometheusMetricsHandler {
319 pub fn new(registry: Registry) -> Self {
321 Self {
322 prometheus_registry: registry,
323 }
324 }
325 }
326
327 impl PrometheusMetricsHandler {
328 fn metrics(&self) -> String {
329 let encoder = TextEncoder::new();
330 let metric_families = self.prometheus_registry.gather();
331 let mut buf = Vec::new();
332 if let Err(err) = encoder.encode(&metric_families[..], &mut buf) {
333 tracing::error!(
334 name: "encode_failure",
335 target: env!("CARGO_PKG_NAME"),
336 name = "encode_failure",
337 error = MetricError::Other(err.to_string()).to_string(),
338 ""
339 );
340 }
341
342 String::from_utf8(buf).unwrap_or_default()
343 }
344 }
345
346 impl dev::Handler<actix_web::HttpRequest> for PrometheusMetricsHandler {
347 type Output = Result<actix_web::HttpResponse<String>, actix_web::error::Error>;
348 type Future = LocalBoxFuture<'static, Self::Output>;
349
350 fn call(&self, _req: actix_web::HttpRequest) -> Self::Future {
351 Box::pin(future::ok(actix_web::HttpResponse::with_body(
352 StatusCode::OK,
353 self.metrics(),
354 )))
355 }
356 }
357}