error_rail/async_ext/tokio_ext.rs
1//! Tokio-specific async extensions.
2//!
3//! This module provides Tokio-optimized utilities that leverage Tokio's
4//! runtime features like `task_local!` and `tokio::time::sleep`.
5//!
6//! # Feature Flag
7//!
8//! Requires the `ecosystem` feature:
9//!
10//! ```toml
11//! [dependencies]
12//! error-rail = { version = "0.8", features = ["ecosystem"] }
13//! ```
14
15use core::future::Future;
16use core::time::Duration;
17
18use crate::traits::TransientError;
19use crate::types::{BoxedComposableError, BoxedComposableResult, ComposableError};
20
21use super::retry::{retry_with_policy, ExponentialBackoff, RetryPolicy};
22
23/// Retries an async operation using Tokio's sleep, returning an unboxed error.
24///
25/// This is a convenience wrapper around [`retry_with_policy`] that uses
26/// `tokio::time::sleep` for delays, eliminating the need to pass a sleep function.
27///
28/// For a boxed version, use [`retry_transient`].
29///
30/// # Arguments
31///
32/// * `operation` - A closure that returns the future to retry
33/// * `policy` - The retry policy to use
34///
35/// # Example
36///
37/// ```rust,ignore
38/// use error_rail::async_ext::{retry_transient_unboxed, ExponentialBackoff};
39///
40/// #[tokio::main]
41/// async fn main() {
42/// let result = retry_transient_unboxed(
43/// || fetch_data(),
44/// ExponentialBackoff::default(),
45/// ).await;
46/// }
47/// ```
48#[inline]
49pub async fn retry_transient_unboxed<F, Fut, T, E, P>(
50 operation: F,
51 policy: P,
52) -> Result<T, ComposableError<E>>
53where
54 F: FnMut() -> Fut,
55 Fut: Future<Output = Result<T, E>>,
56 E: TransientError,
57 P: RetryPolicy,
58{
59 retry_with_policy(operation, policy, tokio::time::sleep).await
60}
61
62/// Retries an async operation using Tokio's sleep, returning a boxed error.
63///
64/// This is a convenience wrapper around [`retry_transient_unboxed`] that boxes
65/// the error for reduced stack size.
66///
67/// # Arguments
68///
69/// * `operation` - A closure that returns the future to retry
70/// * `policy` - The retry policy to use
71///
72/// # Example
73///
74/// ```rust,ignore
75/// use error_rail::async_ext::{retry_transient, ExponentialBackoff};
76///
77/// #[tokio::main]
78/// async fn main() {
79/// let result = retry_transient(
80/// || fetch_data(),
81/// ExponentialBackoff::default(),
82/// ).await;
83/// }
84/// ```
85#[inline]
86pub async fn retry_transient<F, Fut, T, E, P>(
87 operation: F,
88 policy: P,
89) -> BoxedComposableResult<T, E>
90where
91 F: FnMut() -> Fut,
92 Fut: Future<Output = Result<T, E>>,
93 E: TransientError,
94 P: RetryPolicy,
95{
96 match retry_transient_unboxed(operation, policy).await {
97 Ok(v) => Ok(v),
98 Err(e) => Err(Box::new(e)),
99 }
100}
101
102/// Retries an async operation with a simple count limit using Tokio's sleep.
103///
104/// Uses exponential backoff with sensible defaults.
105///
106/// # Arguments
107///
108/// * `operation` - A closure that returns the future to retry
109/// * `max_attempts` - Maximum number of retry attempts
110///
111/// # Example
112///
113/// ```rust,ignore
114/// use error_rail::async_ext::retry_transient_n;
115///
116/// let result = retry_transient_n(|| fetch_data(), 3).await;
117/// ```
118#[inline]
119pub async fn retry_transient_n<F, Fut, T, E>(
120 operation: F,
121 max_attempts: u32,
122) -> BoxedComposableResult<T, E>
123where
124 F: FnMut() -> Fut,
125 Fut: Future<Output = Result<T, E>>,
126 E: TransientError,
127{
128 retry_transient(operation, ExponentialBackoff::new().with_max_attempts(max_attempts)).await
129}
130
131/// Result type for timeout operations that can fail with either
132/// the inner error or a timeout.
133#[derive(Debug)]
134pub enum TimeoutResult<T, E> {
135 /// Operation completed successfully.
136 Ok(T),
137 /// Operation failed with an error.
138 Err(BoxedComposableError<E>),
139 /// Operation timed out.
140 Timeout(Duration),
141}
142
143impl<T, E> TimeoutResult<T, E> {
144 /// Returns `true` if the result is `Ok`.
145 #[inline]
146 pub const fn is_ok(&self) -> bool {
147 matches!(self, Self::Ok(_))
148 }
149
150 /// Returns `true` if the result is `Err`.
151 #[inline]
152 pub const fn is_err(&self) -> bool {
153 matches!(self, Self::Err(_))
154 }
155
156 /// Returns `true` if the operation timed out.
157 #[inline]
158 pub const fn is_timeout(&self) -> bool {
159 matches!(self, Self::Timeout(_))
160 }
161
162 /// Converts to a standard `Result`, treating timeout as an error message.
163 #[inline]
164 pub fn into_result(self) -> BoxedComposableResult<T, E>
165 where
166 E: From<TimeoutError>,
167 {
168 match self {
169 Self::Ok(v) => Ok(v),
170 Self::Err(e) => Err(e),
171 Self::Timeout(d) => Err(Box::new(ComposableError::new(E::from(TimeoutError(d))))),
172 }
173 }
174}
175
176/// Error type representing a timeout.
177#[derive(Debug, Clone, Copy)]
178pub struct TimeoutError(pub Duration);
179
180impl core::fmt::Display for TimeoutError {
181 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
182 write!(f, "operation timed out after {:?}", self.0)
183 }
184}
185
186#[cfg(feature = "std")]
187impl std::error::Error for TimeoutError {}
188
189/// Executes an async operation with a timeout, returning a `TimeoutResult`.
190///
191/// Unlike panic-on-timeout helpers, this function doesn't panic on timeout but
192/// returns a structured result that the caller can handle.
193///
194/// # Example
195///
196/// ```rust,ignore
197/// use error_rail::async_ext::try_with_timeout;
198/// use std::time::Duration;
199///
200/// match try_with_timeout(Duration::from_secs(5), fetch_data()).await {
201/// TimeoutResult::Ok(data) => println!("Got data: {:?}", data),
202/// TimeoutResult::Err(e) => println!("Error: {}", e.error_chain()),
203/// TimeoutResult::Timeout(d) => println!("Timed out after {:?}", d),
204/// }
205/// ```
206#[inline]
207pub async fn try_with_timeout<T, E, Fut>(duration: Duration, future: Fut) -> TimeoutResult<T, E>
208where
209 Fut: Future<Output = Result<T, E>>,
210{
211 match tokio::time::timeout(duration, future).await {
212 Ok(Ok(value)) => TimeoutResult::Ok(value),
213 Ok(Err(e)) => TimeoutResult::Err(Box::new(ComposableError::new(e))),
214 Err(_) => TimeoutResult::Timeout(duration),
215 }
216}