Skip to main content

metriki_tower/
http.rs

1use std::sync::Arc;
2use std::task::{Context, Poll};
3
4use derive_builder::Builder;
5use futures::{FutureExt, TryFutureExt};
6use hyper::{Body, Request, Response};
7use metriki_core::metrics::TimerContextArc;
8use metriki_core::MetricsRegistry;
9use tower_layer::Layer;
10use tower_service::Service;
11
12use crate::common::ResultFuture;
13
14/// The Tower service designed for metering hyper stack
15///
16/// Current provided metrics:
17///
18/// * Timer all requests: `metric_name.all`
19/// * Timers by request method: eg, `metric_name.GET`
20/// * Meters by response status code family: eg, `metric_name.2xx`
21/// * Inflight request counter: `metric_name.inflight`
22/// * Meter for unhandled error: `metric_name.error`
23///
24#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
25#[derive(Debug, Clone)]
26pub struct HyperMetricsService<S> {
27    registry: Arc<MetricsRegistry>,
28    base_metric_name: String,
29    inner: S,
30}
31
32// A sample data structure of hyper request
33//
34// Request {
35//     method: GET,
36//     uri: /,
37//     version: HTTP/1.1,
38//     headers: {
39//         "host": "localhost:3000",
40//         "user-agent": "curl/7.78.0",
41//         "accept": "*/*",
42//     },
43//     body: Body(
44//         Empty,
45//     ),
46// }
47
48impl<S, RespBody> Service<Request<Body>> for HyperMetricsService<S>
49where
50    S: Service<Request<Body>, Response = Response<RespBody>> + Send,
51    S::Future: Send + 'static,
52{
53    type Response = Response<RespBody>;
54    type Error = S::Error;
55    type Future = ResultFuture<Self::Response, Self::Error>;
56
57    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58        self.inner.poll_ready(cx)
59    }
60
61    fn call(&mut self, req: Request<Body>) -> Self::Future {
62        let registry = self.registry.clone();
63        let name = Arc::new(self.base_metric_name.clone());
64
65        let request_timer = registry.timer(&format!("{}.all", name));
66        let method_timer = registry.timer(&format!("{}.{}", name, req.method().as_str()));
67        let request_timer_ctx = TimerContextArc::start(request_timer);
68        let method_timer_ctx = TimerContextArc::start(method_timer);
69
70        registry.counter(&format!("{}.inflight", name)).inc(1);
71
72        // this is bad :(
73        let inner_registry_err = registry.clone();
74        let inner_name_err = name.clone();
75
76        let f = self
77            .inner
78            .call(req)
79            .map(move |resp| {
80                // timers
81                request_timer_ctx.stop();
82                method_timer_ctx.stop();
83
84                // inflight request counter
85                registry.counter(&format!("{}.inflight", name)).dec(1);
86
87                if let Ok(ref resp) = resp {
88                    // meters by status code family, 2xx, 3xx, 4xx and 5xx
89                    let status_family = resp.status().as_u16() / 100;
90                    registry
91                        .meter(&format!("{}.{}xx", name, status_family))
92                        .mark();
93                }
94
95                resp
96            })
97            .map_err(move |e| {
98                // error meter
99                inner_registry_err
100                    .meter(&format!("{}.error", inner_name_err))
101                    .mark();
102
103                // inflight request counter
104                inner_registry_err
105                    .counter(&format!("{}.inflight", inner_name_err))
106                    .dec(1);
107
108                e
109            });
110
111        Box::pin(f)
112    }
113}
114
115/// Tower layer to create [`HyperMetricsService`]
116///
117/// Use [`HyperMetricsLayerBuilder`] to create.
118#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
119#[derive(Builder, Debug, Clone)]
120pub struct HyperMetricsLayer {
121    registry: Arc<MetricsRegistry>,
122    #[builder(setter(into), default = "\"requests\".to_owned()")]
123    base_metric_name: String,
124}
125
126impl<S> Layer<S> for HyperMetricsLayer {
127    type Service = HyperMetricsService<S>;
128
129    fn layer(&self, service: S) -> Self::Service {
130        HyperMetricsService {
131            registry: self.registry.clone(),
132            inner: service,
133            base_metric_name: self.base_metric_name.clone(),
134        }
135    }
136}