error_rail/async_ext/
tokio_ext.rs

1//! Tokio-specific async extensions.
2//!
3//! This module provides Tokio-optimized utilities that leverage Tokio's
4//! runtime features like `task_local!` and `tokio::time::sleep`.
5//!
6//! # Feature Flag
7//!
8//! Requires the `ecosystem` feature:
9//!
10//! ```toml
11//! [dependencies]
12//! error-rail = { version = "0.8", features = ["ecosystem"] }
13//! ```
14
15use core::future::Future;
16use core::time::Duration;
17
18use crate::traits::TransientError;
19use crate::types::{BoxedComposableError, BoxedComposableResult, ComposableError};
20
21use super::retry::{retry_with_policy, RetryPolicy};
22
23/// Retries an async operation using Tokio's sleep.
24///
25/// This is a convenience wrapper around [`retry_with_policy`] that uses
26/// `tokio::time::sleep` for delays, eliminating the need to pass a sleep function.
27///
28/// # Arguments
29///
30/// * `operation` - A closure that returns the future to retry
31/// * `policy` - The retry policy to use
32///
33/// # Example
34///
35/// ```rust,ignore
36/// use error_rail::async_ext::{retry_transient, ExponentialBackoff};
37///
38/// #[tokio::main]
39/// async fn main() {
40///     let result = retry_transient(
41///         || fetch_data(),
42///         ExponentialBackoff::default(),
43///     ).await;
44/// }
45/// ```
46pub async fn retry_transient<F, Fut, T, E, P>(
47    operation: F,
48    policy: P,
49) -> BoxedComposableResult<T, E>
50where
51    F: FnMut() -> Fut,
52    Fut: Future<Output = Result<T, E>>,
53    E: TransientError,
54    P: RetryPolicy,
55{
56    retry_with_policy(operation, policy, tokio::time::sleep)
57        .await
58        .map_err(Box::new)
59}
60
61/// Retries an async operation with a simple count limit using Tokio's sleep.
62///
63/// Uses exponential backoff with sensible defaults.
64///
65/// # Arguments
66///
67/// * `operation` - A closure that returns the future to retry
68/// * `max_attempts` - Maximum number of retry attempts
69///
70/// # Example
71///
72/// ```rust,ignore
73/// use error_rail::async_ext::retry_transient_n;
74///
75/// let result = retry_transient_n(|| fetch_data(), 3).await;
76/// ```
77pub async fn retry_transient_n<F, Fut, T, E>(
78    operation: F,
79    max_attempts: u32,
80) -> BoxedComposableResult<T, E>
81where
82    F: FnMut() -> Fut,
83    Fut: Future<Output = Result<T, E>>,
84    E: TransientError,
85{
86    use super::retry::ExponentialBackoff;
87
88    let policy = ExponentialBackoff::new().with_max_attempts(max_attempts);
89    retry_transient(operation, policy).await
90}
91
92/// Result type for timeout operations that can fail with either
93/// the inner error or a timeout.
94#[derive(Debug)]
95pub enum TimeoutResult<T, E> {
96    /// Operation completed successfully.
97    Ok(T),
98    /// Operation failed with an error.
99    Err(BoxedComposableError<E>),
100    /// Operation timed out.
101    Timeout(Duration),
102}
103
104impl<T, E> TimeoutResult<T, E> {
105    /// Returns `true` if the result is `Ok`.
106    pub fn is_ok(&self) -> bool {
107        matches!(self, Self::Ok(_))
108    }
109
110    /// Returns `true` if the result is `Err`.
111    pub fn is_err(&self) -> bool {
112        matches!(self, Self::Err(_))
113    }
114
115    /// Returns `true` if the operation timed out.
116    pub fn is_timeout(&self) -> bool {
117        matches!(self, Self::Timeout(_))
118    }
119
120    /// Converts to a standard `Result`, treating timeout as an error message.
121    pub fn into_result(self) -> BoxedComposableResult<T, E>
122    where
123        E: From<TimeoutError>,
124    {
125        match self {
126            Self::Ok(v) => Ok(v),
127            Self::Err(e) => Err(e),
128            Self::Timeout(d) => Err(Box::new(ComposableError::new(E::from(TimeoutError(d))))),
129        }
130    }
131}
132
133/// Error type representing a timeout.
134#[derive(Debug, Clone)]
135pub struct TimeoutError(pub Duration);
136
137impl core::fmt::Display for TimeoutError {
138    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
139        write!(f, "operation timed out after {:?}", self.0)
140    }
141}
142
143#[cfg(feature = "std")]
144impl std::error::Error for TimeoutError {}
145
146/// Executes an async operation with a timeout, returning a `TimeoutResult`.
147///
148/// Unlike panic-on-timeout helpers, this function doesn't panic on timeout but
149/// returns a structured result that the caller can handle.
150///
151/// # Example
152///
153/// ```rust,ignore
154/// use error_rail::async_ext::try_with_timeout;
155/// use std::time::Duration;
156///
157/// match try_with_timeout(Duration::from_secs(5), fetch_data()).await {
158///     TimeoutResult::Ok(data) => println!("Got data: {:?}", data),
159///     TimeoutResult::Err(e) => println!("Error: {}", e.error_chain()),
160///     TimeoutResult::Timeout(d) => println!("Timed out after {:?}", d),
161/// }
162/// ```
163pub async fn try_with_timeout<T, E, Fut>(duration: Duration, future: Fut) -> TimeoutResult<T, E>
164where
165    Fut: Future<Output = Result<T, E>>,
166{
167    match tokio::time::timeout(duration, future).await {
168        Ok(Ok(value)) => TimeoutResult::Ok(value),
169        Ok(Err(e)) => TimeoutResult::Err(Box::new(ComposableError::new(e))),
170        Err(_elapsed) => TimeoutResult::Timeout(duration),
171    }
172}