deltalake_catalog_unity/client/
retry.rs1use super::backoff::BackoffConfig;
4use deltalake_core::DataCatalogError;
5use reqwest::StatusCode;
6use reqwest_retry::policies::ExponentialBackoff;
7use std::time::Duration;
8
9#[derive(Debug)]
11pub struct RetryError {
12 retries: usize,
13 message: String,
14 source: Option<reqwest::Error>,
15}
16
17impl std::fmt::Display for RetryError {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 write!(
20 f,
21 "response error \"{}\", after {} retries",
22 self.message, self.retries
23 )?;
24 if let Some(source) = &self.source {
25 write!(f, ": {source}")?;
26 }
27 Ok(())
28 }
29}
30
31impl std::error::Error for RetryError {
32 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
33 self.source.as_ref().map(|e| e as _)
34 }
35}
36
37impl RetryError {
38 pub fn status(&self) -> Option<StatusCode> {
40 self.source.as_ref().and_then(|e| e.status())
41 }
42}
43
44impl From<RetryError> for std::io::Error {
45 fn from(err: RetryError) -> Self {
46 use std::io::ErrorKind;
47 match (&err.source, err.status()) {
48 (Some(source), _) if source.is_builder() || source.is_request() => {
49 Self::new(ErrorKind::InvalidInput, err)
50 }
51 (_, Some(StatusCode::NOT_FOUND)) => Self::new(ErrorKind::NotFound, err),
52 (_, Some(StatusCode::BAD_REQUEST)) => Self::new(ErrorKind::InvalidInput, err),
53 (Some(source), None) if source.is_timeout() => Self::new(ErrorKind::TimedOut, err),
54 (Some(source), None) if source.is_connect() => Self::new(ErrorKind::NotConnected, err),
55 _ => Self::other(err),
56 }
57 }
58}
59
60impl From<RetryError> for DataCatalogError {
61 fn from(value: RetryError) -> Self {
62 DataCatalogError::Generic {
63 catalog: "",
64 source: Box::new(value),
65 }
66 }
67}
68
69pub type Result<T, E = RetryError> = std::result::Result<T, E>;
71
72#[derive(Debug, Clone)]
78pub struct RetryConfig {
79 pub backoff: BackoffConfig,
81
82 pub max_retries: usize,
86
87 pub retry_timeout: Duration,
99}
100
101impl Default for RetryConfig {
102 fn default() -> Self {
103 Self {
104 backoff: Default::default(),
105 max_retries: 10,
106 retry_timeout: Duration::from_secs(3 * 60),
107 }
108 }
109}
110
111impl From<&RetryConfig> for ExponentialBackoff {
112 fn from(val: &RetryConfig) -> ExponentialBackoff {
113 ExponentialBackoff::builder()
114 .retry_bounds(val.backoff.init_backoff, val.backoff.max_backoff)
115 .base(val.backoff.base as u32)
116 .build_with_max_retries(val.max_retries as u32)
117 }
118}