prism3_retry/
executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025.
4 *    3-Prism Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! # Retry Executor
10//!
11//! Retry executor responsible for executing operations with retry strategies.
12//!
13//! # Author
14//!
15//! Haixing Hu
16
17use std::error::Error;
18use std::time::{Duration, Instant};
19
20use prism3_function::readonly_consumer::ReadonlyConsumer;
21
22use super::{
23    AbortEvent, DefaultRetryConfig, FailureEvent, RetryBuilder, RetryConfig, RetryDecision,
24    RetryError, RetryEvent, RetryReason, RetryResult, SuccessEvent,
25};
26
27/// Retry executor
28///
29/// Responsible for executing operations with retry strategies. Automatically
30/// executes retry logic according to configured retry strategies, delay
31/// strategies, failure/abort conditions, and triggers event listeners at
32/// appropriate times.
33///
34/// # Generic Parameters
35///
36/// * `T` - The return value type of the operation
37/// * `C` - Retry configuration type, must implement `RetryConfig` trait,
38///   defaults to `DefaultRetryConfig`
39///
40/// # Core Features
41///
42/// - **Synchronous Retry**: `run()` method executes synchronous operations,
43///   using post-check mechanism for timeout detection
44/// - **Asynchronous Retry**: `run_async()` method executes asynchronous
45///   operations, using tokio::time::timeout for real timeout interruption
46/// - **Timeout Control**: Supports single operation timeout (operation_timeout)
47///   and overall timeout (max_duration)
48/// - **Event Listening**: Supports event callbacks for retry, success,
49///   failure, abort, etc.
50/// - **Flexible Configuration**: Supports multiple delay strategies, error
51///   type identification, result value judgment, etc.
52///
53/// # Timeout Control
54///
55/// The executor supports two levels of timeout control:
56///
57/// 1. **Single Operation Timeout (operation_timeout)**:
58///    - Controls the maximum execution time for each operation
59///    - Synchronous version (`run`): Checks if timeout occurred after
60///      operation completes (post-check mechanism)
61///    - Asynchronous version (`run_async`): Uses tokio::time::timeout to
62///      truly interrupt timeout operations
63///
64/// 2. **Overall Timeout (max_duration)**:
65///    - Controls the maximum total time for the entire retry process
66///      (including all retries and delays)
67///    - Applies to both synchronous and asynchronous versions
68///
69/// # Usage Examples
70///
71/// ## Synchronous Retry (Post-Check Timeout)
72///
73/// ```rust
74/// use prism3_retry::{RetryBuilder, RetryResult};
75/// use std::time::Duration;
76///
77/// let executor = RetryBuilder::<String>::new()
78///     .set_max_attempts(3)
79///     .set_operation_timeout(Some(Duration::from_secs(5)))
80///     .build();
81///
82/// // Use RetryResult type alias to simplify function signature
83/// let result: RetryResult<String> = executor.run(|| {
84///     // Can directly return any error type that implements Into<RetryError>
85///     // For example, using ? operator to handle io::Error will automatically
86///     // convert to RetryError
87///     std::thread::sleep(Duration::from_millis(100));
88///     Ok("SUCCESS".to_string())
89/// });
90/// ```
91///
92/// ## Asynchronous Retry (Real Timeout Interruption)
93///
94/// ```rust,no_run
95/// use prism3_retry::{RetryBuilder, RetryResult};
96/// use std::time::Duration;
97///
98/// # async fn example() {
99/// let executor = RetryBuilder::<String>::new()
100///     .set_max_attempts(3)
101///     .set_operation_timeout(Some(Duration::from_secs(5)))
102///     .build();
103///
104/// // Use RetryResult to make async function signature clearer
105/// let result: RetryResult<String> = executor.run_async(|| async {
106///     // Asynchronous operation, truly interrupted on timeout
107///     tokio::time::sleep(Duration::from_millis(100)).await;
108///     Ok("SUCCESS".to_string())
109/// }).await;
110/// # }
111/// ```
112///
113/// # Author
114///
115/// Haixing Hu
116pub struct RetryExecutor<T, C: RetryConfig = DefaultRetryConfig> {
117    builder: RetryBuilder<T, C>,
118}
119
120impl<T, C> RetryExecutor<T, C>
121where
122    T: Clone + PartialEq + Eq + std::hash::Hash + Send + Sync + 'static,
123    C: RetryConfig,
124{
125    /// Create retry executor
126    pub(crate) fn new(builder: RetryBuilder<T, C>) -> Self {
127        Self { builder }
128    }
129
130    // ==================== Private Helper Methods ====================
131
132    /// Check if maximum duration has been exceeded
133    ///
134    /// # Parameters
135    ///
136    /// * `start_time` - Start time
137    /// * `max_duration` - Maximum duration
138    /// * `attempt` - Current attempt count
139    ///
140    /// # Returns
141    ///
142    /// Returns Some(RetryError) if maximum duration exceeded, None otherwise
143    fn check_max_duration_exceeded(
144        &self,
145        start_time: Instant,
146        max_duration: Option<Duration>,
147        attempt: u32,
148    ) -> Option<RetryError> {
149        if let Some(max_dur) = max_duration {
150            let elapsed = start_time.elapsed();
151            if elapsed >= max_dur {
152                let failure_event = FailureEvent::builder()
153                    .attempt_count(attempt)
154                    .total_duration(elapsed)
155                    .build();
156                if let Some(listener) = self.builder.failure_listener() {
157                    listener.accept(&failure_event);
158                }
159                return Some(RetryError::max_duration_exceeded(elapsed, max_dur));
160            }
161        }
162        None
163    }
164
165    /// Check if single operation timeout occurred (post-check mechanism)
166    ///
167    /// # Parameters
168    ///
169    /// * `result` - Operation result
170    /// * `operation_duration` - Operation execution time
171    ///
172    /// # Returns
173    ///
174    /// Returns timeout error if timed out, otherwise returns original result
175    fn check_operation_timeout(
176        &self,
177        result: Result<T, Box<dyn Error + Send + Sync>>,
178        operation_duration: Duration,
179    ) -> Result<T, Box<dyn Error + Send + Sync>> {
180        if let Some(timeout) = self.builder.operation_timeout() {
181            if operation_duration > timeout {
182                return Err(
183                    Box::new(RetryError::operation_timeout(operation_duration, timeout))
184                        as Box<dyn Error + Send + Sync>,
185                );
186            }
187        }
188        result
189    }
190
191    /// Handle success case
192    ///
193    /// # Parameters
194    ///
195    /// * `value` - Successful result value
196    /// * `attempt` - Current attempt count
197    /// * `start_time` - Start time
198    ///
199    /// # Returns
200    ///
201    /// Returns success result
202    fn handle_success(&self, value: T, attempt: u32, start_time: Instant) -> RetryResult<T> {
203        let success_event = SuccessEvent::builder()
204            .result(value.clone())
205            .attempt_count(attempt)
206            .total_duration(start_time.elapsed())
207            .build();
208        if let Some(listener) = self.builder.success_listener() {
209            listener.accept(&success_event);
210        }
211        Ok(value)
212    }
213
214    /// Handle abort case
215    ///
216    /// # Parameters
217    ///
218    /// * `reason` - Abort reason
219    /// * `attempt` - Current attempt count
220    /// * `start_time` - Start time
221    ///
222    /// # Returns
223    ///
224    /// Returns abort error
225    fn handle_abort(
226        &self,
227        reason: super::AbortReason<T>,
228        attempt: u32,
229        start_time: Instant,
230    ) -> RetryResult<T> {
231        let abort_event = AbortEvent::builder()
232            .reason(reason)
233            .attempt_count(attempt)
234            .total_duration(start_time.elapsed())
235            .build();
236        if let Some(listener) = self.builder.abort_listener() {
237            listener.accept(&abort_event);
238        }
239        Err(RetryError::aborted("Operation aborted"))
240    }
241
242    /// Check if maximum attempts reached
243    ///
244    /// # Parameters
245    ///
246    /// * `attempt` - Current attempt count
247    /// * `max_attempts` - Maximum attempts
248    ///
249    /// # Returns
250    ///
251    /// Returns true if maximum attempts reached, false otherwise
252    fn check_max_attempts_exceeded(&self, attempt: u32, max_attempts: u32) -> bool {
253        attempt >= max_attempts
254    }
255
256    /// Handle maximum attempts exceeded case
257    ///
258    /// # Parameters
259    ///
260    /// * `attempt` - Current attempt count
261    /// * `max_attempts` - Maximum attempts
262    /// * `reason` - Retry reason
263    /// * `start_time` - Start time
264    ///
265    /// # Returns
266    ///
267    /// Returns maximum attempts exceeded error
268    fn handle_max_attempts_exceeded(
269        &self,
270        attempt: u32,
271        max_attempts: u32,
272        reason: RetryReason<T>,
273        start_time: Instant,
274    ) -> RetryError {
275        let failure_event = match reason {
276            RetryReason::Error(error) => FailureEvent::builder()
277                .last_error(Some(error))
278                .attempt_count(attempt)
279                .total_duration(start_time.elapsed())
280                .build(),
281            RetryReason::Result(result) => FailureEvent::builder()
282                .last_result(Some(result))
283                .attempt_count(attempt)
284                .total_duration(start_time.elapsed())
285                .build(),
286        };
287
288        if let Some(listener) = self.builder.failure_listener() {
289            listener.accept(&failure_event);
290        }
291
292        RetryError::max_attempts_exceeded(attempt, max_attempts)
293    }
294
295    /// Calculate delay duration
296    ///
297    /// # Parameters
298    ///
299    /// * `attempt` - Current attempt count
300    ///
301    /// # Returns
302    ///
303    /// Returns calculated delay duration
304    fn calculate_delay(&self, attempt: u32) -> Duration {
305        let delay_strategy = self.builder.delay_strategy();
306        let jitter_factor = self.builder.jitter_factor();
307        delay_strategy.calculate_delay(attempt, jitter_factor)
308    }
309
310    /// Create retry event
311    ///
312    /// # Parameters
313    ///
314    /// * `attempt` - Current attempt count
315    /// * `max_attempts` - Maximum attempts
316    /// * `reason` - Retry reason
317    /// * `delay` - Delay duration
318    /// * `start_time` - Start time
319    ///
320    /// # Returns
321    ///
322    /// Returns created retry event
323    fn create_retry_event(
324        &self,
325        attempt: u32,
326        max_attempts: u32,
327        reason: RetryReason<T>,
328        delay: Duration,
329        start_time: Instant,
330    ) -> RetryEvent<T> {
331        match reason {
332            RetryReason::Error(error) => RetryEvent::builder()
333                .attempt_count(attempt)
334                .max_attempts(max_attempts)
335                .last_error(Some(error))
336                .next_delay(delay)
337                .total_duration(start_time.elapsed())
338                .build(),
339            RetryReason::Result(result) => RetryEvent::builder()
340                .attempt_count(attempt)
341                .max_attempts(max_attempts)
342                .last_result(Some(result))
343                .next_delay(delay)
344                .total_duration(start_time.elapsed())
345                .build(),
346        }
347    }
348
349    /// Trigger retry event and wait for delay
350    ///
351    /// # Parameters
352    ///
353    /// * `retry_event` - Retry event
354    /// * `delay` - Delay duration
355    fn trigger_retry_and_wait(&self, retry_event: RetryEvent<T>, delay: Duration) {
356        if let Some(listener) = self.builder.retry_listener() {
357            listener.accept(&retry_event);
358        }
359
360        if delay > Duration::ZERO {
361            std::thread::sleep(delay);
362        }
363    }
364
365    /// Trigger retry event and wait for delay asynchronously
366    ///
367    /// # Parameters
368    ///
369    /// * `retry_event` - Retry event
370    /// * `delay` - Delay duration
371    async fn trigger_retry_and_wait_async(&self, retry_event: RetryEvent<T>, delay: Duration) {
372        if let Some(listener) = self.builder.retry_listener() {
373            listener.accept(&retry_event);
374        }
375
376        if delay > Duration::ZERO {
377            tokio::time::sleep(delay).await;
378        }
379    }
380
381    /// Execute single synchronous operation and get decision
382    ///
383    /// # Parameters
384    ///
385    /// * `operation` - Operation to execute
386    ///
387    /// # Returns
388    ///
389    /// Returns retry decision
390    fn execute_operation_and_get_decision<F>(&self, operation: &mut F) -> RetryDecision<T>
391    where
392        F: FnMut() -> Result<T, Box<dyn Error + Send + Sync>>,
393    {
394        let operation_start = Instant::now();
395        let result = operation();
396        let operation_duration = operation_start.elapsed();
397
398        // Check single operation timeout (post-check mechanism)
399        let result = self.check_operation_timeout(result, operation_duration);
400
401        // Get retry decision
402        self.builder.get_retry_decision(result)
403    }
404
405    /// Execute single asynchronous operation and get decision
406    ///
407    /// # Parameters
408    ///
409    /// * `operation` - Asynchronous operation to execute
410    ///
411    /// # Returns
412    ///
413    /// Returns retry decision
414    async fn execute_operation_async_and_get_decision<F, Fut>(
415        &self,
416        operation: &mut F,
417    ) -> RetryDecision<T>
418    where
419        F: FnMut() -> Fut,
420        Fut: std::future::Future<Output = Result<T, Box<dyn Error + Send + Sync>>>,
421    {
422        let operation_start = Instant::now();
423        let operation_timeout = self.builder.operation_timeout();
424
425        let result = if let Some(timeout_duration) = operation_timeout {
426            // With timeout limit, use tokio::time::timeout
427            match tokio::time::timeout(timeout_duration, operation()).await {
428                Ok(result) => result,
429                Err(_elapsed) => {
430                    // Timed out, convert to error
431                    let duration = operation_start.elapsed();
432                    Err(
433                        Box::new(RetryError::operation_timeout(duration, timeout_duration))
434                            as Box<dyn Error + Send + Sync>,
435                    )
436                }
437            }
438        } else {
439            // No timeout limit, execute directly
440            operation().await
441        };
442
443        // Get retry decision
444        self.builder.get_retry_decision(result)
445    }
446
447    /// Handle retry decision and return whether to continue
448    ///
449    /// # Parameters
450    ///
451    /// * `decision` - Retry decision
452    /// * `attempt` - Current attempt count
453    /// * `max_attempts` - Maximum attempts
454    /// * `start_time` - Start time
455    ///
456    /// # Returns
457    ///
458    /// - `Ok(Some(value))` - Operation succeeded, returns result value
459    /// - `Ok(None)` - Need to retry, returns None to continue loop
460    /// - `Err(error)` - Operation failed or aborted, returns error
461    fn handle_decision(
462        &self,
463        decision: RetryDecision<T>,
464        attempt: u32,
465        max_attempts: u32,
466        start_time: Instant,
467    ) -> Result<Option<T>, RetryError> {
468        match decision {
469            RetryDecision::Success(value) => {
470                self.handle_success(value.clone(), attempt, start_time)?;
471                Ok(Some(value))
472            }
473            RetryDecision::Retry(reason) => {
474                // Check if maximum retry count reached
475                if self.check_max_attempts_exceeded(attempt, max_attempts) {
476                    let error = self.handle_max_attempts_exceeded(
477                        attempt,
478                        max_attempts,
479                        reason,
480                        start_time,
481                    );
482                    return Err(error);
483                }
484
485                // Calculate delay and create retry event
486                let delay = self.calculate_delay(attempt);
487                let retry_event =
488                    self.create_retry_event(attempt, max_attempts, reason, delay, start_time);
489
490                // Return None and delay time to indicate retry needed
491                // Note: We need to return delay time, so need to adjust return type
492                // Or trigger event directly here
493                self.trigger_retry_and_wait(retry_event, delay);
494
495                Ok(None) // Return None to indicate need to continue retrying
496            }
497            RetryDecision::Abort(reason) => {
498                self.handle_abort(reason, attempt, start_time).map(|_| None) // Won't reach here as handle_abort always returns Err
499            }
500        }
501    }
502
503    /// Handle async retry decision and return whether to continue
504    ///
505    /// # Parameters
506    ///
507    /// * `decision` - Retry decision
508    /// * `attempt` - Current attempt count
509    /// * `max_attempts` - Maximum attempts
510    /// * `start_time` - Start time
511    ///
512    /// # Returns
513    ///
514    /// - `Ok(Some(value))` - Operation succeeded, returns result value
515    /// - `Ok(None)` - Need to retry, returns None to continue loop
516    /// - `Err(error)` - Operation failed or aborted, returns error
517    async fn handle_decision_async(
518        &self,
519        decision: RetryDecision<T>,
520        attempt: u32,
521        max_attempts: u32,
522        start_time: Instant,
523    ) -> Result<Option<T>, RetryError> {
524        match decision {
525            RetryDecision::Success(value) => {
526                self.handle_success(value.clone(), attempt, start_time)?;
527                Ok(Some(value))
528            }
529            RetryDecision::Retry(reason) => {
530                // Check if maximum retry count reached
531                if self.check_max_attempts_exceeded(attempt, max_attempts) {
532                    let error = self.handle_max_attempts_exceeded(
533                        attempt,
534                        max_attempts,
535                        reason,
536                        start_time,
537                    );
538                    return Err(error);
539                }
540
541                // Calculate delay and create retry event
542                let delay = self.calculate_delay(attempt);
543                let retry_event =
544                    self.create_retry_event(attempt, max_attempts, reason, delay, start_time);
545
546                // Trigger event and wait asynchronously
547                self.trigger_retry_and_wait_async(retry_event, delay).await;
548
549                Ok(None) // Return None to indicate need to continue retrying
550            }
551            RetryDecision::Abort(reason) => {
552                self.handle_abort(reason, attempt, start_time).map(|_| None) // Won't reach here as handle_abort always returns Err
553            }
554        }
555    }
556
557    // ==================== Public Methods ====================
558
559    /// Execute synchronous operation (with post-check timeout mechanism)
560    ///
561    /// Execute synchronous operation according to configured retry strategy,
562    /// until success, maximum retry count reached, or abort condition met.
563    ///
564    /// # Timeout Control
565    ///
566    /// This method uses **post-check mechanism** for timeout control:
567    /// - After operation completes, check if execution time exceeds
568    ///   `operation_timeout`
569    /// - If timeout, convert result to `RetryError::OperationTimeout` error
570    ///   and trigger retry
571    /// - Note: Cannot truly interrupt ongoing synchronous operation
572    ///
573    /// If you need to truly interrupt timeout operations, please use
574    /// `run_async()` method.
575    ///
576    /// # Parameters
577    ///
578    /// * `operation` - Operation to execute, returns
579    ///   `Result<T, Box<dyn Error + Send + Sync>>`
580    ///
581    /// # Returns
582    ///
583    /// Returns operation result or error
584    ///
585    /// # Example
586    ///
587    /// ```rust
588    /// use prism3_retry::{RetryBuilder, RetryDelayStrategy, RetryResult};
589    /// use std::time::Duration;
590    ///
591    /// let executor = RetryBuilder::new()
592    ///     .set_max_attempts(3)
593    ///     .set_delay_strategy(RetryDelayStrategy::Fixed { delay: Duration::from_secs(1) })
594    ///     .set_operation_timeout(Some(Duration::from_secs(5))) // Single operation post-check timeout
595    ///     .build();
596    ///
597    /// // Use RetryResult to simplify function signature, leveraging From trait
598    /// // for automatic error conversion
599    /// let result: RetryResult<String> = executor.run(|| -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
600    ///     // Can return any standard error type, will be automatically
601    ///     // converted to RetryError
602    ///     // Example: std::fs::File::open("file.txt")?;
603    ///     // io::Error will be automatically converted to RetryError through
604    ///     // From trait
605    ///     Ok("SUCCESS".to_string())
606    /// });
607    ///
608    /// assert!(result.is_ok());
609    /// ```
610    pub fn run<F>(&self, mut operation: F) -> RetryResult<T>
611    where
612        F: FnMut() -> Result<T, Box<dyn Error + Send + Sync>>,
613    {
614        let start_time = Instant::now();
615        let max_attempts = self.builder.max_attempts();
616        let max_duration = self.builder.max_duration();
617        let mut attempt = 0;
618
619        loop {
620            attempt += 1;
621
622            // Check if maximum duration exceeded
623            if let Some(error) = self.check_max_duration_exceeded(start_time, max_duration, attempt)
624            {
625                return Err(error);
626            }
627
628            // Execute operation and get decision
629            let decision = self.execute_operation_and_get_decision(&mut operation);
630
631            // Handle decision
632            match self.handle_decision(decision, attempt, max_attempts, start_time)? {
633                Some(value) => return Ok(value), // Success, return result
634                None => continue,                // Retry, continue to next iteration
635            }
636        }
637    }
638
639    /// Execute asynchronous operation (with real timeout interruption)
640    ///
641    /// Execute asynchronous operation according to configured retry strategy,
642    /// with single operation timeout control.
643    ///
644    /// # Timeout Control
645    ///
646    /// This method uses **tokio::time::timeout** for real timeout interruption:
647    /// - When operation execution time exceeds `operation_timeout`, the
648    ///   operation will be truly interrupted (cancelled)
649    /// - After interruption, retry will be triggered (if there are remaining
650    ///   retry attempts)
651    /// - Compared to the `run()` method's post-check mechanism, this approach
652    ///   is more efficient and precise
653    ///
654    /// # Difference from Synchronous Version
655    ///
656    /// | Feature | `run()` Sync Version | `run_async()` Async Version |
657    /// |---------|---------------------|----------------------------|
658    /// | Timeout Mechanism | Post-check (check after operation completes) | Real interruption (tokio::time::timeout) |
659    /// | Can Interrupt Operation | ❌ Cannot | ✅ Can |
660    /// | Timeout Precision | Depends on operation completion | Precise to millisecond level |
661    /// | Applicable Scenario | Short synchronous operations | Long asynchronous operations |
662    ///
663    /// # Parameters
664    ///
665    /// * `operation` - Asynchronous operation to execute
666    ///
667    /// # Returns
668    ///
669    /// Returns operation result or error
670    ///
671    /// # Example
672    ///
673    /// ```rust,no_run
674    /// use prism3_retry::{RetryBuilder, RetryDelayStrategy, RetryResult};
675    /// use std::time::Duration;
676    ///
677    /// #[tokio::main]
678    /// async fn main() {
679    ///     let executor = RetryBuilder::<String>::new()
680    ///         .set_max_attempts(3)
681    ///         .set_operation_timeout(Some(Duration::from_secs(5)))  // Real timeout interruption
682    ///         .set_max_duration(Some(Duration::from_secs(30)))      // Overall timeout
683    ///         .set_delay_strategy(RetryDelayStrategy::Fixed {
684    ///             delay: Duration::from_secs(1)
685    ///         })
686    ///         .build();
687    ///
688    ///     // Use RetryResult type alias to make code more concise
689    ///     let result: RetryResult<String> = executor.run_async(|| async {
690    ///         // Can also use ? operator in async operations, errors will be
691    ///         // automatically converted
692    ///         // Example: tokio::fs::read_to_string("file.txt").await?;
693    ///         tokio::time::sleep(Duration::from_millis(100)).await;
694    ///         Ok("SUCCESS".to_string())
695    ///     }).await;
696    ///
697    ///     assert!(result.is_ok());
698    /// }
699    /// ```
700    pub async fn run_async<F, Fut>(&self, mut operation: F) -> RetryResult<T>
701    where
702        F: FnMut() -> Fut,
703        Fut: std::future::Future<Output = Result<T, Box<dyn Error + Send + Sync>>>,
704    {
705        let start_time = Instant::now();
706        let max_attempts = self.builder.max_attempts();
707        let max_duration = self.builder.max_duration();
708        let mut attempt = 0;
709
710        loop {
711            attempt += 1;
712
713            // Check if maximum duration exceeded
714            if let Some(error) = self.check_max_duration_exceeded(start_time, max_duration, attempt)
715            {
716                return Err(error);
717            }
718
719            // Execute operation and get decision
720            let decision = self
721                .execute_operation_async_and_get_decision(&mut operation)
722                .await;
723
724            // Handle decision
725            match self
726                .handle_decision_async(decision, attempt, max_attempts, start_time)
727                .await?
728            {
729                Some(value) => return Ok(value), // Success, return result
730                None => continue,                // Retry, continue to next iteration
731            }
732        }
733    }
734
735    /// Get builder configuration
736    pub fn config(&self) -> &RetryBuilder<T, C> {
737        &self.builder
738    }
739}