use std::str::FromStr;
use std::time::Duration;
use qubit_config::{ConfigReader, ConfigResult};
use super::attempt_timeout_option::AttemptTimeoutOption;
use super::attempt_timeout_policy::AttemptTimeoutPolicy;
use super::retry_delay::RetryDelay;
use super::retry_jitter::RetryJitter;
use super::retry_options::RetryOptions;
use crate::RetryConfigError;
use crate::constants::{
DEFAULT_RETRY_EXPONENTIAL_INITIAL_DELAY_MILLIS, DEFAULT_RETRY_EXPONENTIAL_MAX_DELAY_MILLIS,
DEFAULT_RETRY_EXPONENTIAL_MULTIPLIER, DEFAULT_RETRY_JITTER_FACTOR,
DEFAULT_RETRY_RANDOM_MAX_DELAY_MILLIS, DEFAULT_RETRY_RANDOM_MIN_DELAY_MILLIS,
KEY_ATTEMPT_TIMEOUT_MILLIS, KEY_ATTEMPT_TIMEOUT_POLICY, KEY_DELAY, KEY_DELAY_STRATEGY,
KEY_EXPONENTIAL_INITIAL_DELAY_MILLIS, KEY_EXPONENTIAL_MAX_DELAY_MILLIS,
KEY_EXPONENTIAL_MULTIPLIER, KEY_FIXED_DELAY_MILLIS, KEY_JITTER_FACTOR, KEY_MAX_ATTEMPTS,
KEY_MAX_OPERATION_ELAPSED_MILLIS, KEY_MAX_OPERATION_ELAPSED_UNLIMITED,
KEY_MAX_TOTAL_ELAPSED_MILLIS, KEY_MAX_TOTAL_ELAPSED_UNLIMITED, KEY_RANDOM_MAX_DELAY_MILLIS,
KEY_RANDOM_MIN_DELAY_MILLIS, KEY_WORKER_CANCEL_GRACE_MILLIS,
};
#[derive(Debug, Clone, PartialEq)]
pub struct RetryConfigValues {
pub max_attempts: Option<u32>,
pub max_operation_elapsed_millis: Option<u64>,
pub max_operation_elapsed_unlimited: Option<bool>,
pub max_total_elapsed_millis: Option<u64>,
pub max_total_elapsed_unlimited: Option<bool>,
pub attempt_timeout_millis: Option<u64>,
pub attempt_timeout_policy: Option<String>,
pub worker_cancel_grace_millis: Option<u64>,
pub delay: Option<String>,
pub delay_strategy: Option<String>,
pub fixed_delay_millis: Option<u64>,
pub random_min_delay_millis: Option<u64>,
pub random_max_delay_millis: Option<u64>,
pub exponential_initial_delay_millis: Option<u64>,
pub exponential_max_delay_millis: Option<u64>,
pub exponential_multiplier: Option<f64>,
pub jitter_factor: Option<f64>,
}
impl RetryConfigValues {
pub(crate) fn new<R>(config: &R) -> ConfigResult<Self>
where
R: ConfigReader + ?Sized,
{
Ok(Self {
max_attempts: config.get_optional(KEY_MAX_ATTEMPTS)?,
max_operation_elapsed_millis: config.get_optional(KEY_MAX_OPERATION_ELAPSED_MILLIS)?,
max_operation_elapsed_unlimited: config
.get_optional(KEY_MAX_OPERATION_ELAPSED_UNLIMITED)?,
max_total_elapsed_millis: config.get_optional(KEY_MAX_TOTAL_ELAPSED_MILLIS)?,
max_total_elapsed_unlimited: config.get_optional(KEY_MAX_TOTAL_ELAPSED_UNLIMITED)?,
attempt_timeout_millis: config.get_optional(KEY_ATTEMPT_TIMEOUT_MILLIS)?,
attempt_timeout_policy: config.get_optional_string(KEY_ATTEMPT_TIMEOUT_POLICY)?,
worker_cancel_grace_millis: config.get_optional(KEY_WORKER_CANCEL_GRACE_MILLIS)?,
delay: config.get_optional_string(KEY_DELAY)?,
delay_strategy: config.get_optional_string(KEY_DELAY_STRATEGY)?,
fixed_delay_millis: config.get_optional(KEY_FIXED_DELAY_MILLIS)?,
random_min_delay_millis: config.get_optional(KEY_RANDOM_MIN_DELAY_MILLIS)?,
random_max_delay_millis: config.get_optional(KEY_RANDOM_MAX_DELAY_MILLIS)?,
exponential_initial_delay_millis: config
.get_optional(KEY_EXPONENTIAL_INITIAL_DELAY_MILLIS)?,
exponential_max_delay_millis: config.get_optional(KEY_EXPONENTIAL_MAX_DELAY_MILLIS)?,
exponential_multiplier: config.get_optional(KEY_EXPONENTIAL_MULTIPLIER)?,
jitter_factor: config.get_optional(KEY_JITTER_FACTOR)?,
})
}
pub fn to_options(&self, default: &RetryOptions) -> Result<RetryOptions, RetryConfigError> {
let max_attempts = self.max_attempts.unwrap_or(default.max_attempts());
let max_operation_elapsed = self.get_max_operation_elapsed(default);
let max_total_elapsed = self.get_max_total_elapsed(default);
let attempt_timeout = self.get_attempt_timeout(default)?;
let worker_cancel_grace = self.get_worker_cancel_grace(default);
let delay = self.get_delay(default)?;
let jitter = self.get_jitter(default);
let mut options = RetryOptions::new_with_attempt_timeout(
max_attempts,
max_operation_elapsed,
max_total_elapsed,
delay,
jitter,
attempt_timeout,
)?;
options.worker_cancel_grace = worker_cancel_grace;
options.validate()?;
Ok(options)
}
fn get_max_operation_elapsed(&self, default: &RetryOptions) -> Option<Duration> {
if self.max_operation_elapsed_unlimited.unwrap_or(false) {
return None;
}
match self.max_operation_elapsed_millis {
Some(millis) => Some(Duration::from_millis(millis)),
None => default.max_operation_elapsed(),
}
}
fn get_max_total_elapsed(&self, default: &RetryOptions) -> Option<Duration> {
if self.max_total_elapsed_unlimited.unwrap_or(false) {
return None;
}
match self.max_total_elapsed_millis {
Some(millis) => Some(Duration::from_millis(millis)),
None => default.max_total_elapsed(),
}
}
fn get_attempt_timeout(
&self,
default: &RetryOptions,
) -> Result<Option<AttemptTimeoutOption>, RetryConfigError> {
let default_attempt_timeout = default.attempt_timeout();
let policy = self
.attempt_timeout_policy
.as_deref()
.map(parse_attempt_timeout_policy)
.transpose()?;
match self.attempt_timeout_millis {
Some(timeout_millis) => {
let policy = policy
.or_else(|| {
default_attempt_timeout.map(|attempt_timeout| attempt_timeout.policy())
})
.unwrap_or_default();
Ok(Some(AttemptTimeoutOption::new(
Duration::from_millis(timeout_millis),
policy,
)))
}
None => {
if let Some(policy) = policy {
let Some(default_attempt_timeout) = default_attempt_timeout else {
return Err(RetryConfigError::invalid_value(
KEY_ATTEMPT_TIMEOUT_POLICY,
"attempt_timeout_policy requires attempt_timeout_millis when the default has no attempt timeout",
));
};
Ok(Some(default_attempt_timeout.with_policy(policy)))
} else {
Ok(default_attempt_timeout)
}
}
}
}
fn get_worker_cancel_grace(&self, default: &RetryOptions) -> Duration {
self.worker_cancel_grace_millis
.map(Duration::from_millis)
.unwrap_or_else(|| default.worker_cancel_grace())
}
fn get_delay(&self, default: &RetryOptions) -> Result<RetryDelay, RetryConfigError> {
let strategy = self
.delay
.as_deref()
.map(|value| (KEY_DELAY, value))
.or_else(|| {
self.delay_strategy
.as_deref()
.map(|value| (KEY_DELAY_STRATEGY, value))
})
.map(|(key, value)| (key, value.trim().to_ascii_lowercase()));
match strategy {
None => Ok(self
.get_implicit_delay()
.unwrap_or_else(|| default.delay().clone())),
Some((_, strategy)) if strategy == "none" => Ok(RetryDelay::None),
Some((_, strategy)) if strategy == "fixed" => {
let Some(fixed_delay_millis) = self.fixed_delay_millis else {
return Err(RetryConfigError::invalid_value(
KEY_FIXED_DELAY_MILLIS,
"fixed delay strategy requires fixed_delay_millis",
));
};
Ok(RetryDelay::fixed(Duration::from_millis(fixed_delay_millis)))
}
Some((_, strategy)) if strategy == "random" => Ok(RetryDelay::random(
Duration::from_millis(self.random_min_delay_millis.ok_or_else(|| {
RetryConfigError::invalid_value(
KEY_RANDOM_MIN_DELAY_MILLIS,
"random delay strategy requires random_min_delay_millis",
)
})?),
Duration::from_millis(self.random_max_delay_millis.ok_or_else(|| {
RetryConfigError::invalid_value(
KEY_RANDOM_MAX_DELAY_MILLIS,
"random delay strategy requires random_max_delay_millis",
)
})?),
)),
Some((_, strategy))
if strategy == "exponential" || strategy == "exponential_backoff" =>
{
let initial_delay = self.exponential_initial_delay_millis.ok_or_else(|| {
RetryConfigError::invalid_value(
KEY_EXPONENTIAL_INITIAL_DELAY_MILLIS,
"exponential delay strategy requires exponential_initial_delay_millis",
)
})?;
let max_delay = self.exponential_max_delay_millis.ok_or_else(|| {
RetryConfigError::invalid_value(
KEY_EXPONENTIAL_MAX_DELAY_MILLIS,
"exponential delay strategy requires exponential_max_delay_millis",
)
})?;
let multiplier = self.exponential_multiplier.ok_or_else(|| {
RetryConfigError::invalid_value(
KEY_EXPONENTIAL_MULTIPLIER,
"exponential delay strategy requires exponential_multiplier",
)
})?;
Ok(RetryDelay::exponential(
Duration::from_millis(initial_delay),
Duration::from_millis(max_delay),
multiplier,
))
}
Some((key, other)) => Err(RetryConfigError::invalid_value(
key,
format!("unsupported delay strategy '{other}'"),
)),
}
}
fn get_implicit_delay(&self) -> Option<RetryDelay> {
if let Some(millis) = self.fixed_delay_millis {
return Some(RetryDelay::fixed(Duration::from_millis(millis)));
}
if self.random_min_delay_millis.is_some() || self.random_max_delay_millis.is_some() {
return Some(RetryDelay::random(
Duration::from_millis(
self.random_min_delay_millis
.unwrap_or(DEFAULT_RETRY_RANDOM_MIN_DELAY_MILLIS),
),
Duration::from_millis(
self.random_max_delay_millis
.unwrap_or(DEFAULT_RETRY_RANDOM_MAX_DELAY_MILLIS),
),
));
}
if self.exponential_initial_delay_millis.is_some()
|| self.exponential_max_delay_millis.is_some()
|| self.exponential_multiplier.is_some()
{
return Some(RetryDelay::exponential(
Duration::from_millis(
self.exponential_initial_delay_millis
.unwrap_or(DEFAULT_RETRY_EXPONENTIAL_INITIAL_DELAY_MILLIS),
),
Duration::from_millis(
self.exponential_max_delay_millis
.unwrap_or(DEFAULT_RETRY_EXPONENTIAL_MAX_DELAY_MILLIS),
),
self.exponential_multiplier
.unwrap_or(DEFAULT_RETRY_EXPONENTIAL_MULTIPLIER),
));
}
None
}
fn get_jitter(&self, default: &RetryOptions) -> RetryJitter {
match self.jitter_factor {
Some(factor) if factor == DEFAULT_RETRY_JITTER_FACTOR => RetryJitter::None,
None => default.jitter(),
Some(factor) => RetryJitter::Factor(factor),
}
}
}
fn parse_attempt_timeout_policy(value: &str) -> Result<AttemptTimeoutPolicy, RetryConfigError> {
AttemptTimeoutPolicy::from_str(value)
.map_err(|message| RetryConfigError::invalid_value(KEY_ATTEMPT_TIMEOUT_POLICY, message))
}