tower_http_metrics/server/
future.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5    time::Instant,
6};
7
8use futures_util::ready;
9use http::Response as HttpResponse;
10use metrics::{increment_counter, increment_gauge};
11use pin_project::pin_project;
12
13use crate::server::body::InstrumentedBody;
14use crate::{HTTP_SERVER_REQUESTS_PENDING, HTTP_SERVER_REQUESTS_TOTAL};
15
16#[pin_project]
17pub struct InstrumentedFuture<F> {
18    #[pin]
19    inner: F,
20    start: Instant,
21    method: &'static str,
22}
23
24impl<F> InstrumentedFuture<F> {
25    pub(crate) fn new(inner: F, method: &'static str) -> Self {
26        let labels = [("method", method)];
27        increment_counter!(HTTP_SERVER_REQUESTS_TOTAL, &labels);
28        increment_gauge!(HTTP_SERVER_REQUESTS_PENDING, 1.0, &labels);
29
30        Self {
31            inner,
32            start: Instant::now(),
33            method,
34        }
35    }
36}
37
38impl<F, B, E> Future for InstrumentedFuture<F>
39where
40    F: Future<Output = Result<HttpResponse<B>, E>>,
41{
42    type Output = Result<HttpResponse<InstrumentedBody<B>>, E>;
43
44    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        let this = self.project();
46        match ready!(this.inner.poll(cx)) {
47            Ok(res) => {
48                let status_code = res.status().as_u16();
49                Poll::Ready(Ok(res.map(|b| {
50                    InstrumentedBody::new(b, *this.start, this.method, status_code)
51                })))
52            }
53            Err(err) => Poll::Ready(Err(err)),
54        }
55    }
56}