prism3_retry/executor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025.
4 * 3-Prism Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! # Retry Executor
10//!
11//! Retry executor responsible for executing operations with retry strategies.
12//!
13//! # Author
14//!
15//! Haixing Hu
16
17use std::error::Error;
18use std::time::{Duration, Instant};
19
20use prism3_function::readonly_consumer::ReadonlyConsumer;
21
22use super::{
23 AbortEvent, DefaultRetryConfig, FailureEvent, RetryBuilder, RetryConfig, RetryDecision,
24 RetryError, RetryEvent, RetryReason, RetryResult, SuccessEvent,
25};
26
27/// Retry executor
28///
29/// Responsible for executing operations with retry strategies. Automatically
30/// executes retry logic according to configured retry strategies, delay
31/// strategies, failure/abort conditions, and triggers event listeners at
32/// appropriate times.
33///
34/// # Generic Parameters
35///
36/// * `T` - The return value type of the operation
37/// * `C` - Retry configuration type, must implement `RetryConfig` trait,
38/// defaults to `DefaultRetryConfig`
39///
40/// # Core Features
41///
42/// - **Synchronous Retry**: `run()` method executes synchronous operations,
43/// using post-check mechanism for timeout detection
44/// - **Asynchronous Retry**: `run_async()` method executes asynchronous
45/// operations, using tokio::time::timeout for real timeout interruption
46/// - **Timeout Control**: Supports single operation timeout (operation_timeout)
47/// and overall timeout (max_duration)
48/// - **Event Listening**: Supports event callbacks for retry, success,
49/// failure, abort, etc.
50/// - **Flexible Configuration**: Supports multiple delay strategies, error
51/// type identification, result value judgment, etc.
52///
53/// # Timeout Control
54///
55/// The executor supports two levels of timeout control:
56///
57/// 1. **Single Operation Timeout (operation_timeout)**:
58/// - Controls the maximum execution time for each operation
59/// - Synchronous version (`run`): Checks if timeout occurred after
60/// operation completes (post-check mechanism)
61/// - Asynchronous version (`run_async`): Uses tokio::time::timeout to
62/// truly interrupt timeout operations
63///
64/// 2. **Overall Timeout (max_duration)**:
65/// - Controls the maximum total time for the entire retry process
66/// (including all retries and delays)
67/// - Applies to both synchronous and asynchronous versions
68///
69/// # Usage Examples
70///
71/// ## Synchronous Retry (Post-Check Timeout)
72///
73/// ```rust
74/// use prism3_retry::{RetryBuilder, RetryResult};
75/// use std::time::Duration;
76///
77/// let executor = RetryBuilder::<String>::new()
78/// .set_max_attempts(3)
79/// .set_operation_timeout(Some(Duration::from_secs(5)))
80/// .build();
81///
82/// // Use RetryResult type alias to simplify function signature
83/// let result: RetryResult<String> = executor.run(|| {
84/// // Can directly return any error type that implements Into<RetryError>
85/// // For example, using ? operator to handle io::Error will automatically
86/// // convert to RetryError
87/// std::thread::sleep(Duration::from_millis(100));
88/// Ok("SUCCESS".to_string())
89/// });
90/// ```
91///
92/// ## Asynchronous Retry (Real Timeout Interruption)
93///
94/// ```rust,no_run
95/// use prism3_retry::{RetryBuilder, RetryResult};
96/// use std::time::Duration;
97///
98/// # async fn example() {
99/// let executor = RetryBuilder::<String>::new()
100/// .set_max_attempts(3)
101/// .set_operation_timeout(Some(Duration::from_secs(5)))
102/// .build();
103///
104/// // Use RetryResult to make async function signature clearer
105/// let result: RetryResult<String> = executor.run_async(|| async {
106/// // Asynchronous operation, truly interrupted on timeout
107/// tokio::time::sleep(Duration::from_millis(100)).await;
108/// Ok("SUCCESS".to_string())
109/// }).await;
110/// # }
111/// ```
112///
113/// # Author
114///
115/// Haixing Hu
116pub struct RetryExecutor<T, C: RetryConfig = DefaultRetryConfig> {
117 builder: RetryBuilder<T, C>,
118}
119
120impl<T, C> RetryExecutor<T, C>
121where
122 T: Clone + PartialEq + Eq + std::hash::Hash + Send + Sync + 'static,
123 C: RetryConfig,
124{
125 /// Create retry executor
126 pub(crate) fn new(builder: RetryBuilder<T, C>) -> Self {
127 Self { builder }
128 }
129
130 // ==================== Private Helper Methods ====================
131
132 /// Check if maximum duration has been exceeded
133 ///
134 /// # Parameters
135 ///
136 /// * `start_time` - Start time
137 /// * `max_duration` - Maximum duration
138 /// * `attempt` - Current attempt count
139 ///
140 /// # Returns
141 ///
142 /// Returns Some(RetryError) if maximum duration exceeded, None otherwise
143 fn check_max_duration_exceeded(
144 &self,
145 start_time: Instant,
146 max_duration: Option<Duration>,
147 attempt: u32,
148 ) -> Option<RetryError> {
149 if let Some(max_dur) = max_duration {
150 let elapsed = start_time.elapsed();
151 if elapsed >= max_dur {
152 let failure_event = FailureEvent::builder()
153 .attempt_count(attempt)
154 .total_duration(elapsed)
155 .build();
156 if let Some(listener) = self.builder.failure_listener() {
157 listener.accept(&failure_event);
158 }
159 return Some(RetryError::max_duration_exceeded(elapsed, max_dur));
160 }
161 }
162 None
163 }
164
165 /// Check if single operation timeout occurred (post-check mechanism)
166 ///
167 /// # Parameters
168 ///
169 /// * `result` - Operation result
170 /// * `operation_duration` - Operation execution time
171 ///
172 /// # Returns
173 ///
174 /// Returns timeout error if timed out, otherwise returns original result
175 fn check_operation_timeout(
176 &self,
177 result: Result<T, Box<dyn Error + Send + Sync>>,
178 operation_duration: Duration,
179 ) -> Result<T, Box<dyn Error + Send + Sync>> {
180 if let Some(timeout) = self.builder.operation_timeout() {
181 if operation_duration > timeout {
182 return Err(
183 Box::new(RetryError::operation_timeout(operation_duration, timeout))
184 as Box<dyn Error + Send + Sync>,
185 );
186 }
187 }
188 result
189 }
190
191 /// Handle success case
192 ///
193 /// # Parameters
194 ///
195 /// * `value` - Successful result value
196 /// * `attempt` - Current attempt count
197 /// * `start_time` - Start time
198 ///
199 /// # Returns
200 ///
201 /// Returns success result
202 fn handle_success(&self, value: T, attempt: u32, start_time: Instant) -> RetryResult<T> {
203 let success_event = SuccessEvent::builder()
204 .result(value.clone())
205 .attempt_count(attempt)
206 .total_duration(start_time.elapsed())
207 .build();
208 if let Some(listener) = self.builder.success_listener() {
209 listener.accept(&success_event);
210 }
211 Ok(value)
212 }
213
214 /// Handle abort case
215 ///
216 /// # Parameters
217 ///
218 /// * `reason` - Abort reason
219 /// * `attempt` - Current attempt count
220 /// * `start_time` - Start time
221 ///
222 /// # Returns
223 ///
224 /// Returns abort error
225 fn handle_abort(
226 &self,
227 reason: super::AbortReason<T>,
228 attempt: u32,
229 start_time: Instant,
230 ) -> RetryResult<T> {
231 let abort_event = AbortEvent::builder()
232 .reason(reason)
233 .attempt_count(attempt)
234 .total_duration(start_time.elapsed())
235 .build();
236 if let Some(listener) = self.builder.abort_listener() {
237 listener.accept(&abort_event);
238 }
239 Err(RetryError::aborted("Operation aborted"))
240 }
241
242 /// Check if maximum attempts reached
243 ///
244 /// # Parameters
245 ///
246 /// * `attempt` - Current attempt count
247 /// * `max_attempts` - Maximum attempts
248 ///
249 /// # Returns
250 ///
251 /// Returns true if maximum attempts reached, false otherwise
252 fn check_max_attempts_exceeded(&self, attempt: u32, max_attempts: u32) -> bool {
253 attempt >= max_attempts
254 }
255
256 /// Handle maximum attempts exceeded case
257 ///
258 /// # Parameters
259 ///
260 /// * `attempt` - Current attempt count
261 /// * `max_attempts` - Maximum attempts
262 /// * `reason` - Retry reason
263 /// * `start_time` - Start time
264 ///
265 /// # Returns
266 ///
267 /// Returns maximum attempts exceeded error
268 fn handle_max_attempts_exceeded(
269 &self,
270 attempt: u32,
271 max_attempts: u32,
272 reason: RetryReason<T>,
273 start_time: Instant,
274 ) -> RetryError {
275 let failure_event = match reason {
276 RetryReason::Error(error) => FailureEvent::builder()
277 .last_error(Some(error))
278 .attempt_count(attempt)
279 .total_duration(start_time.elapsed())
280 .build(),
281 RetryReason::Result(result) => FailureEvent::builder()
282 .last_result(Some(result))
283 .attempt_count(attempt)
284 .total_duration(start_time.elapsed())
285 .build(),
286 };
287
288 if let Some(listener) = self.builder.failure_listener() {
289 listener.accept(&failure_event);
290 }
291
292 RetryError::max_attempts_exceeded(attempt, max_attempts)
293 }
294
295 /// Calculate delay duration
296 ///
297 /// # Parameters
298 ///
299 /// * `attempt` - Current attempt count
300 ///
301 /// # Returns
302 ///
303 /// Returns calculated delay duration
304 fn calculate_delay(&self, attempt: u32) -> Duration {
305 let delay_strategy = self.builder.delay_strategy();
306 let jitter_factor = self.builder.jitter_factor();
307 delay_strategy.calculate_delay(attempt, jitter_factor)
308 }
309
310 /// Create retry event
311 ///
312 /// # Parameters
313 ///
314 /// * `attempt` - Current attempt count
315 /// * `max_attempts` - Maximum attempts
316 /// * `reason` - Retry reason
317 /// * `delay` - Delay duration
318 /// * `start_time` - Start time
319 ///
320 /// # Returns
321 ///
322 /// Returns created retry event
323 fn create_retry_event(
324 &self,
325 attempt: u32,
326 max_attempts: u32,
327 reason: RetryReason<T>,
328 delay: Duration,
329 start_time: Instant,
330 ) -> RetryEvent<T> {
331 match reason {
332 RetryReason::Error(error) => RetryEvent::builder()
333 .attempt_count(attempt)
334 .max_attempts(max_attempts)
335 .last_error(Some(error))
336 .next_delay(delay)
337 .total_duration(start_time.elapsed())
338 .build(),
339 RetryReason::Result(result) => RetryEvent::builder()
340 .attempt_count(attempt)
341 .max_attempts(max_attempts)
342 .last_result(Some(result))
343 .next_delay(delay)
344 .total_duration(start_time.elapsed())
345 .build(),
346 }
347 }
348
349 /// Trigger retry event and wait for delay
350 ///
351 /// # Parameters
352 ///
353 /// * `retry_event` - Retry event
354 /// * `delay` - Delay duration
355 fn trigger_retry_and_wait(&self, retry_event: RetryEvent<T>, delay: Duration) {
356 if let Some(listener) = self.builder.retry_listener() {
357 listener.accept(&retry_event);
358 }
359
360 if delay > Duration::ZERO {
361 std::thread::sleep(delay);
362 }
363 }
364
365 /// Trigger retry event and wait for delay asynchronously
366 ///
367 /// # Parameters
368 ///
369 /// * `retry_event` - Retry event
370 /// * `delay` - Delay duration
371 async fn trigger_retry_and_wait_async(&self, retry_event: RetryEvent<T>, delay: Duration) {
372 if let Some(listener) = self.builder.retry_listener() {
373 listener.accept(&retry_event);
374 }
375
376 if delay > Duration::ZERO {
377 tokio::time::sleep(delay).await;
378 }
379 }
380
381 /// Execute single synchronous operation and get decision
382 ///
383 /// # Parameters
384 ///
385 /// * `operation` - Operation to execute
386 ///
387 /// # Returns
388 ///
389 /// Returns retry decision
390 fn execute_operation_and_get_decision<F>(&self, operation: &mut F) -> RetryDecision<T>
391 where
392 F: FnMut() -> Result<T, Box<dyn Error + Send + Sync>>,
393 {
394 let operation_start = Instant::now();
395 let result = operation();
396 let operation_duration = operation_start.elapsed();
397
398 // Check single operation timeout (post-check mechanism)
399 let result = self.check_operation_timeout(result, operation_duration);
400
401 // Get retry decision
402 self.builder.get_retry_decision(result)
403 }
404
405 /// Execute single asynchronous operation and get decision
406 ///
407 /// # Parameters
408 ///
409 /// * `operation` - Asynchronous operation to execute
410 ///
411 /// # Returns
412 ///
413 /// Returns retry decision
414 async fn execute_operation_async_and_get_decision<F, Fut>(
415 &self,
416 operation: &mut F,
417 ) -> RetryDecision<T>
418 where
419 F: FnMut() -> Fut,
420 Fut: std::future::Future<Output = Result<T, Box<dyn Error + Send + Sync>>>,
421 {
422 let operation_start = Instant::now();
423 let operation_timeout = self.builder.operation_timeout();
424
425 let result = if let Some(timeout_duration) = operation_timeout {
426 // With timeout limit, use tokio::time::timeout
427 match tokio::time::timeout(timeout_duration, operation()).await {
428 Ok(result) => result,
429 Err(_elapsed) => {
430 // Timed out, convert to error
431 let duration = operation_start.elapsed();
432 Err(
433 Box::new(RetryError::operation_timeout(duration, timeout_duration))
434 as Box<dyn Error + Send + Sync>,
435 )
436 }
437 }
438 } else {
439 // No timeout limit, execute directly
440 operation().await
441 };
442
443 // Get retry decision
444 self.builder.get_retry_decision(result)
445 }
446
447 /// Handle retry decision and return whether to continue
448 ///
449 /// # Parameters
450 ///
451 /// * `decision` - Retry decision
452 /// * `attempt` - Current attempt count
453 /// * `max_attempts` - Maximum attempts
454 /// * `start_time` - Start time
455 ///
456 /// # Returns
457 ///
458 /// - `Ok(Some(value))` - Operation succeeded, returns result value
459 /// - `Ok(None)` - Need to retry, returns None to continue loop
460 /// - `Err(error)` - Operation failed or aborted, returns error
461 fn handle_decision(
462 &self,
463 decision: RetryDecision<T>,
464 attempt: u32,
465 max_attempts: u32,
466 start_time: Instant,
467 ) -> Result<Option<T>, RetryError> {
468 match decision {
469 RetryDecision::Success(value) => {
470 self.handle_success(value.clone(), attempt, start_time)?;
471 Ok(Some(value))
472 }
473 RetryDecision::Retry(reason) => {
474 // Check if maximum retry count reached
475 if self.check_max_attempts_exceeded(attempt, max_attempts) {
476 let error = self.handle_max_attempts_exceeded(
477 attempt,
478 max_attempts,
479 reason,
480 start_time,
481 );
482 return Err(error);
483 }
484
485 // Calculate delay and create retry event
486 let delay = self.calculate_delay(attempt);
487 let retry_event =
488 self.create_retry_event(attempt, max_attempts, reason, delay, start_time);
489
490 // Return None and delay time to indicate retry needed
491 // Note: We need to return delay time, so need to adjust return type
492 // Or trigger event directly here
493 self.trigger_retry_and_wait(retry_event, delay);
494
495 Ok(None) // Return None to indicate need to continue retrying
496 }
497 RetryDecision::Abort(reason) => {
498 self.handle_abort(reason, attempt, start_time).map(|_| None) // Won't reach here as handle_abort always returns Err
499 }
500 }
501 }
502
503 /// Handle async retry decision and return whether to continue
504 ///
505 /// # Parameters
506 ///
507 /// * `decision` - Retry decision
508 /// * `attempt` - Current attempt count
509 /// * `max_attempts` - Maximum attempts
510 /// * `start_time` - Start time
511 ///
512 /// # Returns
513 ///
514 /// - `Ok(Some(value))` - Operation succeeded, returns result value
515 /// - `Ok(None)` - Need to retry, returns None to continue loop
516 /// - `Err(error)` - Operation failed or aborted, returns error
517 async fn handle_decision_async(
518 &self,
519 decision: RetryDecision<T>,
520 attempt: u32,
521 max_attempts: u32,
522 start_time: Instant,
523 ) -> Result<Option<T>, RetryError> {
524 match decision {
525 RetryDecision::Success(value) => {
526 self.handle_success(value.clone(), attempt, start_time)?;
527 Ok(Some(value))
528 }
529 RetryDecision::Retry(reason) => {
530 // Check if maximum retry count reached
531 if self.check_max_attempts_exceeded(attempt, max_attempts) {
532 let error = self.handle_max_attempts_exceeded(
533 attempt,
534 max_attempts,
535 reason,
536 start_time,
537 );
538 return Err(error);
539 }
540
541 // Calculate delay and create retry event
542 let delay = self.calculate_delay(attempt);
543 let retry_event =
544 self.create_retry_event(attempt, max_attempts, reason, delay, start_time);
545
546 // Trigger event and wait asynchronously
547 self.trigger_retry_and_wait_async(retry_event, delay).await;
548
549 Ok(None) // Return None to indicate need to continue retrying
550 }
551 RetryDecision::Abort(reason) => {
552 self.handle_abort(reason, attempt, start_time).map(|_| None) // Won't reach here as handle_abort always returns Err
553 }
554 }
555 }
556
557 // ==================== Public Methods ====================
558
559 /// Execute synchronous operation (with post-check timeout mechanism)
560 ///
561 /// Execute synchronous operation according to configured retry strategy,
562 /// until success, maximum retry count reached, or abort condition met.
563 ///
564 /// # Timeout Control
565 ///
566 /// This method uses **post-check mechanism** for timeout control:
567 /// - After operation completes, check if execution time exceeds
568 /// `operation_timeout`
569 /// - If timeout, convert result to `RetryError::OperationTimeout` error
570 /// and trigger retry
571 /// - Note: Cannot truly interrupt ongoing synchronous operation
572 ///
573 /// If you need to truly interrupt timeout operations, please use
574 /// `run_async()` method.
575 ///
576 /// # Parameters
577 ///
578 /// * `operation` - Operation to execute, returns
579 /// `Result<T, Box<dyn Error + Send + Sync>>`
580 ///
581 /// # Returns
582 ///
583 /// Returns operation result or error
584 ///
585 /// # Example
586 ///
587 /// ```rust
588 /// use prism3_retry::{RetryBuilder, RetryDelayStrategy, RetryResult};
589 /// use std::time::Duration;
590 ///
591 /// let executor = RetryBuilder::new()
592 /// .set_max_attempts(3)
593 /// .set_delay_strategy(RetryDelayStrategy::Fixed { delay: Duration::from_secs(1) })
594 /// .set_operation_timeout(Some(Duration::from_secs(5))) // Single operation post-check timeout
595 /// .build();
596 ///
597 /// // Use RetryResult to simplify function signature, leveraging From trait
598 /// // for automatic error conversion
599 /// let result: RetryResult<String> = executor.run(|| -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
600 /// // Can return any standard error type, will be automatically
601 /// // converted to RetryError
602 /// // Example: std::fs::File::open("file.txt")?;
603 /// // io::Error will be automatically converted to RetryError through
604 /// // From trait
605 /// Ok("SUCCESS".to_string())
606 /// });
607 ///
608 /// assert!(result.is_ok());
609 /// ```
610 pub fn run<F>(&self, mut operation: F) -> RetryResult<T>
611 where
612 F: FnMut() -> Result<T, Box<dyn Error + Send + Sync>>,
613 {
614 let start_time = Instant::now();
615 let max_attempts = self.builder.max_attempts();
616 let max_duration = self.builder.max_duration();
617 let mut attempt = 0;
618
619 loop {
620 attempt += 1;
621
622 // Check if maximum duration exceeded
623 if let Some(error) = self.check_max_duration_exceeded(start_time, max_duration, attempt)
624 {
625 return Err(error);
626 }
627
628 // Execute operation and get decision
629 let decision = self.execute_operation_and_get_decision(&mut operation);
630
631 // Handle decision
632 match self.handle_decision(decision, attempt, max_attempts, start_time)? {
633 Some(value) => return Ok(value), // Success, return result
634 None => continue, // Retry, continue to next iteration
635 }
636 }
637 }
638
639 /// Execute asynchronous operation (with real timeout interruption)
640 ///
641 /// Execute asynchronous operation according to configured retry strategy,
642 /// with single operation timeout control.
643 ///
644 /// # Timeout Control
645 ///
646 /// This method uses **tokio::time::timeout** for real timeout interruption:
647 /// - When operation execution time exceeds `operation_timeout`, the
648 /// operation will be truly interrupted (cancelled)
649 /// - After interruption, retry will be triggered (if there are remaining
650 /// retry attempts)
651 /// - Compared to the `run()` method's post-check mechanism, this approach
652 /// is more efficient and precise
653 ///
654 /// # Difference from Synchronous Version
655 ///
656 /// | Feature | `run()` Sync Version | `run_async()` Async Version |
657 /// |---------|---------------------|----------------------------|
658 /// | Timeout Mechanism | Post-check (check after operation completes) | Real interruption (tokio::time::timeout) |
659 /// | Can Interrupt Operation | ❌ Cannot | ✅ Can |
660 /// | Timeout Precision | Depends on operation completion | Precise to millisecond level |
661 /// | Applicable Scenario | Short synchronous operations | Long asynchronous operations |
662 ///
663 /// # Parameters
664 ///
665 /// * `operation` - Asynchronous operation to execute
666 ///
667 /// # Returns
668 ///
669 /// Returns operation result or error
670 ///
671 /// # Example
672 ///
673 /// ```rust,no_run
674 /// use prism3_retry::{RetryBuilder, RetryDelayStrategy, RetryResult};
675 /// use std::time::Duration;
676 ///
677 /// #[tokio::main]
678 /// async fn main() {
679 /// let executor = RetryBuilder::<String>::new()
680 /// .set_max_attempts(3)
681 /// .set_operation_timeout(Some(Duration::from_secs(5))) // Real timeout interruption
682 /// .set_max_duration(Some(Duration::from_secs(30))) // Overall timeout
683 /// .set_delay_strategy(RetryDelayStrategy::Fixed {
684 /// delay: Duration::from_secs(1)
685 /// })
686 /// .build();
687 ///
688 /// // Use RetryResult type alias to make code more concise
689 /// let result: RetryResult<String> = executor.run_async(|| async {
690 /// // Can also use ? operator in async operations, errors will be
691 /// // automatically converted
692 /// // Example: tokio::fs::read_to_string("file.txt").await?;
693 /// tokio::time::sleep(Duration::from_millis(100)).await;
694 /// Ok("SUCCESS".to_string())
695 /// }).await;
696 ///
697 /// assert!(result.is_ok());
698 /// }
699 /// ```
700 pub async fn run_async<F, Fut>(&self, mut operation: F) -> RetryResult<T>
701 where
702 F: FnMut() -> Fut,
703 Fut: std::future::Future<Output = Result<T, Box<dyn Error + Send + Sync>>>,
704 {
705 let start_time = Instant::now();
706 let max_attempts = self.builder.max_attempts();
707 let max_duration = self.builder.max_duration();
708 let mut attempt = 0;
709
710 loop {
711 attempt += 1;
712
713 // Check if maximum duration exceeded
714 if let Some(error) = self.check_max_duration_exceeded(start_time, max_duration, attempt)
715 {
716 return Err(error);
717 }
718
719 // Execute operation and get decision
720 let decision = self
721 .execute_operation_async_and_get_decision(&mut operation)
722 .await;
723
724 // Handle decision
725 match self
726 .handle_decision_async(decision, attempt, max_attempts, start_time)
727 .await?
728 {
729 Some(value) => return Ok(value), // Success, return result
730 None => continue, // Retry, continue to next iteration
731 }
732 }
733 }
734
735 /// Get builder configuration
736 pub fn config(&self) -> &RetryBuilder<T, C> {
737 &self.builder
738 }
739}