rama_core/layer/timeout/
mod.rs

1//! Middleware that applies a timeout to requests.
2//!
3//! If the response does not complete within the specified timeout, the response
4//! will be aborted.
5
6use super::{LayerErrorFn, LayerErrorStatic, MakeLayerError};
7use crate::{Context, Service};
8use rama_utils::macros::define_inner_service_accessors;
9use std::{fmt, time::Duration};
10
11mod error;
12#[doc(inline)]
13pub use error::Elapsed;
14
15mod layer;
16#[doc(inline)]
17pub use layer::TimeoutLayer;
18
19/// Applies a timeout to requests.
20pub struct Timeout<S, F> {
21    inner: S,
22    into_error: F,
23    timeout: Duration,
24}
25
26impl<S, F> Timeout<S, F> {
27    define_inner_service_accessors!();
28}
29
30impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for Timeout<S, F> {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        f.debug_struct("Timeout")
33            .field("inner", &self.inner)
34            .field("into_error", &self.into_error)
35            .field("timeout", &self.timeout)
36            .finish()
37    }
38}
39
40impl<S, F> Clone for Timeout<S, F>
41where
42    S: Clone,
43    F: Clone,
44{
45    fn clone(&self) -> Self {
46        Self {
47            inner: self.inner.clone(),
48            into_error: self.into_error.clone(),
49            timeout: self.timeout,
50        }
51    }
52}
53
54// ===== impl Timeout =====
55
56impl<S> Timeout<S, LayerErrorStatic<Elapsed>> {
57    /// Creates a new [`Timeout`]
58    pub fn new(inner: S, timeout: Duration) -> Self {
59        Self::with_error(inner, timeout, error::Elapsed::new(timeout))
60    }
61}
62
63impl<S, E> Timeout<S, LayerErrorStatic<E>> {
64    /// Creates a new [`Timeout`] with a custom error
65    /// value.
66    pub fn with_error(inner: S, timeout: Duration, error: E) -> Self
67    where
68        E: Clone + Send + Sync + 'static,
69    {
70        Self {
71            inner,
72            timeout,
73            into_error: LayerErrorStatic::new(error),
74        }
75    }
76}
77
78impl<S, F> Timeout<S, LayerErrorFn<F>> {
79    /// Creates a new [`Timeout`] with a custom error
80    /// function.
81    pub fn with_error_fn<E>(inner: S, timeout: Duration, error_fn: F) -> Self
82    where
83        F: FnOnce() -> E + Clone + Send + Sync + 'static,
84        E: Send + 'static,
85    {
86        Self {
87            inner,
88            timeout,
89            into_error: LayerErrorFn::new(error_fn),
90        }
91    }
92}
93
94impl<S, F> Timeout<S, F>
95where
96    F: MakeLayerError,
97{
98    /// Creates a new [`Timeout`] with a custom error
99    /// value.
100    pub(crate) fn with(inner: S, timeout: Duration, into_error: F) -> Self {
101        Self {
102            inner,
103            timeout,
104            into_error,
105        }
106    }
107}
108
109impl<T, F, S, Request, E> Service<S, Request> for Timeout<T, F>
110where
111    Request: Send + 'static,
112    S: Clone + Send + Sync + 'static,
113    F: MakeLayerError<Error = E>,
114    E: Into<T::Error> + Send + 'static,
115    T: Service<S, Request>,
116{
117    type Response = T::Response;
118    type Error = T::Error;
119
120    async fn serve(
121        &self,
122        ctx: Context<S>,
123        request: Request,
124    ) -> Result<Self::Response, Self::Error> {
125        tokio::select! {
126            res = self.inner.serve(ctx, request) => res,
127            _ = tokio::time::sleep(self.timeout) => Err(self.into_error.make_layer_error().into()),
128        }
129    }
130}