use std::time::{Duration, Instant};
use noxu_db::{Environment, Transaction};
use crate::error::{CollectionError, Result};
#[derive(Debug, Clone, Copy)]
pub struct RetryConfig {
pub max_retries: u32,
pub base_backoff: Duration,
pub max_backoff: Duration,
pub jitter: f64,
}
impl RetryConfig {
pub const DEFAULT: RetryConfig = RetryConfig {
max_retries: 10,
base_backoff: Duration::from_millis(10),
max_backoff: Duration::from_secs(1),
jitter: 0.25,
};
pub fn backoff_for(&self, attempt: u32, nanos: u64) -> Duration {
let raw = self.base_backoff.saturating_mul(1u32 << attempt.min(20));
let capped =
if raw > self.max_backoff { self.max_backoff } else { raw };
if self.jitter <= 0.0 {
return capped;
}
let nanos_capped = capped.as_nanos();
let jitter_window = (nanos_capped as f64 * self.jitter).round() as i128;
if jitter_window <= 0 {
return capped;
}
let span = 2 * jitter_window + 1;
let salted = (nanos as i128).rem_euclid(span);
let offset = salted - jitter_window;
let result_nanos =
(nanos_capped as i128).saturating_add(offset).max(0) as u128;
Duration::from_nanos(u64::try_from(result_nanos).unwrap_or(u64::MAX))
}
}
impl Default for RetryConfig {
fn default() -> Self {
Self::DEFAULT
}
}
pub struct TransactionRunner<'env> {
env: &'env Environment,
retry: RetryConfig,
}
impl<'env> TransactionRunner<'env> {
pub fn new(env: &'env Environment) -> Self {
TransactionRunner { env, retry: RetryConfig::DEFAULT }
}
pub fn environment(&self) -> &'env Environment {
self.env
}
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.retry.max_retries = max_retries;
self
}
pub fn with_base_backoff(mut self, base: Duration) -> Self {
self.retry.base_backoff = base;
self
}
pub fn with_max_backoff(mut self, max: Duration) -> Self {
self.retry.max_backoff = max;
self
}
pub fn with_jitter(mut self, jitter: f64) -> Self {
self.retry.jitter = jitter.clamp(0.0, 1.0);
self
}
pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
self.retry = config;
self
}
pub fn get_max_retries(&self) -> u32 {
self.retry.max_retries
}
pub fn retry_config(&self) -> RetryConfig {
self.retry
}
pub fn run<F, R>(&self, mut f: F) -> Result<R>
where
F: FnMut(&Transaction) -> Result<R>,
{
self.run_with_sleep(&mut f, std::thread::sleep)
}
pub fn run_with_sleep<F, R, S>(&self, f: &mut F, mut sleep: S) -> Result<R>
where
F: FnMut(&Transaction) -> Result<R>,
S: FnMut(Duration),
{
let mut attempt: u32 = 0;
loop {
let txn = self.env.begin_transaction(None)?;
match f(&txn) {
Ok(value) => {
txn.commit().map_err(CollectionError::DatabaseError)?;
return Ok(value);
}
Err(err) => {
let _ = txn.abort();
if attempt >= self.retry.max_retries || !is_retryable(&err)
{
return Err(err);
}
let nanos = jitter_salt();
let sleep_dur = self.retry.backoff_for(attempt, nanos);
sleep(sleep_dur);
attempt = attempt.saturating_add(1);
}
}
}
}
pub fn run_without_txn<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce() -> Result<R>,
{
f()
}
}
fn is_retryable(err: &CollectionError) -> bool {
match err {
CollectionError::DatabaseError(db_err) => db_err.is_retryable(),
_ => false,
}
}
fn jitter_salt() -> u64 {
Instant::now().elapsed().as_nanos() as u64
^ std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use noxu_db::{DatabaseConfig, EnvironmentConfig, NoxuError};
use tempfile::TempDir;
fn setup_env() -> (TempDir, Environment) {
let td = TempDir::new().unwrap();
let env = Environment::open(
EnvironmentConfig::new(td.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true),
)
.unwrap();
(td, env)
}
#[test]
fn defaults_match_wave_2b_spec() {
let cfg = RetryConfig::DEFAULT;
assert_eq!(cfg.max_retries, 10);
assert_eq!(cfg.base_backoff, Duration::from_millis(10));
assert_eq!(cfg.max_backoff, Duration::from_secs(1));
assert!((cfg.jitter - 0.25).abs() < f64::EPSILON);
}
#[test]
fn backoff_grows_then_caps() {
let cfg = RetryConfig {
max_retries: 100,
base_backoff: Duration::from_millis(10),
max_backoff: Duration::from_millis(80),
jitter: 0.0,
};
assert_eq!(cfg.backoff_for(0, 0), Duration::from_millis(10));
assert_eq!(cfg.backoff_for(1, 0), Duration::from_millis(20));
assert_eq!(cfg.backoff_for(2, 0), Duration::from_millis(40));
assert_eq!(cfg.backoff_for(3, 0), Duration::from_millis(80));
assert_eq!(cfg.backoff_for(10, 0), Duration::from_millis(80));
assert_eq!(cfg.backoff_for(50, 0), Duration::from_millis(80));
}
#[test]
fn backoff_jitter_within_bounds() {
let cfg = RetryConfig {
max_retries: 5,
base_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(10),
jitter: 0.25,
};
for nanos in 0u64..1000 {
let d = cfg.backoff_for(0, nanos);
assert!(d >= Duration::from_millis(75), "got {:?}", d);
assert!(d <= Duration::from_millis(125), "got {:?}", d);
}
}
#[test]
fn run_success_commits() {
let (_td, env) = setup_env();
let runner = TransactionRunner::new(&env);
let result = runner.run(|_txn| Ok(42));
assert_eq!(result.unwrap(), 42);
}
#[test]
fn run_aborts_on_error() {
let (_td, env) = setup_env();
let runner = TransactionRunner::new(&env);
let result: Result<()> = runner
.run(|_txn| Err(CollectionError::IllegalState("nope".to_string())));
assert!(matches!(result, Err(CollectionError::IllegalState(_))));
}
#[test]
fn run_retries_on_deadlock_with_synthetic_sleep() {
let (_td, env) = setup_env();
let runner = TransactionRunner::new(&env).with_max_retries(3);
let calls = Arc::new(AtomicU32::new(0));
let sleeps = Arc::new(AtomicU32::new(0));
let calls_clone = Arc::clone(&calls);
let sleeps_clone = Arc::clone(&sleeps);
let mut closure = move |_t: &Transaction| -> Result<&'static str> {
let n = calls_clone.fetch_add(1, Ordering::SeqCst);
if n < 2 {
Err(CollectionError::DatabaseError(NoxuError::DeadlockDetected))
} else {
Ok("ok")
}
};
let result = runner.run_with_sleep(&mut closure, |_d| {
sleeps_clone.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(result.unwrap(), "ok");
assert_eq!(calls.load(Ordering::SeqCst), 3);
assert_eq!(sleeps.load(Ordering::SeqCst), 2);
}
#[test]
fn run_exhausts_retry_budget() {
let (_td, env) = setup_env();
let runner = TransactionRunner::new(&env).with_max_retries(2);
let calls = Arc::new(AtomicU32::new(0));
let calls_clone = Arc::clone(&calls);
let mut closure = move |_t: &Transaction| -> Result<()> {
calls_clone.fetch_add(1, Ordering::SeqCst);
Err(CollectionError::DatabaseError(NoxuError::DeadlockDetected))
};
let result = runner.run_with_sleep(&mut closure, |_d| {});
assert!(matches!(
result,
Err(CollectionError::DatabaseError(NoxuError::DeadlockDetected)),
));
assert_eq!(calls.load(Ordering::SeqCst), 3);
}
#[test]
fn run_does_not_retry_non_retryable_errors() {
let (_td, env) = setup_env();
let runner = TransactionRunner::new(&env).with_max_retries(5);
let calls = Arc::new(AtomicU32::new(0));
let calls_clone = Arc::clone(&calls);
let mut closure = move |_t: &Transaction| -> Result<()> {
calls_clone.fetch_add(1, Ordering::SeqCst);
Err(CollectionError::IllegalState("non-retryable".to_string()))
};
let result = runner.run_with_sleep(&mut closure, |_d| {});
assert!(matches!(result, Err(CollectionError::IllegalState(_))));
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[test]
fn run_writes_through_typed_storedmap() {
use noxu_bind::{IntBinding, StringBinding};
use crate::stored_map::StoredMap;
let (_td, env) = setup_env();
let db = env
.open_database(
None,
"runner_map",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap();
let map: StoredMap<'_, i32, String, _, _> =
StoredMap::new(&db, IntBinding, StringBinding);
let runner = TransactionRunner::new(&env);
runner
.run(|txn| {
map.put(Some(txn), &1, &"alpha".to_string())?;
map.put(Some(txn), &2, &"beta".to_string())?;
Ok(())
})
.unwrap();
assert_eq!(map.get(None, &1).unwrap(), Some("alpha".to_string()));
assert_eq!(map.get(None, &2).unwrap(), Some("beta".to_string()));
}
#[test]
fn run_aborts_storedmap_writes_on_error() {
use noxu_bind::{IntBinding, StringBinding};
use crate::stored_map::StoredMap;
let (_td, env) = setup_env();
let db = env
.open_database(
None,
"runner_abort",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap();
let map: StoredMap<'_, i32, String, _, _> =
StoredMap::new(&db, IntBinding, StringBinding);
let runner = TransactionRunner::new(&env);
let result: Result<()> = runner.run(|txn| {
map.put(Some(txn), &1, &"set".to_string())?;
Err(CollectionError::IllegalState("rollback".to_string()))
});
assert!(result.is_err());
assert_eq!(map.get(None, &1).unwrap(), None);
}
#[test]
fn run_without_txn_passes_through() {
let (_td, env) = setup_env();
let runner = TransactionRunner::new(&env);
let r = runner.run_without_txn(|| Ok::<i32, CollectionError>(7));
assert_eq!(r.unwrap(), 7);
}
#[test]
fn with_jitter_clamps_to_unit_range() {
let (_td, env) = setup_env();
let runner =
TransactionRunner::new(&env).with_jitter(2.0).with_jitter(-1.0);
assert!((runner.retry_config().jitter - 0.0).abs() < f64::EPSILON);
}
#[test]
fn classification_helper_matches_db_retryable() {
assert!(is_retryable(&CollectionError::DatabaseError(
NoxuError::DeadlockDetected
)));
assert!(is_retryable(&CollectionError::DatabaseError(
NoxuError::LockConflict("x".to_string())
)));
assert!(!is_retryable(&CollectionError::IllegalState("x".to_string())));
assert!(!is_retryable(&CollectionError::ReadOnly));
}
}