use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;
use crate::sink::{SinkError, Tile, TileSink};
fn process_seed() -> u64 {
static SEED: OnceLock<u64> = OnceLock::new();
*SEED.get_or_init(|| {
use std::hash::{BuildHasher, RandomState};
RandomState::new().hash_one(()) })
}
fn sample_jitter(max_nanos: u64, jitter_tick: &AtomicU64) -> u64 {
if max_nanos == 0 {
return 0;
}
let tick = jitter_tick.fetch_add(1, Ordering::Relaxed);
let mut x = process_seed().wrapping_add(tick.wrapping_mul(0x9E3779B97F4A7C15));
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
x % max_nanos
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub struct RetryPolicy {
pub max_retries: u32,
pub initial_backoff: Duration,
pub multiplier: f32,
pub max_backoff: Duration,
pub jitter: bool,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 3,
initial_backoff: Duration::from_millis(50),
multiplier: 2.0,
max_backoff: Duration::from_secs(5),
jitter: true,
}
}
}
impl RetryPolicy {
pub fn new(max_retries: u32, initial_backoff: Duration) -> Self {
Self {
max_retries,
initial_backoff,
..Self::default()
}
}
pub fn fail_fast() -> Self {
Self {
max_retries: 0,
..Self::default()
}
}
pub fn with_max(self, n: u32) -> Self {
self.with_max_retries(n)
}
pub fn with_max_retries(mut self, n: u32) -> Self {
self.max_retries = n;
self
}
pub fn with_initial_backoff(mut self, d: Duration) -> Self {
self.initial_backoff = d;
self
}
pub fn with_multiplier(mut self, m: f32) -> Self {
self.multiplier = m;
self
}
pub fn with_max_backoff(mut self, d: Duration) -> Self {
self.max_backoff = d;
self
}
pub fn with_jitter(mut self, enabled: bool) -> Self {
self.jitter = enabled;
self
}
}
#[derive(Debug, Clone, PartialEq, Default)]
#[non_exhaustive]
pub enum FailurePolicy {
#[default]
FailFast,
RetryThenFail(RetryPolicy),
RetryThenSkip(RetryPolicy),
}
pub fn compute_backoff(policy: &RetryPolicy, attempt: u32) -> Duration {
let base_nanos = policy.initial_backoff.as_nanos() as f64;
let multiplier = policy.multiplier as f64;
let scaled = base_nanos * multiplier.powi(attempt as i32);
let max_nanos = policy.max_backoff.as_nanos() as f64;
let clamped = if !scaled.is_finite() || scaled > max_nanos {
max_nanos
} else if scaled < 0.0 {
0.0
} else {
scaled
};
let nanos = clamped.round() as u128;
duration_from_nanos_u128(nanos)
}
fn duration_from_nanos_u128(nanos: u128) -> Duration {
const NANOS_PER_SEC: u128 = 1_000_000_000;
let secs = (nanos / NANOS_PER_SEC) as u64;
let sub = (nanos % NANOS_PER_SEC) as u32;
Duration::new(secs, sub)
}
pub struct RetryingSink<S: TileSink> {
inner: S,
policy: RetryPolicy,
retry_count: AtomicU64,
skipped_due_to_failure: AtomicU64,
jitter_tick: AtomicU64,
}
impl<S: TileSink> RetryingSink<S> {
pub fn new(inner: S, policy: RetryPolicy) -> Self {
Self {
inner,
policy,
retry_count: AtomicU64::new(0),
skipped_due_to_failure: AtomicU64::new(0),
jitter_tick: AtomicU64::new(0),
}
}
pub fn retry_count(&self) -> u64 {
self.retry_count.load(Ordering::Relaxed)
}
pub fn skipped_due_to_failure(&self) -> u64 {
self.skipped_due_to_failure.load(Ordering::Relaxed)
}
pub fn inner(&self) -> &S {
&self.inner
}
#[doc(hidden)]
pub fn note_skipped(&self) {
self.skipped_due_to_failure.fetch_add(1, Ordering::Relaxed);
}
pub fn policy(&self) -> &RetryPolicy {
&self.policy
}
fn backoff_sleep(&self, attempt: u32) {
let base = compute_backoff(&self.policy, attempt);
let total = if self.policy.jitter {
let max_jitter_nanos = (base / 2).as_nanos() as u64;
let jitter_nanos = sample_jitter(max_jitter_nanos, &self.jitter_tick);
base + Duration::from_nanos(jitter_nanos)
} else {
base
};
if !total.is_zero() {
thread::sleep(total);
}
}
}
impl<S: TileSink> TileSink for RetryingSink<S> {
fn write_tile(&self, tile: &Tile) -> Result<(), SinkError> {
match self.inner.write_tile(tile) {
Ok(()) => Ok(()),
Err(first_err) => {
if self.policy.max_retries == 0 {
return Err(first_err);
}
let mut last_err = first_err;
for attempt in 0..self.policy.max_retries {
self.backoff_sleep(attempt);
self.retry_count.fetch_add(1, Ordering::Relaxed);
match self.inner.write_tile(tile) {
Ok(()) => return Ok(()),
Err(e) => last_err = e,
}
}
Err(last_err)
}
}
}
fn finish(&self) -> Result<(), SinkError> {
self.inner.finish()
}
fn record_engine_config(&self, config: &crate::engine::EngineConfig) {
self.inner.record_engine_config(config);
}
fn sink_retry_count(&self) -> u64 {
self.retry_count.load(Ordering::Relaxed) + self.inner.sink_retry_count()
}
fn sink_skipped_due_to_failure(&self) -> u64 {
self.skipped_due_to_failure.load(Ordering::Relaxed)
+ self.inner.sink_skipped_due_to_failure()
}
fn note_sink_skipped(&self) {
self.skipped_due_to_failure.fetch_add(1, Ordering::Relaxed);
self.inner.note_sink_skipped();
}
fn checkpoint_root(&self) -> Option<&std::path::Path> {
self.inner.checkpoint_root()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pixel::PixelFormat;
use crate::planner::TileCoord;
use crate::raster::Raster;
use crate::sink::MemorySink;
use std::sync::atomic::AtomicU32;
const _: fn() = || {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<RetryingSink<MemorySink>>();
};
fn dummy_tile() -> Tile {
let raster = Raster::new(1, 1, PixelFormat::Rgb8, vec![0, 0, 0]).unwrap();
Tile {
coord: TileCoord {
level: 0,
col: 0,
row: 0,
},
raster,
blank: false,
}
}
struct CountingFailSink {
budget: AtomicU32,
calls: AtomicU64,
}
impl CountingFailSink {
fn new(fail_times: u32) -> Self {
Self {
budget: AtomicU32::new(fail_times),
calls: AtomicU64::new(0),
}
}
}
impl TileSink for CountingFailSink {
fn write_tile(&self, _tile: &Tile) -> Result<(), SinkError> {
self.calls.fetch_add(1, Ordering::SeqCst);
let prev = self.budget.load(Ordering::SeqCst);
if prev > 0
&& self
.budget
.compare_exchange(prev, prev - 1, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
Err(SinkError::Other("fail".into()))
} else {
Ok(())
}
}
}
#[test]
fn default_policy_matches_spec() {
let p = RetryPolicy::default();
assert_eq!(p.max_retries, 3);
assert_eq!(p.initial_backoff, Duration::from_millis(50));
assert!((p.multiplier - 2.0).abs() < f32::EPSILON);
assert_eq!(p.max_backoff, Duration::from_secs(5));
assert!(p.jitter);
}
#[test]
fn default_failure_policy_is_fail_fast() {
assert_eq!(FailurePolicy::default(), FailurePolicy::FailFast);
}
#[test]
fn compute_backoff_is_geometric() {
let policy = RetryPolicy {
max_retries: 10,
initial_backoff: Duration::from_millis(10),
multiplier: 2.0,
max_backoff: Duration::from_secs(60),
jitter: false,
};
assert_eq!(compute_backoff(&policy, 0), Duration::from_millis(10));
assert_eq!(compute_backoff(&policy, 1), Duration::from_millis(20));
assert_eq!(compute_backoff(&policy, 2), Duration::from_millis(40));
assert_eq!(compute_backoff(&policy, 3), Duration::from_millis(80));
}
#[test]
fn compute_backoff_is_capped() {
let policy = RetryPolicy {
max_retries: 10,
initial_backoff: Duration::from_secs(1),
multiplier: 10.0,
max_backoff: Duration::from_secs(3),
jitter: false,
};
assert_eq!(compute_backoff(&policy, 0), Duration::from_secs(1));
assert_eq!(compute_backoff(&policy, 1), Duration::from_secs(3));
assert_eq!(compute_backoff(&policy, 2), Duration::from_secs(3));
assert_eq!(compute_backoff(&policy, 100), Duration::from_secs(3));
}
#[test]
fn retries_until_success() {
let inner = CountingFailSink::new(2);
let policy = RetryPolicy {
max_retries: 5,
initial_backoff: Duration::from_micros(1),
multiplier: 1.0,
max_backoff: Duration::from_millis(1),
jitter: false,
};
let sink = RetryingSink::new(inner, policy);
let tile = dummy_tile();
sink.write_tile(&tile)
.expect("should succeed after retries");
assert_eq!(sink.retry_count(), 2);
assert_eq!(sink.inner().calls.load(Ordering::SeqCst), 3);
}
#[test]
fn returns_last_error_when_exhausted() {
let inner = CountingFailSink::new(100);
let policy = RetryPolicy {
max_retries: 2,
initial_backoff: Duration::from_micros(1),
multiplier: 1.0,
max_backoff: Duration::from_millis(1),
jitter: false,
};
let sink = RetryingSink::new(inner, policy);
let tile = dummy_tile();
let err = sink.write_tile(&tile).unwrap_err();
match err {
SinkError::Other(msg) => assert_eq!(msg, "fail"),
other => panic!("unexpected error: {other:?}"),
}
assert_eq!(sink.inner().calls.load(Ordering::SeqCst), 3);
assert_eq!(sink.retry_count(), 2);
}
#[test]
fn zero_retries_returns_first_error_immediately() {
let inner = CountingFailSink::new(1);
let policy = RetryPolicy {
max_retries: 0,
initial_backoff: Duration::from_micros(1),
multiplier: 2.0,
max_backoff: Duration::from_millis(1),
jitter: false,
};
let sink = RetryingSink::new(inner, policy);
let tile = dummy_tile();
assert!(sink.write_tile(&tile).is_err());
assert_eq!(sink.retry_count(), 0);
assert_eq!(sink.inner().calls.load(Ordering::SeqCst), 1);
}
}