rama_http/layer/retry/
policy.rs

1use super::RetryBody;
2use crate::Request;
3use rama_core::Context;
4
5/// A "retry policy" to classify if a request should be retried.
6///
7/// # Example
8///
9/// ```
10/// use rama_core::Context;
11/// use rama_http::Request;
12/// use rama_http::layer::retry::{Policy, PolicyResult, RetryBody};
13/// use std::sync::Arc;
14/// use parking_lot::Mutex;
15///
16/// struct Attempts(Arc<Mutex<usize>>);
17///
18/// impl<S, R, E> Policy<S, R, E> for Attempts
19///     where
20///         S: Clone + Send + Sync + 'static,
21///         R: Send + 'static,
22///         E: Send + Sync + 'static,
23/// {
24///     async fn retry(&self, ctx: Context<S>, req: Request<RetryBody>, result: Result<R, E>) -> PolicyResult<S, R, E> {
25///         match result {
26///             Ok(_) => {
27///                 // Treat all `Response`s as success,
28///                 // so don't retry...
29///                 PolicyResult::Abort(result)
30///             },
31///             Err(_) => {
32///                 // Treat all errors as failures...
33///                 // But we limit the number of attempts...
34///                 let mut attempts = self.0.lock();
35///                 if *attempts > 0 {
36///                     // Try again!
37///                     *attempts -= 1;
38///                     PolicyResult::Retry { ctx, req }
39///                 } else {
40///                     // Used all our attempts, no retry...
41///                     PolicyResult::Abort(result)
42///                 }
43///             }
44///         }
45///     }
46///
47///     fn clone_input(&self, ctx: &Context<S>, req: &Request<RetryBody>) -> Option<(Context<S>, Request<RetryBody>)> {
48///         Some((ctx.clone(), req.clone()))
49///     }
50/// }
51/// ```
52pub trait Policy<S, R, E>: Send + Sync + 'static {
53    /// Check the policy if a certain request should be retried.
54    ///
55    /// This method is passed a reference to the original request, and either
56    /// the [`Service::Response`] or [`Service::Error`] from the inner service.
57    ///
58    /// If the request should **not** be retried, return `None`.
59    ///
60    /// If the request *should* be retried, return `Some` future that will delay
61    /// the next retry of the request. This can be used to sleep for a certain
62    /// duration, to wait for some external condition to be met before retrying,
63    /// or resolve right away, if the request should be retried immediately.
64    ///
65    /// ## Mutating Requests
66    ///
67    /// The policy MAY chose to mutate the `req`: if the request is mutated, the
68    /// mutated request will be sent to the inner service in the next retry.
69    /// This can be helpful for use cases like tracking the retry count in a
70    /// header.
71    ///
72    /// ## Mutating Results
73    ///
74    /// The policy MAY chose to mutate the result. This enables the retry
75    /// policy to convert a failure into a success and vice versa. For example,
76    /// if the policy is used to poll while waiting for a state change, the
77    /// policy can switch the result to emit a specific error when retries are
78    /// exhausted.
79    ///
80    /// The policy can also record metadata on the request to include
81    /// information about the number of retries required or to record that a
82    /// failure failed after exhausting all retries.
83    ///
84    /// [`Service::Response`]: rama_core::Service::Response
85    /// [`Service::Error`]: rama_core::Service::Error
86    fn retry(
87        &self,
88        ctx: Context<S>,
89        req: Request<RetryBody>,
90        result: Result<R, E>,
91    ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_;
92
93    /// Tries to clone a request before being passed to the inner service.
94    ///
95    /// If the request cannot be cloned, return [`None`]. Moreover, the retry
96    /// function will not be called if the [`None`] is returned.
97    fn clone_input(
98        &self,
99        ctx: &Context<S>,
100        req: &Request<RetryBody>,
101    ) -> Option<(Context<S>, Request<RetryBody>)>;
102}
103
104impl<P, S, R, E> Policy<S, R, E> for &'static P
105where
106    P: Policy<S, R, E>,
107{
108    fn retry(
109        &self,
110        ctx: Context<S>,
111        req: Request<RetryBody>,
112        result: Result<R, E>,
113    ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_ {
114        (**self).retry(ctx, req, result)
115    }
116
117    fn clone_input(
118        &self,
119        ctx: &Context<S>,
120        req: &Request<RetryBody>,
121    ) -> Option<(Context<S>, Request<RetryBody>)> {
122        (**self).clone_input(ctx, req)
123    }
124}
125
126impl<P, S, R, E> Policy<S, R, E> for std::sync::Arc<P>
127where
128    P: Policy<S, R, E>,
129{
130    fn retry(
131        &self,
132        ctx: Context<S>,
133        req: Request<RetryBody>,
134        result: Result<R, E>,
135    ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_ {
136        (**self).retry(ctx, req, result)
137    }
138
139    fn clone_input(
140        &self,
141        ctx: &Context<S>,
142        req: &Request<RetryBody>,
143    ) -> Option<(Context<S>, Request<RetryBody>)> {
144        (**self).clone_input(ctx, req)
145    }
146}
147
148/// The full result of a limit policy.
149pub enum PolicyResult<S, R, E> {
150    /// The result should not be retried,
151    /// and the result should be returned to the caller.
152    Abort(Result<R, E>),
153    /// The result should be retried,
154    /// and the request should be passed to the inner service again.
155    Retry {
156        /// The context of the request.
157        ctx: Context<S>,
158        /// The request to be retried, with the above context.
159        req: Request<RetryBody>,
160    },
161}
162
163impl<S, R, E> std::fmt::Debug for PolicyResult<S, R, E>
164where
165    S: std::fmt::Debug,
166    R: std::fmt::Debug,
167    E: std::fmt::Debug,
168{
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        match self {
171            PolicyResult::Abort(err) => write!(f, "PolicyResult::Abort({:?})", err),
172            PolicyResult::Retry { ctx, req } => write!(
173                f,
174                "PolicyResult::Retry {{ ctx: {:?}, req: {:?} }}",
175                ctx, req
176            ),
177        }
178    }
179}
180
181macro_rules! impl_retry_policy_either {
182    ($id:ident, $($param:ident),+ $(,)?) => {
183        impl<$($param),+, State, Response, Error> Policy<State, Response, Error> for rama_core::combinators::$id<$($param),+>
184        where
185            $($param: Policy<State, Response, Error>),+,
186            State: Clone + Send + Sync + 'static,
187            Response: Send + 'static,
188            Error: Send + 'static,
189        {
190            async fn retry(
191                &self,
192                ctx: Context<State>,
193                req: rama_http_types::Request<RetryBody>,
194                result: Result<Response, Error>,
195            ) -> PolicyResult<State, Response, Error> {
196                match self {
197                    $(
198                        rama_core::combinators::$id::$param(policy) => policy.retry(ctx, req, result).await,
199                    )+
200                }
201            }
202
203            fn clone_input(
204                &self,
205                ctx: &Context<State>,
206                req: &rama_http_types::Request<RetryBody>,
207            ) -> Option<(Context<State>, rama_http_types::Request<RetryBody>)> {
208                match self {
209                    $(
210                        rama_core::combinators::$id::$param(policy) => policy.clone_input(ctx, req),
211                    )+
212                }
213            }
214        }
215    };
216}
217
218rama_core::combinators::impl_either!(impl_retry_policy_either);