qubit_retry/executor/retry_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Retry builder.
11//!
12//! The builder collects retry options, attempt listeners, failure listeners, and
13//! terminal error listeners before producing a validated [`Retry`].
14
15use std::time::Duration;
16
17use qubit_error::BoxError;
18use qubit_function::{
19 BiConsumer,
20 BiFunction,
21 BiPredicate,
22 Consumer,
23};
24
25use crate::constants::KEY_MAX_ATTEMPTS;
26use crate::event::RetryListeners;
27use crate::{
28 AttemptFailure,
29 AttemptFailureDecision,
30 AttemptTimeoutOption,
31 AttemptTimeoutPolicy,
32 Retry,
33 RetryAfterHint,
34 RetryConfigError,
35 RetryContext,
36 RetryDelay,
37 RetryError,
38 RetryJitter,
39 RetryOptions,
40};
41
42/// Builder for [`Retry`].
43///
44/// The generic parameter `E` is the operation error type preserved inside
45/// [`AttemptFailure::Error`]. Failure listeners may observe failures, override
46/// the retry decision, or return [`AttemptFailureDecision::UseDefault`] to let
47/// the policy decide from configured limits and delay strategy.
48pub struct RetryBuilder<E = BoxError> {
49 /// Retry limits, delay strategy, jitter, and elapsed budgets.
50 options: RetryOptions,
51 /// Pending policy used when timeout duration is configured later.
52 pending_attempt_timeout_policy: AttemptTimeoutPolicy,
53 /// Optional retry-after hint extractor.
54 retry_after_hint: Option<RetryAfterHint<E>>,
55 /// Lifecycle listeners registered on the builder.
56 listeners: RetryListeners<E>,
57 /// Whether listener panics should be isolated.
58 isolate_listener_panics: bool,
59 /// Stored validation error when max attempts is configured as zero.
60 max_attempts_error: Option<RetryConfigError>,
61}
62
63impl<E> RetryBuilder<E> {
64 /// Creates a builder with default options and no listeners.
65 ///
66 /// # Returns
67 /// A retry builder using [`RetryOptions::default`].
68 #[inline]
69 pub fn new() -> Self {
70 Self {
71 options: RetryOptions::default(),
72 pending_attempt_timeout_policy: AttemptTimeoutPolicy::default(),
73 retry_after_hint: None,
74 listeners: RetryListeners::default(),
75 isolate_listener_panics: false,
76 max_attempts_error: None,
77 }
78 }
79
80 /// Replaces all retry options.
81 ///
82 /// # Parameters
83 /// - `options`: Retry option snapshot.
84 ///
85 /// # Returns
86 /// The updated builder.
87 #[inline]
88 pub fn options(mut self, options: RetryOptions) -> Self {
89 self.pending_attempt_timeout_policy = options
90 .attempt_timeout()
91 .map(|attempt_timeout| attempt_timeout.policy())
92 .unwrap_or_default();
93 self.options = options;
94 self.max_attempts_error = None;
95 self
96 }
97
98 /// Sets the maximum total attempts, including the initial attempt.
99 ///
100 /// # Parameters
101 /// - `max_attempts`: Maximum attempts. Zero is recorded as a build error.
102 ///
103 /// # Returns
104 /// The updated builder.
105 pub fn max_attempts(mut self, max_attempts: u32) -> Self {
106 if let Some(max_attempts) = std::num::NonZeroU32::new(max_attempts) {
107 self.options.max_attempts = max_attempts;
108 self.max_attempts_error = None;
109 } else {
110 self.max_attempts_error = Some(RetryConfigError::invalid_value(
111 KEY_MAX_ATTEMPTS,
112 "max_attempts must be greater than zero",
113 ));
114 }
115 self
116 }
117
118 /// Sets the maximum retry count after the initial attempt.
119 ///
120 /// # Parameters
121 /// - `max_retries`: Number of retries after the first attempt.
122 ///
123 /// # Returns
124 /// The updated builder.
125 #[inline]
126 pub fn max_retries(self, max_retries: u32) -> Self {
127 self.max_attempts(max_retries.saturating_add(1))
128 }
129
130 /// Sets the maximum cumulative user operation time.
131 ///
132 /// # Parameters
133 /// - `max_operation_elapsed`: Optional cumulative user operation time budget.
134 ///
135 /// # Returns
136 /// The updated builder.
137 #[inline]
138 pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
139 self.options.max_operation_elapsed = max_operation_elapsed;
140 self
141 }
142
143 /// Sets the maximum total monotonic retry-flow elapsed time.
144 ///
145 /// # Parameters
146 /// - `max_total_elapsed`: Optional total retry-flow time budget. Operation
147 /// execution, retry sleeps, retry-after sleeps, and retry control-path
148 /// listener time are included.
149 ///
150 /// # Returns
151 /// The updated builder.
152 #[inline]
153 pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
154 self.options.max_total_elapsed = max_total_elapsed;
155 self
156 }
157
158 /// Sets the retry delay strategy.
159 ///
160 /// # Parameters
161 /// - `delay`: Base delay strategy used between attempts.
162 ///
163 /// # Returns
164 /// The updated builder.
165 #[inline]
166 pub fn delay(mut self, delay: RetryDelay) -> Self {
167 self.options.delay = delay;
168 self
169 }
170
171 /// Configures immediate retries with no sleep.
172 ///
173 /// # Returns
174 /// The updated builder.
175 #[inline]
176 pub fn no_delay(self) -> Self {
177 self.delay(RetryDelay::none())
178 }
179
180 /// Configures a fixed retry delay.
181 ///
182 /// # Parameters
183 /// - `delay`: Delay slept before each retry.
184 ///
185 /// # Returns
186 /// The updated builder.
187 #[inline]
188 pub fn fixed_delay(self, delay: Duration) -> Self {
189 self.delay(RetryDelay::fixed(delay))
190 }
191
192 /// Configures a random retry delay range.
193 ///
194 /// # Parameters
195 /// - `min`: Inclusive lower delay bound.
196 /// - `max`: Inclusive upper delay bound.
197 ///
198 /// # Returns
199 /// The updated builder.
200 #[inline]
201 pub fn random_delay(self, min: Duration, max: Duration) -> Self {
202 self.delay(RetryDelay::random(min, max))
203 }
204
205 /// Configures exponential backoff with the default multiplier `2.0`.
206 ///
207 /// # Parameters
208 /// - `initial`: First retry delay.
209 /// - `max`: Maximum retry delay.
210 ///
211 /// # Returns
212 /// The updated builder.
213 #[inline]
214 pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
215 self.exponential_backoff_with_multiplier(initial, max, 2.0)
216 }
217
218 /// Configures exponential backoff with a custom multiplier.
219 ///
220 /// # Parameters
221 /// - `initial`: First retry delay.
222 /// - `max`: Maximum retry delay.
223 /// - `multiplier`: Multiplier applied after each failed attempt.
224 ///
225 /// # Returns
226 /// The updated builder.
227 #[inline]
228 pub fn exponential_backoff_with_multiplier(
229 self,
230 initial: Duration,
231 max: Duration,
232 multiplier: f64,
233 ) -> Self {
234 self.delay(RetryDelay::exponential(initial, max, multiplier))
235 }
236
237 /// Sets the jitter strategy.
238 ///
239 /// # Parameters
240 /// - `jitter`: Jitter strategy applied to base delays.
241 ///
242 /// # Returns
243 /// The updated builder.
244 #[inline]
245 pub fn jitter(mut self, jitter: RetryJitter) -> Self {
246 self.options.jitter = jitter;
247 self
248 }
249
250 /// Sets relative jitter by factor.
251 ///
252 /// # Parameters
253 /// - `factor`: Relative jitter factor in `[0.0, 1.0]`.
254 ///
255 /// # Returns
256 /// The updated builder.
257 #[inline]
258 pub fn jitter_factor(self, factor: f64) -> Self {
259 self.jitter(RetryJitter::factor(factor))
260 }
261
262 /// Sets a per-attempt timeout.
263 ///
264 /// # Parameters
265 /// - `attempt_timeout`: Timeout applied by `run_async`, `run_in_worker`,
266 /// and `run_blocking_with_timeout`. `None` disables per-attempt timeout.
267 ///
268 /// # Returns
269 /// The updated builder.
270 #[inline]
271 pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
272 if let Some(timeout) = attempt_timeout {
273 self.options.attempt_timeout = Some(AttemptTimeoutOption::new(
274 timeout,
275 self.pending_attempt_timeout_policy,
276 ));
277 } else {
278 self.pending_attempt_timeout_policy = AttemptTimeoutPolicy::default();
279 self.options.attempt_timeout = None;
280 }
281 self
282 }
283
284 /// Sets the complete per-attempt timeout option.
285 ///
286 /// # Parameters
287 /// - `attempt_timeout`: Timeout option. `None` disables per-attempt timeout.
288 ///
289 /// # Returns
290 /// The updated builder.
291 #[inline]
292 pub fn attempt_timeout_option(mut self, attempt_timeout: Option<AttemptTimeoutOption>) -> Self {
293 if let Some(attempt_timeout) = attempt_timeout {
294 self.pending_attempt_timeout_policy = attempt_timeout.policy();
295 } else {
296 self.pending_attempt_timeout_policy = AttemptTimeoutPolicy::default();
297 }
298 self.options.attempt_timeout = attempt_timeout;
299 self
300 }
301
302 /// Sets the policy used when an attempt times out.
303 ///
304 /// If a timeout duration is already configured, this updates the complete
305 /// timeout option. Otherwise the policy is kept and applied when
306 /// [`RetryBuilder::attempt_timeout`] is called later.
307 ///
308 /// # Parameters
309 /// - `policy`: Timeout policy to use.
310 ///
311 /// # Returns
312 /// The updated builder.
313 #[inline]
314 pub fn attempt_timeout_policy(mut self, policy: AttemptTimeoutPolicy) -> Self {
315 self.pending_attempt_timeout_policy = policy;
316 self.options.attempt_timeout = self
317 .options
318 .attempt_timeout
319 .map(|attempt_timeout| attempt_timeout.with_policy(policy));
320 self
321 }
322
323 /// Sets how long worker-thread execution waits after cancelling a timed-out worker.
324 ///
325 /// # Parameters
326 /// - `grace`: Duration to wait after the attempt timeout fires and the
327 /// cooperative cancellation token is marked as cancelled. Use zero to skip
328 /// the grace wait.
329 ///
330 /// # Returns
331 /// The updated builder.
332 #[inline]
333 pub fn worker_cancel_grace(mut self, grace: Duration) -> Self {
334 self.options.worker_cancel_grace = grace;
335 self
336 }
337
338 /// Extracts an optional retry-after hint from each failure.
339 ///
340 /// # Parameters
341 /// - `hint`: Function that inspects a failure and context before failure
342 /// listeners run.
343 ///
344 /// # Returns
345 /// The updated builder.
346 pub fn retry_after_hint<H>(mut self, hint: H) -> Self
347 where
348 H: BiFunction<AttemptFailure<E>, RetryContext, Option<Duration>> + Send + Sync + 'static,
349 {
350 self.retry_after_hint = Some(hint.into_arc());
351 self
352 }
353
354 /// Extracts an optional retry-after hint from operation errors.
355 ///
356 /// # Parameters
357 /// - `hint`: Function returning a delay hint for application errors.
358 ///
359 /// # Returns
360 /// The updated builder.
361 pub fn retry_after_from_error<H>(self, hint: H) -> Self
362 where
363 H: Fn(&E) -> Option<Duration> + Send + Sync + 'static,
364 {
365 self.retry_after_hint(
366 move |failure: &AttemptFailure<E>, _context: &RetryContext| {
367 failure.as_error().and_then(&hint)
368 },
369 )
370 }
371
372 /// Registers a listener invoked before every attempt.
373 ///
374 /// # Parameters
375 /// - `listener`: Listener receiving the retry context.
376 ///
377 /// # Returns
378 /// The updated builder.
379 pub fn before_attempt<C>(mut self, listener: C) -> Self
380 where
381 C: Consumer<RetryContext> + Send + Sync + 'static,
382 {
383 self.listeners.before_attempt.push(listener.into_arc());
384 self
385 }
386
387 /// Registers a listener invoked when an attempt succeeds.
388 ///
389 /// # Parameters
390 /// - `listener`: Listener receiving the success context.
391 ///
392 /// # Returns
393 /// The updated builder.
394 pub fn on_success<C>(mut self, listener: C) -> Self
395 where
396 C: Consumer<RetryContext> + Send + Sync + 'static,
397 {
398 self.listeners.attempt_success.push(listener.into_arc());
399 self
400 }
401
402 /// Registers a listener invoked after each attempt failure.
403 ///
404 /// # Parameters
405 /// - `listener`: Listener returning a retry failure decision.
406 ///
407 /// # Returns
408 /// The updated builder.
409 pub fn on_failure<F>(mut self, listener: F) -> Self
410 where
411 F: BiFunction<AttemptFailure<E>, RetryContext, AttemptFailureDecision>
412 + Send
413 + Sync
414 + 'static,
415 {
416 self.listeners.failure.push(listener.into_arc());
417 self
418 }
419
420 /// Registers a listener invoked after a retry delay has been selected.
421 ///
422 /// The listener receives the failed attempt and a context whose
423 /// [`RetryContext::next_delay`] contains the delay that will be slept before
424 /// the next attempt. The listener is observational and cannot change the
425 /// retry decision.
426 ///
427 /// # Parameters
428 /// - `listener`: Listener receiving the failure and scheduled-retry context.
429 ///
430 /// # Returns
431 /// The updated builder.
432 pub fn on_retry<C>(mut self, listener: C) -> Self
433 where
434 C: BiConsumer<AttemptFailure<E>, RetryContext> + Send + Sync + 'static,
435 {
436 self.listeners.retry_scheduled.push(listener.into_arc());
437 self
438 }
439
440 /// Registers an error-only predicate where `true` means retry.
441 ///
442 /// # Parameters
443 /// - `predicate`: Predicate applied only to [`AttemptFailure::Error`].
444 ///
445 /// # Returns
446 /// The updated builder.
447 pub fn retry_if_error<P>(self, predicate: P) -> Self
448 where
449 P: BiPredicate<E, RetryContext> + Send + Sync + 'static,
450 {
451 self.on_failure(
452 move |failure: &AttemptFailure<E>, context: &RetryContext| match failure {
453 AttemptFailure::Error(error) => {
454 if predicate.test(error, context) {
455 AttemptFailureDecision::Retry
456 } else {
457 AttemptFailureDecision::Abort
458 }
459 }
460 AttemptFailure::Timeout
461 | AttemptFailure::Panic(_)
462 | AttemptFailure::Executor(_) => AttemptFailureDecision::UseDefault,
463 },
464 )
465 }
466
467 /// Registers a listener invoked when the retry flow returns [`RetryError`].
468 ///
469 /// # Parameters
470 /// - `listener`: Observational listener that cannot resume the retry flow.
471 ///
472 /// # Returns
473 /// The updated builder.
474 pub fn on_error<C>(mut self, listener: C) -> Self
475 where
476 C: BiConsumer<RetryError<E>, RetryContext> + Send + Sync + 'static,
477 {
478 self.listeners.error.push(listener.into_arc());
479 self
480 }
481
482 /// Aborts the retry flow when a configured per-attempt timeout expires.
483 ///
484 /// Max-elapsed effective timeouts are not controlled by this policy and stop
485 /// with [`crate::RetryErrorReason::MaxOperationElapsedExceeded`].
486 ///
487 /// # Returns
488 /// The updated builder.
489 pub fn abort_on_timeout(self) -> Self {
490 self.attempt_timeout_policy(AttemptTimeoutPolicy::Abort)
491 }
492
493 /// Retries configured per-attempt timeouts while limits allow it.
494 ///
495 /// Max-elapsed effective timeouts are not controlled by this policy and stop
496 /// with [`crate::RetryErrorReason::MaxOperationElapsedExceeded`].
497 ///
498 /// # Returns
499 /// The updated builder.
500 pub fn retry_on_timeout(self) -> Self {
501 self.attempt_timeout_policy(AttemptTimeoutPolicy::Retry)
502 }
503
504 /// Enables panic isolation for all registered listeners.
505 ///
506 /// # Returns
507 /// The updated builder.
508 #[inline]
509 pub fn isolate_listener_panics(mut self) -> Self {
510 self.isolate_listener_panics = true;
511 self
512 }
513
514 /// Builds and validates the retry policy.
515 ///
516 /// # Returns
517 /// A validated [`Retry`].
518 ///
519 /// # Errors
520 /// Returns [`RetryConfigError`] when options are invalid.
521 pub fn build(self) -> Result<Retry<E>, RetryConfigError> {
522 if let Some(error) = self.max_attempts_error {
523 return Err(error);
524 }
525 self.options.validate()?;
526 Ok(Retry::new(
527 self.options,
528 self.retry_after_hint,
529 self.isolate_listener_panics,
530 self.listeners,
531 ))
532 }
533}
534
535impl<E> Default for RetryBuilder<E> {
536 /// Creates a default retry builder.
537 ///
538 /// # Returns
539 /// A builder equivalent to [`RetryBuilder::new`].
540 #[inline]
541 fn default() -> Self {
542 Self::new()
543 }
544}