rama_http/layer/retry/policy.rs
1use super::RetryBody;
2use crate::Request;
3use rama_core::Context;
4
5/// A "retry policy" to classify if a request should be retried.
6///
7/// # Example
8///
9/// ```
10/// use rama_core::Context;
11/// use rama_http::Request;
12/// use rama_http::layer::retry::{Policy, PolicyResult, RetryBody};
13/// use std::sync::Arc;
14/// use parking_lot::Mutex;
15///
16/// struct Attempts(Arc<Mutex<usize>>);
17///
18/// impl<S, R, E> Policy<S, R, E> for Attempts
19/// where
20/// S: Clone + Send + Sync + 'static,
21/// R: Send + 'static,
22/// E: Send + Sync + 'static,
23/// {
24/// async fn retry(&self, ctx: Context<S>, req: Request<RetryBody>, result: Result<R, E>) -> PolicyResult<S, R, E> {
25/// match result {
26/// Ok(_) => {
27/// // Treat all `Response`s as success,
28/// // so don't retry...
29/// PolicyResult::Abort(result)
30/// },
31/// Err(_) => {
32/// // Treat all errors as failures...
33/// // But we limit the number of attempts...
34/// let mut attempts = self.0.lock();
35/// if *attempts > 0 {
36/// // Try again!
37/// *attempts -= 1;
38/// PolicyResult::Retry { ctx, req }
39/// } else {
40/// // Used all our attempts, no retry...
41/// PolicyResult::Abort(result)
42/// }
43/// }
44/// }
45/// }
46///
47/// fn clone_input(&self, ctx: &Context<S>, req: &Request<RetryBody>) -> Option<(Context<S>, Request<RetryBody>)> {
48/// Some((ctx.clone(), req.clone()))
49/// }
50/// }
51/// ```
52pub trait Policy<S, R, E>: Send + Sync + 'static {
53 /// Check the policy if a certain request should be retried.
54 ///
55 /// This method is passed a reference to the original request, and either
56 /// the [`Service::Response`] or [`Service::Error`] from the inner service.
57 ///
58 /// If the request should **not** be retried, return `None`.
59 ///
60 /// If the request *should* be retried, return `Some` future that will delay
61 /// the next retry of the request. This can be used to sleep for a certain
62 /// duration, to wait for some external condition to be met before retrying,
63 /// or resolve right away, if the request should be retried immediately.
64 ///
65 /// ## Mutating Requests
66 ///
67 /// The policy MAY chose to mutate the `req`: if the request is mutated, the
68 /// mutated request will be sent to the inner service in the next retry.
69 /// This can be helpful for use cases like tracking the retry count in a
70 /// header.
71 ///
72 /// ## Mutating Results
73 ///
74 /// The policy MAY chose to mutate the result. This enables the retry
75 /// policy to convert a failure into a success and vice versa. For example,
76 /// if the policy is used to poll while waiting for a state change, the
77 /// policy can switch the result to emit a specific error when retries are
78 /// exhausted.
79 ///
80 /// The policy can also record metadata on the request to include
81 /// information about the number of retries required or to record that a
82 /// failure failed after exhausting all retries.
83 ///
84 /// [`Service::Response`]: rama_core::Service::Response
85 /// [`Service::Error`]: rama_core::Service::Error
86 fn retry(
87 &self,
88 ctx: Context<S>,
89 req: Request<RetryBody>,
90 result: Result<R, E>,
91 ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_;
92
93 /// Tries to clone a request before being passed to the inner service.
94 ///
95 /// If the request cannot be cloned, return [`None`]. Moreover, the retry
96 /// function will not be called if the [`None`] is returned.
97 fn clone_input(
98 &self,
99 ctx: &Context<S>,
100 req: &Request<RetryBody>,
101 ) -> Option<(Context<S>, Request<RetryBody>)>;
102}
103
104impl<P, S, R, E> Policy<S, R, E> for &'static P
105where
106 P: Policy<S, R, E>,
107{
108 fn retry(
109 &self,
110 ctx: Context<S>,
111 req: Request<RetryBody>,
112 result: Result<R, E>,
113 ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_ {
114 (**self).retry(ctx, req, result)
115 }
116
117 fn clone_input(
118 &self,
119 ctx: &Context<S>,
120 req: &Request<RetryBody>,
121 ) -> Option<(Context<S>, Request<RetryBody>)> {
122 (**self).clone_input(ctx, req)
123 }
124}
125
126impl<P, S, R, E> Policy<S, R, E> for std::sync::Arc<P>
127where
128 P: Policy<S, R, E>,
129{
130 fn retry(
131 &self,
132 ctx: Context<S>,
133 req: Request<RetryBody>,
134 result: Result<R, E>,
135 ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_ {
136 (**self).retry(ctx, req, result)
137 }
138
139 fn clone_input(
140 &self,
141 ctx: &Context<S>,
142 req: &Request<RetryBody>,
143 ) -> Option<(Context<S>, Request<RetryBody>)> {
144 (**self).clone_input(ctx, req)
145 }
146}
147
148/// The full result of a limit policy.
149pub enum PolicyResult<S, R, E> {
150 /// The result should not be retried,
151 /// and the result should be returned to the caller.
152 Abort(Result<R, E>),
153 /// The result should be retried,
154 /// and the request should be passed to the inner service again.
155 Retry {
156 /// The context of the request.
157 ctx: Context<S>,
158 /// The request to be retried, with the above context.
159 req: Request<RetryBody>,
160 },
161}
162
163impl<S, R, E> std::fmt::Debug for PolicyResult<S, R, E>
164where
165 S: std::fmt::Debug,
166 R: std::fmt::Debug,
167 E: std::fmt::Debug,
168{
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 match self {
171 PolicyResult::Abort(err) => write!(f, "PolicyResult::Abort({:?})", err),
172 PolicyResult::Retry { ctx, req } => write!(
173 f,
174 "PolicyResult::Retry {{ ctx: {:?}, req: {:?} }}",
175 ctx, req
176 ),
177 }
178 }
179}
180
181macro_rules! impl_retry_policy_either {
182 ($id:ident, $($param:ident),+ $(,)?) => {
183 impl<$($param),+, State, Response, Error> Policy<State, Response, Error> for rama_core::combinators::$id<$($param),+>
184 where
185 $($param: Policy<State, Response, Error>),+,
186 State: Clone + Send + Sync + 'static,
187 Response: Send + 'static,
188 Error: Send + 'static,
189 {
190 async fn retry(
191 &self,
192 ctx: Context<State>,
193 req: rama_http_types::Request<RetryBody>,
194 result: Result<Response, Error>,
195 ) -> PolicyResult<State, Response, Error> {
196 match self {
197 $(
198 rama_core::combinators::$id::$param(policy) => policy.retry(ctx, req, result).await,
199 )+
200 }
201 }
202
203 fn clone_input(
204 &self,
205 ctx: &Context<State>,
206 req: &rama_http_types::Request<RetryBody>,
207 ) -> Option<(Context<State>, rama_http_types::Request<RetryBody>)> {
208 match self {
209 $(
210 rama_core::combinators::$id::$param(policy) => policy.clone_input(ctx, req),
211 )+
212 }
213 }
214 }
215 };
216}
217
218rama_core::combinators::impl_either!(impl_retry_policy_either);