use crate::config_bag::value::Value;
use crate::config_bag::{ItemIter, Storable, Store, StoreReplace};
use std::fmt;
use std::str::FromStr;
use std::time::Duration;
const VALID_RETRY_MODES: &[RetryMode] = &[RetryMode::Standard];
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
#[non_exhaustive]
pub enum ErrorKind {
TransientError,
ThrottlingError,
ServerError,
ClientError,
}
impl fmt::Display for ErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TransientError => write!(f, "transient error"),
Self::ThrottlingError => write!(f, "throttling error"),
Self::ServerError => write!(f, "server error"),
Self::ClientError => write!(f, "client error"),
}
}
}
pub trait ProvideErrorKind {
fn retryable_error_kind(&self) -> Option<ErrorKind>;
fn code(&self) -> Option<&str>;
}
#[non_exhaustive]
#[derive(Eq, PartialEq, Debug)]
pub enum RetryKind {
Error(ErrorKind),
Explicit(Duration),
UnretryableFailure,
Unnecessary,
}
#[non_exhaustive]
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub enum RetryMode {
Standard,
Adaptive,
}
impl FromStr for RetryMode {
type Err = RetryModeParseError;
fn from_str(string: &str) -> Result<Self, Self::Err> {
let string = string.trim();
if string.eq_ignore_ascii_case("standard") {
Ok(RetryMode::Standard)
} else if string.eq_ignore_ascii_case("adaptive") {
Ok(RetryMode::Adaptive)
} else {
Err(RetryModeParseError::new(string))
}
}
}
#[derive(Debug)]
pub struct RetryModeParseError {
message: String,
}
impl RetryModeParseError {
pub(super) fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
impl fmt::Display for RetryModeParseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"error parsing string '{}' as RetryMode, valid options are: {:#?}",
self.message, VALID_RETRY_MODES
)
}
}
impl std::error::Error for RetryModeParseError {}
#[non_exhaustive]
#[derive(Debug, Default, Clone, PartialEq)]
pub struct RetryConfigBuilder {
mode: Option<RetryMode>,
max_attempts: Option<u32>,
initial_backoff: Option<Duration>,
max_backoff: Option<Duration>,
reconnect_mode: Option<ReconnectMode>,
}
impl RetryConfigBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn set_mode(&mut self, retry_mode: Option<RetryMode>) -> &mut Self {
self.mode = retry_mode;
self
}
pub fn mode(mut self, mode: RetryMode) -> Self {
self.set_mode(Some(mode));
self
}
pub fn reconnect_mode(mut self, reconnect_mode: ReconnectMode) -> Self {
self.set_reconnect_mode(Some(reconnect_mode));
self
}
pub fn set_reconnect_mode(&mut self, reconnect_mode: Option<ReconnectMode>) -> &mut Self {
self.reconnect_mode = reconnect_mode;
self
}
pub fn set_max_attempts(&mut self, max_attempts: Option<u32>) -> &mut Self {
self.max_attempts = max_attempts;
self
}
pub fn max_attempts(mut self, max_attempts: u32) -> Self {
self.set_max_attempts(Some(max_attempts));
self
}
pub fn set_initial_backoff(&mut self, initial_backoff: Option<Duration>) -> &mut Self {
self.initial_backoff = initial_backoff;
self
}
pub fn initial_backoff(mut self, initial_backoff: Duration) -> Self {
self.set_initial_backoff(Some(initial_backoff));
self
}
pub fn set_max_backoff(&mut self, max_backoff: Option<Duration>) -> &mut Self {
self.max_backoff = max_backoff;
self
}
pub fn max_backoff(mut self, max_backoff: Duration) -> Self {
self.set_max_backoff(Some(max_backoff));
self
}
pub fn take_unset_from(self, other: Self) -> Self {
Self {
mode: self.mode.or(other.mode),
max_attempts: self.max_attempts.or(other.max_attempts),
initial_backoff: self.initial_backoff.or(other.initial_backoff),
max_backoff: self.max_backoff.or(other.max_backoff),
reconnect_mode: self.reconnect_mode.or(other.reconnect_mode),
}
}
pub fn build(self) -> RetryConfig {
RetryConfig {
mode: self.mode.unwrap_or(RetryMode::Standard),
max_attempts: self.max_attempts.unwrap_or(3),
initial_backoff: self
.initial_backoff
.unwrap_or_else(|| Duration::from_secs(1)),
reconnect_mode: self
.reconnect_mode
.unwrap_or(ReconnectMode::ReconnectOnTransientError),
max_backoff: self.max_backoff.unwrap_or_else(|| Duration::from_secs(20)),
use_static_exponential_base: false,
retry_spec: None,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq)]
pub struct RetryConfig {
mode: RetryMode,
max_attempts: u32,
initial_backoff: Duration,
max_backoff: Duration,
reconnect_mode: ReconnectMode,
use_static_exponential_base: bool,
retry_spec: Option<RetrySpec>,
}
impl Storable for RetryConfig {
type Storer = StoreReplace<RetryConfig>;
}
#[derive(Debug, Clone, PartialEq, Copy)]
pub enum ReconnectMode {
ReconnectOnTransientError,
ReuseAllConnections,
}
impl Storable for ReconnectMode {
type Storer = StoreReplace<ReconnectMode>;
}
#[doc(hidden)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[non_exhaustive]
pub enum RetrySpecVersion {
V2_0,
V2_1,
}
#[doc(hidden)]
#[derive(Clone, Debug, PartialEq)]
#[non_exhaustive]
pub struct RetrySpec {
version: RetrySpecVersion,
non_throttling_initial_backoff: Duration,
long_polling: Option<bool>,
}
impl RetrySpec {
pub const V2_0: RetrySpecVersion = RetrySpecVersion::V2_0;
pub const V2_1: RetrySpecVersion = RetrySpecVersion::V2_1;
pub fn is_at_least(&self, version: RetrySpecVersion) -> bool {
self.version >= version
}
pub fn v2_0() -> Self {
Self {
version: Self::V2_0,
non_throttling_initial_backoff: Duration::from_secs(1),
long_polling: None,
}
}
pub fn v2_1() -> Self {
Self {
version: Self::V2_1,
non_throttling_initial_backoff: Duration::from_millis(50),
long_polling: None,
}
}
pub fn with_non_throttling_initial_backoff(mut self, duration: Duration) -> Self {
self.non_throttling_initial_backoff = duration;
self
}
pub fn non_throttling_initial_backoff(&self) -> Duration {
self.non_throttling_initial_backoff
}
pub fn with_long_polling(mut self, long_polling: bool) -> Self {
self.long_polling = Some(long_polling);
self
}
pub fn long_polling(&self) -> bool {
self.long_polling.unwrap_or(false)
}
fn take_defaults_from(&mut self, other: &RetrySpec) {
if self.long_polling.is_none() {
self.long_polling = other.long_polling;
}
}
}
impl RetryConfig {
pub fn standard() -> Self {
Self {
mode: RetryMode::Standard,
max_attempts: 3,
initial_backoff: Duration::from_secs(1),
reconnect_mode: ReconnectMode::ReconnectOnTransientError,
max_backoff: Duration::from_secs(20),
use_static_exponential_base: false,
retry_spec: None,
}
}
pub fn adaptive() -> Self {
Self {
mode: RetryMode::Adaptive,
max_attempts: 3,
initial_backoff: Duration::from_secs(1),
reconnect_mode: ReconnectMode::ReconnectOnTransientError,
max_backoff: Duration::from_secs(20),
use_static_exponential_base: false,
retry_spec: None,
}
}
pub fn disabled() -> Self {
Self::standard().with_max_attempts(1)
}
pub fn with_retry_mode(mut self, retry_mode: RetryMode) -> Self {
self.mode = retry_mode;
self
}
pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
self.max_attempts = max_attempts;
self
}
pub fn with_reconnect_mode(mut self, reconnect_mode: ReconnectMode) -> Self {
self.reconnect_mode = reconnect_mode;
self
}
pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self {
self.initial_backoff = initial_backoff;
self
}
pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self {
self.max_backoff = max_backoff;
self
}
#[cfg(feature = "test-util")]
pub fn with_use_static_exponential_base(mut self, use_static_exponential_base: bool) -> Self {
self.use_static_exponential_base = use_static_exponential_base;
self
}
pub fn mode(&self) -> RetryMode {
self.mode
}
pub fn reconnect_mode(&self) -> ReconnectMode {
self.reconnect_mode
}
pub fn max_attempts(&self) -> u32 {
self.max_attempts
}
pub fn initial_backoff(&self) -> Duration {
self.initial_backoff
}
pub fn max_backoff(&self) -> Duration {
self.max_backoff
}
pub fn has_retry(&self) -> bool {
self.max_attempts > 1
}
pub fn use_static_exponential_base(&self) -> bool {
self.use_static_exponential_base
}
#[doc(hidden)]
pub fn with_retry_spec(mut self, retry_spec: RetrySpec) -> Self {
self.retry_spec = Some(retry_spec);
self
}
#[doc(hidden)]
pub fn retry_spec(&self) -> Option<&RetrySpec> {
self.retry_spec.as_ref()
}
fn take_defaults_from(&mut self, other: &RetryConfig) {
if self.retry_spec.is_none() {
self.retry_spec = other.retry_spec.clone();
} else if let (Some(mine), Some(theirs)) = (self.retry_spec.as_mut(), &other.retry_spec) {
mine.take_defaults_from(theirs);
}
}
}
#[doc(hidden)]
#[derive(Debug)]
pub struct MergeRetryConfig;
impl Storable for MergeRetryConfig {
type Storer = MergeRetryConfig;
}
impl Store for MergeRetryConfig {
type ReturnedType<'a> = RetryConfig;
type StoredType = <StoreReplace<RetryConfig> as Store>::StoredType;
fn merge_iter(iter: ItemIter<'_, Self>) -> Self::ReturnedType<'_> {
let mut result: Option<RetryConfig> = None;
for rc in iter {
match (result.as_mut(), rc) {
(Some(result), Value::Set(rc)) => {
result.take_defaults_from(rc);
}
(None, Value::Set(rc)) => {
result = Some(rc.clone());
}
(_, Value::ExplicitlyUnset(_)) => {
result = Some(RetryConfig::disabled());
}
}
}
result.unwrap_or_else(RetryConfig::disabled)
}
}
#[cfg(test)]
mod tests {
use crate::retry::{RetryConfigBuilder, RetryMode};
use std::str::FromStr;
#[test]
fn retry_config_builder_merge_with_favors_self_values_over_other_values() {
let self_builder = RetryConfigBuilder::new()
.max_attempts(1)
.mode(RetryMode::Adaptive);
let other_builder = RetryConfigBuilder::new()
.max_attempts(5)
.mode(RetryMode::Standard);
let retry_config = self_builder.take_unset_from(other_builder).build();
assert_eq!(retry_config.max_attempts, 1);
assert_eq!(retry_config.mode, RetryMode::Adaptive);
}
#[test]
fn retry_mode_from_str_parses_valid_strings_regardless_of_casing() {
assert_eq!(
RetryMode::from_str("standard").ok(),
Some(RetryMode::Standard)
);
assert_eq!(
RetryMode::from_str("STANDARD").ok(),
Some(RetryMode::Standard)
);
assert_eq!(
RetryMode::from_str("StAnDaRd").ok(),
Some(RetryMode::Standard)
);
}
#[test]
fn retry_mode_from_str_ignores_whitespace_before_and_after() {
assert_eq!(
RetryMode::from_str(" standard ").ok(),
Some(RetryMode::Standard)
);
assert_eq!(
RetryMode::from_str(" STANDARD ").ok(),
Some(RetryMode::Standard)
);
assert_eq!(
RetryMode::from_str(" StAnDaRd ").ok(),
Some(RetryMode::Standard)
);
}
#[test]
fn retry_mode_from_str_wont_parse_invalid_strings() {
assert_eq!(RetryMode::from_str("std").ok(), None);
assert_eq!(RetryMode::from_str("aws").ok(), None);
assert_eq!(RetryMode::from_str("s t a n d a r d").ok(), None);
assert_eq!(RetryMode::from_str("a d a p t i v e").ok(), None);
}
#[test]
fn merge_retry_config_preserves_retry_spec_from_lower_layer() {
use crate::config_bag::{ConfigBag, Layer};
use crate::retry::{MergeRetryConfig, RetryConfig, RetrySpec};
let mut lower = Layer::new("sdk_defaults");
lower.store_put(RetryConfig::standard().with_retry_spec(RetrySpec::v2_1()));
let mut upper = Layer::new("customer");
upper.store_put(RetryConfig::standard().with_max_attempts(5));
let bag = ConfigBag::of_layers(vec![lower, upper]);
let merged = bag.load::<MergeRetryConfig>();
assert_eq!(merged.max_attempts(), 5);
assert_eq!(merged.retry_spec(), Some(&RetrySpec::v2_1()));
}
#[test]
fn merge_retry_config_customer_explicit_retry_spec_wins() {
use crate::config_bag::{ConfigBag, Layer};
use crate::retry::{MergeRetryConfig, RetryConfig, RetrySpec};
let mut lower = Layer::new("sdk_defaults");
lower.store_put(RetryConfig::standard().with_retry_spec(RetrySpec::v2_1()));
let mut upper = Layer::new("customer");
upper.store_put(RetryConfig::standard().with_retry_spec(RetrySpec::v2_0()));
let bag = ConfigBag::of_layers(vec![lower, upper]);
let merged = bag.load::<MergeRetryConfig>();
assert_eq!(merged.retry_spec(), Some(&RetrySpec::v2_0()));
}
#[test]
fn merge_retry_config_long_polling_from_operation_layer() {
use crate::config_bag::{ConfigBag, Layer};
use crate::retry::{MergeRetryConfig, RetryConfig, RetrySpec};
let mut lower = Layer::new("sdk_defaults");
lower.store_put(RetryConfig::standard().with_retry_spec(RetrySpec::v2_1()));
let mut upper = Layer::new("operation");
upper.store_put(
RetryConfig::standard().with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
);
let bag = ConfigBag::of_layers(vec![lower, upper]);
let merged = bag.load::<MergeRetryConfig>();
assert!(merged.retry_spec().unwrap().long_polling());
}
}