qubit_retry/executor/retry.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Retry execution.
11//!
12//! A [`Retry`] owns validated retry options and lifecycle listeners. The
13//! operation success type is introduced by each `run` call, while the error type
14//! is bound by the retry policy.
15
16use qubit_error::BoxError;
17use qubit_function::{
18 BiConsumer,
19 BiFunction,
20 Consumer,
21};
22use std::fmt;
23#[cfg(feature = "tokio")]
24use std::future::Future;
25use std::panic;
26use std::sync::Arc;
27use std::sync::mpsc;
28use std::thread::JoinHandle;
29use std::time::{
30 Duration,
31 Instant,
32};
33
34#[cfg(feature = "tokio")]
35use super::async_attempt::AsyncAttempt;
36#[cfg(feature = "tokio")]
37use super::async_value_operation::AsyncValueOperation;
38use super::attempt_cancel_token::AttemptCancelToken;
39use super::blocking_attempt_message::BlockingAttemptMessage;
40use super::retry_flow_action::RetryFlowAction;
41use super::sync_attempt::SyncAttempt;
42use super::sync_value_operation::SyncValueOperation;
43use crate::event::{
44 RetryContextParts,
45 RetryListeners,
46};
47use crate::{
48 AttemptExecutorError,
49 AttemptFailure,
50 AttemptFailureDecision,
51 AttemptPanic,
52 AttemptTimeoutPolicy,
53 AttemptTimeoutSource,
54 RetryAfterHint,
55 RetryBuilder,
56 RetryConfigError,
57 RetryContext,
58 RetryError,
59 RetryErrorReason,
60 RetryOptions,
61};
62
63const WORKER_DISCONNECTED_MESSAGE: &str = "retry worker thread stopped without sending a result";
64const WORKER_SPAWN_FAILED_MESSAGE: &str = "failed to spawn retry worker thread";
65
66/// Builds an executor attempt failure from a static message.
67macro_rules! exec_fail {
68 ($message:expr) => {
69 AttemptFailure::Executor(AttemptExecutorError::new($message))
70 };
71}
72
73/// Retry policy and executor bound to an operation error type.
74///
75/// The generic parameter `E` is the caller's operation error type. Cloning a
76/// retry policy shares all registered functors through reference-counted
77/// `rs-function` wrappers.
78#[derive(Clone)]
79pub struct Retry<E = BoxError> {
80 /// Validated retry limits and backoff settings.
81 options: RetryOptions,
82 /// Optional retry-after hint extractor.
83 retry_after_hint: Option<RetryAfterHint<E>>,
84 /// Whether listener panics should be isolated.
85 isolate_listener_panics: bool,
86 /// Lifecycle listeners.
87 listeners: RetryListeners<E>,
88}
89
90/// Effective timeout selected for a single attempt.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92struct EffectiveAttemptTimeout {
93 /// Timeout duration actually enforced for the attempt.
94 duration: Option<Duration>,
95 /// Source that selected the effective timeout.
96 source: Option<AttemptTimeoutSource>,
97}
98
99impl EffectiveAttemptTimeout {
100 /// Creates an effective attempt timeout.
101 ///
102 /// # Parameters
103 /// - `duration`: Timeout duration enforced for the attempt.
104 /// - `source`: Source that selected the timeout.
105 ///
106 /// # Returns
107 /// A timeout descriptor for one attempt.
108 #[inline]
109 fn new(duration: Option<Duration>, source: Option<AttemptTimeoutSource>) -> Self {
110 Self { duration, source }
111 }
112
113 /// Returns the elapsed-budget reason represented by a timeout failure.
114 ///
115 /// # Parameters
116 /// - `failure`: Failure produced by the attempt.
117 ///
118 /// # Returns
119 /// `Some(RetryErrorReason)` when the attempt timed out because an elapsed
120 /// budget selected the effective timeout.
121 #[inline]
122 fn elapsed_timeout_reason<E>(&self, failure: &AttemptFailure<E>) -> Option<RetryErrorReason> {
123 if !matches!(failure, AttemptFailure::Timeout) {
124 return None;
125 }
126 match self.source {
127 Some(AttemptTimeoutSource::MaxOperationElapsed) => {
128 Some(RetryErrorReason::MaxOperationElapsedExceeded)
129 }
130 Some(AttemptTimeoutSource::MaxTotalElapsed) => {
131 Some(RetryErrorReason::MaxTotalElapsedExceeded)
132 }
133 Some(AttemptTimeoutSource::Configured) | None => None,
134 }
135 }
136}
137
138/// Result and cleanup status returned from one blocking worker attempt.
139struct BlockingAttemptOutcome<T, E> {
140 /// Attempt result after timeout handling.
141 result: Result<T, AttemptFailure<E>>,
142 /// Worker threads not observed to exit before cancellation grace ended.
143 unreaped_worker_count: u32,
144}
145
146impl<T, E> BlockingAttemptOutcome<T, E> {
147 /// Creates a worker-attempt outcome.
148 ///
149 /// # Parameters
150 /// - `result`: Attempt result exposed to the retry flow.
151 /// - `unreaped_worker_count`: Count of worker threads not observed to exit.
152 ///
153 /// # Returns
154 /// A blocking-attempt outcome.
155 #[inline]
156 fn new(result: Result<T, AttemptFailure<E>>, unreaped_worker_count: u32) -> Self {
157 Self {
158 result,
159 unreaped_worker_count,
160 }
161 }
162}
163
164/// Source of an effective attempt timeout.
165#[allow(clippy::result_large_err)]
166impl<E> Retry<E> {
167 /// Creates a retry builder.
168 ///
169 /// # Returns
170 /// A [`RetryBuilder`] configured with defaults.
171 #[inline]
172 pub fn builder() -> RetryBuilder<E> {
173 RetryBuilder::new()
174 }
175
176 /// Creates a retry policy from options.
177 ///
178 /// # Parameters
179 /// - `options`: Retry options to validate and install.
180 ///
181 /// # Returns
182 /// A retry policy using the default listener set.
183 ///
184 /// # Errors
185 /// Returns [`RetryConfigError`] if the options are invalid.
186 pub fn from_options(options: RetryOptions) -> Result<Self, RetryConfigError> {
187 Self::builder().options(options).build()
188 }
189
190 /// Returns the immutable options used by this retry policy.
191 ///
192 /// # Returns
193 /// Shared retry options.
194 #[inline]
195 pub fn options(&self) -> &RetryOptions {
196 &self.options
197 }
198
199 /// Runs a synchronous operation with retry.
200 ///
201 /// # Parameters
202 /// - `operation`: Operation called once per attempt until it succeeds or the
203 /// retry flow stops.
204 ///
205 /// # Returns
206 /// `Ok(T)` with the operation value, or [`RetryError`] when retrying stops.
207 ///
208 /// # Panics
209 /// Propagates operation panics and listener panics unless listener panic
210 /// isolation is enabled.
211 ///
212 /// # Blocking
213 /// Blocks the current thread with `std::thread::sleep` between attempts when
214 /// a non-zero retry delay is selected.
215 ///
216 /// # Elapsed Budget
217 /// `max_operation_elapsed` counts only user operation execution time.
218 /// `max_total_elapsed` counts monotonic retry-flow time, including
219 /// operation execution, retry sleep, retry-after sleep, and retry
220 /// control-path listener time. This synchronous mode cannot interrupt an
221 /// already-running operation; it checks budgets before attempts and after
222 /// failed attempts. If `attempt_timeout` is configured, this method returns
223 /// [`RetryErrorReason::UnsupportedOperation`] because timeout enforcement
224 /// requires worker-thread or async execution.
225 pub fn run<T, F>(&self, mut operation: F) -> Result<T, RetryError<E>>
226 where
227 F: FnMut() -> Result<T, E>,
228 {
229 if self.options.attempt_timeout().is_some() {
230 let attempt_timeout = self.attempt_timeout_duration();
231 return Err(self.emit_error(RetryError::new(
232 RetryErrorReason::UnsupportedOperation,
233 None,
234 RetryContext::from_parts(RetryContextParts {
235 attempt: 0,
236 max_attempts: self.options.max_attempts.get(),
237 max_operation_elapsed: self.options.max_operation_elapsed,
238 max_total_elapsed: self.options.max_total_elapsed,
239 operation_elapsed: Duration::ZERO,
240 total_elapsed: Duration::ZERO,
241 attempt_elapsed: Duration::ZERO,
242 attempt_timeout,
243 })
244 .with_attempt_timeout_source(Some(AttemptTimeoutSource::Configured)),
245 )));
246 }
247 let mut operation = SyncValueOperation::new(&mut operation);
248 self.run_sync_operation(&mut operation)
249 .map(|()| operation.into_value())
250 }
251
252 /// Runs a blocking operation with retry inside worker-thread attempts.
253 ///
254 /// Each attempt runs on a worker thread. Worker panics are captured as
255 /// [`AttemptFailure::Panic`]. Worker-spawn failures are reported as
256 /// [`AttemptFailure::Executor`]. If the effective timeout expires, the retry
257 /// executor stops waiting and marks the attempt's [`AttemptCancelToken`] as
258 /// cancelled. It then waits up to [`RetryOptions::worker_cancel_grace`] for
259 /// the worker to exit. Configured attempt-timeout expirations continue
260 /// according to [`AttemptTimeoutPolicy`] only when the worker exits within
261 /// that grace period; otherwise the retry flow stops with
262 /// [`RetryErrorReason::WorkerStillRunning`]. Elapsed-budget expirations stop
263 /// with [`RetryErrorReason::MaxOperationElapsedExceeded`] or
264 /// [`RetryErrorReason::MaxTotalElapsedExceeded`].
265 ///
266 /// # Parameters
267 /// - `operation`: Thread-safe operation called once per attempt. It receives
268 /// a cooperative cancellation token for that attempt.
269 ///
270 /// # Returns
271 /// `Ok(T)` with the operation value, or [`RetryError`] when retrying stops.
272 ///
273 /// # Panics
274 /// Does not propagate operation panics. Listener panic behavior follows this
275 /// retry policy's listener isolation setting.
276 ///
277 /// # Blocking
278 /// Blocks the current thread while waiting for each worker result or timeout
279 /// and while sleeping between retry attempts.
280 ///
281 /// # Elapsed Budget
282 /// `max_operation_elapsed` counts only user operation execution time.
283 /// `max_total_elapsed` counts monotonic retry-flow time. Worker attempts use
284 /// the shortest of configured attempt timeout, remaining
285 /// max-operation-elapsed budget, and remaining max-total-elapsed budget as
286 /// their effective timeout.
287 pub fn run_in_worker<T, F>(&self, operation: F) -> Result<T, RetryError<E>>
288 where
289 T: Send + 'static,
290 E: Send + 'static,
291 F: Fn(AttemptCancelToken) -> Result<T, E> + Send + Sync + 'static,
292 {
293 let operation = Arc::new(operation);
294 let flow_started_at = Instant::now();
295 let mut operation_elapsed = Duration::ZERO;
296 let mut attempts = 0;
297 let mut last_failure = None;
298
299 loop {
300 let total_elapsed = flow_started_at.elapsed();
301 let attempt_timeout = self.effective_attempt_timeout(operation_elapsed, total_elapsed);
302 if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
303 return Err(self.emit_error(self.elapsed_error(
304 reason,
305 operation_elapsed,
306 total_elapsed,
307 attempts,
308 last_failure.take(),
309 attempt_timeout,
310 )));
311 }
312
313 attempts += 1;
314 let attempt_timeout =
315 self.effective_attempt_timeout(operation_elapsed, flow_started_at.elapsed());
316 let before_context = self
317 .context(
318 operation_elapsed,
319 flow_started_at.elapsed(),
320 attempts,
321 Duration::ZERO,
322 attempt_timeout.duration,
323 )
324 .with_attempt_timeout_source(attempt_timeout.source);
325 self.emit_before_attempt(&before_context);
326 let total_elapsed = flow_started_at.elapsed();
327 let attempt_timeout = self.effective_attempt_timeout(operation_elapsed, total_elapsed);
328 if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
329 return Err(self.emit_error(self.elapsed_error(
330 reason,
331 operation_elapsed,
332 total_elapsed,
333 attempts,
334 last_failure.take(),
335 attempt_timeout,
336 )));
337 }
338
339 let attempt_start = Instant::now();
340 let outcome =
341 self.call_blocking_attempt(Arc::clone(&operation), attempt_timeout.duration);
342 let attempt_elapsed = attempt_start.elapsed();
343 operation_elapsed = add_elapsed(operation_elapsed, attempt_elapsed);
344 let context = self
345 .context(
346 operation_elapsed,
347 flow_started_at.elapsed(),
348 attempts,
349 attempt_elapsed,
350 attempt_timeout.duration,
351 )
352 .with_attempt_timeout_source(attempt_timeout.source)
353 .with_unreaped_worker_count(outcome.unreaped_worker_count);
354 match outcome.result {
355 Ok(value) => {
356 self.emit_attempt_success(&context);
357 return Ok(value);
358 }
359 Err(failure) => {
360 if let Some(reason) = attempt_timeout.elapsed_timeout_reason(&failure) {
361 return Err(self.emit_error(RetryError::new(
362 reason,
363 Some(failure),
364 context,
365 )));
366 }
367 let retry_block_reason = (context.unreaped_worker_count() > 0)
368 .then_some(RetryErrorReason::WorkerStillRunning);
369 match self.handle_failure(
370 attempts,
371 failure,
372 context,
373 retry_block_reason,
374 flow_started_at,
375 ) {
376 RetryFlowAction::Retry { delay, failure } => {
377 if !delay.is_zero() {
378 std::thread::sleep(delay);
379 }
380 last_failure = Some(failure);
381 }
382 RetryFlowAction::Finished(error) => return Err(self.emit_error(error)),
383 }
384 }
385 }
386 }
387 }
388
389 /// Runs a blocking operation with retry and per-attempt timeout isolation.
390 ///
391 /// This method is a compatibility alias for [`Retry::run_in_worker`]. It
392 /// also runs attempts in worker threads when no timeout is configured, so
393 /// worker panics are reported as [`AttemptFailure::Panic`] instead of
394 /// unwinding through the caller. Worker-spawn failures are reported as
395 /// [`AttemptFailure::Executor`].
396 ///
397 /// # Parameters
398 /// - `operation`: Thread-safe operation called once per attempt. It receives
399 /// a cooperative cancellation token for that attempt.
400 ///
401 /// # Returns
402 /// `Ok(T)` with the operation value, or [`RetryError`] when retrying stops.
403 ///
404 /// # Panics
405 /// Does not propagate operation panics. Listener panic behavior follows this
406 /// retry policy's listener isolation setting.
407 ///
408 /// # Blocking
409 /// Blocks the current thread while waiting for each worker result or timeout
410 /// and while sleeping between retry attempts.
411 ///
412 /// # Elapsed Budget
413 /// `max_operation_elapsed` counts only user operation execution time.
414 /// `max_total_elapsed` counts monotonic retry-flow time. Worker attempts use
415 /// the shortest of configured attempt timeout, remaining
416 /// max-operation-elapsed budget, and remaining max-total-elapsed budget as
417 /// their effective timeout.
418 #[inline]
419 pub fn run_blocking_with_timeout<T, F>(&self, operation: F) -> Result<T, RetryError<E>>
420 where
421 T: Send + 'static,
422 E: Send + 'static,
423 F: Fn(AttemptCancelToken) -> Result<T, E> + Send + Sync + 'static,
424 {
425 self.run_in_worker(operation)
426 }
427
428 /// Runs a synchronous value-erased operation with retry.
429 ///
430 /// # Parameters
431 /// - `operation`: Operation adapter called once per attempt.
432 ///
433 /// # Returns
434 /// `Ok(())` after a successful attempt, or [`RetryError`] when retrying stops.
435 fn run_sync_operation(&self, operation: &mut dyn SyncAttempt<E>) -> Result<(), RetryError<E>> {
436 let flow_started_at = Instant::now();
437 let mut operation_elapsed = Duration::ZERO;
438 let mut attempts = 0;
439 let mut last_failure = None;
440
441 loop {
442 let total_elapsed = flow_started_at.elapsed();
443 if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
444 return Err(self.emit_error(self.elapsed_error(
445 reason,
446 operation_elapsed,
447 total_elapsed,
448 attempts,
449 last_failure.take(),
450 EffectiveAttemptTimeout::new(None, None),
451 )));
452 }
453
454 attempts += 1;
455 let before_context = self.context(
456 operation_elapsed,
457 flow_started_at.elapsed(),
458 attempts,
459 Duration::ZERO,
460 None,
461 );
462 self.emit_before_attempt(&before_context);
463 let total_elapsed = flow_started_at.elapsed();
464 if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
465 return Err(self.emit_error(self.elapsed_error(
466 reason,
467 operation_elapsed,
468 total_elapsed,
469 attempts,
470 last_failure.take(),
471 EffectiveAttemptTimeout::new(None, None),
472 )));
473 }
474
475 let attempt_start = Instant::now();
476 match operation.call() {
477 Ok(()) => {
478 let attempt_elapsed = attempt_start.elapsed();
479 operation_elapsed = add_elapsed(operation_elapsed, attempt_elapsed);
480 let context = self.context(
481 operation_elapsed,
482 flow_started_at.elapsed(),
483 attempts,
484 attempt_elapsed,
485 None,
486 );
487 self.emit_attempt_success(&context);
488 return Ok(());
489 }
490 Err(failure) => {
491 let attempt_elapsed = attempt_start.elapsed();
492 operation_elapsed = add_elapsed(operation_elapsed, attempt_elapsed);
493 let context = self.context(
494 operation_elapsed,
495 flow_started_at.elapsed(),
496 attempts,
497 attempt_elapsed,
498 None,
499 );
500 match self.handle_failure(attempts, failure, context, None, flow_started_at) {
501 RetryFlowAction::Retry { delay, failure } => {
502 if !delay.is_zero() {
503 std::thread::sleep(delay);
504 }
505 last_failure = Some(failure);
506 }
507 RetryFlowAction::Finished(error) => return Err(self.emit_error(error)),
508 }
509 }
510 }
511 }
512 }
513
514 /// Runs an asynchronous operation with retry.
515 ///
516 /// # Parameters
517 /// - `operation`: Factory returning a fresh future for each attempt.
518 ///
519 /// # Returns
520 /// `Ok(T)` with the operation value, or [`RetryError`] when retrying stops.
521 ///
522 /// # Panics
523 /// Propagates operation panics from the current async task. They are not
524 /// converted to [`AttemptFailure::Panic`] because `run_async` does not
525 /// create an isolation boundary. Listener panics are propagated unless
526 /// listener panic isolation is enabled. Tokio may panic if timer APIs are
527 /// used outside a runtime with a time driver.
528 ///
529 /// # Elapsed Budget
530 /// `max_operation_elapsed` counts only user operation execution time.
531 /// `max_total_elapsed` counts monotonic retry-flow time. Async attempts use
532 /// the shortest of configured attempt timeout, remaining
533 /// max-operation-elapsed budget, and remaining max-total-elapsed budget as
534 /// their effective timeout.
535 #[cfg(feature = "tokio")]
536 pub async fn run_async<T, F, Fut>(&self, mut operation: F) -> Result<T, RetryError<E>>
537 where
538 F: FnMut() -> Fut,
539 Fut: Future<Output = Result<T, E>>,
540 {
541 let mut operation = AsyncValueOperation::new(&mut operation);
542 self.run_async_operation(&mut operation)
543 .await
544 .map(|()| operation.into_value())
545 }
546
547 /// Runs an asynchronous value-erased operation with retry.
548 ///
549 /// # Parameters
550 /// - `operation`: Async operation adapter called once per attempt.
551 ///
552 /// # Returns
553 /// `Ok(())` after a successful attempt, or [`RetryError`] when retrying stops.
554 #[cfg(feature = "tokio")]
555 async fn run_async_operation(
556 &self,
557 operation: &mut dyn AsyncAttempt<E>,
558 ) -> Result<(), RetryError<E>> {
559 let flow_started_at = Instant::now();
560 let mut operation_elapsed = Duration::ZERO;
561 let mut attempts = 0;
562 let mut last_failure = None;
563
564 loop {
565 let total_elapsed = flow_started_at.elapsed();
566 let attempt_timeout = self.effective_attempt_timeout(operation_elapsed, total_elapsed);
567 if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
568 return Err(self.emit_error(self.elapsed_error(
569 reason,
570 operation_elapsed,
571 total_elapsed,
572 attempts,
573 last_failure.take(),
574 attempt_timeout,
575 )));
576 }
577
578 attempts += 1;
579 let attempt_timeout =
580 self.effective_attempt_timeout(operation_elapsed, flow_started_at.elapsed());
581 let before_context = self
582 .context(
583 operation_elapsed,
584 flow_started_at.elapsed(),
585 attempts,
586 Duration::ZERO,
587 attempt_timeout.duration,
588 )
589 .with_attempt_timeout_source(attempt_timeout.source);
590 self.emit_before_attempt(&before_context);
591 let total_elapsed = flow_started_at.elapsed();
592 let attempt_timeout = self.effective_attempt_timeout(operation_elapsed, total_elapsed);
593 if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
594 return Err(self.emit_error(self.elapsed_error(
595 reason,
596 operation_elapsed,
597 total_elapsed,
598 attempts,
599 last_failure.take(),
600 attempt_timeout,
601 )));
602 }
603
604 let attempt_start = Instant::now();
605 let result = if let Some(timeout) = attempt_timeout.duration {
606 match tokio::time::timeout(timeout, operation.call()).await {
607 Ok(result) => result,
608 Err(_) => Err(AttemptFailure::Timeout),
609 }
610 } else {
611 operation.call().await
612 };
613
614 let attempt_elapsed = attempt_start.elapsed();
615 operation_elapsed = add_elapsed(operation_elapsed, attempt_elapsed);
616 let context = self
617 .context(
618 operation_elapsed,
619 flow_started_at.elapsed(),
620 attempts,
621 attempt_elapsed,
622 attempt_timeout.duration,
623 )
624 .with_attempt_timeout_source(attempt_timeout.source);
625 match result {
626 Ok(()) => {
627 self.emit_attempt_success(&context);
628 return Ok(());
629 }
630 Err(failure) => {
631 if let Some(reason) = attempt_timeout.elapsed_timeout_reason(&failure) {
632 return Err(self.emit_error(RetryError::new(
633 reason,
634 Some(failure),
635 context,
636 )));
637 }
638 match self.handle_failure(attempts, failure, context, None, flow_started_at) {
639 RetryFlowAction::Retry { delay, failure } => {
640 sleep_async(delay).await;
641 last_failure = Some(failure);
642 }
643 RetryFlowAction::Finished(error) => return Err(self.emit_error(error)),
644 }
645 }
646 }
647 }
648 }
649
650 /// Creates a retry policy from validated parts.
651 ///
652 /// # Parameters
653 /// - `options`: Retry options.
654 /// - `retry_after_hint`: Optional hint extractor.
655 /// - `isolate_listener_panics`: Whether listener panics are isolated.
656 /// - `listeners`: Lifecycle listeners.
657 ///
658 /// # Returns
659 /// A retry policy.
660 pub(super) fn new(
661 options: RetryOptions,
662 retry_after_hint: Option<RetryAfterHint<E>>,
663 isolate_listener_panics: bool,
664 listeners: RetryListeners<E>,
665 ) -> Self {
666 Self {
667 options,
668 retry_after_hint,
669 isolate_listener_panics,
670 listeners,
671 }
672 }
673
674 /// Builds a context snapshot.
675 ///
676 /// # Parameters
677 /// - `operation_elapsed`: Cumulative user operation time consumed by this flow.
678 /// - `total_elapsed`: Total monotonic time consumed by this flow.
679 /// - `attempt`: Current attempt number.
680 /// - `attempt_elapsed`: Elapsed time in the current attempt.
681 /// - `attempt_timeout`: Effective timeout configured for the current attempt.
682 ///
683 /// # Returns
684 /// A retry context.
685 fn context(
686 &self,
687 operation_elapsed: Duration,
688 total_elapsed: Duration,
689 attempt: u32,
690 attempt_elapsed: Duration,
691 attempt_timeout: Option<Duration>,
692 ) -> RetryContext {
693 RetryContext::from_parts(RetryContextParts {
694 attempt,
695 max_attempts: self.options.max_attempts.get(),
696 max_operation_elapsed: self.options.max_operation_elapsed,
697 max_total_elapsed: self.options.max_total_elapsed,
698 operation_elapsed,
699 total_elapsed,
700 attempt_elapsed,
701 attempt_timeout,
702 })
703 }
704
705 /// Returns the configured attempt-timeout duration.
706 ///
707 /// # Returns
708 /// `Some(Duration)` when per-attempt timeout is configured.
709 #[inline]
710 fn attempt_timeout_duration(&self) -> Option<Duration> {
711 self.options
712 .attempt_timeout()
713 .map(|attempt_timeout| attempt_timeout.timeout())
714 }
715
716 /// Returns the effective timeout used by the next attempt.
717 ///
718 /// # Parameters
719 /// - `operation_elapsed`: Cumulative user operation time consumed so far.
720 /// - `total_elapsed`: Total monotonic retry-flow time consumed so far.
721 ///
722 /// # Returns
723 /// The shortest of the configured attempt timeout, remaining
724 /// max-operation-elapsed budget, and remaining max-total-elapsed budget,
725 /// including the source that selected it. A configured timeout wins ties so
726 /// its timeout policy remains observable.
727 fn effective_attempt_timeout(
728 &self,
729 operation_elapsed: Duration,
730 total_elapsed: Duration,
731 ) -> EffectiveAttemptTimeout {
732 let candidates = [
733 self.attempt_timeout_duration()
734 .map(|duration| (duration, AttemptTimeoutSource::Configured)),
735 self.remaining_operation_elapsed(operation_elapsed)
736 .map(|duration| (duration, AttemptTimeoutSource::MaxOperationElapsed)),
737 self.remaining_total_elapsed(total_elapsed)
738 .map(|duration| (duration, AttemptTimeoutSource::MaxTotalElapsed)),
739 ];
740 let selected = candidates
741 .into_iter()
742 .flatten()
743 .min_by(|left, right| left.0.cmp(&right.0).then_with(|| left.1.cmp(&right.1)));
744 match selected {
745 Some((duration, source)) => EffectiveAttemptTimeout::new(Some(duration), Some(source)),
746 None => EffectiveAttemptTimeout::new(None, None),
747 }
748 }
749
750 /// Returns remaining user operation time before the max-operation-elapsed budget is exhausted.
751 ///
752 /// # Parameters
753 /// - `operation_elapsed`: Cumulative user operation time consumed so far.
754 ///
755 /// # Returns
756 /// `Some(Duration)` when max elapsed is configured, or `None` when unlimited.
757 #[inline]
758 fn remaining_operation_elapsed(&self, operation_elapsed: Duration) -> Option<Duration> {
759 self.options
760 .max_operation_elapsed
761 .map(|max_operation_elapsed| max_operation_elapsed.saturating_sub(operation_elapsed))
762 }
763
764 /// Returns remaining total retry-flow time before the max-total-elapsed budget is exhausted.
765 ///
766 /// # Parameters
767 /// - `total_elapsed`: Total monotonic retry-flow time consumed so far.
768 ///
769 /// # Returns
770 /// `Some(Duration)` when max total elapsed is configured, or `None` when unlimited.
771 #[inline]
772 fn remaining_total_elapsed(&self, total_elapsed: Duration) -> Option<Duration> {
773 self.options
774 .max_total_elapsed
775 .map(|max_total_elapsed| max_total_elapsed.saturating_sub(total_elapsed))
776 }
777
778 /// Runs one blocking attempt on a worker thread.
779 ///
780 /// # Parameters
781 /// - `operation`: Shared blocking operation.
782 /// - `attempt_timeout`: Effective timeout for this attempt, if any.
783 ///
784 /// # Returns
785 /// The operation value on success, or an attempt failure.
786 ///
787 /// # Panics
788 /// Converts worker panics into [`AttemptFailure::Panic`] and worker-spawn
789 /// failures into [`AttemptFailure::Executor`].
790 fn call_blocking_attempt<T, F>(
791 &self,
792 operation: Arc<F>,
793 attempt_timeout: Option<Duration>,
794 ) -> BlockingAttemptOutcome<T, E>
795 where
796 T: Send + 'static,
797 E: Send + 'static,
798 F: Fn(AttemptCancelToken) -> Result<T, E> + Send + Sync + 'static,
799 {
800 let token = AttemptCancelToken::new();
801 let (sender, receiver) = mpsc::sync_channel(1);
802 let worker_token = token.clone();
803 let worker = std::thread::Builder::new()
804 .name("qubit-retry-worker".to_string())
805 .spawn(move || {
806 let result =
807 panic::catch_unwind(panic::AssertUnwindSafe(|| operation(worker_token)));
808 let message = match result {
809 Ok(result) => BlockingAttemptMessage::Result(result),
810 Err(payload) => {
811 BlockingAttemptMessage::Panic(AttemptPanic::from_payload(payload))
812 }
813 };
814 let _ = sender.send(message);
815 });
816 let worker = match worker {
817 Ok(worker) => worker,
818 Err(_) => {
819 return BlockingAttemptOutcome::new(
820 Err(exec_fail!(WORKER_SPAWN_FAILED_MESSAGE)),
821 0,
822 );
823 }
824 };
825
826 match attempt_timeout {
827 Some(attempt_timeout) => {
828 let message = receiver.recv_timeout(attempt_timeout);
829 self.worker_timeout_message_to_attempt_outcome(message, receiver, worker, &token)
830 }
831 None => {
832 let result = worker_recv_message_to_attempt_result(receiver.recv());
833 join_finished_worker(worker);
834 BlockingAttemptOutcome::new(result, 0)
835 }
836 }
837 }
838
839 /// Handles one failed attempt.
840 ///
841 /// # Parameters
842 /// - `attempts`: Attempts executed so far.
843 /// - `failure`: Attempt failure.
844 /// - `context`: Context captured after the failed attempt.
845 ///
846 /// # Returns
847 /// A retry action selected from listeners and configured limits.
848 fn handle_failure(
849 &self,
850 attempts: u32,
851 failure: AttemptFailure<E>,
852 context: RetryContext,
853 retry_block_reason: Option<RetryErrorReason>,
854 flow_started_at: Instant,
855 ) -> RetryFlowAction<E> {
856 let hint = self
857 .retry_after_hint
858 .as_ref()
859 .and_then(|hint| self.invoke_listener(|| hint.apply(&failure, &context)));
860 let context = context
861 .with_retry_after_hint(hint)
862 .with_total_elapsed(flow_started_at.elapsed());
863
864 let decision =
865 self.resolve_failure_decision(self.failure_decision(&failure, &context), &failure);
866 let context = context.with_total_elapsed(flow_started_at.elapsed());
867 if decision == AttemptFailureDecision::Abort {
868 return RetryFlowAction::Finished(RetryError::new(
869 RetryErrorReason::Aborted,
870 Some(failure),
871 context,
872 ));
873 }
874
875 let max_attempts = self.options.max_attempts.get();
876 if attempts >= max_attempts {
877 return RetryFlowAction::Finished(RetryError::new(
878 RetryErrorReason::AttemptsExceeded,
879 Some(failure),
880 context,
881 ));
882 }
883
884 if let Some(reason) =
885 self.elapsed_error_reason(context.operation_elapsed(), context.total_elapsed())
886 {
887 return RetryFlowAction::Finished(RetryError::new(reason, Some(failure), context));
888 }
889
890 if let Some(reason) = retry_block_reason {
891 return RetryFlowAction::Finished(RetryError::new(reason, Some(failure), context));
892 }
893
894 let delay = self.retry_delay(decision, attempts, hint);
895 let context = context
896 .with_total_elapsed(flow_started_at.elapsed())
897 .with_next_delay(delay);
898 if self.retry_sleep_exhausts_total_elapsed(context.total_elapsed(), delay) {
899 return RetryFlowAction::Finished(RetryError::new(
900 RetryErrorReason::MaxTotalElapsedExceeded,
901 Some(failure),
902 context,
903 ));
904 }
905 self.emit_retry_scheduled(&failure, &context);
906 let context = context.with_total_elapsed(flow_started_at.elapsed());
907 if let Some(reason) =
908 self.elapsed_error_reason(context.operation_elapsed(), context.total_elapsed())
909 {
910 return RetryFlowAction::Finished(RetryError::new(reason, Some(failure), context));
911 }
912 if self.retry_sleep_exhausts_total_elapsed(context.total_elapsed(), delay) {
913 return RetryFlowAction::Finished(RetryError::new(
914 RetryErrorReason::MaxTotalElapsedExceeded,
915 Some(failure),
916 context,
917 ));
918 }
919 RetryFlowAction::Retry { delay, failure }
920 }
921
922 /// Resolves all failure listeners into one decision.
923 ///
924 /// # Parameters
925 /// - `failure`: Attempt failure.
926 /// - `context`: Failure context.
927 ///
928 /// # Returns
929 /// Last non-default listener decision, or [`AttemptFailureDecision::UseDefault`].
930 fn failure_decision(
931 &self,
932 failure: &AttemptFailure<E>,
933 context: &RetryContext,
934 ) -> AttemptFailureDecision {
935 let mut decision = AttemptFailureDecision::UseDefault;
936 for listener in &self.listeners.failure {
937 let current = self.invoke_listener(|| listener.apply(failure, context));
938 if current != AttemptFailureDecision::UseDefault {
939 decision = current;
940 }
941 }
942 decision
943 }
944
945 /// Resolves the effective failure decision after applying timeout policy.
946 ///
947 /// # Parameters
948 /// - `decision`: Decision returned by failure listeners.
949 /// - `failure`: Attempt failure being handled.
950 ///
951 /// # Returns
952 /// A concrete decision for timeout failures when listeners used the default.
953 fn resolve_failure_decision(
954 &self,
955 decision: AttemptFailureDecision,
956 failure: &AttemptFailure<E>,
957 ) -> AttemptFailureDecision {
958 if decision != AttemptFailureDecision::UseDefault {
959 return decision;
960 }
961 if matches!(failure, AttemptFailure::Timeout)
962 && let Some(attempt_timeout) = self.options.attempt_timeout()
963 {
964 match attempt_timeout.policy() {
965 AttemptTimeoutPolicy::Retry => AttemptFailureDecision::Retry,
966 AttemptTimeoutPolicy::Abort => AttemptFailureDecision::Abort,
967 }
968 } else if matches!(
969 failure,
970 AttemptFailure::Panic(_) | AttemptFailure::Executor(_)
971 ) {
972 AttemptFailureDecision::Abort
973 } else {
974 AttemptFailureDecision::UseDefault
975 }
976 }
977
978 /// Selects the delay used before the next retry.
979 ///
980 /// # Parameters
981 /// - `decision`: Failure decision.
982 /// - `attempts`: Attempts executed so far.
983 /// - `hint`: Optional retry-after hint.
984 ///
985 /// # Returns
986 /// Delay before the next retry.
987 fn retry_delay(
988 &self,
989 decision: AttemptFailureDecision,
990 attempts: u32,
991 hint: Option<Duration>,
992 ) -> Duration {
993 match decision {
994 AttemptFailureDecision::RetryAfter(delay) => delay,
995 AttemptFailureDecision::UseDefault => hint.unwrap_or_else(|| {
996 self.options
997 .jitter
998 .delay_for_attempt(&self.options.delay, attempts)
999 }),
1000 AttemptFailureDecision::Retry | AttemptFailureDecision::Abort => self
1001 .options
1002 .jitter
1003 .delay_for_attempt(&self.options.delay, attempts),
1004 }
1005 }
1006
1007 /// Builds an elapsed-budget error.
1008 ///
1009 /// # Parameters
1010 /// - `reason`: Elapsed-budget reason selected by the caller.
1011 /// - `operation_elapsed`: Cumulative user operation time consumed by this flow.
1012 /// - `total_elapsed`: Total monotonic retry-flow time consumed by this flow.
1013 /// - `attempts`: Attempts executed so far.
1014 /// - `last_failure`: Last observed failure, if any.
1015 /// - `attempt_timeout`: Timeout visible in the terminal context.
1016 ///
1017 /// # Returns
1018 /// A retry error preserving the terminal context.
1019 fn elapsed_error(
1020 &self,
1021 reason: RetryErrorReason,
1022 operation_elapsed: Duration,
1023 total_elapsed: Duration,
1024 attempts: u32,
1025 last_failure: Option<AttemptFailure<E>>,
1026 attempt_timeout: EffectiveAttemptTimeout,
1027 ) -> RetryError<E> {
1028 RetryError::new(
1029 reason,
1030 last_failure,
1031 self.context(
1032 operation_elapsed,
1033 total_elapsed,
1034 attempts,
1035 Duration::ZERO,
1036 attempt_timeout.duration,
1037 )
1038 .with_attempt_timeout_source(attempt_timeout.source),
1039 )
1040 }
1041
1042 /// Returns the first elapsed-budget reason that is exhausted.
1043 ///
1044 /// # Parameters
1045 /// - `operation_elapsed`: Cumulative user operation time consumed by this flow.
1046 /// - `total_elapsed`: Total monotonic retry-flow time consumed by this flow.
1047 ///
1048 /// # Returns
1049 /// `Some(RetryErrorReason)` when an elapsed budget has been exhausted.
1050 #[inline]
1051 fn elapsed_error_reason(
1052 &self,
1053 operation_elapsed: Duration,
1054 total_elapsed: Duration,
1055 ) -> Option<RetryErrorReason> {
1056 if self
1057 .options
1058 .max_operation_elapsed
1059 .is_some_and(|max_operation_elapsed| operation_elapsed >= max_operation_elapsed)
1060 {
1061 Some(RetryErrorReason::MaxOperationElapsedExceeded)
1062 } else if self
1063 .options
1064 .max_total_elapsed
1065 .is_some_and(|max_total_elapsed| total_elapsed >= max_total_elapsed)
1066 {
1067 Some(RetryErrorReason::MaxTotalElapsedExceeded)
1068 } else {
1069 None
1070 }
1071 }
1072
1073 /// Returns whether a selected retry sleep would consume the remaining total budget.
1074 ///
1075 /// # Parameters
1076 /// - `total_elapsed`: Total monotonic retry-flow time consumed before sleep.
1077 /// - `delay`: Selected retry delay.
1078 ///
1079 /// # Returns
1080 /// `true` when the delay should not be slept because no budget would remain
1081 /// for the next attempt.
1082 #[inline]
1083 fn retry_sleep_exhausts_total_elapsed(&self, total_elapsed: Duration, delay: Duration) -> bool {
1084 if delay.is_zero() {
1085 return false;
1086 }
1087 let Some(max_total_elapsed) = self.options.max_total_elapsed else {
1088 return false;
1089 };
1090 total_elapsed.saturating_add(delay) >= max_total_elapsed
1091 }
1092
1093 /// Emits before-attempt listeners.
1094 ///
1095 /// # Parameters
1096 /// - `context`: Context passed to listeners.
1097 fn emit_before_attempt(&self, context: &RetryContext) {
1098 for listener in &self.listeners.before_attempt {
1099 self.invoke_listener(|| {
1100 listener.accept(context);
1101 });
1102 }
1103 }
1104
1105 /// Emits attempt-success listeners.
1106 ///
1107 /// # Parameters
1108 /// - `context`: Context passed to listeners.
1109 fn emit_attempt_success(&self, context: &RetryContext) {
1110 for listener in &self.listeners.attempt_success {
1111 self.invoke_listener(|| {
1112 listener.accept(context);
1113 });
1114 }
1115 }
1116
1117 /// Emits retry-scheduled listeners.
1118 ///
1119 /// # Parameters
1120 /// - `failure`: Failure that caused the retry to be scheduled.
1121 /// - `context`: Context carrying the selected next delay.
1122 fn emit_retry_scheduled(&self, failure: &AttemptFailure<E>, context: &RetryContext) {
1123 for listener in &self.listeners.retry_scheduled {
1124 self.invoke_listener(|| {
1125 listener.accept(failure, context);
1126 });
1127 }
1128 }
1129
1130 /// Emits terminal error listeners and returns the same error.
1131 ///
1132 /// # Parameters
1133 /// - `error`: Terminal retry error.
1134 ///
1135 /// # Returns
1136 /// The same error after listeners have been invoked.
1137 fn emit_error(&self, error: RetryError<E>) -> RetryError<E> {
1138 for listener in &self.listeners.error {
1139 self.invoke_listener(|| {
1140 listener.accept(&error, error.context());
1141 });
1142 }
1143 error
1144 }
1145
1146 /// Converts a timeout-aware worker receive into an attempt outcome.
1147 ///
1148 /// # Parameters
1149 /// - `message`: Initial receive result with the attempt timeout applied.
1150 /// - `receiver`: Receiver used to observe worker exit during cancellation grace.
1151 /// - `worker`: Worker thread handle for joining finished workers.
1152 /// - `token`: Cancellation token to mark when the receive timed out.
1153 ///
1154 /// # Returns
1155 /// Attempt result plus the number of worker threads not observed to exit.
1156 fn worker_timeout_message_to_attempt_outcome<T>(
1157 &self,
1158 message: Result<BlockingAttemptMessage<T, E>, mpsc::RecvTimeoutError>,
1159 receiver: mpsc::Receiver<BlockingAttemptMessage<T, E>>,
1160 worker: JoinHandle<()>,
1161 token: &AttemptCancelToken,
1162 ) -> BlockingAttemptOutcome<T, E>
1163 where
1164 T: Send + 'static,
1165 E: Send + 'static,
1166 {
1167 match message {
1168 Ok(message) => {
1169 let result = worker_message_to_attempt_result(message);
1170 join_finished_worker(worker);
1171 BlockingAttemptOutcome::new(result, 0)
1172 }
1173 Err(mpsc::RecvTimeoutError::Timeout) => {
1174 token.cancel();
1175 let worker_exited =
1176 wait_for_cancelled_worker(&receiver, worker, self.options.worker_cancel_grace);
1177 let unreaped_worker_count = if worker_exited { 0 } else { 1 };
1178 BlockingAttemptOutcome::new(Err(AttemptFailure::Timeout), unreaped_worker_count)
1179 }
1180 Err(mpsc::RecvTimeoutError::Disconnected) => {
1181 join_finished_worker(worker);
1182 BlockingAttemptOutcome::new(Err(exec_fail!(WORKER_DISCONNECTED_MESSAGE)), 0)
1183 }
1184 }
1185 }
1186
1187 /// Invokes a listener and optionally isolates panics.
1188 ///
1189 /// # Parameters
1190 /// - `call`: Listener invocation closure.
1191 ///
1192 /// # Returns
1193 /// The listener return value, or `Default::default()` when an isolated panic
1194 /// occurs.
1195 fn invoke_listener<R>(&self, call: impl FnOnce() -> R) -> R
1196 where
1197 R: Default,
1198 {
1199 if self.isolate_listener_panics {
1200 std::panic::catch_unwind(std::panic::AssertUnwindSafe(call)).unwrap_or_default()
1201 } else {
1202 call()
1203 }
1204 }
1205}
1206
1207impl<E> fmt::Debug for Retry<E> {
1208 /// Formats the retry policy without exposing callbacks.
1209 ///
1210 /// # Parameters
1211 /// - `f`: Formatter.
1212 ///
1213 /// # Returns
1214 /// Formatter result.
1215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1216 f.debug_struct("Retry")
1217 .field("options", &self.options)
1218 .finish_non_exhaustive()
1219 }
1220}
1221
1222/// Converts a worker message into an attempt result.
1223///
1224/// # Parameters
1225/// - `message`: Message received from the worker thread.
1226///
1227/// # Returns
1228/// The operation value on success, or an attempt failure.
1229fn worker_message_to_attempt_result<T, E>(
1230 message: BlockingAttemptMessage<T, E>,
1231) -> Result<T, AttemptFailure<E>> {
1232 match message {
1233 BlockingAttemptMessage::Result(result) => result.map_err(AttemptFailure::Error),
1234 BlockingAttemptMessage::Panic(panic) => Err(AttemptFailure::Panic(panic)),
1235 }
1236}
1237
1238/// Converts a blocking receive result into an attempt result.
1239///
1240/// # Parameters
1241/// - `message`: Result returned by `Receiver::recv`.
1242///
1243/// # Returns
1244/// The operation value on success, or an attempt failure.
1245fn worker_recv_message_to_attempt_result<T, E>(
1246 message: Result<BlockingAttemptMessage<T, E>, mpsc::RecvError>,
1247) -> Result<T, AttemptFailure<E>> {
1248 match message {
1249 Ok(message) => worker_message_to_attempt_result(message),
1250 Err(_) => Err(exec_fail!(WORKER_DISCONNECTED_MESSAGE)),
1251 }
1252}
1253
1254/// Waits briefly for a cancelled worker to exit.
1255///
1256/// # Parameters
1257/// - `receiver`: Worker result receiver used only to observe whether the worker
1258/// sent or disconnected.
1259/// - `worker`: Worker thread handle, joined when exit is observed.
1260/// - `grace`: Maximum time to wait after cancellation. Zero performs only a
1261/// non-blocking check.
1262///
1263/// # Returns
1264/// `true` when the worker was observed to exit before the grace period ended,
1265/// otherwise `false`. When this returns `false`, the worker handle is dropped and
1266/// the thread may continue running detached.
1267fn wait_for_cancelled_worker<T, E>(
1268 receiver: &mpsc::Receiver<BlockingAttemptMessage<T, E>>,
1269 worker: JoinHandle<()>,
1270 grace: Duration,
1271) -> bool {
1272 let exited = if grace.is_zero() {
1273 match receiver.try_recv() {
1274 Ok(_) | Err(mpsc::TryRecvError::Disconnected) => true,
1275 Err(mpsc::TryRecvError::Empty) => false,
1276 }
1277 } else {
1278 match receiver.recv_timeout(grace) {
1279 Ok(_) | Err(mpsc::RecvTimeoutError::Disconnected) => true,
1280 Err(mpsc::RecvTimeoutError::Timeout) => false,
1281 }
1282 };
1283 if exited {
1284 join_finished_worker(worker);
1285 }
1286 exited
1287}
1288
1289/// Joins a worker thread that has already been observed to finish.
1290///
1291/// # Parameters
1292/// - `worker`: Worker thread handle.
1293///
1294/// # Returns
1295/// This function returns nothing.
1296fn join_finished_worker(worker: JoinHandle<()>) {
1297 let _ = worker.join();
1298}
1299
1300/// Adds one attempt duration to the cumulative user-operation elapsed time.
1301///
1302/// # Parameters
1303/// - `operation_elapsed`: Cumulative elapsed time before the attempt.
1304/// - `attempt_elapsed`: Elapsed time consumed by the current attempt.
1305///
1306/// # Returns
1307/// The summed elapsed time, saturated at [`Duration::MAX`] on overflow.
1308fn add_elapsed(operation_elapsed: Duration, attempt_elapsed: Duration) -> Duration {
1309 operation_elapsed.saturating_add(attempt_elapsed)
1310}
1311
1312/// Sleeps asynchronously when the delay is non-zero.
1313///
1314/// # Parameters
1315/// - `delay`: Delay to sleep.
1316///
1317/// # Returns
1318/// This function returns after the sleep completes.
1319#[cfg(feature = "tokio")]
1320async fn sleep_async(delay: Duration) {
1321 if !delay.is_zero() {
1322 tokio::time::sleep(delay).await;
1323 }
1324}