1use std::str::FromStr;
15use std::time::Duration;
16
17use qubit_config::{
18 ConfigReader,
19 ConfigResult,
20};
21
22use super::attempt_timeout_option::AttemptTimeoutOption;
23use super::attempt_timeout_policy::AttemptTimeoutPolicy;
24use super::retry_delay::RetryDelay;
25use super::retry_jitter::RetryJitter;
26use super::retry_options::RetryOptions;
27
28use crate::RetryConfigError;
29use crate::constants::{
30 DEFAULT_RETRY_EXPONENTIAL_INITIAL_DELAY_MILLIS,
31 DEFAULT_RETRY_EXPONENTIAL_MAX_DELAY_MILLIS,
32 DEFAULT_RETRY_EXPONENTIAL_MULTIPLIER,
33 DEFAULT_RETRY_JITTER_FACTOR,
34 DEFAULT_RETRY_RANDOM_MAX_DELAY_MILLIS,
35 DEFAULT_RETRY_RANDOM_MIN_DELAY_MILLIS,
36 KEY_ATTEMPT_TIMEOUT_MILLIS,
37 KEY_ATTEMPT_TIMEOUT_POLICY,
38 KEY_DELAY,
39 KEY_DELAY_STRATEGY,
40 KEY_EXPONENTIAL_INITIAL_DELAY_MILLIS,
41 KEY_EXPONENTIAL_MAX_DELAY_MILLIS,
42 KEY_EXPONENTIAL_MULTIPLIER,
43 KEY_FIXED_DELAY_MILLIS,
44 KEY_JITTER_FACTOR,
45 KEY_MAX_ATTEMPTS,
46 KEY_MAX_OPERATION_ELAPSED_MILLIS,
47 KEY_MAX_OPERATION_ELAPSED_UNLIMITED,
48 KEY_MAX_TOTAL_ELAPSED_MILLIS,
49 KEY_MAX_TOTAL_ELAPSED_UNLIMITED,
50 KEY_RANDOM_MAX_DELAY_MILLIS,
51 KEY_RANDOM_MIN_DELAY_MILLIS,
52 KEY_WORKER_CANCEL_GRACE_MILLIS,
53};
54
55#[derive(Debug, Clone, PartialEq)]
66pub struct RetryConfigValues {
67 pub max_attempts: Option<u32>,
69 pub max_operation_elapsed_millis: Option<u64>,
71 pub max_operation_elapsed_unlimited: Option<bool>,
73 pub max_total_elapsed_millis: Option<u64>,
75 pub max_total_elapsed_unlimited: Option<bool>,
77 pub attempt_timeout_millis: Option<u64>,
79 pub attempt_timeout_policy: Option<String>,
81 pub worker_cancel_grace_millis: Option<u64>,
83 pub delay: Option<String>,
85 pub delay_strategy: Option<String>,
87 pub fixed_delay_millis: Option<u64>,
89 pub random_min_delay_millis: Option<u64>,
91 pub random_max_delay_millis: Option<u64>,
93 pub exponential_initial_delay_millis: Option<u64>,
95 pub exponential_max_delay_millis: Option<u64>,
97 pub exponential_multiplier: Option<f64>,
99 pub jitter_factor: Option<f64>,
101}
102
103impl RetryConfigValues {
104 pub(crate) fn new<R>(config: &R) -> ConfigResult<Self>
119 where
120 R: ConfigReader + ?Sized,
121 {
122 Ok(Self {
123 max_attempts: config.get_optional(KEY_MAX_ATTEMPTS)?,
124 max_operation_elapsed_millis: config.get_optional(KEY_MAX_OPERATION_ELAPSED_MILLIS)?,
125 max_operation_elapsed_unlimited: config
126 .get_optional(KEY_MAX_OPERATION_ELAPSED_UNLIMITED)?,
127 max_total_elapsed_millis: config.get_optional(KEY_MAX_TOTAL_ELAPSED_MILLIS)?,
128 max_total_elapsed_unlimited: config.get_optional(KEY_MAX_TOTAL_ELAPSED_UNLIMITED)?,
129 attempt_timeout_millis: config.get_optional(KEY_ATTEMPT_TIMEOUT_MILLIS)?,
130 attempt_timeout_policy: config.get_optional_string(KEY_ATTEMPT_TIMEOUT_POLICY)?,
131 worker_cancel_grace_millis: config.get_optional(KEY_WORKER_CANCEL_GRACE_MILLIS)?,
132 delay: config.get_optional_string(KEY_DELAY)?,
133 delay_strategy: config.get_optional_string(KEY_DELAY_STRATEGY)?,
134 fixed_delay_millis: config.get_optional(KEY_FIXED_DELAY_MILLIS)?,
135 random_min_delay_millis: config.get_optional(KEY_RANDOM_MIN_DELAY_MILLIS)?,
136 random_max_delay_millis: config.get_optional(KEY_RANDOM_MAX_DELAY_MILLIS)?,
137 exponential_initial_delay_millis: config
138 .get_optional(KEY_EXPONENTIAL_INITIAL_DELAY_MILLIS)?,
139 exponential_max_delay_millis: config.get_optional(KEY_EXPONENTIAL_MAX_DELAY_MILLIS)?,
140 exponential_multiplier: config.get_optional(KEY_EXPONENTIAL_MULTIPLIER)?,
141 jitter_factor: config.get_optional(KEY_JITTER_FACTOR)?,
142 })
143 }
144
145 pub fn to_options(&self, default: &RetryOptions) -> Result<RetryOptions, RetryConfigError> {
157 let max_attempts = self.max_attempts.unwrap_or(default.max_attempts());
158 let max_operation_elapsed = self.get_max_operation_elapsed(default);
159 let max_total_elapsed = self.get_max_total_elapsed(default);
160 let attempt_timeout = self.get_attempt_timeout(default)?;
161 let worker_cancel_grace = self.get_worker_cancel_grace(default);
162 let delay = self.get_delay(default)?;
163 let jitter = self.get_jitter(default);
164 let mut options = RetryOptions::new_with_attempt_timeout(
165 max_attempts,
166 max_operation_elapsed,
167 max_total_elapsed,
168 delay,
169 jitter,
170 attempt_timeout,
171 )?;
172 options.worker_cancel_grace = worker_cancel_grace;
173 options.validate()?;
174 Ok(options)
175 }
176
177 fn get_max_operation_elapsed(&self, default: &RetryOptions) -> Option<Duration> {
190 if self.max_operation_elapsed_unlimited.unwrap_or(false) {
191 return None;
192 }
193 match self.max_operation_elapsed_millis {
194 Some(millis) => Some(Duration::from_millis(millis)),
195 None => default.max_operation_elapsed(),
196 }
197 }
198
199 fn get_max_total_elapsed(&self, default: &RetryOptions) -> Option<Duration> {
212 if self.max_total_elapsed_unlimited.unwrap_or(false) {
213 return None;
214 }
215 match self.max_total_elapsed_millis {
216 Some(millis) => Some(Duration::from_millis(millis)),
217 None => default.max_total_elapsed(),
218 }
219 }
220
221 fn get_attempt_timeout(
234 &self,
235 default: &RetryOptions,
236 ) -> Result<Option<AttemptTimeoutOption>, RetryConfigError> {
237 let default_attempt_timeout = default.attempt_timeout();
238 let policy = self
239 .attempt_timeout_policy
240 .as_deref()
241 .map(parse_attempt_timeout_policy)
242 .transpose()?;
243
244 match self.attempt_timeout_millis {
245 Some(timeout_millis) => {
246 let policy = policy
247 .or_else(|| {
248 default_attempt_timeout.map(|attempt_timeout| attempt_timeout.policy())
249 })
250 .unwrap_or_default();
251 Ok(Some(AttemptTimeoutOption::new(
252 Duration::from_millis(timeout_millis),
253 policy,
254 )))
255 }
256 None => {
257 if let Some(policy) = policy {
258 let Some(default_attempt_timeout) = default_attempt_timeout else {
259 return Err(RetryConfigError::invalid_value(
260 KEY_ATTEMPT_TIMEOUT_POLICY,
261 "attempt_timeout_policy requires attempt_timeout_millis when the default has no attempt timeout",
262 ));
263 };
264 Ok(Some(default_attempt_timeout.with_policy(policy)))
265 } else {
266 Ok(default_attempt_timeout)
267 }
268 }
269 }
270 }
271
272 fn get_worker_cancel_grace(&self, default: &RetryOptions) -> Duration {
284 self.worker_cancel_grace_millis
285 .map(Duration::from_millis)
286 .unwrap_or_else(|| default.worker_cancel_grace())
287 }
288
289 fn get_delay(&self, default: &RetryOptions) -> Result<RetryDelay, RetryConfigError> {
302 let strategy = self
303 .delay
304 .as_deref()
305 .map(|value| (KEY_DELAY, value))
306 .or_else(|| {
307 self.delay_strategy
308 .as_deref()
309 .map(|value| (KEY_DELAY_STRATEGY, value))
310 })
311 .map(|(key, value)| (key, value.trim().to_ascii_lowercase()));
312 match strategy {
313 None => Ok(self
314 .get_implicit_delay()
315 .unwrap_or_else(|| default.delay().clone())),
316 Some((_, strategy)) if strategy == "none" => Ok(RetryDelay::None),
317 Some((_, strategy)) if strategy == "fixed" => {
318 let Some(fixed_delay_millis) = self.fixed_delay_millis else {
319 return Err(RetryConfigError::invalid_value(
320 KEY_FIXED_DELAY_MILLIS,
321 "fixed delay strategy requires fixed_delay_millis",
322 ));
323 };
324 Ok(RetryDelay::fixed(Duration::from_millis(fixed_delay_millis)))
325 }
326 Some((_, strategy)) if strategy == "random" => Ok(RetryDelay::random(
327 Duration::from_millis(self.random_min_delay_millis.ok_or_else(|| {
328 RetryConfigError::invalid_value(
329 KEY_RANDOM_MIN_DELAY_MILLIS,
330 "random delay strategy requires random_min_delay_millis",
331 )
332 })?),
333 Duration::from_millis(self.random_max_delay_millis.ok_or_else(|| {
334 RetryConfigError::invalid_value(
335 KEY_RANDOM_MAX_DELAY_MILLIS,
336 "random delay strategy requires random_max_delay_millis",
337 )
338 })?),
339 )),
340 Some((_, strategy))
341 if strategy == "exponential" || strategy == "exponential_backoff" =>
342 {
343 let initial_delay = self.exponential_initial_delay_millis.ok_or_else(|| {
344 RetryConfigError::invalid_value(
345 KEY_EXPONENTIAL_INITIAL_DELAY_MILLIS,
346 "exponential delay strategy requires exponential_initial_delay_millis",
347 )
348 })?;
349 let max_delay = self.exponential_max_delay_millis.ok_or_else(|| {
350 RetryConfigError::invalid_value(
351 KEY_EXPONENTIAL_MAX_DELAY_MILLIS,
352 "exponential delay strategy requires exponential_max_delay_millis",
353 )
354 })?;
355 let multiplier = self.exponential_multiplier.ok_or_else(|| {
356 RetryConfigError::invalid_value(
357 KEY_EXPONENTIAL_MULTIPLIER,
358 "exponential delay strategy requires exponential_multiplier",
359 )
360 })?;
361 Ok(RetryDelay::exponential(
362 Duration::from_millis(initial_delay),
363 Duration::from_millis(max_delay),
364 multiplier,
365 ))
366 }
367 Some((key, other)) => Err(RetryConfigError::invalid_value(
368 key,
369 format!("unsupported delay strategy '{other}'"),
370 )),
371 }
372 }
373
374 fn get_implicit_delay(&self) -> Option<RetryDelay> {
386 if let Some(millis) = self.fixed_delay_millis {
387 return Some(RetryDelay::fixed(Duration::from_millis(millis)));
388 }
389 if self.random_min_delay_millis.is_some() || self.random_max_delay_millis.is_some() {
390 return Some(RetryDelay::random(
391 Duration::from_millis(
392 self.random_min_delay_millis
393 .unwrap_or(DEFAULT_RETRY_RANDOM_MIN_DELAY_MILLIS),
394 ),
395 Duration::from_millis(
396 self.random_max_delay_millis
397 .unwrap_or(DEFAULT_RETRY_RANDOM_MAX_DELAY_MILLIS),
398 ),
399 ));
400 }
401 if self.exponential_initial_delay_millis.is_some()
402 || self.exponential_max_delay_millis.is_some()
403 || self.exponential_multiplier.is_some()
404 {
405 return Some(RetryDelay::exponential(
406 Duration::from_millis(
407 self.exponential_initial_delay_millis
408 .unwrap_or(DEFAULT_RETRY_EXPONENTIAL_INITIAL_DELAY_MILLIS),
409 ),
410 Duration::from_millis(
411 self.exponential_max_delay_millis
412 .unwrap_or(DEFAULT_RETRY_EXPONENTIAL_MAX_DELAY_MILLIS),
413 ),
414 self.exponential_multiplier
415 .unwrap_or(DEFAULT_RETRY_EXPONENTIAL_MULTIPLIER),
416 ));
417 }
418 None
419 }
420
421 fn get_jitter(&self, default: &RetryOptions) -> RetryJitter {
434 match self.jitter_factor {
435 Some(factor) if factor == DEFAULT_RETRY_JITTER_FACTOR => RetryJitter::None,
436 None => default.jitter(),
437 Some(factor) => RetryJitter::Factor(factor),
438 }
439 }
440}
441
442fn parse_attempt_timeout_policy(value: &str) -> Result<AttemptTimeoutPolicy, RetryConfigError> {
453 AttemptTimeoutPolicy::from_str(value)
454 .map_err(|message| RetryConfigError::invalid_value(KEY_ATTEMPT_TIMEOUT_POLICY, message))
455}