error_rail/async_ext/
tokio_ext.rs

1//! Tokio-specific async extensions.
2//!
3//! This module provides Tokio-optimized utilities that leverage Tokio's
4//! runtime features like `task_local!` and `tokio::time::sleep`.
5//!
6//! # Feature Flag
7//!
8//! Requires the `ecosystem` feature:
9//!
10//! ```toml
11//! [dependencies]
12//! error-rail = { version = "0.8", features = ["ecosystem"] }
13//! ```
14
15use core::future::Future;
16use core::time::Duration;
17
18use crate::traits::TransientError;
19use crate::types::{BoxedComposableError, BoxedComposableResult, ComposableError};
20
21use super::retry::{retry_with_policy, RetryPolicy};
22
23/// Retries an async operation using Tokio's sleep, returning an unboxed error.
24///
25/// This is a convenience wrapper around [`retry_with_policy`] that uses
26/// `tokio::time::sleep` for delays, eliminating the need to pass a sleep function.
27///
28/// For a boxed version, use [`retry_transient`].
29///
30/// # Arguments
31///
32/// * `operation` - A closure that returns the future to retry
33/// * `policy` - The retry policy to use
34///
35/// # Example
36///
37/// ```rust,ignore
38/// use error_rail::async_ext::{retry_transient_unboxed, ExponentialBackoff};
39///
40/// #[tokio::main]
41/// async fn main() {
42///     let result = retry_transient_unboxed(
43///         || fetch_data(),
44///         ExponentialBackoff::default(),
45///     ).await;
46/// }
47/// ```
48pub async fn retry_transient_unboxed<F, Fut, T, E, P>(
49    operation: F,
50    policy: P,
51) -> Result<T, ComposableError<E>>
52where
53    F: FnMut() -> Fut,
54    Fut: Future<Output = Result<T, E>>,
55    E: TransientError,
56    P: RetryPolicy,
57{
58    retry_with_policy(operation, policy, tokio::time::sleep).await
59}
60
61/// Retries an async operation using Tokio's sleep, returning a boxed error.
62///
63/// This is a convenience wrapper around [`retry_transient_unboxed`] that boxes
64/// the error for reduced stack size.
65///
66/// # Arguments
67///
68/// * `operation` - A closure that returns the future to retry
69/// * `policy` - The retry policy to use
70///
71/// # Example
72///
73/// ```rust,ignore
74/// use error_rail::async_ext::{retry_transient, ExponentialBackoff};
75///
76/// #[tokio::main]
77/// async fn main() {
78///     let result = retry_transient(
79///         || fetch_data(),
80///         ExponentialBackoff::default(),
81///     ).await;
82/// }
83/// ```
84pub async fn retry_transient<F, Fut, T, E, P>(
85    operation: F,
86    policy: P,
87) -> BoxedComposableResult<T, E>
88where
89    F: FnMut() -> Fut,
90    Fut: Future<Output = Result<T, E>>,
91    E: TransientError,
92    P: RetryPolicy,
93{
94    retry_transient_unboxed(operation, policy)
95        .await
96        .map_err(Box::new)
97}
98
99/// Retries an async operation with a simple count limit using Tokio's sleep.
100///
101/// Uses exponential backoff with sensible defaults.
102///
103/// # Arguments
104///
105/// * `operation` - A closure that returns the future to retry
106/// * `max_attempts` - Maximum number of retry attempts
107///
108/// # Example
109///
110/// ```rust,ignore
111/// use error_rail::async_ext::retry_transient_n;
112///
113/// let result = retry_transient_n(|| fetch_data(), 3).await;
114/// ```
115pub async fn retry_transient_n<F, Fut, T, E>(
116    operation: F,
117    max_attempts: u32,
118) -> BoxedComposableResult<T, E>
119where
120    F: FnMut() -> Fut,
121    Fut: Future<Output = Result<T, E>>,
122    E: TransientError,
123{
124    use super::retry::ExponentialBackoff;
125
126    let policy = ExponentialBackoff::new().with_max_attempts(max_attempts);
127    retry_transient(operation, policy).await
128}
129
130/// Result type for timeout operations that can fail with either
131/// the inner error or a timeout.
132#[derive(Debug)]
133pub enum TimeoutResult<T, E> {
134    /// Operation completed successfully.
135    Ok(T),
136    /// Operation failed with an error.
137    Err(BoxedComposableError<E>),
138    /// Operation timed out.
139    Timeout(Duration),
140}
141
142impl<T, E> TimeoutResult<T, E> {
143    /// Returns `true` if the result is `Ok`.
144    pub fn is_ok(&self) -> bool {
145        matches!(self, Self::Ok(_))
146    }
147
148    /// Returns `true` if the result is `Err`.
149    pub fn is_err(&self) -> bool {
150        matches!(self, Self::Err(_))
151    }
152
153    /// Returns `true` if the operation timed out.
154    pub fn is_timeout(&self) -> bool {
155        matches!(self, Self::Timeout(_))
156    }
157
158    /// Converts to a standard `Result`, treating timeout as an error message.
159    pub fn into_result(self) -> BoxedComposableResult<T, E>
160    where
161        E: From<TimeoutError>,
162    {
163        match self {
164            Self::Ok(v) => Ok(v),
165            Self::Err(e) => Err(e),
166            Self::Timeout(d) => Err(Box::new(ComposableError::new(E::from(TimeoutError(d))))),
167        }
168    }
169}
170
171/// Error type representing a timeout.
172#[derive(Debug, Clone)]
173pub struct TimeoutError(pub Duration);
174
175impl core::fmt::Display for TimeoutError {
176    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
177        write!(f, "operation timed out after {:?}", self.0)
178    }
179}
180
181#[cfg(feature = "std")]
182impl std::error::Error for TimeoutError {}
183
184/// Executes an async operation with a timeout, returning a `TimeoutResult`.
185///
186/// Unlike panic-on-timeout helpers, this function doesn't panic on timeout but
187/// returns a structured result that the caller can handle.
188///
189/// # Example
190///
191/// ```rust,ignore
192/// use error_rail::async_ext::try_with_timeout;
193/// use std::time::Duration;
194///
195/// match try_with_timeout(Duration::from_secs(5), fetch_data()).await {
196///     TimeoutResult::Ok(data) => println!("Got data: {:?}", data),
197///     TimeoutResult::Err(e) => println!("Error: {}", e.error_chain()),
198///     TimeoutResult::Timeout(d) => println!("Timed out after {:?}", d),
199/// }
200/// ```
201pub async fn try_with_timeout<T, E, Fut>(duration: Duration, future: Fut) -> TimeoutResult<T, E>
202where
203    Fut: Future<Output = Result<T, E>>,
204{
205    match tokio::time::timeout(duration, future).await {
206        Ok(Ok(value)) => TimeoutResult::Ok(value),
207        Ok(Err(e)) => TimeoutResult::Err(Box::new(ComposableError::new(e))),
208        Err(_elapsed) => TimeoutResult::Timeout(duration),
209    }
210}