mod classify;
mod scope;
use std::{error::Error as StdError, future::Ready, sync::Arc, time::Duration};
use http::{Request, Response};
use tower::retry::{
Policy,
budget::{Budget, TpsBudget},
};
pub(crate) use self::{
classify::{Action, Classifier, ClassifyFn, ReqRep},
scope::{ScopeFn, Scoped},
};
use super::super::core::body::Incoming;
use crate::{Body, error::BoxError, retry};
#[derive(Clone)]
pub struct RetryPolicy {
budget: Option<Arc<TpsBudget>>,
classifier: Classifier,
max_retries_per_request: u32,
retry_cnt: u32,
scope: Scoped,
}
impl RetryPolicy {
#[inline]
pub fn new(policy: retry::Policy) -> Self {
Self {
budget: policy
.budget
.map(|budget| Arc::new(TpsBudget::new(Duration::from_secs(10), 10, budget))),
classifier: policy.classifier,
max_retries_per_request: policy.max_retries_per_request,
retry_cnt: 0,
scope: policy.scope,
}
}
}
type Req = Request<Body>;
type Res = Response<Incoming>;
pub(crate) fn clone_http_request(req: &Req) -> Option<Req> {
let body = req.body().try_clone()?;
let mut new = http::Request::new(body);
*new.method_mut() = req.method().clone();
*new.uri_mut() = req.uri().clone();
*new.version_mut() = req.version();
*new.headers_mut() = req.headers().clone();
*new.extensions_mut() = req.extensions().clone();
Some(new)
}
impl Policy<Req, Res, BoxError> for RetryPolicy {
type Future = Ready<()>;
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, BoxError>) -> Option<Self::Future> {
match self.classifier.classify(req, result) {
Action::Success => {
trace!(
"Request successful, no retry needed: {} {}",
req.method(),
req.uri()
);
if let Some(ref budget) = self.budget {
budget.deposit();
trace!("Token deposited back to retry budget");
}
None
}
Action::Retryable => {
trace!(
"Retrying request ({} attempts so far): {} {} - {}",
self.retry_cnt,
req.method(),
req.uri(),
match result {
Ok(res) => format!("HTTP {}", res.status()),
Err(e) => format!("Error: {}", e),
}
);
Some(std::future::ready(()))
}
}
}
fn clone_request(&mut self, req: &Req) -> Option<Req> {
if self.retry_cnt > 0 && !self.scope.applies_to(req) {
trace!("not in scope, not retrying");
return None;
}
if self.retry_cnt >= self.max_retries_per_request {
trace!("max_retries_per_request hit");
return None;
}
if !self.budget.as_ref().map(|b| b.withdraw()).unwrap_or(true) {
debug!(
"Request is retryable but retry budget exhausted: {} {}",
req.method(),
req.uri()
);
return None;
}
self.retry_cnt += 1;
match clone_http_request(req) {
Some(cloned) => Some(cloned),
None => {
if let Some(ref budget) = self.budget {
budget.deposit();
}
None
}
}
}
}
fn is_retryable_error(err: &(dyn StdError + 'static)) -> bool {
let mut source = Some(err);
while let Some(err) = source {
#[cfg(feature = "http2")]
if let Some(h2_err) = err.downcast_ref::<http2::Error>() {
if h2_err.is_go_away()
&& h2_err.is_remote()
&& h2_err.reason() == Some(http2::Reason::NO_ERROR)
{
return true;
}
if h2_err.is_reset()
&& h2_err.is_remote()
&& h2_err.reason() == Some(http2::Reason::REFUSED_STREAM)
{
return true;
}
}
source = err.source();
}
false
}
#[cfg(test)]
mod tests {
use http::Request;
use http_body_util::BodyExt;
use tower::retry::Policy as _;
use super::RetryPolicy;
use crate::{Body, retry};
fn make_retry_policy(max_retries: u32) -> RetryPolicy {
let policy = retry::Policy {
budget: Some(0.2),
classifier: crate::client::layer::retry::Classifier::Never,
max_retries_per_request: max_retries,
scope: crate::client::layer::retry::Scoped::Unscoped,
};
RetryPolicy::new(policy)
}
#[test]
fn budget_preserved_when_clone_fails() {
let mut policy = make_retry_policy(2);
let body: Body = http_body_util::Empty::new()
.map_err(|e| -> crate::error::BoxError { e.into() })
.boxed()
.into();
let req = Request::new(body);
let result = policy.clone_request(&req);
assert!(result.is_none(), "non-clonable body should return None");
let clonable_req = Request::new(Body::from("hello"));
let result2 = policy.clone_request(&clonable_req);
assert!(
result2.is_some(),
"budget should still have tokens after failed clone"
);
}
}