Skip to main content

datapress_core/
timeout.rs

1//! Tiny per-request timeout middleware.
2//!
3//! When a request takes longer than `Timeout::duration` to produce a
4//! response, the handler future is dropped and the client gets a
5//! `504 Gateway Timeout`. The work the handler started may still finish
6//! in the background until the next `.await` point — that's a property
7//! of cooperative scheduling, not of this middleware.
8
9use std::future::{Future, ready};
10use std::pin::Pin;
11use std::time::Duration;
12
13use actix_web::{
14    Error, HttpResponse,
15    body::{BoxBody, EitherBody},
16    dev::{Service, ServiceRequest, ServiceResponse, Transform, forward_ready},
17    error::InternalError,
18};
19
20#[derive(Clone)]
21pub struct Timeout {
22    duration: Duration,
23}
24
25impl Timeout {
26    pub fn new(duration: Duration) -> Self {
27        Self { duration }
28    }
29}
30
31impl<S, B> Transform<S, ServiceRequest> for Timeout
32where
33    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
34    S::Future: 'static,
35    B: 'static,
36{
37    type Response = ServiceResponse<EitherBody<B, BoxBody>>;
38    type Error = Error;
39    type Transform = TimeoutMiddleware<S>;
40    type InitError = ();
41    type Future = std::future::Ready<Result<Self::Transform, Self::InitError>>;
42
43    fn new_transform(&self, service: S) -> Self::Future {
44        ready(Ok(TimeoutMiddleware {
45            service,
46            duration: self.duration,
47        }))
48    }
49}
50
51pub struct TimeoutMiddleware<S> {
52    service: S,
53    duration: Duration,
54}
55
56impl<S, B> Service<ServiceRequest> for TimeoutMiddleware<S>
57where
58    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
59    S::Future: 'static,
60    B: 'static,
61{
62    type Response = ServiceResponse<EitherBody<B, BoxBody>>;
63    type Error = Error;
64    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
65
66    forward_ready!(service);
67
68    fn call(&self, req: ServiceRequest) -> Self::Future {
69        let duration = self.duration;
70        // We must NOT clone the inner `HttpRequest` before handing the
71        // `ServiceRequest` off to the next service: actix's routing layer
72        // calls `match_info_mut` (-> `Rc::get_mut().unwrap()`) on the
73        // request, which panics if any other clone of the inner Rc is
74        // still alive. So we capture just the bits we need for logging
75        // by value, and on timeout return an `InternalError` carrying a
76        // pre-built 504 response — actix renders it without ever needing
77        // the original request.
78        let method = req.method().clone();
79        let path = req.path().to_owned();
80        let fut = self.service.call(req);
81        Box::pin(async move {
82            match tokio::time::timeout(duration, fut).await {
83                Ok(Ok(resp)) => Ok(resp.map_into_left_body()),
84                Ok(Err(e)) => Err(e),
85                Err(_) => {
86                    log::warn!(
87                        "request {method} {path} exceeded timeout of {} ms",
88                        duration.as_millis(),
89                    );
90                    let resp = HttpResponse::GatewayTimeout()
91                        .content_type("application/json")
92                        .body(r#"{"error":"request timed out"}"#);
93                    Err(InternalError::from_response("", resp).into())
94                }
95            }
96        })
97    }
98}