use super::backoff::BackoffConfig;
use deltalake_core::DataCatalogError;
use reqwest::StatusCode;
use reqwest_retry::policies::ExponentialBackoff;
use std::time::Duration;
#[derive(Debug)]
pub struct RetryError {
retries: usize,
message: String,
source: Option<reqwest::Error>,
}
impl std::fmt::Display for RetryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"response error \"{}\", after {} retries",
self.message, self.retries
)?;
if let Some(source) = &self.source {
write!(f, ": {source}")?;
}
Ok(())
}
}
impl std::error::Error for RetryError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source.as_ref().map(|e| e as _)
}
}
impl RetryError {
pub fn status(&self) -> Option<StatusCode> {
self.source.as_ref().and_then(|e| e.status())
}
}
impl From<RetryError> for std::io::Error {
fn from(err: RetryError) -> Self {
use std::io::ErrorKind;
match (&err.source, err.status()) {
(Some(source), _) if source.is_builder() || source.is_request() => {
Self::new(ErrorKind::InvalidInput, err)
}
(_, Some(StatusCode::NOT_FOUND)) => Self::new(ErrorKind::NotFound, err),
(_, Some(StatusCode::BAD_REQUEST)) => Self::new(ErrorKind::InvalidInput, err),
(Some(source), None) if source.is_timeout() => Self::new(ErrorKind::TimedOut, err),
(Some(source), None) if source.is_connect() => Self::new(ErrorKind::NotConnected, err),
_ => Self::other(err),
}
}
}
impl From<RetryError> for DataCatalogError {
fn from(value: RetryError) -> Self {
DataCatalogError::Generic {
catalog: "",
source: Box::new(value),
}
}
}
pub type Result<T, E = RetryError> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub backoff: BackoffConfig,
pub max_retries: usize,
pub retry_timeout: Duration,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
backoff: Default::default(),
max_retries: 10,
retry_timeout: Duration::from_secs(3 * 60),
}
}
}
impl From<&RetryConfig> for ExponentialBackoff {
fn from(val: &RetryConfig) -> ExponentialBackoff {
ExponentialBackoff::builder()
.retry_bounds(val.backoff.init_backoff, val.backoff.max_backoff)
.base(val.backoff.base as u32)
.build_with_max_retries(val.max_retries as u32)
}
}