tower_http/timeout/
service.rs1use crate::timeout::body::TimeoutBody;
2use http::{Request, Response, StatusCode};
3use pin_project_lite::pin_project;
4use std::{
5 future::Future,
6 pin::Pin,
7 task::{ready, Context, Poll},
8 time::Duration,
9};
10use tokio::time::Sleep;
11use tower_layer::Layer;
12use tower_service::Service;
13
14#[derive(Debug, Clone, Copy)]
18pub struct TimeoutLayer {
19 timeout: Duration,
20}
21
22impl TimeoutLayer {
23 pub fn new(timeout: Duration) -> Self {
25 TimeoutLayer { timeout }
26 }
27}
28
29impl<S> Layer<S> for TimeoutLayer {
30 type Service = Timeout<S>;
31
32 fn layer(&self, inner: S) -> Self::Service {
33 Timeout::new(inner, self.timeout)
34 }
35}
36
37#[derive(Debug, Clone, Copy)]
44pub struct Timeout<S> {
45 inner: S,
46 timeout: Duration,
47}
48
49impl<S> Timeout<S> {
50 pub fn new(inner: S, timeout: Duration) -> Self {
52 Self { inner, timeout }
53 }
54
55 define_inner_service_accessors!();
56
57 pub fn layer(timeout: Duration) -> TimeoutLayer {
61 TimeoutLayer::new(timeout)
62 }
63}
64
65impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for Timeout<S>
66where
67 S: Service<Request<ReqBody>, Response = Response<ResBody>>,
68 ResBody: Default,
69{
70 type Response = S::Response;
71 type Error = S::Error;
72 type Future = ResponseFuture<S::Future>;
73
74 #[inline]
75 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76 self.inner.poll_ready(cx)
77 }
78
79 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
80 let sleep = tokio::time::sleep(self.timeout);
81 ResponseFuture {
82 inner: self.inner.call(req),
83 sleep,
84 }
85 }
86}
87
88pin_project! {
89 pub struct ResponseFuture<F> {
91 #[pin]
92 inner: F,
93 #[pin]
94 sleep: Sleep,
95 }
96}
97
98impl<F, B, E> Future for ResponseFuture<F>
99where
100 F: Future<Output = Result<Response<B>, E>>,
101 B: Default,
102{
103 type Output = Result<Response<B>, E>;
104
105 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106 let this = self.project();
107
108 if this.sleep.poll(cx).is_ready() {
109 let mut res = Response::new(B::default());
110 *res.status_mut() = StatusCode::REQUEST_TIMEOUT;
111 return Poll::Ready(Ok(res));
112 }
113
114 this.inner.poll(cx)
115 }
116}
117
118#[derive(Clone, Debug)]
120pub struct RequestBodyTimeoutLayer {
121 timeout: Duration,
122}
123
124impl RequestBodyTimeoutLayer {
125 pub fn new(timeout: Duration) -> Self {
127 Self { timeout }
128 }
129}
130
131impl<S> Layer<S> for RequestBodyTimeoutLayer {
132 type Service = RequestBodyTimeout<S>;
133
134 fn layer(&self, inner: S) -> Self::Service {
135 RequestBodyTimeout::new(inner, self.timeout)
136 }
137}
138
139#[derive(Clone, Debug)]
141pub struct RequestBodyTimeout<S> {
142 inner: S,
143 timeout: Duration,
144}
145
146impl<S> RequestBodyTimeout<S> {
147 pub fn new(service: S, timeout: Duration) -> Self {
149 Self {
150 inner: service,
151 timeout,
152 }
153 }
154
155 pub fn layer(timeout: Duration) -> RequestBodyTimeoutLayer {
159 RequestBodyTimeoutLayer::new(timeout)
160 }
161
162 define_inner_service_accessors!();
163}
164
165impl<S, ReqBody> Service<Request<ReqBody>> for RequestBodyTimeout<S>
166where
167 S: Service<Request<TimeoutBody<ReqBody>>>,
168{
169 type Response = S::Response;
170 type Error = S::Error;
171 type Future = S::Future;
172
173 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174 self.inner.poll_ready(cx)
175 }
176
177 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
178 let req = req.map(|body| TimeoutBody::new(self.timeout, body));
179 self.inner.call(req)
180 }
181}
182
183#[derive(Clone)]
185pub struct ResponseBodyTimeoutLayer {
186 timeout: Duration,
187}
188
189impl ResponseBodyTimeoutLayer {
190 pub fn new(timeout: Duration) -> Self {
192 Self { timeout }
193 }
194}
195
196impl<S> Layer<S> for ResponseBodyTimeoutLayer {
197 type Service = ResponseBodyTimeout<S>;
198
199 fn layer(&self, inner: S) -> Self::Service {
200 ResponseBodyTimeout::new(inner, self.timeout)
201 }
202}
203
204#[derive(Clone)]
206pub struct ResponseBodyTimeout<S> {
207 inner: S,
208 timeout: Duration,
209}
210
211impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for ResponseBodyTimeout<S>
212where
213 S: Service<Request<ReqBody>, Response = Response<ResBody>>,
214{
215 type Response = Response<TimeoutBody<ResBody>>;
216 type Error = S::Error;
217 type Future = ResponseBodyTimeoutFuture<S::Future>;
218
219 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220 self.inner.poll_ready(cx)
221 }
222
223 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
224 ResponseBodyTimeoutFuture {
225 inner: self.inner.call(req),
226 timeout: self.timeout,
227 }
228 }
229}
230
231impl<S> ResponseBodyTimeout<S> {
232 pub fn new(service: S, timeout: Duration) -> Self {
234 Self {
235 inner: service,
236 timeout,
237 }
238 }
239
240 pub fn layer(timeout: Duration) -> ResponseBodyTimeoutLayer {
244 ResponseBodyTimeoutLayer::new(timeout)
245 }
246
247 define_inner_service_accessors!();
248}
249
250pin_project! {
251 pub struct ResponseBodyTimeoutFuture<Fut> {
253 #[pin]
254 inner: Fut,
255 timeout: Duration,
256 }
257}
258
259impl<Fut, ResBody, E> Future for ResponseBodyTimeoutFuture<Fut>
260where
261 Fut: Future<Output = Result<Response<ResBody>, E>>,
262{
263 type Output = Result<Response<TimeoutBody<ResBody>>, E>;
264
265 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
266 let timeout = self.timeout;
267 let this = self.project();
268 let res = ready!(this.inner.poll(cx))?;
269 Poll::Ready(Ok(res.map(|body| TimeoutBody::new(timeout, body))))
270 }
271}