use crate::error::{TrazaeoError, TrazaeoResult};
use std::collections::HashSet;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub initial_backoff_ms: u64,
pub backoff_multiplier: u64,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_attempts: 3,
initial_backoff_ms: 100,
backoff_multiplier: 2,
}
}
}
pub fn retry_with_backoff<T, E, F>(policy: RetryPolicy, mut op: F) -> Result<T, E>
where
F: FnMut(u32) -> Result<T, E>,
{
let mut attempt = 1;
loop {
match op(attempt) {
Ok(value) => return Ok(value),
Err(err) => {
if attempt >= policy.max_attempts {
return Err(err);
}
attempt += 1;
}
}
}
}
pub fn compute_backoff_schedule(policy: RetryPolicy) -> Vec<u64> {
let mut schedule = Vec::new();
let mut backoff = policy.initial_backoff_ms;
for _ in 1..policy.max_attempts {
schedule.push(backoff);
backoff = backoff.saturating_mul(policy.backoff_multiplier);
}
schedule
}
pub fn enforce_monotonic_sequence(sequence_numbers: &[u64]) -> TrazaeoResult<()> {
if sequence_numbers.is_empty() {
return Ok(());
}
let mut prev = sequence_numbers[0];
for current in sequence_numbers.iter().copied().skip(1) {
if current <= prev {
return Err(TrazaeoError::invalid_input(
"enforce monotonic sequence",
format!("non-monotonic event ordering detected: prev={prev}, current={current}"),
));
}
prev = current;
}
Ok(())
}
#[derive(Debug, Default)]
pub struct InMemoryIdempotencyStore {
processed_ids: HashSet<String>,
}
impl InMemoryIdempotencyStore {
pub fn has_processed(&self, id: &str) -> bool {
self.processed_ids.contains(id)
}
pub fn mark_processed(&mut self, id: &str) {
self.processed_ids.insert(id.to_string());
}
pub fn process_once<T, F>(&mut self, id: &str, op: F) -> Option<T>
where
F: FnOnce() -> T,
{
if self.has_processed(id) {
return None;
}
let result = op();
self.mark_processed(id);
Some(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn retry_with_backoff_eventually_succeeds() {
let policy = RetryPolicy {
max_attempts: 4,
initial_backoff_ms: 10,
backoff_multiplier: 2,
};
let mut calls = 0;
let result: Result<u32, &'static str> = retry_with_backoff(policy, |_attempt| {
calls += 1;
if calls < 3 {
Err("temporary")
} else {
Ok(42)
}
});
assert_eq!(result.expect("must succeed"), 42);
assert_eq!(calls, 3);
}
#[test]
fn retry_with_backoff_returns_last_error() {
let policy = RetryPolicy {
max_attempts: 2,
initial_backoff_ms: 10,
backoff_multiplier: 2,
};
let mut calls = 0;
let result: Result<u32, &'static str> = retry_with_backoff(policy, |_attempt| {
calls += 1;
Err("permanent")
});
assert_eq!(result.expect_err("must fail"), "permanent");
assert_eq!(calls, 2);
}
#[test]
fn compute_backoff_schedule_matches_policy() {
let schedule = compute_backoff_schedule(RetryPolicy {
max_attempts: 4,
initial_backoff_ms: 50,
backoff_multiplier: 3,
});
assert_eq!(schedule, vec![50, 150, 450]);
}
#[test]
fn idempotency_store_processes_once_per_id() {
let mut store = InMemoryIdempotencyStore::default();
let first = store.process_once("evt-1", || 7);
let second = store.process_once("evt-1", || 8);
assert_eq!(first, Some(7));
assert_eq!(second, None);
assert!(store.has_processed("evt-1"));
}
#[test]
fn enforce_monotonic_sequence_accepts_strictly_increasing_events() {
assert!(enforce_monotonic_sequence(&[1, 2, 3, 9]).is_ok());
}
#[test]
fn enforce_monotonic_sequence_rejects_reordered_events() {
let err = enforce_monotonic_sequence(&[1, 3, 2]).expect_err("must detect reorder");
assert!(err.to_string().contains("non-monotonic event ordering"));
}
}