deltalake_catalog_unity/client/
retry.rs

1//! A shared HTTP client implementation incorporating retries
2
3use super::backoff::BackoffConfig;
4use deltalake_core::DataCatalogError;
5use reqwest::StatusCode;
6use reqwest_retry::policies::ExponentialBackoff;
7use std::time::Duration;
8
9/// Retry request error
10#[derive(Debug)]
11pub struct RetryError {
12    retries: usize,
13    message: String,
14    source: Option<reqwest::Error>,
15}
16
17impl std::fmt::Display for RetryError {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        write!(
20            f,
21            "response error \"{}\", after {} retries",
22            self.message, self.retries
23        )?;
24        if let Some(source) = &self.source {
25            write!(f, ": {source}")?;
26        }
27        Ok(())
28    }
29}
30
31impl std::error::Error for RetryError {
32    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
33        self.source.as_ref().map(|e| e as _)
34    }
35}
36
37impl RetryError {
38    /// Returns the status code associated with this error if any
39    pub fn status(&self) -> Option<StatusCode> {
40        self.source.as_ref().and_then(|e| e.status())
41    }
42}
43
44impl From<RetryError> for std::io::Error {
45    fn from(err: RetryError) -> Self {
46        use std::io::ErrorKind;
47        match (&err.source, err.status()) {
48            (Some(source), _) if source.is_builder() || source.is_request() => {
49                Self::new(ErrorKind::InvalidInput, err)
50            }
51            (_, Some(StatusCode::NOT_FOUND)) => Self::new(ErrorKind::NotFound, err),
52            (_, Some(StatusCode::BAD_REQUEST)) => Self::new(ErrorKind::InvalidInput, err),
53            (Some(source), None) if source.is_timeout() => Self::new(ErrorKind::TimedOut, err),
54            (Some(source), None) if source.is_connect() => Self::new(ErrorKind::NotConnected, err),
55            _ => Self::other(err),
56        }
57    }
58}
59
60impl From<RetryError> for DataCatalogError {
61    fn from(value: RetryError) -> Self {
62        DataCatalogError::Generic {
63            catalog: "",
64            source: Box::new(value),
65        }
66    }
67}
68
69/// Error retrying http requests
70pub type Result<T, E = RetryError> = std::result::Result<T, E>;
71
72/// Contains the configuration for how to respond to server errors
73///
74/// By default, they will be retried up to some limit, using exponential
75/// backoff with jitter. See [`BackoffConfig`] for more information
76///
77#[derive(Debug, Clone)]
78pub struct RetryConfig {
79    /// The backoff configuration
80    pub backoff: BackoffConfig,
81
82    /// The maximum number of times to retry a request
83    ///
84    /// Set to 0 to disable retries
85    pub max_retries: usize,
86
87    /// The maximum length of time from the initial request
88    /// after which no further retries will be attempted
89    ///
90    /// This not only bounds the length of time before a server
91    /// error will be surfaced to the application, but also bounds
92    /// the length of time a request's credentials must remain valid.
93    ///
94    /// As requests are retried without renewing credentials or
95    /// regenerating request payloads, this number should be kept
96    /// below 5 minutes to avoid errors due to expired credentials
97    /// and/or request payloads
98    pub retry_timeout: Duration,
99}
100
101impl Default for RetryConfig {
102    fn default() -> Self {
103        Self {
104            backoff: Default::default(),
105            max_retries: 10,
106            retry_timeout: Duration::from_secs(3 * 60),
107        }
108    }
109}
110
111impl From<&RetryConfig> for ExponentialBackoff {
112    fn from(val: &RetryConfig) -> ExponentialBackoff {
113        ExponentialBackoff::builder()
114            .retry_bounds(val.backoff.init_backoff, val.backoff.max_backoff)
115            .base(val.backoff.base as u32)
116            .build_with_max_retries(val.max_retries as u32)
117    }
118}