1use pin_project_lite::pin_project;
2use std::time::Duration;
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{ready, Context, Poll},
7};
8use tokio::time::Instant;
9use tower_service::Service;
10
11pub trait Record {
15 fn record(&mut self, latency: Duration);
16}
17
18#[derive(Clone, Debug)]
21pub struct Latency<R, S> {
22 rec: R,
23 service: S,
24}
25
26pin_project! {
27 #[derive(Debug)]
28 pub struct ResponseFuture<R, F> {
29 start: Instant,
30 rec: R,
31 #[pin]
32 inner: F,
33 }
34}
35
36impl<S, R> Latency<R, S>
37where
38 R: Record + Clone,
39{
40 pub const fn new<Request>(rec: R, service: S) -> Self
41 where
42 S: Service<Request>,
43 S::Error: Into<crate::BoxError>,
44 {
45 Latency { rec, service }
46 }
47}
48
49impl<S, R, Request> Service<Request> for Latency<R, S>
50where
51 S: Service<Request>,
52 S::Error: Into<crate::BoxError>,
53 R: Record + Clone,
54{
55 type Response = S::Response;
56 type Error = crate::BoxError;
57 type Future = ResponseFuture<R, S::Future>;
58
59 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
60 self.service.poll_ready(cx).map_err(Into::into)
61 }
62
63 fn call(&mut self, request: Request) -> Self::Future {
64 ResponseFuture {
65 start: Instant::now(),
66 rec: self.rec.clone(),
67 inner: self.service.call(request),
68 }
69 }
70}
71
72impl<R, F, T, E> Future for ResponseFuture<R, F>
73where
74 R: Record,
75 F: Future<Output = Result<T, E>>,
76 E: Into<crate::BoxError>,
77{
78 type Output = Result<T, crate::BoxError>;
79
80 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81 let this = self.project();
82
83 let rsp = ready!(this.inner.poll(cx)).map_err(Into::into)?;
84 let duration = Instant::now().saturating_duration_since(*this.start);
85 this.rec.record(duration);
86 Poll::Ready(Ok(rsp))
87 }
88}