Skip to main content

qubit_retry/executor/
retry_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Retry builder.
11//!
12//! The builder collects retry options, attempt listeners, failure listeners, and
13//! terminal error listeners before producing a validated [`Retry`].
14
15use std::time::Duration;
16
17use qubit_error::BoxError;
18use qubit_function::{
19    BiConsumer,
20    BiFunction,
21    BiPredicate,
22    Consumer,
23};
24
25use crate::constants::KEY_MAX_ATTEMPTS;
26use crate::event::RetryListeners;
27use crate::{
28    AttemptFailure,
29    AttemptFailureDecision,
30    AttemptTimeoutOption,
31    AttemptTimeoutPolicy,
32    Retry,
33    RetryAfterHint,
34    RetryConfigError,
35    RetryContext,
36    RetryDelay,
37    RetryError,
38    RetryJitter,
39    RetryOptions,
40};
41
42/// Builder for [`Retry`].
43///
44/// The generic parameter `E` is the operation error type preserved inside
45/// [`AttemptFailure::Error`]. Failure listeners may observe failures, override
46/// the retry decision, or return [`AttemptFailureDecision::UseDefault`] to let
47/// the policy decide from configured limits and delay strategy.
48pub struct RetryBuilder<E = BoxError> {
49    /// Retry limits, delay strategy, jitter, and elapsed budgets.
50    options: RetryOptions,
51    /// Pending policy used when timeout duration is configured later.
52    pending_attempt_timeout_policy: AttemptTimeoutPolicy,
53    /// Optional retry-after hint extractor.
54    retry_after_hint: Option<RetryAfterHint<E>>,
55    /// Lifecycle listeners registered on the builder.
56    listeners: RetryListeners<E>,
57    /// Whether listener panics should be isolated.
58    isolate_listener_panics: bool,
59    /// Stored validation error when max attempts is configured as zero.
60    max_attempts_error: Option<RetryConfigError>,
61}
62
63impl<E> RetryBuilder<E> {
64    /// Creates a builder with default options and no listeners.
65    ///
66    /// # Returns
67    /// A retry builder using [`RetryOptions::default`].
68    #[inline]
69    pub fn new() -> Self {
70        Self {
71            options: RetryOptions::default(),
72            pending_attempt_timeout_policy: AttemptTimeoutPolicy::default(),
73            retry_after_hint: None,
74            listeners: RetryListeners::default(),
75            isolate_listener_panics: false,
76            max_attempts_error: None,
77        }
78    }
79
80    /// Replaces all retry options.
81    ///
82    /// # Parameters
83    /// - `options`: Retry option snapshot.
84    ///
85    /// # Returns
86    /// The updated builder.
87    #[inline]
88    pub fn options(mut self, options: RetryOptions) -> Self {
89        self.pending_attempt_timeout_policy = options
90            .attempt_timeout()
91            .map(|attempt_timeout| attempt_timeout.policy())
92            .unwrap_or_default();
93        self.options = options;
94        self.max_attempts_error = None;
95        self
96    }
97
98    /// Sets the maximum total attempts, including the initial attempt.
99    ///
100    /// # Parameters
101    /// - `max_attempts`: Maximum attempts. Zero is recorded as a build error.
102    ///
103    /// # Returns
104    /// The updated builder.
105    pub fn max_attempts(mut self, max_attempts: u32) -> Self {
106        if let Some(max_attempts) = std::num::NonZeroU32::new(max_attempts) {
107            self.options.max_attempts = max_attempts;
108            self.max_attempts_error = None;
109        } else {
110            self.max_attempts_error = Some(RetryConfigError::invalid_value(
111                KEY_MAX_ATTEMPTS,
112                "max_attempts must be greater than zero",
113            ));
114        }
115        self
116    }
117
118    /// Sets the maximum retry count after the initial attempt.
119    ///
120    /// # Parameters
121    /// - `max_retries`: Number of retries after the first attempt.
122    ///
123    /// # Returns
124    /// The updated builder.
125    #[inline]
126    pub fn max_retries(self, max_retries: u32) -> Self {
127        self.max_attempts(max_retries.saturating_add(1))
128    }
129
130    /// Sets the maximum cumulative user operation time.
131    ///
132    /// # Parameters
133    /// - `max_operation_elapsed`: Optional cumulative user operation time budget.
134    ///
135    /// # Returns
136    /// The updated builder.
137    #[inline]
138    pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
139        self.options.max_operation_elapsed = max_operation_elapsed;
140        self
141    }
142
143    /// Sets the maximum total monotonic retry-flow elapsed time.
144    ///
145    /// # Parameters
146    /// - `max_total_elapsed`: Optional total retry-flow time budget. Operation
147    ///   execution, retry sleeps, retry-after sleeps, and retry control-path
148    ///   listener time are included.
149    ///
150    /// # Returns
151    /// The updated builder.
152    #[inline]
153    pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
154        self.options.max_total_elapsed = max_total_elapsed;
155        self
156    }
157
158    /// Sets the retry delay strategy.
159    ///
160    /// # Parameters
161    /// - `delay`: Base delay strategy used between attempts.
162    ///
163    /// # Returns
164    /// The updated builder.
165    #[inline]
166    pub fn delay(mut self, delay: RetryDelay) -> Self {
167        self.options.delay = delay;
168        self
169    }
170
171    /// Configures immediate retries with no sleep.
172    ///
173    /// # Returns
174    /// The updated builder.
175    #[inline]
176    pub fn no_delay(self) -> Self {
177        self.delay(RetryDelay::none())
178    }
179
180    /// Configures a fixed retry delay.
181    ///
182    /// # Parameters
183    /// - `delay`: Delay slept before each retry.
184    ///
185    /// # Returns
186    /// The updated builder.
187    #[inline]
188    pub fn fixed_delay(self, delay: Duration) -> Self {
189        self.delay(RetryDelay::fixed(delay))
190    }
191
192    /// Configures a random retry delay range.
193    ///
194    /// # Parameters
195    /// - `min`: Inclusive lower delay bound.
196    /// - `max`: Inclusive upper delay bound.
197    ///
198    /// # Returns
199    /// The updated builder.
200    #[inline]
201    pub fn random_delay(self, min: Duration, max: Duration) -> Self {
202        self.delay(RetryDelay::random(min, max))
203    }
204
205    /// Configures exponential backoff with the default multiplier `2.0`.
206    ///
207    /// # Parameters
208    /// - `initial`: First retry delay.
209    /// - `max`: Maximum retry delay.
210    ///
211    /// # Returns
212    /// The updated builder.
213    #[inline]
214    pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
215        self.exponential_backoff_with_multiplier(initial, max, 2.0)
216    }
217
218    /// Configures exponential backoff with a custom multiplier.
219    ///
220    /// # Parameters
221    /// - `initial`: First retry delay.
222    /// - `max`: Maximum retry delay.
223    /// - `multiplier`: Multiplier applied after each failed attempt.
224    ///
225    /// # Returns
226    /// The updated builder.
227    #[inline]
228    pub fn exponential_backoff_with_multiplier(
229        self,
230        initial: Duration,
231        max: Duration,
232        multiplier: f64,
233    ) -> Self {
234        self.delay(RetryDelay::exponential(initial, max, multiplier))
235    }
236
237    /// Sets the jitter strategy.
238    ///
239    /// # Parameters
240    /// - `jitter`: Jitter strategy applied to base delays.
241    ///
242    /// # Returns
243    /// The updated builder.
244    #[inline]
245    pub fn jitter(mut self, jitter: RetryJitter) -> Self {
246        self.options.jitter = jitter;
247        self
248    }
249
250    /// Sets relative jitter by factor.
251    ///
252    /// # Parameters
253    /// - `factor`: Relative jitter factor in `[0.0, 1.0]`.
254    ///
255    /// # Returns
256    /// The updated builder.
257    #[inline]
258    pub fn jitter_factor(self, factor: f64) -> Self {
259        self.jitter(RetryJitter::factor(factor))
260    }
261
262    /// Sets a per-attempt timeout.
263    ///
264    /// # Parameters
265    /// - `attempt_timeout`: Timeout applied by `run_async`, `run_in_worker`,
266    ///   and `run_blocking_with_timeout`. `None` disables per-attempt timeout.
267    ///
268    /// # Returns
269    /// The updated builder.
270    #[inline]
271    pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
272        if let Some(timeout) = attempt_timeout {
273            self.options.attempt_timeout = Some(AttemptTimeoutOption::new(
274                timeout,
275                self.pending_attempt_timeout_policy,
276            ));
277        } else {
278            self.pending_attempt_timeout_policy = AttemptTimeoutPolicy::default();
279            self.options.attempt_timeout = None;
280        }
281        self
282    }
283
284    /// Sets the complete per-attempt timeout option.
285    ///
286    /// # Parameters
287    /// - `attempt_timeout`: Timeout option. `None` disables per-attempt timeout.
288    ///
289    /// # Returns
290    /// The updated builder.
291    #[inline]
292    pub fn attempt_timeout_option(mut self, attempt_timeout: Option<AttemptTimeoutOption>) -> Self {
293        if let Some(attempt_timeout) = attempt_timeout {
294            self.pending_attempt_timeout_policy = attempt_timeout.policy();
295        } else {
296            self.pending_attempt_timeout_policy = AttemptTimeoutPolicy::default();
297        }
298        self.options.attempt_timeout = attempt_timeout;
299        self
300    }
301
302    /// Sets the policy used when an attempt times out.
303    ///
304    /// If a timeout duration is already configured, this updates the complete
305    /// timeout option. Otherwise the policy is kept and applied when
306    /// [`RetryBuilder::attempt_timeout`] is called later.
307    ///
308    /// # Parameters
309    /// - `policy`: Timeout policy to use.
310    ///
311    /// # Returns
312    /// The updated builder.
313    #[inline]
314    pub fn attempt_timeout_policy(mut self, policy: AttemptTimeoutPolicy) -> Self {
315        self.pending_attempt_timeout_policy = policy;
316        self.options.attempt_timeout = self
317            .options
318            .attempt_timeout
319            .map(|attempt_timeout| attempt_timeout.with_policy(policy));
320        self
321    }
322
323    /// Sets how long worker-thread execution waits after cancelling a timed-out worker.
324    ///
325    /// # Parameters
326    /// - `grace`: Duration to wait after the attempt timeout fires and the
327    ///   cooperative cancellation token is marked as cancelled. Use zero to skip
328    ///   the grace wait.
329    ///
330    /// # Returns
331    /// The updated builder.
332    #[inline]
333    pub fn worker_cancel_grace(mut self, grace: Duration) -> Self {
334        self.options.worker_cancel_grace = grace;
335        self
336    }
337
338    /// Extracts an optional retry-after hint from each failure.
339    ///
340    /// # Parameters
341    /// - `hint`: Function that inspects a failure and context before failure
342    ///   listeners run.
343    ///
344    /// # Returns
345    /// The updated builder.
346    pub fn retry_after_hint<H>(mut self, hint: H) -> Self
347    where
348        H: BiFunction<AttemptFailure<E>, RetryContext, Option<Duration>> + Send + Sync + 'static,
349    {
350        self.retry_after_hint = Some(hint.into_arc());
351        self
352    }
353
354    /// Extracts an optional retry-after hint from operation errors.
355    ///
356    /// # Parameters
357    /// - `hint`: Function returning a delay hint for application errors.
358    ///
359    /// # Returns
360    /// The updated builder.
361    pub fn retry_after_from_error<H>(self, hint: H) -> Self
362    where
363        H: Fn(&E) -> Option<Duration> + Send + Sync + 'static,
364    {
365        self.retry_after_hint(
366            move |failure: &AttemptFailure<E>, _context: &RetryContext| {
367                failure.as_error().and_then(&hint)
368            },
369        )
370    }
371
372    /// Registers a listener invoked before every attempt.
373    ///
374    /// # Parameters
375    /// - `listener`: Listener receiving the retry context.
376    ///
377    /// # Returns
378    /// The updated builder.
379    pub fn before_attempt<C>(mut self, listener: C) -> Self
380    where
381        C: Consumer<RetryContext> + Send + Sync + 'static,
382    {
383        self.listeners.before_attempt.push(listener.into_arc());
384        self
385    }
386
387    /// Registers a listener invoked when an attempt succeeds.
388    ///
389    /// # Parameters
390    /// - `listener`: Listener receiving the success context.
391    ///
392    /// # Returns
393    /// The updated builder.
394    pub fn on_success<C>(mut self, listener: C) -> Self
395    where
396        C: Consumer<RetryContext> + Send + Sync + 'static,
397    {
398        self.listeners.attempt_success.push(listener.into_arc());
399        self
400    }
401
402    /// Registers a listener invoked after each attempt failure.
403    ///
404    /// # Parameters
405    /// - `listener`: Listener returning a retry failure decision.
406    ///
407    /// # Returns
408    /// The updated builder.
409    pub fn on_failure<F>(mut self, listener: F) -> Self
410    where
411        F: BiFunction<AttemptFailure<E>, RetryContext, AttemptFailureDecision>
412            + Send
413            + Sync
414            + 'static,
415    {
416        self.listeners.failure.push(listener.into_arc());
417        self
418    }
419
420    /// Registers a listener invoked after a retry delay has been selected.
421    ///
422    /// The listener receives the failed attempt and a context whose
423    /// [`RetryContext::next_delay`] contains the delay that will be slept before
424    /// the next attempt. The listener is observational and cannot change the
425    /// retry decision.
426    ///
427    /// # Parameters
428    /// - `listener`: Listener receiving the failure and scheduled-retry context.
429    ///
430    /// # Returns
431    /// The updated builder.
432    pub fn on_retry<C>(mut self, listener: C) -> Self
433    where
434        C: BiConsumer<AttemptFailure<E>, RetryContext> + Send + Sync + 'static,
435    {
436        self.listeners.retry_scheduled.push(listener.into_arc());
437        self
438    }
439
440    /// Registers an error-only predicate where `true` means retry.
441    ///
442    /// # Parameters
443    /// - `predicate`: Predicate applied only to [`AttemptFailure::Error`].
444    ///
445    /// # Returns
446    /// The updated builder.
447    pub fn retry_if_error<P>(self, predicate: P) -> Self
448    where
449        P: BiPredicate<E, RetryContext> + Send + Sync + 'static,
450    {
451        self.on_failure(
452            move |failure: &AttemptFailure<E>, context: &RetryContext| match failure {
453                AttemptFailure::Error(error) => {
454                    if predicate.test(error, context) {
455                        AttemptFailureDecision::Retry
456                    } else {
457                        AttemptFailureDecision::Abort
458                    }
459                }
460                AttemptFailure::Timeout
461                | AttemptFailure::Panic(_)
462                | AttemptFailure::Executor(_) => AttemptFailureDecision::UseDefault,
463            },
464        )
465    }
466
467    /// Registers a listener invoked when the retry flow returns [`RetryError`].
468    ///
469    /// # Parameters
470    /// - `listener`: Observational listener that cannot resume the retry flow.
471    ///
472    /// # Returns
473    /// The updated builder.
474    pub fn on_error<C>(mut self, listener: C) -> Self
475    where
476        C: BiConsumer<RetryError<E>, RetryContext> + Send + Sync + 'static,
477    {
478        self.listeners.error.push(listener.into_arc());
479        self
480    }
481
482    /// Aborts the retry flow when a configured per-attempt timeout expires.
483    ///
484    /// Max-elapsed effective timeouts are not controlled by this policy and stop
485    /// with [`crate::RetryErrorReason::MaxOperationElapsedExceeded`].
486    ///
487    /// # Returns
488    /// The updated builder.
489    pub fn abort_on_timeout(self) -> Self {
490        self.attempt_timeout_policy(AttemptTimeoutPolicy::Abort)
491    }
492
493    /// Retries configured per-attempt timeouts while limits allow it.
494    ///
495    /// Max-elapsed effective timeouts are not controlled by this policy and stop
496    /// with [`crate::RetryErrorReason::MaxOperationElapsedExceeded`].
497    ///
498    /// # Returns
499    /// The updated builder.
500    pub fn retry_on_timeout(self) -> Self {
501        self.attempt_timeout_policy(AttemptTimeoutPolicy::Retry)
502    }
503
504    /// Enables panic isolation for all registered listeners.
505    ///
506    /// # Returns
507    /// The updated builder.
508    #[inline]
509    pub fn isolate_listener_panics(mut self) -> Self {
510        self.isolate_listener_panics = true;
511        self
512    }
513
514    /// Builds and validates the retry policy.
515    ///
516    /// # Returns
517    /// A validated [`Retry`].
518    ///
519    /// # Errors
520    /// Returns [`RetryConfigError`] when options are invalid.
521    pub fn build(self) -> Result<Retry<E>, RetryConfigError> {
522        if let Some(error) = self.max_attempts_error {
523            return Err(error);
524        }
525        self.options.validate()?;
526        Ok(Retry::new(
527            self.options,
528            self.retry_after_hint,
529            self.isolate_listener_panics,
530            self.listeners,
531        ))
532    }
533}
534
535impl<E> Default for RetryBuilder<E> {
536    /// Creates a default retry builder.
537    ///
538    /// # Returns
539    /// A builder equivalent to [`RetryBuilder::new`].
540    #[inline]
541    fn default() -> Self {
542        Self::new()
543    }
544}