hpx 2.4.24

High Performance HTTP Client
Documentation
//! Middleware for retrying requests.

mod classify;
mod scope;

use std::{error::Error as StdError, future::Ready, sync::Arc, time::Duration};

use http::{Request, Response};
use tower::retry::{
    Policy,
    budget::{Budget, TpsBudget},
};

pub(crate) use self::{
    classify::{Action, Classifier, ClassifyFn, ReqRep},
    scope::{ScopeFn, Scoped},
};
use super::super::core::body::Incoming;
use crate::{Body, error::BoxError, retry};

/// A retry policy for HTTP requests.
#[derive(Clone)]
pub struct RetryPolicy {
    budget: Option<Arc<TpsBudget>>,
    classifier: Classifier,
    max_retries_per_request: u32,
    retry_cnt: u32,
    scope: Scoped,
}

impl RetryPolicy {
    /// Create a new `RetryPolicy`.
    #[inline]
    pub fn new(policy: retry::Policy) -> Self {
        Self {
            budget: policy
                .budget
                .map(|budget| Arc::new(TpsBudget::new(Duration::from_secs(10), 10, budget))),
            classifier: policy.classifier,
            max_retries_per_request: policy.max_retries_per_request,
            retry_cnt: 0,
            scope: policy.scope,
        }
    }
}

type Req = Request<Body>;

type Res = Response<Incoming>;

pub(crate) fn clone_http_request(req: &Req) -> Option<Req> {
    let body = req.body().try_clone()?;
    let mut new = http::Request::new(body);
    *new.method_mut() = req.method().clone();
    *new.uri_mut() = req.uri().clone();
    *new.version_mut() = req.version();
    *new.headers_mut() = req.headers().clone();
    *new.extensions_mut() = req.extensions().clone();
    Some(new)
}

impl Policy<Req, Res, BoxError> for RetryPolicy {
    type Future = Ready<()>;

    fn retry(&mut self, req: &mut Req, result: &mut Result<Res, BoxError>) -> Option<Self::Future> {
        match self.classifier.classify(req, result) {
            Action::Success => {
                trace!(
                    "Request successful, no retry needed: {} {}",
                    req.method(),
                    req.uri()
                );

                if let Some(ref budget) = self.budget {
                    budget.deposit();
                    trace!("Token deposited back to retry budget");
                }
                None
            }
            Action::Retryable => {
                trace!(
                    "Retrying request ({} attempts so far): {} {} - {}",
                    self.retry_cnt,
                    req.method(),
                    req.uri(),
                    match result {
                        Ok(res) => format!("HTTP {}", res.status()),
                        Err(e) => format!("Error: {}", e),
                    }
                );

                Some(std::future::ready(()))
            }
        }
    }

    fn clone_request(&mut self, req: &Req) -> Option<Req> {
        if self.retry_cnt > 0 && !self.scope.applies_to(req) {
            trace!("not in scope, not retrying");
            return None;
        }

        if self.retry_cnt >= self.max_retries_per_request {
            trace!("max_retries_per_request hit");
            return None;
        }

        // Withdraw budget token only when we're about to actually retry.
        if !self.budget.as_ref().map(|b| b.withdraw()).unwrap_or(true) {
            debug!(
                "Request is retryable but retry budget exhausted: {} {}",
                req.method(),
                req.uri()
            );
            return None;
        }

        self.retry_cnt += 1;
        match clone_http_request(req) {
            Some(cloned) => Some(cloned),
            None => {
                // Clone failed — deposit the token back.
                if let Some(ref budget) = self.budget {
                    budget.deposit();
                }
                None
            }
        }
    }
}

/// Determines whether the given error is considered retryable for HTTP/2 requests.
///
/// Returns `true` if the error type or content indicates that the request can be retried,
/// otherwise returns `false`.
fn is_retryable_error(err: &(dyn StdError + 'static)) -> bool {
    let mut source = Some(err);

    while let Some(err) = source {
        #[cfg(feature = "http2")]
        if let Some(h2_err) = err.downcast_ref::<http2::Error>() {
            // They sent us a graceful shutdown, try with a new connection!
            if h2_err.is_go_away()
                && h2_err.is_remote()
                && h2_err.reason() == Some(http2::Reason::NO_ERROR)
            {
                return true;
            }

            // REFUSED_STREAM was sent from the server, which is safe to retry.
            // https://www.rfc-editor.org/rfc/rfc9113.html#section-8.7-3.2
            if h2_err.is_reset()
                && h2_err.is_remote()
                && h2_err.reason() == Some(http2::Reason::REFUSED_STREAM)
            {
                return true;
            }
        }

        source = err.source();
    }

    false
}

#[cfg(test)]
mod tests {
    use http::Request;
    use http_body_util::BodyExt;
    use tower::retry::Policy as _;

    use super::RetryPolicy;
    use crate::{Body, retry};

    fn make_retry_policy(max_retries: u32) -> RetryPolicy {
        let policy = retry::Policy {
            budget: Some(0.2),
            classifier: crate::client::layer::retry::Classifier::Never,
            max_retries_per_request: max_retries,
            scope: crate::client::layer::retry::Scoped::Unscoped,
        };
        RetryPolicy::new(policy)
    }

    #[test]
    fn budget_preserved_when_clone_fails() {
        let mut policy = make_retry_policy(2);
        // Create a request with a non-clonable body (streaming/boxed).
        let body: Body = http_body_util::Empty::new()
            .map_err(|e| -> crate::error::BoxError { e.into() })
            .boxed()
            .into();
        let req = Request::new(body);

        // clone_request should return None because the body is not clonable.
        let result = policy.clone_request(&req);
        assert!(result.is_none(), "non-clonable body should return None");

        // Verify the budget still has tokens by cloning a clonable request.
        let clonable_req = Request::new(Body::from("hello"));
        let result2 = policy.clone_request(&clonable_req);
        assert!(
            result2.is_some(),
            "budget should still have tokens after failed clone"
        );
    }
}