tower_http/timeout/
service.rs

1use crate::timeout::body::TimeoutBody;
2use http::{Request, Response, StatusCode};
3use pin_project_lite::pin_project;
4use std::{
5    future::Future,
6    pin::Pin,
7    task::{ready, Context, Poll},
8    time::Duration,
9};
10use tokio::time::Sleep;
11use tower_layer::Layer;
12use tower_service::Service;
13
14/// Layer that applies the [`Timeout`] middleware which apply a timeout to requests.
15///
16/// See the [module docs](super) for an example.
17#[derive(Debug, Clone, Copy)]
18pub struct TimeoutLayer {
19    timeout: Duration,
20}
21
22impl TimeoutLayer {
23    /// Creates a new [`TimeoutLayer`].
24    pub fn new(timeout: Duration) -> Self {
25        TimeoutLayer { timeout }
26    }
27}
28
29impl<S> Layer<S> for TimeoutLayer {
30    type Service = Timeout<S>;
31
32    fn layer(&self, inner: S) -> Self::Service {
33        Timeout::new(inner, self.timeout)
34    }
35}
36
37/// Middleware which apply a timeout to requests.
38///
39/// If the request does not complete within the specified timeout it will be aborted and a `408
40/// Request Timeout` response will be sent.
41///
42/// See the [module docs](super) for an example.
43#[derive(Debug, Clone, Copy)]
44pub struct Timeout<S> {
45    inner: S,
46    timeout: Duration,
47}
48
49impl<S> Timeout<S> {
50    /// Creates a new [`Timeout`].
51    pub fn new(inner: S, timeout: Duration) -> Self {
52        Self { inner, timeout }
53    }
54
55    define_inner_service_accessors!();
56
57    /// Returns a new [`Layer`] that wraps services with a `Timeout` middleware.
58    ///
59    /// [`Layer`]: tower_layer::Layer
60    pub fn layer(timeout: Duration) -> TimeoutLayer {
61        TimeoutLayer::new(timeout)
62    }
63}
64
65impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for Timeout<S>
66where
67    S: Service<Request<ReqBody>, Response = Response<ResBody>>,
68    ResBody: Default,
69{
70    type Response = S::Response;
71    type Error = S::Error;
72    type Future = ResponseFuture<S::Future>;
73
74    #[inline]
75    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76        self.inner.poll_ready(cx)
77    }
78
79    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
80        let sleep = tokio::time::sleep(self.timeout);
81        ResponseFuture {
82            inner: self.inner.call(req),
83            sleep,
84        }
85    }
86}
87
88pin_project! {
89    /// Response future for [`Timeout`].
90    pub struct ResponseFuture<F> {
91        #[pin]
92        inner: F,
93        #[pin]
94        sleep: Sleep,
95    }
96}
97
98impl<F, B, E> Future for ResponseFuture<F>
99where
100    F: Future<Output = Result<Response<B>, E>>,
101    B: Default,
102{
103    type Output = Result<Response<B>, E>;
104
105    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106        let this = self.project();
107
108        if this.sleep.poll(cx).is_ready() {
109            let mut res = Response::new(B::default());
110            *res.status_mut() = StatusCode::REQUEST_TIMEOUT;
111            return Poll::Ready(Ok(res));
112        }
113
114        this.inner.poll(cx)
115    }
116}
117
118/// Applies a [`TimeoutBody`] to the request body.
119#[derive(Clone, Debug)]
120pub struct RequestBodyTimeoutLayer {
121    timeout: Duration,
122}
123
124impl RequestBodyTimeoutLayer {
125    /// Creates a new [`RequestBodyTimeoutLayer`].
126    pub fn new(timeout: Duration) -> Self {
127        Self { timeout }
128    }
129}
130
131impl<S> Layer<S> for RequestBodyTimeoutLayer {
132    type Service = RequestBodyTimeout<S>;
133
134    fn layer(&self, inner: S) -> Self::Service {
135        RequestBodyTimeout::new(inner, self.timeout)
136    }
137}
138
139/// Applies a [`TimeoutBody`] to the request body.
140#[derive(Clone, Debug)]
141pub struct RequestBodyTimeout<S> {
142    inner: S,
143    timeout: Duration,
144}
145
146impl<S> RequestBodyTimeout<S> {
147    /// Creates a new [`RequestBodyTimeout`].
148    pub fn new(service: S, timeout: Duration) -> Self {
149        Self {
150            inner: service,
151            timeout,
152        }
153    }
154
155    /// Returns a new [`Layer`] that wraps services with a [`RequestBodyTimeoutLayer`] middleware.
156    ///
157    /// [`Layer`]: tower_layer::Layer
158    pub fn layer(timeout: Duration) -> RequestBodyTimeoutLayer {
159        RequestBodyTimeoutLayer::new(timeout)
160    }
161
162    define_inner_service_accessors!();
163}
164
165impl<S, ReqBody> Service<Request<ReqBody>> for RequestBodyTimeout<S>
166where
167    S: Service<Request<TimeoutBody<ReqBody>>>,
168{
169    type Response = S::Response;
170    type Error = S::Error;
171    type Future = S::Future;
172
173    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174        self.inner.poll_ready(cx)
175    }
176
177    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
178        let req = req.map(|body| TimeoutBody::new(self.timeout, body));
179        self.inner.call(req)
180    }
181}
182
183/// Applies a [`TimeoutBody`] to the response body.
184#[derive(Clone)]
185pub struct ResponseBodyTimeoutLayer {
186    timeout: Duration,
187}
188
189impl ResponseBodyTimeoutLayer {
190    /// Creates a new [`ResponseBodyTimeoutLayer`].
191    pub fn new(timeout: Duration) -> Self {
192        Self { timeout }
193    }
194}
195
196impl<S> Layer<S> for ResponseBodyTimeoutLayer {
197    type Service = ResponseBodyTimeout<S>;
198
199    fn layer(&self, inner: S) -> Self::Service {
200        ResponseBodyTimeout::new(inner, self.timeout)
201    }
202}
203
204/// Applies a [`TimeoutBody`] to the response body.
205#[derive(Clone)]
206pub struct ResponseBodyTimeout<S> {
207    inner: S,
208    timeout: Duration,
209}
210
211impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for ResponseBodyTimeout<S>
212where
213    S: Service<Request<ReqBody>, Response = Response<ResBody>>,
214{
215    type Response = Response<TimeoutBody<ResBody>>;
216    type Error = S::Error;
217    type Future = ResponseBodyTimeoutFuture<S::Future>;
218
219    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220        self.inner.poll_ready(cx)
221    }
222
223    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
224        ResponseBodyTimeoutFuture {
225            inner: self.inner.call(req),
226            timeout: self.timeout,
227        }
228    }
229}
230
231impl<S> ResponseBodyTimeout<S> {
232    /// Creates a new [`ResponseBodyTimeout`].
233    pub fn new(service: S, timeout: Duration) -> Self {
234        Self {
235            inner: service,
236            timeout,
237        }
238    }
239
240    /// Returns a new [`Layer`] that wraps services with a [`ResponseBodyTimeoutLayer`] middleware.
241    ///
242    /// [`Layer`]: tower_layer::Layer
243    pub fn layer(timeout: Duration) -> ResponseBodyTimeoutLayer {
244        ResponseBodyTimeoutLayer::new(timeout)
245    }
246
247    define_inner_service_accessors!();
248}
249
250pin_project! {
251    /// Response future for [`ResponseBodyTimeout`].
252    pub struct ResponseBodyTimeoutFuture<Fut> {
253        #[pin]
254        inner: Fut,
255        timeout: Duration,
256    }
257}
258
259impl<Fut, ResBody, E> Future for ResponseBodyTimeoutFuture<Fut>
260where
261    Fut: Future<Output = Result<Response<ResBody>, E>>,
262{
263    type Output = Result<Response<TimeoutBody<ResBody>>, E>;
264
265    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
266        let timeout = self.timeout;
267        let this = self.project();
268        let res = ready!(this.inner.poll(cx))?;
269        Poll::Ready(Ok(res.map(|body| TimeoutBody::new(timeout, body))))
270    }
271}