use std::future::Future;
use std::time::Duration;
use crate::storage::error::Result;
const DELAYS: &[Duration] = &[
Duration::from_millis(100),
Duration::from_millis(300),
Duration::from_secs(1),
];
#[allow(
clippy::redundant_pub_crate,
reason = "pub(crate) is the intended surface — adapters reach it via `use crate::storage::retry::with_transient_retry`. `pub(super)` would only reach `storage`."
)]
pub(crate) async fn with_transient_retry<T, F, Fut>(label: &'static str, mut op: F) -> Result<T>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T>>,
{
let mut attempt = 0usize;
loop {
match op().await {
Ok(v) => return Ok(v),
Err(e) if e.is_transient_conflict() && attempt < DELAYS.len() => {
tracing::warn!(
label,
attempt,
delay_ms = DELAYS[attempt].as_millis(),
err = %e,
"storage: transient conflict; retrying"
);
tokio::time::sleep(DELAYS[attempt]).await;
attempt += 1;
}
Err(e) => return Err(e),
}
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
reason = "unit tests crash loudly on setup failure"
)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use super::*;
use crate::storage::error::StorageError;
#[tokio::test(start_paused = true)]
async fn ok_returns_without_retry() {
let calls = AtomicUsize::new(0);
let r: Result<i32> = with_transient_retry("test", || {
calls.fetch_add(1, Ordering::SeqCst);
async { Ok(7) }
})
.await;
assert_eq!(r.unwrap(), 7);
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[tokio::test(start_paused = true)]
async fn transient_retries_until_success() {
let calls = AtomicUsize::new(0);
let r: Result<i32> = with_transient_retry("test", || {
let n = calls.fetch_add(1, Ordering::SeqCst);
async move {
if n < 2 {
Err(StorageError::Conflict("locked".to_owned()))
} else {
Ok(42)
}
}
})
.await;
assert_eq!(r.unwrap(), 42);
assert_eq!(
calls.load(Ordering::SeqCst),
3,
"two transients + one success = 3 calls"
);
}
#[tokio::test(start_paused = true)]
async fn transient_past_last_delay_surfaces_error() {
let calls = AtomicUsize::new(0);
let r: Result<i32> = with_transient_retry("test", || {
calls.fetch_add(1, Ordering::SeqCst);
async { Err(StorageError::Conflict("locked".to_owned())) }
})
.await;
assert!(r.is_err(), "exhausted retries must surface the error");
assert_eq!(calls.load(Ordering::SeqCst), 1 + DELAYS.len());
}
#[tokio::test(start_paused = true)]
async fn non_transient_returns_immediately() {
let calls = AtomicUsize::new(0);
let r: Result<i32> = with_transient_retry("test", || {
calls.fetch_add(1, Ordering::SeqCst);
async { Err(StorageError::NotFound("missing".to_owned())) }
})
.await;
assert!(r.is_err());
assert_eq!(
calls.load(Ordering::SeqCst),
1,
"non-transient must not retry"
);
}
}