rama_core/layer/timeout/
mod.rs1use super::{LayerErrorFn, LayerErrorStatic, MakeLayerError};
7use crate::{Context, Service};
8use rama_utils::macros::define_inner_service_accessors;
9use std::{fmt, time::Duration};
10
11mod error;
12#[doc(inline)]
13pub use error::Elapsed;
14
15mod layer;
16#[doc(inline)]
17pub use layer::TimeoutLayer;
18
19pub struct Timeout<S, F> {
21 inner: S,
22 into_error: F,
23 timeout: Duration,
24}
25
26impl<S, F> Timeout<S, F> {
27 define_inner_service_accessors!();
28}
29
30impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for Timeout<S, F> {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 f.debug_struct("Timeout")
33 .field("inner", &self.inner)
34 .field("into_error", &self.into_error)
35 .field("timeout", &self.timeout)
36 .finish()
37 }
38}
39
40impl<S, F> Clone for Timeout<S, F>
41where
42 S: Clone,
43 F: Clone,
44{
45 fn clone(&self) -> Self {
46 Self {
47 inner: self.inner.clone(),
48 into_error: self.into_error.clone(),
49 timeout: self.timeout,
50 }
51 }
52}
53
54impl<S> Timeout<S, LayerErrorStatic<Elapsed>> {
57 pub fn new(inner: S, timeout: Duration) -> Self {
59 Self::with_error(inner, timeout, error::Elapsed::new(timeout))
60 }
61}
62
63impl<S, E> Timeout<S, LayerErrorStatic<E>> {
64 pub fn with_error(inner: S, timeout: Duration, error: E) -> Self
67 where
68 E: Clone + Send + Sync + 'static,
69 {
70 Self {
71 inner,
72 timeout,
73 into_error: LayerErrorStatic::new(error),
74 }
75 }
76}
77
78impl<S, F> Timeout<S, LayerErrorFn<F>> {
79 pub fn with_error_fn<E>(inner: S, timeout: Duration, error_fn: F) -> Self
82 where
83 F: FnOnce() -> E + Clone + Send + Sync + 'static,
84 E: Send + 'static,
85 {
86 Self {
87 inner,
88 timeout,
89 into_error: LayerErrorFn::new(error_fn),
90 }
91 }
92}
93
94impl<S, F> Timeout<S, F>
95where
96 F: MakeLayerError,
97{
98 pub(crate) fn with(inner: S, timeout: Duration, into_error: F) -> Self {
101 Self {
102 inner,
103 timeout,
104 into_error,
105 }
106 }
107}
108
109impl<T, F, S, Request, E> Service<S, Request> for Timeout<T, F>
110where
111 Request: Send + 'static,
112 S: Clone + Send + Sync + 'static,
113 F: MakeLayerError<Error = E>,
114 E: Into<T::Error> + Send + 'static,
115 T: Service<S, Request>,
116{
117 type Response = T::Response;
118 type Error = T::Error;
119
120 async fn serve(
121 &self,
122 ctx: Context<S>,
123 request: Request,
124 ) -> Result<Self::Response, Self::Error> {
125 tokio::select! {
126 res = self.inner.serve(ctx, request) => res,
127 _ = tokio::time::sleep(self.timeout) => Err(self.into_error.make_layer_error().into()),
128 }
129 }
130}