#![cfg(feature = "async")]
use super::*;
use crate::effect::prelude::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, PartialEq, Clone)]
enum RetryTestError {
Transient,
Permanent,
}
#[tokio::test]
async fn test_retry_succeeds_on_third_attempt() {
let attempts = Arc::new(AtomicU32::new(0));
let effect = retry(
{
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
from_async(move |_: &()| {
let attempts = attempts.clone();
async move {
let n = attempts.fetch_add(1, Ordering::SeqCst);
if n < 2 {
Err("transient failure")
} else {
Ok("success")
}
}
})
}
},
RetryPolicy::constant(Duration::from_millis(1)).with_max_retries(5),
);
let result = effect.execute(&()).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().final_error, "success");
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_retry_exhausted_returns_final_error() {
let effect = retry(
|| fail::<(), _, ()>("always fails"),
RetryPolicy::constant(Duration::from_millis(1)).with_max_retries(3),
);
let result = effect.execute(&()).await;
assert!(result.is_err());
let exhausted = result.unwrap_err();
assert_eq!(exhausted.attempts, 4); assert_eq!(exhausted.final_error, "always fails");
}
#[tokio::test]
async fn test_retry_if_skips_non_retryable_errors() {
let attempts = Arc::new(AtomicU32::new(0));
let effect = retry_if(
{
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
from_async(move |_: &()| {
let attempts = attempts.clone();
async move {
attempts.fetch_add(1, Ordering::SeqCst);
Err::<(), _>(RetryTestError::Permanent)
}
})
}
},
RetryPolicy::constant(Duration::from_millis(1)).with_max_retries(5),
|err| matches!(err, RetryTestError::Transient),
);
let result = effect.execute(&()).await;
assert_eq!(result, Err(RetryTestError::Permanent));
assert_eq!(attempts.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn test_retry_if_retries_transient_errors() {
let attempts = Arc::new(AtomicU32::new(0));
let effect = retry_if(
{
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
from_async(move |_: &()| {
let attempts = attempts.clone();
async move {
let n = attempts.fetch_add(1, Ordering::SeqCst);
if n < 2 {
Err::<&str, _>(RetryTestError::Transient)
} else {
Ok("success")
}
}
})
}
},
RetryPolicy::constant(Duration::from_millis(1)).with_max_retries(5),
|err| matches!(err, RetryTestError::Transient),
);
let result = effect.execute(&()).await;
assert_eq!(result, Ok("success"));
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_retry_with_hooks_calls_hook() {
let attempts = Arc::new(AtomicU32::new(0));
let hook_calls = Arc::new(AtomicU32::new(0));
let effect = retry_with_hooks(
{
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
from_async(move |_: &()| {
let attempts = attempts.clone();
async move {
let n = attempts.fetch_add(1, Ordering::SeqCst);
if n < 2 {
Err("transient")
} else {
Ok("success")
}
}
})
}
},
RetryPolicy::constant(Duration::from_millis(1)).with_max_retries(5),
{
let hook_calls = hook_calls.clone();
move |_event: &RetryEvent<'_, &str>| {
hook_calls.fetch_add(1, Ordering::SeqCst);
}
},
);
let result = effect.execute(&()).await;
assert!(result.is_ok());
assert_eq!(attempts.load(Ordering::SeqCst), 3);
assert_eq!(hook_calls.load(Ordering::SeqCst), 2); }
#[tokio::test]
async fn test_timeout_triggers_correctly() {
let effect = with_timeout(
from_async(|_: &()| async {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok::<_, String>(42)
}),
Duration::from_millis(10),
);
let result = effect.execute(&()).await;
assert!(result.is_err());
assert!(result.unwrap_err().is_timeout());
}
#[tokio::test]
async fn test_timeout_passes_through_success() {
let effect = with_timeout(
from_async(|_: &()| async { Ok::<_, String>(42) }),
Duration::from_secs(1),
);
let result = effect.execute(&()).await;
assert_eq!(result, Ok(42));
}
#[tokio::test]
async fn test_timeout_passes_through_inner_error() {
let effect = with_timeout(
from_async(|_: &()| async { Err::<i32, _>("inner error") }),
Duration::from_secs(1),
);
let result = effect.execute(&()).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.is_inner());
assert_eq!(err.into_inner(), Some("inner error"));
}
#[tokio::test]
async fn test_retry_with_timeout_per_attempt() {
let attempts = Arc::new(AtomicU32::new(0));
let effect = retry(
{
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
with_timeout(
from_async(move |_: &()| {
let attempts = attempts.clone();
async move {
let n = attempts.fetch_add(1, Ordering::SeqCst);
if n < 2 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok::<_, String>("success")
}
}),
Duration::from_millis(10),
)
.map_err(|e| format!("{}", e))
}
},
RetryPolicy::constant(Duration::from_millis(1)).with_max_retries(5),
);
let result = effect.execute(&()).await;
assert!(result.is_ok());
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_retry_preserves_success_value() {
let effect = retry(
|| pure::<_, String, ()>(42),
RetryPolicy::constant(Duration::from_millis(1)).with_max_retries(3),
);
let result = effect.execute(&()).await;
assert!(result.is_ok());
let success = result.unwrap();
assert_eq!(success.into_value(), 42);
}
#[tokio::test]
async fn test_exponential_backoff_timing() {
use std::time::Instant;
let start = Instant::now();
let attempts = Arc::new(AtomicU32::new(0));
let effect = retry(
{
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
from_async(move |_: &()| {
let attempts = attempts.clone();
async move {
let n = attempts.fetch_add(1, Ordering::SeqCst);
if n < 3 {
Err("retry")
} else {
Ok("done")
}
}
})
}
},
RetryPolicy::exponential(Duration::from_millis(10)).with_max_retries(5),
);
let _ = effect.execute(&()).await;
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(50),
"Expected at least 50ms, got {:?}",
elapsed
);
}
#[tokio::test]
async fn test_retry_with_environment() {
#[derive(Clone)]
struct Env {
fail_count: u32,
}
let attempts = Arc::new(AtomicU32::new(0));
let effect = retry(
{
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
from_async(move |env: &Env| {
let attempts = attempts.clone();
let fail_count = env.fail_count;
async move {
let n = attempts.fetch_add(1, Ordering::SeqCst);
if n < fail_count {
Err("retry")
} else {
Ok("success")
}
}
})
}
},
RetryPolicy::constant(Duration::from_millis(1)).with_max_retries(5),
);
let env = Env { fail_count: 2 };
let result = effect.execute(&env).await;
assert!(result.is_ok());
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}