actix_web_opentelemetry/middleware/
metrics.rs

1//! # Metrics Middleware
2
3use actix_http::{
4    body::{BodySize, MessageBody},
5    header::CONTENT_LENGTH,
6};
7use actix_web::dev;
8use futures_util::future::{self, FutureExt as _, LocalBoxFuture};
9use opentelemetry::{
10    global,
11    metrics::{Histogram, Meter, MeterProvider, UpDownCounter},
12    KeyValue,
13};
14use std::borrow::Cow;
15use std::{sync::Arc, time::SystemTime};
16
17use super::get_scope;
18use crate::util::metrics_attributes_from_request;
19use crate::RouteFormatter;
20
21// Follows the experimental semantic conventions for HTTP metrics:
22// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/http-metrics.md
23use opentelemetry_semantic_conventions::trace::HTTP_RESPONSE_STATUS_CODE;
24
25const HTTP_SERVER_DURATION: &str = "http.server.duration";
26const HTTP_SERVER_ACTIVE_REQUESTS: &str = "http.server.active_requests";
27const HTTP_SERVER_REQUEST_SIZE: &str = "http.server.request.size";
28const HTTP_SERVER_RESPONSE_SIZE: &str = "http.server.response.size";
29
30/// Records http server metrics
31///
32/// See the [spec] for details.
33///
34/// [spec]: https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-metrics.md#http-server
35#[derive(Clone, Debug)]
36struct Metrics {
37    http_server_duration: Histogram<f64>,
38    http_server_active_requests: UpDownCounter<i64>,
39    http_server_request_size: Histogram<u64>,
40    http_server_response_size: Histogram<u64>,
41}
42
43impl Metrics {
44    /// Create a new [`RequestMetrics`]
45    fn new(meter: Meter) -> Self {
46        let http_server_duration = meter
47            .f64_histogram(HTTP_SERVER_DURATION)
48            .with_description("Measures the duration of inbound HTTP requests.")
49            .with_unit("s")
50            .build();
51
52        let http_server_active_requests = meter
53            .i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
54            .with_description(
55                "Measures the number of concurrent HTTP requests that are currently in-flight.",
56            )
57            .build();
58
59        let http_server_request_size = meter
60            .u64_histogram(HTTP_SERVER_REQUEST_SIZE)
61            .with_description("Measures the size of HTTP request messages (compressed).")
62            .with_unit("By")
63            .build();
64
65        let http_server_response_size = meter
66            .u64_histogram(HTTP_SERVER_RESPONSE_SIZE)
67            .with_description("Measures the size of HTTP response messages (compressed).")
68            .with_unit("By")
69            .build();
70
71        Metrics {
72            http_server_active_requests,
73            http_server_duration,
74            http_server_request_size,
75            http_server_response_size,
76        }
77    }
78}
79
80type MetricsAttrsFromReqFn = fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>;
81
82/// Builder for [RequestMetrics]
83#[derive(Clone, Debug, Default)]
84pub struct RequestMetricsBuilder {
85    route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
86    meter: Option<Meter>,
87    metric_attrs_from_req: Option<MetricsAttrsFromReqFn>,
88}
89
90impl RequestMetricsBuilder {
91    /// Create a new `RequestMetricsBuilder`
92    pub fn new() -> Self {
93        Self::default()
94    }
95
96    /// Add a route formatter to customize metrics match patterns
97    pub fn with_route_formatter<R>(mut self, route_formatter: R) -> Self
98    where
99        R: RouteFormatter + Send + Sync + 'static,
100    {
101        self.route_formatter = Some(Arc::new(route_formatter));
102        self
103    }
104
105    /// Set the meter provider this middleware should use to construct meters
106    pub fn with_meter_provider(mut self, meter_provider: impl MeterProvider) -> Self {
107        self.meter = Some(meter_provider.meter_with_scope(get_scope()));
108        self
109    }
110
111    /// Set a metric attrs function that the middleware will use to create metric attributes
112    pub fn with_metric_attrs_from_req(
113        mut self,
114        metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
115    ) -> Self {
116        self.metric_attrs_from_req = Some(metric_attrs_from_req);
117        self
118    }
119
120    /// Build the `RequestMetrics` middleware
121    pub fn build(self) -> RequestMetrics {
122        let meter = self
123            .meter
124            .unwrap_or_else(|| global::meter_provider().meter_with_scope(get_scope()));
125
126        RequestMetrics {
127            route_formatter: self.route_formatter,
128            metrics: Arc::new(Metrics::new(meter)),
129            metric_attrs_from_req: self
130                .metric_attrs_from_req
131                .unwrap_or(metrics_attributes_from_request),
132        }
133    }
134}
135
136/// Request metrics tracking
137///
138/// # Examples
139///
140/// ```no_run
141/// use actix_web::{dev, http, web, App, HttpRequest, HttpServer};
142/// use actix_web_opentelemetry::{PrometheusMetricsHandler, RequestMetrics, RequestTracing};
143/// use opentelemetry::global;
144/// use opentelemetry_sdk::metrics::SdkMeterProvider;
145///
146/// #[actix_web::main]
147/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
148///     // Configure prometheus or your preferred metrics service
149///     let registry = prometheus::Registry::new();
150///     let exporter = opentelemetry_prometheus::exporter()
151///         .with_registry(registry.clone())
152///         .build()?;
153///
154///     // set up your meter provider with your exporter(s)
155///     let provider = SdkMeterProvider::builder()
156///         .with_reader(exporter)
157///         .build();
158///     global::set_meter_provider(provider);
159///
160///     // Run actix server, metrics are now available at http://localhost:8080/metrics
161///     HttpServer::new(move || {
162///         App::new()
163///             .wrap(RequestTracing::new())
164///             .wrap(RequestMetrics::default())
165///             .route("/metrics", web::get().to(PrometheusMetricsHandler::new(registry.clone())))
166///         })
167///         .bind("localhost:8080")?
168///         .run()
169///         .await?;
170///
171///     Ok(())
172/// }
173/// ```
174#[derive(Clone, Debug)]
175pub struct RequestMetrics {
176    route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
177    metrics: Arc<Metrics>,
178    metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
179}
180
181impl RequestMetrics {
182    /// Create a builder to configure this middleware
183    pub fn builder() -> RequestMetricsBuilder {
184        RequestMetricsBuilder::new()
185    }
186}
187
188impl Default for RequestMetrics {
189    fn default() -> Self {
190        RequestMetrics::builder().build()
191    }
192}
193
194impl<S, B> dev::Transform<S, dev::ServiceRequest> for RequestMetrics
195where
196    S: dev::Service<
197        dev::ServiceRequest,
198        Response = dev::ServiceResponse<B>,
199        Error = actix_web::Error,
200    >,
201    S::Future: 'static,
202    B: MessageBody + 'static,
203{
204    type Response = dev::ServiceResponse<B>;
205    type Error = actix_web::Error;
206    type Transform = RequestMetricsMiddleware<S>;
207    type InitError = ();
208    type Future = future::Ready<Result<Self::Transform, Self::InitError>>;
209
210    fn new_transform(&self, service: S) -> Self::Future {
211        let service = RequestMetricsMiddleware {
212            service,
213            metrics: self.metrics.clone(),
214            route_formatter: self.route_formatter.clone(),
215            metric_attrs_from_req: self.metric_attrs_from_req,
216        };
217
218        future::ok(service)
219    }
220}
221
222/// Request metrics middleware
223#[allow(missing_debug_implementations)]
224pub struct RequestMetricsMiddleware<S> {
225    service: S,
226    metrics: Arc<Metrics>,
227    route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
228    metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
229}
230
231impl<S, B> dev::Service<dev::ServiceRequest> for RequestMetricsMiddleware<S>
232where
233    S: dev::Service<
234        dev::ServiceRequest,
235        Response = dev::ServiceResponse<B>,
236        Error = actix_web::Error,
237    >,
238    S::Future: 'static,
239    B: MessageBody + 'static,
240{
241    type Response = dev::ServiceResponse<B>;
242    type Error = actix_web::Error;
243    type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
244
245    dev::forward_ready!(service);
246
247    fn call(&self, req: dev::ServiceRequest) -> Self::Future {
248        let timer = SystemTime::now();
249
250        let mut http_target = req
251            .match_pattern()
252            .map(Cow::Owned)
253            .unwrap_or(Cow::Borrowed("default"));
254
255        if let Some(formatter) = &self.route_formatter {
256            http_target = Cow::Owned(formatter.format(&http_target));
257        }
258
259        let mut attributes = (self.metric_attrs_from_req)(&req, http_target);
260        self.metrics.http_server_active_requests.add(1, &attributes);
261
262        let content_length = req
263            .headers()
264            .get(CONTENT_LENGTH)
265            .and_then(|len| len.to_str().ok().and_then(|s| s.parse().ok()))
266            .unwrap_or(0);
267        self.metrics
268            .http_server_request_size
269            .record(content_length, &attributes);
270
271        let request_metrics = self.metrics.clone();
272        Box::pin(self.service.call(req).map(move |res| {
273            request_metrics
274                .http_server_active_requests
275                .add(-1, &attributes);
276
277            // Ignore actix errors for metrics
278            if let Ok(res) = res {
279                attributes.push(KeyValue::new(
280                    HTTP_RESPONSE_STATUS_CODE,
281                    res.status().as_u16() as i64,
282                ));
283                let response_size = match res.response().body().size() {
284                    BodySize::Sized(size) => size,
285                    _ => 0,
286                };
287                request_metrics
288                    .http_server_response_size
289                    .record(response_size, &attributes);
290
291                request_metrics.http_server_duration.record(
292                    timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
293                    &attributes,
294                );
295
296                Ok(res)
297            } else {
298                res
299            }
300        }))
301    }
302}
303
304#[cfg(feature = "metrics-prometheus")]
305#[cfg_attr(docsrs, doc(cfg(feature = "metrics-prometheus")))]
306pub(crate) mod prometheus {
307    use actix_web::{dev, http::StatusCode};
308    use futures_util::future::{self, LocalBoxFuture};
309    use opentelemetry_sdk::metrics::MetricError;
310    use prometheus::{Encoder, Registry, TextEncoder};
311
312    /// Prometheus request metrics service
313    #[derive(Clone, Debug)]
314    pub struct PrometheusMetricsHandler {
315        prometheus_registry: Registry,
316    }
317
318    impl PrometheusMetricsHandler {
319        /// Build a route to serve Prometheus metrics
320        pub fn new(registry: Registry) -> Self {
321            Self {
322                prometheus_registry: registry,
323            }
324        }
325    }
326
327    impl PrometheusMetricsHandler {
328        fn metrics(&self) -> String {
329            let encoder = TextEncoder::new();
330            let metric_families = self.prometheus_registry.gather();
331            let mut buf = Vec::new();
332            if let Err(err) = encoder.encode(&metric_families[..], &mut buf) {
333                tracing::error!(
334                    name: "encode_failure",
335                    target: env!("CARGO_PKG_NAME"),
336                    name = "encode_failure",
337                    error = MetricError::Other(err.to_string()).to_string(),
338                    ""
339                );
340            }
341
342            String::from_utf8(buf).unwrap_or_default()
343        }
344    }
345
346    impl dev::Handler<actix_web::HttpRequest> for PrometheusMetricsHandler {
347        type Output = Result<actix_web::HttpResponse<String>, actix_web::error::Error>;
348        type Future = LocalBoxFuture<'static, Self::Output>;
349
350        fn call(&self, _req: actix_web::HttpRequest) -> Self::Future {
351            Box::pin(future::ok(actix_web::HttpResponse::with_body(
352                StatusCode::OK,
353                self.metrics(),
354            )))
355        }
356    }
357}