rama_http/layer/retry/policy.rs
1use super::RetryBody;
2use crate::Request;
3use rama_core::Context;
4use std::future::Future;
5
6/// A "retry policy" to classify if a request should be retried.
7///
8/// # Example
9///
10/// ```
11/// use rama_core::Context;
12/// use rama_http::Request;
13/// use rama_http::layer::retry::{Policy, PolicyResult, RetryBody};
14/// use std::sync::Arc;
15/// use parking_lot::Mutex;
16///
17/// struct Attempts(Arc<Mutex<usize>>);
18///
19/// impl<S, R, E> Policy<S, R, E> for Attempts
20/// where
21/// S: Clone + Send + Sync + 'static,
22/// R: Send + 'static,
23/// E: Send + Sync + 'static,
24/// {
25/// async fn retry(&self, ctx: Context<S>, req: Request<RetryBody>, result: Result<R, E>) -> PolicyResult<S, R, E> {
26/// match result {
27/// Ok(_) => {
28/// // Treat all `Response`s as success,
29/// // so don't retry...
30/// PolicyResult::Abort(result)
31/// },
32/// Err(_) => {
33/// // Treat all errors as failures...
34/// // But we limit the number of attempts...
35/// let mut attempts = self.0.lock();
36/// if *attempts > 0 {
37/// // Try again!
38/// *attempts -= 1;
39/// PolicyResult::Retry { ctx, req }
40/// } else {
41/// // Used all our attempts, no retry...
42/// PolicyResult::Abort(result)
43/// }
44/// }
45/// }
46/// }
47///
48/// fn clone_input(&self, ctx: &Context<S>, req: &Request<RetryBody>) -> Option<(Context<S>, Request<RetryBody>)> {
49/// Some((ctx.clone(), req.clone()))
50/// }
51/// }
52/// ```
53pub trait Policy<S, R, E>: Send + Sync + 'static {
54 /// Check the policy if a certain request should be retried.
55 ///
56 /// This method is passed a reference to the original request, and either
57 /// the [`Service::Response`] or [`Service::Error`] from the inner service.
58 ///
59 /// If the request should **not** be retried, return `None`.
60 ///
61 /// If the request *should* be retried, return `Some` future that will delay
62 /// the next retry of the request. This can be used to sleep for a certain
63 /// duration, to wait for some external condition to be met before retrying,
64 /// or resolve right away, if the request should be retried immediately.
65 ///
66 /// ## Mutating Requests
67 ///
68 /// The policy MAY chose to mutate the `req`: if the request is mutated, the
69 /// mutated request will be sent to the inner service in the next retry.
70 /// This can be helpful for use cases like tracking the retry count in a
71 /// header.
72 ///
73 /// ## Mutating Results
74 ///
75 /// The policy MAY chose to mutate the result. This enables the retry
76 /// policy to convert a failure into a success and vice versa. For example,
77 /// if the policy is used to poll while waiting for a state change, the
78 /// policy can switch the result to emit a specific error when retries are
79 /// exhausted.
80 ///
81 /// The policy can also record metadata on the request to include
82 /// information about the number of retries required or to record that a
83 /// failure failed after exhausting all retries.
84 ///
85 /// [`Service::Response`]: rama_core::Service::Response
86 /// [`Service::Error`]: rama_core::Service::Error
87 fn retry(
88 &self,
89 ctx: Context<S>,
90 req: Request<RetryBody>,
91 result: Result<R, E>,
92 ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_;
93
94 /// Tries to clone a request before being passed to the inner service.
95 ///
96 /// If the request cannot be cloned, return [`None`]. Moreover, the retry
97 /// function will not be called if the [`None`] is returned.
98 fn clone_input(
99 &self,
100 ctx: &Context<S>,
101 req: &Request<RetryBody>,
102 ) -> Option<(Context<S>, Request<RetryBody>)>;
103}
104
105impl<P, S, R, E> Policy<S, R, E> for &'static P
106where
107 P: Policy<S, R, E>,
108{
109 fn retry(
110 &self,
111 ctx: Context<S>,
112 req: Request<RetryBody>,
113 result: Result<R, E>,
114 ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_ {
115 (**self).retry(ctx, req, result)
116 }
117
118 fn clone_input(
119 &self,
120 ctx: &Context<S>,
121 req: &Request<RetryBody>,
122 ) -> Option<(Context<S>, Request<RetryBody>)> {
123 (**self).clone_input(ctx, req)
124 }
125}
126
127impl<P, S, R, E> Policy<S, R, E> for std::sync::Arc<P>
128where
129 P: Policy<S, R, E>,
130{
131 fn retry(
132 &self,
133 ctx: Context<S>,
134 req: Request<RetryBody>,
135 result: Result<R, E>,
136 ) -> impl Future<Output = PolicyResult<S, R, E>> + Send + '_ {
137 (**self).retry(ctx, req, result)
138 }
139
140 fn clone_input(
141 &self,
142 ctx: &Context<S>,
143 req: &Request<RetryBody>,
144 ) -> Option<(Context<S>, Request<RetryBody>)> {
145 (**self).clone_input(ctx, req)
146 }
147}
148
149/// The full result of a limit policy.
150pub enum PolicyResult<S, R, E> {
151 /// The result should not be retried,
152 /// and the result should be returned to the caller.
153 Abort(Result<R, E>),
154 /// The result should be retried,
155 /// and the request should be passed to the inner service again.
156 Retry {
157 /// The context of the request.
158 ctx: Context<S>,
159 /// The request to be retried, with the above context.
160 req: Request<RetryBody>,
161 },
162}
163
164impl<S, R, E> std::fmt::Debug for PolicyResult<S, R, E>
165where
166 S: std::fmt::Debug,
167 R: std::fmt::Debug,
168 E: std::fmt::Debug,
169{
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 match self {
172 PolicyResult::Abort(err) => write!(f, "PolicyResult::Abort({:?})", err),
173 PolicyResult::Retry { ctx, req } => write!(
174 f,
175 "PolicyResult::Retry {{ ctx: {:?}, req: {:?} }}",
176 ctx, req
177 ),
178 }
179 }
180}
181
182macro_rules! impl_retry_policy_either {
183 ($id:ident, $($param:ident),+ $(,)?) => {
184 impl<$($param),+, State, Response, Error> Policy<State, Response, Error> for rama_core::combinators::$id<$($param),+>
185 where
186 $($param: Policy<State, Response, Error>),+,
187 State: Clone + Send + Sync + 'static,
188 Response: Send + 'static,
189 Error: Send + Sync + 'static,
190 {
191 async fn retry(
192 &self,
193 ctx: Context<State>,
194 req: http::Request<RetryBody>,
195 result: Result<Response, Error>,
196 ) -> PolicyResult<State, Response, Error> {
197 match self {
198 $(
199 rama_core::combinators::$id::$param(policy) => policy.retry(ctx, req, result).await,
200 )+
201 }
202 }
203
204 fn clone_input(
205 &self,
206 ctx: &Context<State>,
207 req: &http::Request<RetryBody>,
208 ) -> Option<(Context<State>, http::Request<RetryBody>)> {
209 match self {
210 $(
211 rama_core::combinators::$id::$param(policy) => policy.clone_input(ctx, req),
212 )+
213 }
214 }
215 }
216 };
217}
218
219rama_core::combinators::impl_either!(impl_retry_policy_either);