Skip to main content

qubit_retry/executor/
retry.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Retry execution.
11//!
12//! A [`Retry`] owns validated retry options and lifecycle listeners. The
13//! operation success type is introduced by each `run` call, while the error type
14//! is bound by the retry policy.
15
16use qubit_error::BoxError;
17use qubit_function::{
18    BiConsumer,
19    BiFunction,
20    Consumer,
21};
22use std::fmt;
23#[cfg(feature = "tokio")]
24use std::future::Future;
25use std::panic;
26use std::sync::Arc;
27use std::sync::mpsc;
28use std::thread::JoinHandle;
29use std::time::{
30    Duration,
31    Instant,
32};
33
34#[cfg(feature = "tokio")]
35use super::async_attempt::AsyncAttempt;
36#[cfg(feature = "tokio")]
37use super::async_value_operation::AsyncValueOperation;
38use super::attempt_cancel_token::AttemptCancelToken;
39use super::blocking_attempt_message::BlockingAttemptMessage;
40use super::retry_flow_action::RetryFlowAction;
41use super::sync_attempt::SyncAttempt;
42use super::sync_value_operation::SyncValueOperation;
43use crate::event::{
44    RetryContextParts,
45    RetryListeners,
46};
47use crate::{
48    AttemptExecutorError,
49    AttemptFailure,
50    AttemptFailureDecision,
51    AttemptPanic,
52    AttemptTimeoutPolicy,
53    AttemptTimeoutSource,
54    RetryAfterHint,
55    RetryBuilder,
56    RetryConfigError,
57    RetryContext,
58    RetryError,
59    RetryErrorReason,
60    RetryOptions,
61};
62
63const WORKER_DISCONNECTED_MESSAGE: &str = "retry worker thread stopped without sending a result";
64const WORKER_SPAWN_FAILED_MESSAGE: &str = "failed to spawn retry worker thread";
65
66/// Builds an executor attempt failure from a static message.
67macro_rules! exec_fail {
68    ($message:expr) => {
69        AttemptFailure::Executor(AttemptExecutorError::new($message))
70    };
71}
72
73/// Retry policy and executor bound to an operation error type.
74///
75/// The generic parameter `E` is the caller's operation error type. Cloning a
76/// retry policy shares all registered functors through reference-counted
77/// `rs-function` wrappers.
78#[derive(Clone)]
79pub struct Retry<E = BoxError> {
80    /// Validated retry limits and backoff settings.
81    options: RetryOptions,
82    /// Optional retry-after hint extractor.
83    retry_after_hint: Option<RetryAfterHint<E>>,
84    /// Whether listener panics should be isolated.
85    isolate_listener_panics: bool,
86    /// Lifecycle listeners.
87    listeners: RetryListeners<E>,
88}
89
90/// Effective timeout selected for a single attempt.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92struct EffectiveAttemptTimeout {
93    /// Timeout duration actually enforced for the attempt.
94    duration: Option<Duration>,
95    /// Source that selected the effective timeout.
96    source: Option<AttemptTimeoutSource>,
97}
98
99impl EffectiveAttemptTimeout {
100    /// Creates an effective attempt timeout.
101    ///
102    /// # Parameters
103    /// - `duration`: Timeout duration enforced for the attempt.
104    /// - `source`: Source that selected the timeout.
105    ///
106    /// # Returns
107    /// A timeout descriptor for one attempt.
108    #[inline]
109    fn new(duration: Option<Duration>, source: Option<AttemptTimeoutSource>) -> Self {
110        Self { duration, source }
111    }
112
113    /// Returns the elapsed-budget reason represented by a timeout failure.
114    ///
115    /// # Parameters
116    /// - `failure`: Failure produced by the attempt.
117    ///
118    /// # Returns
119    /// `Some(RetryErrorReason)` when the attempt timed out because an elapsed
120    /// budget selected the effective timeout.
121    #[inline]
122    fn elapsed_timeout_reason<E>(&self, failure: &AttemptFailure<E>) -> Option<RetryErrorReason> {
123        if !matches!(failure, AttemptFailure::Timeout) {
124            return None;
125        }
126        match self.source {
127            Some(AttemptTimeoutSource::MaxOperationElapsed) => {
128                Some(RetryErrorReason::MaxOperationElapsedExceeded)
129            }
130            Some(AttemptTimeoutSource::MaxTotalElapsed) => {
131                Some(RetryErrorReason::MaxTotalElapsedExceeded)
132            }
133            Some(AttemptTimeoutSource::Configured) | None => None,
134        }
135    }
136}
137
138/// Result and cleanup status returned from one blocking worker attempt.
139struct BlockingAttemptOutcome<T, E> {
140    /// Attempt result after timeout handling.
141    result: Result<T, AttemptFailure<E>>,
142    /// Worker threads not observed to exit before cancellation grace ended.
143    unreaped_worker_count: u32,
144}
145
146impl<T, E> BlockingAttemptOutcome<T, E> {
147    /// Creates a worker-attempt outcome.
148    ///
149    /// # Parameters
150    /// - `result`: Attempt result exposed to the retry flow.
151    /// - `unreaped_worker_count`: Count of worker threads not observed to exit.
152    ///
153    /// # Returns
154    /// A blocking-attempt outcome.
155    #[inline]
156    fn new(result: Result<T, AttemptFailure<E>>, unreaped_worker_count: u32) -> Self {
157        Self {
158            result,
159            unreaped_worker_count,
160        }
161    }
162}
163
164/// Source of an effective attempt timeout.
165#[allow(clippy::result_large_err)]
166impl<E> Retry<E> {
167    /// Creates a retry builder.
168    ///
169    /// # Returns
170    /// A [`RetryBuilder`] configured with defaults.
171    #[inline]
172    pub fn builder() -> RetryBuilder<E> {
173        RetryBuilder::new()
174    }
175
176    /// Creates a retry policy from options.
177    ///
178    /// # Parameters
179    /// - `options`: Retry options to validate and install.
180    ///
181    /// # Returns
182    /// A retry policy using the default listener set.
183    ///
184    /// # Errors
185    /// Returns [`RetryConfigError`] if the options are invalid.
186    pub fn from_options(options: RetryOptions) -> Result<Self, RetryConfigError> {
187        Self::builder().options(options).build()
188    }
189
190    /// Returns the immutable options used by this retry policy.
191    ///
192    /// # Returns
193    /// Shared retry options.
194    #[inline]
195    pub fn options(&self) -> &RetryOptions {
196        &self.options
197    }
198
199    /// Runs a synchronous operation with retry.
200    ///
201    /// # Parameters
202    /// - `operation`: Operation called once per attempt until it succeeds or the
203    ///   retry flow stops.
204    ///
205    /// # Returns
206    /// `Ok(T)` with the operation value, or [`RetryError`] when retrying stops.
207    ///
208    /// # Panics
209    /// Propagates operation panics and listener panics unless listener panic
210    /// isolation is enabled.
211    ///
212    /// # Blocking
213    /// Blocks the current thread with `std::thread::sleep` between attempts when
214    /// a non-zero retry delay is selected.
215    ///
216    /// # Elapsed Budget
217    /// `max_operation_elapsed` counts only user operation execution time.
218    /// `max_total_elapsed` counts monotonic retry-flow time, including
219    /// operation execution, retry sleep, retry-after sleep, and retry
220    /// control-path listener time. This synchronous mode cannot interrupt an
221    /// already-running operation; it checks budgets before attempts and after
222    /// failed attempts. If `attempt_timeout` is configured, this method returns
223    /// [`RetryErrorReason::UnsupportedOperation`] because timeout enforcement
224    /// requires worker-thread or async execution.
225    pub fn run<T, F>(&self, mut operation: F) -> Result<T, RetryError<E>>
226    where
227        F: FnMut() -> Result<T, E>,
228    {
229        if self.options.attempt_timeout().is_some() {
230            let attempt_timeout = self.attempt_timeout_duration();
231            return Err(self.emit_error(RetryError::new(
232                RetryErrorReason::UnsupportedOperation,
233                None,
234                RetryContext::from_parts(RetryContextParts {
235                    attempt: 0,
236                    max_attempts: self.options.max_attempts.get(),
237                    max_operation_elapsed: self.options.max_operation_elapsed,
238                    max_total_elapsed: self.options.max_total_elapsed,
239                    operation_elapsed: Duration::ZERO,
240                    total_elapsed: Duration::ZERO,
241                    attempt_elapsed: Duration::ZERO,
242                    attempt_timeout,
243                })
244                .with_attempt_timeout_source(Some(AttemptTimeoutSource::Configured)),
245            )));
246        }
247        let mut operation = SyncValueOperation::new(&mut operation);
248        self.run_sync_operation(&mut operation)
249            .map(|()| operation.into_value())
250    }
251
252    /// Runs a blocking operation with retry inside worker-thread attempts.
253    ///
254    /// Each attempt runs on a worker thread. Worker panics are captured as
255    /// [`AttemptFailure::Panic`]. Worker-spawn failures are reported as
256    /// [`AttemptFailure::Executor`]. If the effective timeout expires, the retry
257    /// executor stops waiting and marks the attempt's [`AttemptCancelToken`] as
258    /// cancelled. It then waits up to [`RetryOptions::worker_cancel_grace`] for
259    /// the worker to exit. Configured attempt-timeout expirations continue
260    /// according to [`AttemptTimeoutPolicy`] only when the worker exits within
261    /// that grace period; otherwise the retry flow stops with
262    /// [`RetryErrorReason::WorkerStillRunning`]. Elapsed-budget expirations stop
263    /// with [`RetryErrorReason::MaxOperationElapsedExceeded`] or
264    /// [`RetryErrorReason::MaxTotalElapsedExceeded`].
265    ///
266    /// # Parameters
267    /// - `operation`: Thread-safe operation called once per attempt. It receives
268    ///   a cooperative cancellation token for that attempt.
269    ///
270    /// # Returns
271    /// `Ok(T)` with the operation value, or [`RetryError`] when retrying stops.
272    ///
273    /// # Panics
274    /// Does not propagate operation panics. Listener panic behavior follows this
275    /// retry policy's listener isolation setting.
276    ///
277    /// # Blocking
278    /// Blocks the current thread while waiting for each worker result or timeout
279    /// and while sleeping between retry attempts.
280    ///
281    /// # Elapsed Budget
282    /// `max_operation_elapsed` counts only user operation execution time.
283    /// `max_total_elapsed` counts monotonic retry-flow time. Worker attempts use
284    /// the shortest of configured attempt timeout, remaining
285    /// max-operation-elapsed budget, and remaining max-total-elapsed budget as
286    /// their effective timeout.
287    pub fn run_in_worker<T, F>(&self, operation: F) -> Result<T, RetryError<E>>
288    where
289        T: Send + 'static,
290        E: Send + 'static,
291        F: Fn(AttemptCancelToken) -> Result<T, E> + Send + Sync + 'static,
292    {
293        let operation = Arc::new(operation);
294        let flow_started_at = Instant::now();
295        let mut operation_elapsed = Duration::ZERO;
296        let mut attempts = 0;
297        let mut last_failure = None;
298
299        loop {
300            let total_elapsed = flow_started_at.elapsed();
301            let attempt_timeout = self.effective_attempt_timeout(operation_elapsed, total_elapsed);
302            if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
303                return Err(self.emit_error(self.elapsed_error(
304                    reason,
305                    operation_elapsed,
306                    total_elapsed,
307                    attempts,
308                    last_failure.take(),
309                    attempt_timeout,
310                )));
311            }
312
313            attempts += 1;
314            let attempt_timeout =
315                self.effective_attempt_timeout(operation_elapsed, flow_started_at.elapsed());
316            let before_context = self
317                .context(
318                    operation_elapsed,
319                    flow_started_at.elapsed(),
320                    attempts,
321                    Duration::ZERO,
322                    attempt_timeout.duration,
323                )
324                .with_attempt_timeout_source(attempt_timeout.source);
325            self.emit_before_attempt(&before_context);
326            let total_elapsed = flow_started_at.elapsed();
327            let attempt_timeout = self.effective_attempt_timeout(operation_elapsed, total_elapsed);
328            if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
329                return Err(self.emit_error(self.elapsed_error(
330                    reason,
331                    operation_elapsed,
332                    total_elapsed,
333                    attempts,
334                    last_failure.take(),
335                    attempt_timeout,
336                )));
337            }
338
339            let attempt_start = Instant::now();
340            let outcome =
341                self.call_blocking_attempt(Arc::clone(&operation), attempt_timeout.duration);
342            let attempt_elapsed = attempt_start.elapsed();
343            operation_elapsed = add_elapsed(operation_elapsed, attempt_elapsed);
344            let context = self
345                .context(
346                    operation_elapsed,
347                    flow_started_at.elapsed(),
348                    attempts,
349                    attempt_elapsed,
350                    attempt_timeout.duration,
351                )
352                .with_attempt_timeout_source(attempt_timeout.source)
353                .with_unreaped_worker_count(outcome.unreaped_worker_count);
354            match outcome.result {
355                Ok(value) => {
356                    self.emit_attempt_success(&context);
357                    return Ok(value);
358                }
359                Err(failure) => {
360                    if let Some(reason) = attempt_timeout.elapsed_timeout_reason(&failure) {
361                        return Err(self.emit_error(RetryError::new(
362                            reason,
363                            Some(failure),
364                            context,
365                        )));
366                    }
367                    let retry_block_reason = (context.unreaped_worker_count() > 0)
368                        .then_some(RetryErrorReason::WorkerStillRunning);
369                    match self.handle_failure(
370                        attempts,
371                        failure,
372                        context,
373                        retry_block_reason,
374                        flow_started_at,
375                    ) {
376                        RetryFlowAction::Retry { delay, failure } => {
377                            if !delay.is_zero() {
378                                std::thread::sleep(delay);
379                            }
380                            last_failure = Some(failure);
381                        }
382                        RetryFlowAction::Finished(error) => return Err(self.emit_error(error)),
383                    }
384                }
385            }
386        }
387    }
388
389    /// Runs a blocking operation with retry and per-attempt timeout isolation.
390    ///
391    /// This method is a compatibility alias for [`Retry::run_in_worker`]. It
392    /// also runs attempts in worker threads when no timeout is configured, so
393    /// worker panics are reported as [`AttemptFailure::Panic`] instead of
394    /// unwinding through the caller. Worker-spawn failures are reported as
395    /// [`AttemptFailure::Executor`].
396    ///
397    /// # Parameters
398    /// - `operation`: Thread-safe operation called once per attempt. It receives
399    ///   a cooperative cancellation token for that attempt.
400    ///
401    /// # Returns
402    /// `Ok(T)` with the operation value, or [`RetryError`] when retrying stops.
403    ///
404    /// # Panics
405    /// Does not propagate operation panics. Listener panic behavior follows this
406    /// retry policy's listener isolation setting.
407    ///
408    /// # Blocking
409    /// Blocks the current thread while waiting for each worker result or timeout
410    /// and while sleeping between retry attempts.
411    ///
412    /// # Elapsed Budget
413    /// `max_operation_elapsed` counts only user operation execution time.
414    /// `max_total_elapsed` counts monotonic retry-flow time. Worker attempts use
415    /// the shortest of configured attempt timeout, remaining
416    /// max-operation-elapsed budget, and remaining max-total-elapsed budget as
417    /// their effective timeout.
418    #[inline]
419    pub fn run_blocking_with_timeout<T, F>(&self, operation: F) -> Result<T, RetryError<E>>
420    where
421        T: Send + 'static,
422        E: Send + 'static,
423        F: Fn(AttemptCancelToken) -> Result<T, E> + Send + Sync + 'static,
424    {
425        self.run_in_worker(operation)
426    }
427
428    /// Runs a synchronous value-erased operation with retry.
429    ///
430    /// # Parameters
431    /// - `operation`: Operation adapter called once per attempt.
432    ///
433    /// # Returns
434    /// `Ok(())` after a successful attempt, or [`RetryError`] when retrying stops.
435    fn run_sync_operation(&self, operation: &mut dyn SyncAttempt<E>) -> Result<(), RetryError<E>> {
436        let flow_started_at = Instant::now();
437        let mut operation_elapsed = Duration::ZERO;
438        let mut attempts = 0;
439        let mut last_failure = None;
440
441        loop {
442            let total_elapsed = flow_started_at.elapsed();
443            if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
444                return Err(self.emit_error(self.elapsed_error(
445                    reason,
446                    operation_elapsed,
447                    total_elapsed,
448                    attempts,
449                    last_failure.take(),
450                    EffectiveAttemptTimeout::new(None, None),
451                )));
452            }
453
454            attempts += 1;
455            let before_context = self.context(
456                operation_elapsed,
457                flow_started_at.elapsed(),
458                attempts,
459                Duration::ZERO,
460                None,
461            );
462            self.emit_before_attempt(&before_context);
463            let total_elapsed = flow_started_at.elapsed();
464            if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
465                return Err(self.emit_error(self.elapsed_error(
466                    reason,
467                    operation_elapsed,
468                    total_elapsed,
469                    attempts,
470                    last_failure.take(),
471                    EffectiveAttemptTimeout::new(None, None),
472                )));
473            }
474
475            let attempt_start = Instant::now();
476            match operation.call() {
477                Ok(()) => {
478                    let attempt_elapsed = attempt_start.elapsed();
479                    operation_elapsed = add_elapsed(operation_elapsed, attempt_elapsed);
480                    let context = self.context(
481                        operation_elapsed,
482                        flow_started_at.elapsed(),
483                        attempts,
484                        attempt_elapsed,
485                        None,
486                    );
487                    self.emit_attempt_success(&context);
488                    return Ok(());
489                }
490                Err(failure) => {
491                    let attempt_elapsed = attempt_start.elapsed();
492                    operation_elapsed = add_elapsed(operation_elapsed, attempt_elapsed);
493                    let context = self.context(
494                        operation_elapsed,
495                        flow_started_at.elapsed(),
496                        attempts,
497                        attempt_elapsed,
498                        None,
499                    );
500                    match self.handle_failure(attempts, failure, context, None, flow_started_at) {
501                        RetryFlowAction::Retry { delay, failure } => {
502                            if !delay.is_zero() {
503                                std::thread::sleep(delay);
504                            }
505                            last_failure = Some(failure);
506                        }
507                        RetryFlowAction::Finished(error) => return Err(self.emit_error(error)),
508                    }
509                }
510            }
511        }
512    }
513
514    /// Runs an asynchronous operation with retry.
515    ///
516    /// # Parameters
517    /// - `operation`: Factory returning a fresh future for each attempt.
518    ///
519    /// # Returns
520    /// `Ok(T)` with the operation value, or [`RetryError`] when retrying stops.
521    ///
522    /// # Panics
523    /// Propagates operation panics from the current async task. They are not
524    /// converted to [`AttemptFailure::Panic`] because `run_async` does not
525    /// create an isolation boundary. Listener panics are propagated unless
526    /// listener panic isolation is enabled. Tokio may panic if timer APIs are
527    /// used outside a runtime with a time driver.
528    ///
529    /// # Elapsed Budget
530    /// `max_operation_elapsed` counts only user operation execution time.
531    /// `max_total_elapsed` counts monotonic retry-flow time. Async attempts use
532    /// the shortest of configured attempt timeout, remaining
533    /// max-operation-elapsed budget, and remaining max-total-elapsed budget as
534    /// their effective timeout.
535    #[cfg(feature = "tokio")]
536    pub async fn run_async<T, F, Fut>(&self, mut operation: F) -> Result<T, RetryError<E>>
537    where
538        F: FnMut() -> Fut,
539        Fut: Future<Output = Result<T, E>>,
540    {
541        let mut operation = AsyncValueOperation::new(&mut operation);
542        self.run_async_operation(&mut operation)
543            .await
544            .map(|()| operation.into_value())
545    }
546
547    /// Runs an asynchronous value-erased operation with retry.
548    ///
549    /// # Parameters
550    /// - `operation`: Async operation adapter called once per attempt.
551    ///
552    /// # Returns
553    /// `Ok(())` after a successful attempt, or [`RetryError`] when retrying stops.
554    #[cfg(feature = "tokio")]
555    async fn run_async_operation(
556        &self,
557        operation: &mut dyn AsyncAttempt<E>,
558    ) -> Result<(), RetryError<E>> {
559        let flow_started_at = Instant::now();
560        let mut operation_elapsed = Duration::ZERO;
561        let mut attempts = 0;
562        let mut last_failure = None;
563
564        loop {
565            let total_elapsed = flow_started_at.elapsed();
566            let attempt_timeout = self.effective_attempt_timeout(operation_elapsed, total_elapsed);
567            if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
568                return Err(self.emit_error(self.elapsed_error(
569                    reason,
570                    operation_elapsed,
571                    total_elapsed,
572                    attempts,
573                    last_failure.take(),
574                    attempt_timeout,
575                )));
576            }
577
578            attempts += 1;
579            let attempt_timeout =
580                self.effective_attempt_timeout(operation_elapsed, flow_started_at.elapsed());
581            let before_context = self
582                .context(
583                    operation_elapsed,
584                    flow_started_at.elapsed(),
585                    attempts,
586                    Duration::ZERO,
587                    attempt_timeout.duration,
588                )
589                .with_attempt_timeout_source(attempt_timeout.source);
590            self.emit_before_attempt(&before_context);
591            let total_elapsed = flow_started_at.elapsed();
592            let attempt_timeout = self.effective_attempt_timeout(operation_elapsed, total_elapsed);
593            if let Some(reason) = self.elapsed_error_reason(operation_elapsed, total_elapsed) {
594                return Err(self.emit_error(self.elapsed_error(
595                    reason,
596                    operation_elapsed,
597                    total_elapsed,
598                    attempts,
599                    last_failure.take(),
600                    attempt_timeout,
601                )));
602            }
603
604            let attempt_start = Instant::now();
605            let result = if let Some(timeout) = attempt_timeout.duration {
606                match tokio::time::timeout(timeout, operation.call()).await {
607                    Ok(result) => result,
608                    Err(_) => Err(AttemptFailure::Timeout),
609                }
610            } else {
611                operation.call().await
612            };
613
614            let attempt_elapsed = attempt_start.elapsed();
615            operation_elapsed = add_elapsed(operation_elapsed, attempt_elapsed);
616            let context = self
617                .context(
618                    operation_elapsed,
619                    flow_started_at.elapsed(),
620                    attempts,
621                    attempt_elapsed,
622                    attempt_timeout.duration,
623                )
624                .with_attempt_timeout_source(attempt_timeout.source);
625            match result {
626                Ok(()) => {
627                    self.emit_attempt_success(&context);
628                    return Ok(());
629                }
630                Err(failure) => {
631                    if let Some(reason) = attempt_timeout.elapsed_timeout_reason(&failure) {
632                        return Err(self.emit_error(RetryError::new(
633                            reason,
634                            Some(failure),
635                            context,
636                        )));
637                    }
638                    match self.handle_failure(attempts, failure, context, None, flow_started_at) {
639                        RetryFlowAction::Retry { delay, failure } => {
640                            sleep_async(delay).await;
641                            last_failure = Some(failure);
642                        }
643                        RetryFlowAction::Finished(error) => return Err(self.emit_error(error)),
644                    }
645                }
646            }
647        }
648    }
649
650    /// Creates a retry policy from validated parts.
651    ///
652    /// # Parameters
653    /// - `options`: Retry options.
654    /// - `retry_after_hint`: Optional hint extractor.
655    /// - `isolate_listener_panics`: Whether listener panics are isolated.
656    /// - `listeners`: Lifecycle listeners.
657    ///
658    /// # Returns
659    /// A retry policy.
660    pub(super) fn new(
661        options: RetryOptions,
662        retry_after_hint: Option<RetryAfterHint<E>>,
663        isolate_listener_panics: bool,
664        listeners: RetryListeners<E>,
665    ) -> Self {
666        Self {
667            options,
668            retry_after_hint,
669            isolate_listener_panics,
670            listeners,
671        }
672    }
673
674    /// Builds a context snapshot.
675    ///
676    /// # Parameters
677    /// - `operation_elapsed`: Cumulative user operation time consumed by this flow.
678    /// - `total_elapsed`: Total monotonic time consumed by this flow.
679    /// - `attempt`: Current attempt number.
680    /// - `attempt_elapsed`: Elapsed time in the current attempt.
681    /// - `attempt_timeout`: Effective timeout configured for the current attempt.
682    ///
683    /// # Returns
684    /// A retry context.
685    fn context(
686        &self,
687        operation_elapsed: Duration,
688        total_elapsed: Duration,
689        attempt: u32,
690        attempt_elapsed: Duration,
691        attempt_timeout: Option<Duration>,
692    ) -> RetryContext {
693        RetryContext::from_parts(RetryContextParts {
694            attempt,
695            max_attempts: self.options.max_attempts.get(),
696            max_operation_elapsed: self.options.max_operation_elapsed,
697            max_total_elapsed: self.options.max_total_elapsed,
698            operation_elapsed,
699            total_elapsed,
700            attempt_elapsed,
701            attempt_timeout,
702        })
703    }
704
705    /// Returns the configured attempt-timeout duration.
706    ///
707    /// # Returns
708    /// `Some(Duration)` when per-attempt timeout is configured.
709    #[inline]
710    fn attempt_timeout_duration(&self) -> Option<Duration> {
711        self.options
712            .attempt_timeout()
713            .map(|attempt_timeout| attempt_timeout.timeout())
714    }
715
716    /// Returns the effective timeout used by the next attempt.
717    ///
718    /// # Parameters
719    /// - `operation_elapsed`: Cumulative user operation time consumed so far.
720    /// - `total_elapsed`: Total monotonic retry-flow time consumed so far.
721    ///
722    /// # Returns
723    /// The shortest of the configured attempt timeout, remaining
724    /// max-operation-elapsed budget, and remaining max-total-elapsed budget,
725    /// including the source that selected it. A configured timeout wins ties so
726    /// its timeout policy remains observable.
727    fn effective_attempt_timeout(
728        &self,
729        operation_elapsed: Duration,
730        total_elapsed: Duration,
731    ) -> EffectiveAttemptTimeout {
732        let candidates = [
733            self.attempt_timeout_duration()
734                .map(|duration| (duration, AttemptTimeoutSource::Configured)),
735            self.remaining_operation_elapsed(operation_elapsed)
736                .map(|duration| (duration, AttemptTimeoutSource::MaxOperationElapsed)),
737            self.remaining_total_elapsed(total_elapsed)
738                .map(|duration| (duration, AttemptTimeoutSource::MaxTotalElapsed)),
739        ];
740        let selected = candidates
741            .into_iter()
742            .flatten()
743            .min_by(|left, right| left.0.cmp(&right.0).then_with(|| left.1.cmp(&right.1)));
744        match selected {
745            Some((duration, source)) => EffectiveAttemptTimeout::new(Some(duration), Some(source)),
746            None => EffectiveAttemptTimeout::new(None, None),
747        }
748    }
749
750    /// Returns remaining user operation time before the max-operation-elapsed budget is exhausted.
751    ///
752    /// # Parameters
753    /// - `operation_elapsed`: Cumulative user operation time consumed so far.
754    ///
755    /// # Returns
756    /// `Some(Duration)` when max elapsed is configured, or `None` when unlimited.
757    #[inline]
758    fn remaining_operation_elapsed(&self, operation_elapsed: Duration) -> Option<Duration> {
759        self.options
760            .max_operation_elapsed
761            .map(|max_operation_elapsed| max_operation_elapsed.saturating_sub(operation_elapsed))
762    }
763
764    /// Returns remaining total retry-flow time before the max-total-elapsed budget is exhausted.
765    ///
766    /// # Parameters
767    /// - `total_elapsed`: Total monotonic retry-flow time consumed so far.
768    ///
769    /// # Returns
770    /// `Some(Duration)` when max total elapsed is configured, or `None` when unlimited.
771    #[inline]
772    fn remaining_total_elapsed(&self, total_elapsed: Duration) -> Option<Duration> {
773        self.options
774            .max_total_elapsed
775            .map(|max_total_elapsed| max_total_elapsed.saturating_sub(total_elapsed))
776    }
777
778    /// Runs one blocking attempt on a worker thread.
779    ///
780    /// # Parameters
781    /// - `operation`: Shared blocking operation.
782    /// - `attempt_timeout`: Effective timeout for this attempt, if any.
783    ///
784    /// # Returns
785    /// The operation value on success, or an attempt failure.
786    ///
787    /// # Panics
788    /// Converts worker panics into [`AttemptFailure::Panic`] and worker-spawn
789    /// failures into [`AttemptFailure::Executor`].
790    fn call_blocking_attempt<T, F>(
791        &self,
792        operation: Arc<F>,
793        attempt_timeout: Option<Duration>,
794    ) -> BlockingAttemptOutcome<T, E>
795    where
796        T: Send + 'static,
797        E: Send + 'static,
798        F: Fn(AttemptCancelToken) -> Result<T, E> + Send + Sync + 'static,
799    {
800        let token = AttemptCancelToken::new();
801        let (sender, receiver) = mpsc::sync_channel(1);
802        let worker_token = token.clone();
803        let worker = std::thread::Builder::new()
804            .name("qubit-retry-worker".to_string())
805            .spawn(move || {
806                let result =
807                    panic::catch_unwind(panic::AssertUnwindSafe(|| operation(worker_token)));
808                let message = match result {
809                    Ok(result) => BlockingAttemptMessage::Result(result),
810                    Err(payload) => {
811                        BlockingAttemptMessage::Panic(AttemptPanic::from_payload(payload))
812                    }
813                };
814                let _ = sender.send(message);
815            });
816        let worker = match worker {
817            Ok(worker) => worker,
818            Err(_) => {
819                return BlockingAttemptOutcome::new(
820                    Err(exec_fail!(WORKER_SPAWN_FAILED_MESSAGE)),
821                    0,
822                );
823            }
824        };
825
826        match attempt_timeout {
827            Some(attempt_timeout) => {
828                let message = receiver.recv_timeout(attempt_timeout);
829                self.worker_timeout_message_to_attempt_outcome(message, receiver, worker, &token)
830            }
831            None => {
832                let result = worker_recv_message_to_attempt_result(receiver.recv());
833                join_finished_worker(worker);
834                BlockingAttemptOutcome::new(result, 0)
835            }
836        }
837    }
838
839    /// Handles one failed attempt.
840    ///
841    /// # Parameters
842    /// - `attempts`: Attempts executed so far.
843    /// - `failure`: Attempt failure.
844    /// - `context`: Context captured after the failed attempt.
845    ///
846    /// # Returns
847    /// A retry action selected from listeners and configured limits.
848    fn handle_failure(
849        &self,
850        attempts: u32,
851        failure: AttemptFailure<E>,
852        context: RetryContext,
853        retry_block_reason: Option<RetryErrorReason>,
854        flow_started_at: Instant,
855    ) -> RetryFlowAction<E> {
856        let hint = self
857            .retry_after_hint
858            .as_ref()
859            .and_then(|hint| self.invoke_listener(|| hint.apply(&failure, &context)));
860        let context = context
861            .with_retry_after_hint(hint)
862            .with_total_elapsed(flow_started_at.elapsed());
863
864        let decision =
865            self.resolve_failure_decision(self.failure_decision(&failure, &context), &failure);
866        let context = context.with_total_elapsed(flow_started_at.elapsed());
867        if decision == AttemptFailureDecision::Abort {
868            return RetryFlowAction::Finished(RetryError::new(
869                RetryErrorReason::Aborted,
870                Some(failure),
871                context,
872            ));
873        }
874
875        let max_attempts = self.options.max_attempts.get();
876        if attempts >= max_attempts {
877            return RetryFlowAction::Finished(RetryError::new(
878                RetryErrorReason::AttemptsExceeded,
879                Some(failure),
880                context,
881            ));
882        }
883
884        if let Some(reason) =
885            self.elapsed_error_reason(context.operation_elapsed(), context.total_elapsed())
886        {
887            return RetryFlowAction::Finished(RetryError::new(reason, Some(failure), context));
888        }
889
890        if let Some(reason) = retry_block_reason {
891            return RetryFlowAction::Finished(RetryError::new(reason, Some(failure), context));
892        }
893
894        let delay = self.retry_delay(decision, attempts, hint);
895        let context = context
896            .with_total_elapsed(flow_started_at.elapsed())
897            .with_next_delay(delay);
898        if self.retry_sleep_exhausts_total_elapsed(context.total_elapsed(), delay) {
899            return RetryFlowAction::Finished(RetryError::new(
900                RetryErrorReason::MaxTotalElapsedExceeded,
901                Some(failure),
902                context,
903            ));
904        }
905        self.emit_retry_scheduled(&failure, &context);
906        let context = context.with_total_elapsed(flow_started_at.elapsed());
907        if let Some(reason) =
908            self.elapsed_error_reason(context.operation_elapsed(), context.total_elapsed())
909        {
910            return RetryFlowAction::Finished(RetryError::new(reason, Some(failure), context));
911        }
912        if self.retry_sleep_exhausts_total_elapsed(context.total_elapsed(), delay) {
913            return RetryFlowAction::Finished(RetryError::new(
914                RetryErrorReason::MaxTotalElapsedExceeded,
915                Some(failure),
916                context,
917            ));
918        }
919        RetryFlowAction::Retry { delay, failure }
920    }
921
922    /// Resolves all failure listeners into one decision.
923    ///
924    /// # Parameters
925    /// - `failure`: Attempt failure.
926    /// - `context`: Failure context.
927    ///
928    /// # Returns
929    /// Last non-default listener decision, or [`AttemptFailureDecision::UseDefault`].
930    fn failure_decision(
931        &self,
932        failure: &AttemptFailure<E>,
933        context: &RetryContext,
934    ) -> AttemptFailureDecision {
935        let mut decision = AttemptFailureDecision::UseDefault;
936        for listener in &self.listeners.failure {
937            let current = self.invoke_listener(|| listener.apply(failure, context));
938            if current != AttemptFailureDecision::UseDefault {
939                decision = current;
940            }
941        }
942        decision
943    }
944
945    /// Resolves the effective failure decision after applying timeout policy.
946    ///
947    /// # Parameters
948    /// - `decision`: Decision returned by failure listeners.
949    /// - `failure`: Attempt failure being handled.
950    ///
951    /// # Returns
952    /// A concrete decision for timeout failures when listeners used the default.
953    fn resolve_failure_decision(
954        &self,
955        decision: AttemptFailureDecision,
956        failure: &AttemptFailure<E>,
957    ) -> AttemptFailureDecision {
958        if decision != AttemptFailureDecision::UseDefault {
959            return decision;
960        }
961        if matches!(failure, AttemptFailure::Timeout)
962            && let Some(attempt_timeout) = self.options.attempt_timeout()
963        {
964            match attempt_timeout.policy() {
965                AttemptTimeoutPolicy::Retry => AttemptFailureDecision::Retry,
966                AttemptTimeoutPolicy::Abort => AttemptFailureDecision::Abort,
967            }
968        } else if matches!(
969            failure,
970            AttemptFailure::Panic(_) | AttemptFailure::Executor(_)
971        ) {
972            AttemptFailureDecision::Abort
973        } else {
974            AttemptFailureDecision::UseDefault
975        }
976    }
977
978    /// Selects the delay used before the next retry.
979    ///
980    /// # Parameters
981    /// - `decision`: Failure decision.
982    /// - `attempts`: Attempts executed so far.
983    /// - `hint`: Optional retry-after hint.
984    ///
985    /// # Returns
986    /// Delay before the next retry.
987    fn retry_delay(
988        &self,
989        decision: AttemptFailureDecision,
990        attempts: u32,
991        hint: Option<Duration>,
992    ) -> Duration {
993        match decision {
994            AttemptFailureDecision::RetryAfter(delay) => delay,
995            AttemptFailureDecision::UseDefault => hint.unwrap_or_else(|| {
996                self.options
997                    .jitter
998                    .delay_for_attempt(&self.options.delay, attempts)
999            }),
1000            AttemptFailureDecision::Retry | AttemptFailureDecision::Abort => self
1001                .options
1002                .jitter
1003                .delay_for_attempt(&self.options.delay, attempts),
1004        }
1005    }
1006
1007    /// Builds an elapsed-budget error.
1008    ///
1009    /// # Parameters
1010    /// - `reason`: Elapsed-budget reason selected by the caller.
1011    /// - `operation_elapsed`: Cumulative user operation time consumed by this flow.
1012    /// - `total_elapsed`: Total monotonic retry-flow time consumed by this flow.
1013    /// - `attempts`: Attempts executed so far.
1014    /// - `last_failure`: Last observed failure, if any.
1015    /// - `attempt_timeout`: Timeout visible in the terminal context.
1016    ///
1017    /// # Returns
1018    /// A retry error preserving the terminal context.
1019    fn elapsed_error(
1020        &self,
1021        reason: RetryErrorReason,
1022        operation_elapsed: Duration,
1023        total_elapsed: Duration,
1024        attempts: u32,
1025        last_failure: Option<AttemptFailure<E>>,
1026        attempt_timeout: EffectiveAttemptTimeout,
1027    ) -> RetryError<E> {
1028        RetryError::new(
1029            reason,
1030            last_failure,
1031            self.context(
1032                operation_elapsed,
1033                total_elapsed,
1034                attempts,
1035                Duration::ZERO,
1036                attempt_timeout.duration,
1037            )
1038            .with_attempt_timeout_source(attempt_timeout.source),
1039        )
1040    }
1041
1042    /// Returns the first elapsed-budget reason that is exhausted.
1043    ///
1044    /// # Parameters
1045    /// - `operation_elapsed`: Cumulative user operation time consumed by this flow.
1046    /// - `total_elapsed`: Total monotonic retry-flow time consumed by this flow.
1047    ///
1048    /// # Returns
1049    /// `Some(RetryErrorReason)` when an elapsed budget has been exhausted.
1050    #[inline]
1051    fn elapsed_error_reason(
1052        &self,
1053        operation_elapsed: Duration,
1054        total_elapsed: Duration,
1055    ) -> Option<RetryErrorReason> {
1056        if self
1057            .options
1058            .max_operation_elapsed
1059            .is_some_and(|max_operation_elapsed| operation_elapsed >= max_operation_elapsed)
1060        {
1061            Some(RetryErrorReason::MaxOperationElapsedExceeded)
1062        } else if self
1063            .options
1064            .max_total_elapsed
1065            .is_some_and(|max_total_elapsed| total_elapsed >= max_total_elapsed)
1066        {
1067            Some(RetryErrorReason::MaxTotalElapsedExceeded)
1068        } else {
1069            None
1070        }
1071    }
1072
1073    /// Returns whether a selected retry sleep would consume the remaining total budget.
1074    ///
1075    /// # Parameters
1076    /// - `total_elapsed`: Total monotonic retry-flow time consumed before sleep.
1077    /// - `delay`: Selected retry delay.
1078    ///
1079    /// # Returns
1080    /// `true` when the delay should not be slept because no budget would remain
1081    /// for the next attempt.
1082    #[inline]
1083    fn retry_sleep_exhausts_total_elapsed(&self, total_elapsed: Duration, delay: Duration) -> bool {
1084        if delay.is_zero() {
1085            return false;
1086        }
1087        let Some(max_total_elapsed) = self.options.max_total_elapsed else {
1088            return false;
1089        };
1090        total_elapsed.saturating_add(delay) >= max_total_elapsed
1091    }
1092
1093    /// Emits before-attempt listeners.
1094    ///
1095    /// # Parameters
1096    /// - `context`: Context passed to listeners.
1097    fn emit_before_attempt(&self, context: &RetryContext) {
1098        for listener in &self.listeners.before_attempt {
1099            self.invoke_listener(|| {
1100                listener.accept(context);
1101            });
1102        }
1103    }
1104
1105    /// Emits attempt-success listeners.
1106    ///
1107    /// # Parameters
1108    /// - `context`: Context passed to listeners.
1109    fn emit_attempt_success(&self, context: &RetryContext) {
1110        for listener in &self.listeners.attempt_success {
1111            self.invoke_listener(|| {
1112                listener.accept(context);
1113            });
1114        }
1115    }
1116
1117    /// Emits retry-scheduled listeners.
1118    ///
1119    /// # Parameters
1120    /// - `failure`: Failure that caused the retry to be scheduled.
1121    /// - `context`: Context carrying the selected next delay.
1122    fn emit_retry_scheduled(&self, failure: &AttemptFailure<E>, context: &RetryContext) {
1123        for listener in &self.listeners.retry_scheduled {
1124            self.invoke_listener(|| {
1125                listener.accept(failure, context);
1126            });
1127        }
1128    }
1129
1130    /// Emits terminal error listeners and returns the same error.
1131    ///
1132    /// # Parameters
1133    /// - `error`: Terminal retry error.
1134    ///
1135    /// # Returns
1136    /// The same error after listeners have been invoked.
1137    fn emit_error(&self, error: RetryError<E>) -> RetryError<E> {
1138        for listener in &self.listeners.error {
1139            self.invoke_listener(|| {
1140                listener.accept(&error, error.context());
1141            });
1142        }
1143        error
1144    }
1145
1146    /// Converts a timeout-aware worker receive into an attempt outcome.
1147    ///
1148    /// # Parameters
1149    /// - `message`: Initial receive result with the attempt timeout applied.
1150    /// - `receiver`: Receiver used to observe worker exit during cancellation grace.
1151    /// - `worker`: Worker thread handle for joining finished workers.
1152    /// - `token`: Cancellation token to mark when the receive timed out.
1153    ///
1154    /// # Returns
1155    /// Attempt result plus the number of worker threads not observed to exit.
1156    fn worker_timeout_message_to_attempt_outcome<T>(
1157        &self,
1158        message: Result<BlockingAttemptMessage<T, E>, mpsc::RecvTimeoutError>,
1159        receiver: mpsc::Receiver<BlockingAttemptMessage<T, E>>,
1160        worker: JoinHandle<()>,
1161        token: &AttemptCancelToken,
1162    ) -> BlockingAttemptOutcome<T, E>
1163    where
1164        T: Send + 'static,
1165        E: Send + 'static,
1166    {
1167        match message {
1168            Ok(message) => {
1169                let result = worker_message_to_attempt_result(message);
1170                join_finished_worker(worker);
1171                BlockingAttemptOutcome::new(result, 0)
1172            }
1173            Err(mpsc::RecvTimeoutError::Timeout) => {
1174                token.cancel();
1175                let worker_exited =
1176                    wait_for_cancelled_worker(&receiver, worker, self.options.worker_cancel_grace);
1177                let unreaped_worker_count = if worker_exited { 0 } else { 1 };
1178                BlockingAttemptOutcome::new(Err(AttemptFailure::Timeout), unreaped_worker_count)
1179            }
1180            Err(mpsc::RecvTimeoutError::Disconnected) => {
1181                join_finished_worker(worker);
1182                BlockingAttemptOutcome::new(Err(exec_fail!(WORKER_DISCONNECTED_MESSAGE)), 0)
1183            }
1184        }
1185    }
1186
1187    /// Invokes a listener and optionally isolates panics.
1188    ///
1189    /// # Parameters
1190    /// - `call`: Listener invocation closure.
1191    ///
1192    /// # Returns
1193    /// The listener return value, or `Default::default()` when an isolated panic
1194    /// occurs.
1195    fn invoke_listener<R>(&self, call: impl FnOnce() -> R) -> R
1196    where
1197        R: Default,
1198    {
1199        if self.isolate_listener_panics {
1200            std::panic::catch_unwind(std::panic::AssertUnwindSafe(call)).unwrap_or_default()
1201        } else {
1202            call()
1203        }
1204    }
1205}
1206
1207impl<E> fmt::Debug for Retry<E> {
1208    /// Formats the retry policy without exposing callbacks.
1209    ///
1210    /// # Parameters
1211    /// - `f`: Formatter.
1212    ///
1213    /// # Returns
1214    /// Formatter result.
1215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1216        f.debug_struct("Retry")
1217            .field("options", &self.options)
1218            .finish_non_exhaustive()
1219    }
1220}
1221
1222/// Converts a worker message into an attempt result.
1223///
1224/// # Parameters
1225/// - `message`: Message received from the worker thread.
1226///
1227/// # Returns
1228/// The operation value on success, or an attempt failure.
1229fn worker_message_to_attempt_result<T, E>(
1230    message: BlockingAttemptMessage<T, E>,
1231) -> Result<T, AttemptFailure<E>> {
1232    match message {
1233        BlockingAttemptMessage::Result(result) => result.map_err(AttemptFailure::Error),
1234        BlockingAttemptMessage::Panic(panic) => Err(AttemptFailure::Panic(panic)),
1235    }
1236}
1237
1238/// Converts a blocking receive result into an attempt result.
1239///
1240/// # Parameters
1241/// - `message`: Result returned by `Receiver::recv`.
1242///
1243/// # Returns
1244/// The operation value on success, or an attempt failure.
1245fn worker_recv_message_to_attempt_result<T, E>(
1246    message: Result<BlockingAttemptMessage<T, E>, mpsc::RecvError>,
1247) -> Result<T, AttemptFailure<E>> {
1248    match message {
1249        Ok(message) => worker_message_to_attempt_result(message),
1250        Err(_) => Err(exec_fail!(WORKER_DISCONNECTED_MESSAGE)),
1251    }
1252}
1253
1254/// Waits briefly for a cancelled worker to exit.
1255///
1256/// # Parameters
1257/// - `receiver`: Worker result receiver used only to observe whether the worker
1258///   sent or disconnected.
1259/// - `worker`: Worker thread handle, joined when exit is observed.
1260/// - `grace`: Maximum time to wait after cancellation. Zero performs only a
1261///   non-blocking check.
1262///
1263/// # Returns
1264/// `true` when the worker was observed to exit before the grace period ended,
1265/// otherwise `false`. When this returns `false`, the worker handle is dropped and
1266/// the thread may continue running detached.
1267fn wait_for_cancelled_worker<T, E>(
1268    receiver: &mpsc::Receiver<BlockingAttemptMessage<T, E>>,
1269    worker: JoinHandle<()>,
1270    grace: Duration,
1271) -> bool {
1272    let exited = if grace.is_zero() {
1273        match receiver.try_recv() {
1274            Ok(_) | Err(mpsc::TryRecvError::Disconnected) => true,
1275            Err(mpsc::TryRecvError::Empty) => false,
1276        }
1277    } else {
1278        match receiver.recv_timeout(grace) {
1279            Ok(_) | Err(mpsc::RecvTimeoutError::Disconnected) => true,
1280            Err(mpsc::RecvTimeoutError::Timeout) => false,
1281        }
1282    };
1283    if exited {
1284        join_finished_worker(worker);
1285    }
1286    exited
1287}
1288
1289/// Joins a worker thread that has already been observed to finish.
1290///
1291/// # Parameters
1292/// - `worker`: Worker thread handle.
1293///
1294/// # Returns
1295/// This function returns nothing.
1296fn join_finished_worker(worker: JoinHandle<()>) {
1297    let _ = worker.join();
1298}
1299
1300/// Adds one attempt duration to the cumulative user-operation elapsed time.
1301///
1302/// # Parameters
1303/// - `operation_elapsed`: Cumulative elapsed time before the attempt.
1304/// - `attempt_elapsed`: Elapsed time consumed by the current attempt.
1305///
1306/// # Returns
1307/// The summed elapsed time, saturated at [`Duration::MAX`] on overflow.
1308fn add_elapsed(operation_elapsed: Duration, attempt_elapsed: Duration) -> Duration {
1309    operation_elapsed.saturating_add(attempt_elapsed)
1310}
1311
1312/// Sleeps asynchronously when the delay is non-zero.
1313///
1314/// # Parameters
1315/// - `delay`: Delay to sleep.
1316///
1317/// # Returns
1318/// This function returns after the sleep completes.
1319#[cfg(feature = "tokio")]
1320async fn sleep_async(delay: Duration) {
1321    if !delay.is_zero() {
1322        tokio::time::sleep(delay).await;
1323    }
1324}