trazaeo 0.5.7

Open-source provenance SDK and specification for verifiable EO and climate data workflows
Documentation
use crate::error::{TrazaeoError, TrazaeoResult};
use std::collections::HashSet;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RetryPolicy {
    pub max_attempts: u32,
    pub initial_backoff_ms: u64,
    pub backoff_multiplier: u64,
}

impl Default for RetryPolicy {
    /// Builds the default value for this type.
    fn default() -> Self {
        Self {
            max_attempts: 3,
            initial_backoff_ms: 100,
            backoff_multiplier: 2,
        }
    }
}

/// Retries with backoff.
pub fn retry_with_backoff<T, E, F>(policy: RetryPolicy, mut op: F) -> Result<T, E>
where
    F: FnMut(u32) -> Result<T, E>,
{
    let mut attempt = 1;
    loop {
        match op(attempt) {
            Ok(value) => return Ok(value),
            Err(err) => {
                if attempt >= policy.max_attempts {
                    return Err(err);
                }
                attempt += 1;
            }
        }
    }
}

/// Computes backoff schedule.
pub fn compute_backoff_schedule(policy: RetryPolicy) -> Vec<u64> {
    let mut schedule = Vec::new();
    let mut backoff = policy.initial_backoff_ms;
    for _ in 1..policy.max_attempts {
        schedule.push(backoff);
        backoff = backoff.saturating_mul(policy.backoff_multiplier);
    }
    schedule
}

/// Enforces monotonic sequence.
pub fn enforce_monotonic_sequence(sequence_numbers: &[u64]) -> TrazaeoResult<()> {
    if sequence_numbers.is_empty() {
        return Ok(());
    }
    let mut prev = sequence_numbers[0];
    for current in sequence_numbers.iter().copied().skip(1) {
        if current <= prev {
            return Err(TrazaeoError::invalid_input(
                "enforce monotonic sequence",
                format!("non-monotonic event ordering detected: prev={prev}, current={current}"),
            ));
        }
        prev = current;
    }
    Ok(())
}

#[derive(Debug, Default)]
pub struct InMemoryIdempotencyStore {
    processed_ids: HashSet<String>,
}

impl InMemoryIdempotencyStore {
    /// Handles has processed.
    pub fn has_processed(&self, id: &str) -> bool {
        self.processed_ids.contains(id)
    }

    /// Handles mark processed.
    pub fn mark_processed(&mut self, id: &str) {
        self.processed_ids.insert(id.to_string());
    }

    /// Processes once.
    pub fn process_once<T, F>(&mut self, id: &str, op: F) -> Option<T>
    where
        F: FnOnce() -> T,
    {
        if self.has_processed(id) {
            return None;
        }
        let result = op();
        self.mark_processed(id);
        Some(result)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use proptest::prelude::*;
    use std::collections::HashSet;

    /// Tests that retry with backoff eventually succeeds.
    #[test]
    fn retry_with_backoff_eventually_succeeds() {
        let policy = RetryPolicy {
            max_attempts: 4,
            initial_backoff_ms: 10,
            backoff_multiplier: 2,
        };
        let mut calls = 0;
        let result: Result<u32, &'static str> = retry_with_backoff(policy, |_attempt| {
            calls += 1;
            if calls < 3 {
                Err("temporary")
            } else {
                Ok(42)
            }
        });
        assert_eq!(result.expect("must succeed"), 42);
        assert_eq!(calls, 3);
    }

    /// Tests that retry with backoff returns last error.
    #[test]
    fn retry_with_backoff_returns_last_error() {
        let policy = RetryPolicy {
            max_attempts: 2,
            initial_backoff_ms: 10,
            backoff_multiplier: 2,
        };
        let mut calls = 0;
        let result: Result<u32, &'static str> = retry_with_backoff(policy, |_attempt| {
            calls += 1;
            Err("permanent")
        });
        assert_eq!(result.expect_err("must fail"), "permanent");
        assert_eq!(calls, 2);
    }

    /// Tests that compute backoff schedule matches policy.
    #[test]
    fn compute_backoff_schedule_matches_policy() {
        let schedule = compute_backoff_schedule(RetryPolicy {
            max_attempts: 4,
            initial_backoff_ms: 50,
            backoff_multiplier: 3,
        });
        assert_eq!(schedule, vec![50, 150, 450]);
    }

    /// Tests that idempotency store processes once per id.
    #[test]
    fn idempotency_store_processes_once_per_id() {
        let mut store = InMemoryIdempotencyStore::default();
        let first = store.process_once("evt-1", || 7);
        let second = store.process_once("evt-1", || 8);
        assert_eq!(first, Some(7));
        assert_eq!(second, None);
        assert!(store.has_processed("evt-1"));
    }

    /// Tests that enforce monotonic sequence accepts strictly increasing events.
    #[test]
    fn enforce_monotonic_sequence_accepts_strictly_increasing_events() {
        assert!(enforce_monotonic_sequence(&[1, 2, 3, 9]).is_ok());
    }

    /// Tests that enforce monotonic sequence rejects reordered events.
    #[test]
    fn enforce_monotonic_sequence_rejects_reordered_events() {
        let err = enforce_monotonic_sequence(&[1, 3, 2]).expect_err("must detect reorder");
        assert!(err.to_string().contains("non-monotonic event ordering"));
    }

    proptest! {
        #[test]
        fn monotonic_sequence_matches_window_oracle(sequence in proptest::collection::vec(any::<u64>(), 0..64)) {
            let expected = sequence.windows(2).all(|window| window[0] < window[1]);
            prop_assert_eq!(enforce_monotonic_sequence(&sequence).is_ok(), expected);
        }

        #[test]
        fn backoff_schedule_matches_saturating_model(
            max_attempts in 0u32..20,
            initial_backoff_ms in 0u64..1_000_000,
            backoff_multiplier in 0u64..10,
        ) {
            let policy = RetryPolicy {
                max_attempts,
                initial_backoff_ms,
                backoff_multiplier,
            };
            let mut expected = Vec::new();
            let mut backoff = initial_backoff_ms;
            for _ in 1..max_attempts {
                expected.push(backoff);
                backoff = backoff.saturating_mul(backoff_multiplier);
            }

            prop_assert_eq!(compute_backoff_schedule(policy), expected);
        }

        #[test]
        fn idempotency_store_matches_first_seen_model(ids in proptest::collection::vec("[a-z0-9_-]{0,12}", 0..64)) {
            let mut store = InMemoryIdempotencyStore::default();
            let mut seen = HashSet::new();

            for (index, id) in ids.iter().enumerate() {
                let first_seen = seen.insert(id.clone());
                let result = store.process_once(id, || index);
                if first_seen {
                    prop_assert_eq!(result, Some(index));
                } else {
                    prop_assert_eq!(result, None);
                }
                prop_assert!(store.has_processed(id));
            }
        }
    }
}