rama_http/layer/retry/
policy.rs

1use super::RetryBody;
2use crate::Request;
3use rama_core::Context;
4use std::future::Future;
5
6/// A "retry policy" to classify if a request should be retried.
7///
8/// # Example
9///
10/// ```
11/// use rama_core::Context;
12/// use rama_http::Request;
13/// use rama_http::layer::retry::{Policy, PolicyResult, RetryBody};
14/// use std::sync::Arc;
15/// use parking_lot::Mutex;
16///
17/// struct Attempts(Arc<Mutex<usize>>);
18///
19/// impl<S, R, E> Policy<S, R, E> for Attempts
20///     where
21///         S: Clone + Send + Sync + 'static,
22///         R: Send + 'static,
23///         E: Send + Sync + 'static,
24/// {
25///     async fn retry(&self, ctx: Context<S>, req: Request<RetryBody>, result: Result<R, E>) -> PolicyResult<S, R, E> {
26///         match result {
27///             Ok(_) => {
28///                 // Treat all `Response`s as success,
29///                 // so don't retry...
30///                 PolicyResult::Abort(result)
31///             },
32///             Err(_) => {
33///                 // Treat all errors as failures...
34///                 // But we limit the number of attempts...
35///                 let mut attempts = self.0.lock();
36///                 if *attempts > 0 {
37///                     // Try again!
38///                     *attempts -= 1;
39///                     PolicyResult::Retry { ctx, req }
40///                 } else {
41///                     // Used all our attempts, no retry...
42///                     PolicyResult::Abort(result)
43///                 }
44///             }
45///         }
46///     }
47///
48///     fn clone_input(&self, ctx: &Context<S>, req: &Request<RetryBody>) -> Option<(Context<S>, Request<RetryBody>)> {
49///         Some((ctx.clone(), req.clone()))
50///     }
51/// }
52/// ```
53pub trait Policy<S, R, E>: Send + Sync + 'static {
54    /// Check the policy if a certain request should be retried.
55    ///
56    /// This method is passed a reference to the original request, and either
57    /// the [`Service::Response`] or [`Service::Error`] from the inner service.
58    ///
59    /// If the request should **not** be retried, return `None`.
60    ///
61    /// If the request *should* be retried, return `Some` future that will delay
62    /// the next retry of the request. This can be used to sleep for a certain
63    /// duration, to wait for some external condition to be met before retrying,
64    /// or resolve right away, if the request should be retried immediately.
65    ///
66    /// ## Mutating Requests
67    ///
68    /// The policy MAY chose to mutate the `req`: if the request is mutated, the
69    /// mutated request will be sent to the inner service in the next retry.
70    /// This can be helpful for use cases like tracking the retry count in a
71    /// header.
72    ///
73    /// ## Mutating Results
74    ///
75    /// The policy MAY chose to mutate the result. This enables the retry
76    /// policy to convert a failure into a success and vice versa. For example,
77    /// if the policy is used to poll while waiting for a state change, the
78    /// policy can switch the result to emit a specific error when retries are
79    /// exhausted.
80    ///
81    /// The policy can also record metadata on the request to include
82    /// information about the number of retries required or to record that a
83    /// failure failed after exhausting all retries.
84    ///
85    /// [`Service::Response`]: rama_core::Service::Response
86    /// [`Service::Error`]: rama_core::Service::Error
87    fn retry(
88        &self,
89        ctx: Context<S>,
90        req: Request<RetryBody>,
91        result: Result<R, E>,
92    ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_;
93
94    /// Tries to clone a request before being passed to the inner service.
95    ///
96    /// If the request cannot be cloned, return [`None`]. Moreover, the retry
97    /// function will not be called if the [`None`] is returned.
98    fn clone_input(
99        &self,
100        ctx: &Context<S>,
101        req: &Request<RetryBody>,
102    ) -> Option<(Context<S>, Request<RetryBody>)>;
103}
104
105impl<P, S, R, E> Policy<S, R, E> for &'static P
106where
107    P: Policy<S, R, E>,
108{
109    fn retry(
110        &self,
111        ctx: Context<S>,
112        req: Request<RetryBody>,
113        result: Result<R, E>,
114    ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_ {
115        (**self).retry(ctx, req, result)
116    }
117
118    fn clone_input(
119        &self,
120        ctx: &Context<S>,
121        req: &Request<RetryBody>,
122    ) -> Option<(Context<S>, Request<RetryBody>)> {
123        (**self).clone_input(ctx, req)
124    }
125}
126
127impl<P, S, R, E> Policy<S, R, E> for std::sync::Arc<P>
128where
129    P: Policy<S, R, E>,
130{
131    fn retry(
132        &self,
133        ctx: Context<S>,
134        req: Request<RetryBody>,
135        result: Result<R, E>,
136    ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_ {
137        (**self).retry(ctx, req, result)
138    }
139
140    fn clone_input(
141        &self,
142        ctx: &Context<S>,
143        req: &Request<RetryBody>,
144    ) -> Option<(Context<S>, Request<RetryBody>)> {
145        (**self).clone_input(ctx, req)
146    }
147}
148
149/// The full result of a limit policy.
150pub enum PolicyResult<S, R, E> {
151    /// The result should not be retried,
152    /// and the result should be returned to the caller.
153    Abort(Result<R, E>),
154    /// The result should be retried,
155    /// and the request should be passed to the inner service again.
156    Retry {
157        /// The context of the request.
158        ctx: Context<S>,
159        /// The request to be retried, with the above context.
160        req: Request<RetryBody>,
161    },
162}
163
164impl<S, R, E> std::fmt::Debug for PolicyResult<S, R, E>
165where
166    S: std::fmt::Debug,
167    R: std::fmt::Debug,
168    E: std::fmt::Debug,
169{
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        match self {
172            PolicyResult::Abort(err) => write!(f, "PolicyResult::Abort({:?})", err),
173            PolicyResult::Retry { ctx, req } => write!(
174                f,
175                "PolicyResult::Retry {{ ctx: {:?}, req: {:?} }}",
176                ctx, req
177            ),
178        }
179    }
180}
181
182macro_rules! impl_retry_policy_either {
183    ($id:ident, $($param:ident),+ $(,)?) => {
184        impl<$($param),+, State, Response, Error> Policy<State, Response, Error> for rama_core::combinators::$id<$($param),+>
185        where
186            $($param: Policy<State, Response, Error>),+,
187            State: Clone + Send + Sync + 'static,
188            Response: Send + 'static,
189            Error: Send + Sync + 'static,
190        {
191            async fn retry(
192                &self,
193                ctx: Context<State>,
194                req: http::Request<RetryBody>,
195                result: Result<Response, Error>,
196            ) -> PolicyResult<State, Response, Error> {
197                match self {
198                    $(
199                        rama_core::combinators::$id::$param(policy) => policy.retry(ctx, req, result).await,
200                    )+
201                }
202            }
203
204            fn clone_input(
205                &self,
206                ctx: &Context<State>,
207                req: &http::Request<RetryBody>,
208            ) -> Option<(Context<State>, http::Request<RetryBody>)> {
209                match self {
210                    $(
211                        rama_core::combinators::$id::$param(policy) => policy.clone_input(ctx, req),
212                    )+
213                }
214            }
215        }
216    };
217}
218
219rama_core::combinators::impl_either!(impl_retry_policy_either);