tower/hedge/
latency.rs

1use pin_project_lite::pin_project;
2use std::time::Duration;
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{ready, Context, Poll},
7};
8use tokio::time::Instant;
9use tower_service::Service;
10
11/// Record is the interface for accepting request latency measurements.  When
12/// a request completes, record is called with the elapsed duration between
13/// when the service was called and when the future completed.
14pub trait Record {
15    fn record(&mut self, latency: Duration);
16}
17
18/// Latency is a middleware that measures request latency and records it to the
19/// provided Record instance.
20#[derive(Clone, Debug)]
21pub struct Latency<R, S> {
22    rec: R,
23    service: S,
24}
25
26pin_project! {
27    #[derive(Debug)]
28    pub struct ResponseFuture<R, F> {
29        start: Instant,
30        rec: R,
31        #[pin]
32        inner: F,
33    }
34}
35
36impl<S, R> Latency<R, S>
37where
38    R: Record + Clone,
39{
40    pub const fn new<Request>(rec: R, service: S) -> Self
41    where
42        S: Service<Request>,
43        S::Error: Into<crate::BoxError>,
44    {
45        Latency { rec, service }
46    }
47}
48
49impl<S, R, Request> Service<Request> for Latency<R, S>
50where
51    S: Service<Request>,
52    S::Error: Into<crate::BoxError>,
53    R: Record + Clone,
54{
55    type Response = S::Response;
56    type Error = crate::BoxError;
57    type Future = ResponseFuture<R, S::Future>;
58
59    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
60        self.service.poll_ready(cx).map_err(Into::into)
61    }
62
63    fn call(&mut self, request: Request) -> Self::Future {
64        ResponseFuture {
65            start: Instant::now(),
66            rec: self.rec.clone(),
67            inner: self.service.call(request),
68        }
69    }
70}
71
72impl<R, F, T, E> Future for ResponseFuture<R, F>
73where
74    R: Record,
75    F: Future<Output = Result<T, E>>,
76    E: Into<crate::BoxError>,
77{
78    type Output = Result<T, crate::BoxError>;
79
80    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81        let this = self.project();
82
83        let rsp = ready!(this.inner.poll(cx)).map_err(Into::into)?;
84        let duration = Instant::now().saturating_duration_since(*this.start);
85        this.rec.record(duration);
86        Poll::Ready(Ok(rsp))
87    }
88}