rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::time::{Duration, Instant};

/// Sliding window configuration.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WindowConfig {
    /// Number of buckets kept in the rolling window.
    pub buckets: usize,
    /// Duration represented by each bucket.
    pub bucket_duration: Duration,
}

impl Default for WindowConfig {
    fn default() -> Self {
        Self {
            buckets: 10,
            bucket_duration: Duration::from_secs(1),
        }
    }
}

/// Outcome recorded into a rolling window.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WindowOutcome {
    /// The protected operation completed successfully.
    Success,
    /// The protected operation reached the backend and failed.
    Failure,
    /// The protection layer rejected the operation before backend execution.
    Drop,
}

/// Aggregated view over the live buckets in a rolling window.
#[derive(Debug, Clone, Copy, Default, PartialEq)]
pub struct WindowSnapshot {
    /// Successful operation count.
    pub successes: u64,
    /// Failed operation count.
    pub failures: u64,
    /// Dropped operation count.
    pub drops: u64,
    /// Sum of recorded successful latencies.
    pub latency_sum: Duration,
    /// Number of samples contributing to [`Self::latency_sum`].
    pub latency_samples: u64,
}

impl WindowSnapshot {
    /// Total number of recorded outcomes.
    pub fn total(&self) -> u64 {
        self.successes + self.failures + self.drops
    }

    /// Failure ratio in the range `[0.0, 1.0]`.
    pub fn failure_ratio(&self) -> f64 {
        ratio(self.failures, self.successes + self.failures)
    }

    /// Drop ratio in the range `[0.0, 1.0]`.
    pub fn drop_ratio(&self) -> f64 {
        ratio(self.drops, self.total())
    }

    /// Average latency for samples recorded with latency.
    pub fn average_latency(&self) -> Option<Duration> {
        if self.latency_samples == 0 {
            return None;
        }

        Some(Duration::from_secs_f64(
            self.latency_sum.as_secs_f64() / self.latency_samples as f64,
        ))
    }
}

/// Fixed-bucket sliding window for resilience decisions.
#[derive(Debug, Clone)]
pub struct RollingWindow {
    anchor: Instant,
    config: WindowConfig,
    buckets: Vec<WindowBucket>,
}

impl RollingWindow {
    /// Creates a rolling window from configuration.
    pub fn new(config: WindowConfig) -> Self {
        let config = WindowConfig {
            buckets: config.buckets.max(1),
            bucket_duration: if config.bucket_duration.is_zero() {
                Duration::from_millis(1)
            } else {
                config.bucket_duration
            },
        };
        Self {
            anchor: Instant::now(),
            config,
            buckets: vec![WindowBucket::default(); config.buckets],
        }
    }

    /// Records an outcome at the current instant.
    pub fn record(&mut self, outcome: WindowOutcome) {
        self.record_at(outcome, Instant::now());
    }

    /// Records an outcome with a latency sample at the current instant.
    pub fn record_with_latency(&mut self, outcome: WindowOutcome, latency: Duration) {
        self.record_at_with_latency(outcome, latency, Instant::now());
    }

    /// Returns an aggregate over all live buckets.
    pub fn snapshot(&self) -> WindowSnapshot {
        self.snapshot_at(Instant::now())
    }

    /// Returns the maximum successful operations observed in one live bucket.
    pub fn max_successes_per_bucket(&self) -> u64 {
        self.max_successes_per_bucket_at(Instant::now())
    }

    /// Returns the minimum average latency observed in one live bucket.
    pub fn min_average_latency(&self) -> Option<Duration> {
        self.min_average_latency_at(Instant::now())
    }

    pub(crate) fn record_at(&mut self, outcome: WindowOutcome, now: Instant) {
        self.record_at_inner(outcome, None, now);
    }

    pub(crate) fn record_at_with_latency(
        &mut self,
        outcome: WindowOutcome,
        latency: Duration,
        now: Instant,
    ) {
        self.record_at_inner(outcome, Some(latency), now);
    }

    pub(crate) fn snapshot_at(&self, now: Instant) -> WindowSnapshot {
        let current_generation = self.generation(now);
        self.buckets
            .iter()
            .filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
            .fold(WindowSnapshot::default(), |mut snapshot, bucket| {
                snapshot.successes += bucket.successes;
                snapshot.failures += bucket.failures;
                snapshot.drops += bucket.drops;
                snapshot.latency_sum += bucket.latency_sum;
                snapshot.latency_samples += bucket.latency_samples;
                snapshot
            })
    }

    pub(crate) fn max_successes_per_bucket_at(&self, now: Instant) -> u64 {
        let current_generation = self.generation(now);
        self.buckets
            .iter()
            .filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
            .map(|bucket| bucket.successes)
            .max()
            .unwrap_or_default()
    }

    pub(crate) fn min_average_latency_at(&self, now: Instant) -> Option<Duration> {
        let current_generation = self.generation(now);
        self.buckets
            .iter()
            .filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
            .filter_map(WindowBucket::average_latency)
            .min()
    }

    fn record_at_inner(&mut self, outcome: WindowOutcome, latency: Option<Duration>, now: Instant) {
        let generation = self.generation(now);
        let index = generation as usize % self.config.buckets;
        let bucket = &mut self.buckets[index];
        if bucket.generation != Some(generation) {
            *bucket = WindowBucket {
                generation: Some(generation),
                ..WindowBucket::default()
            };
        }

        bucket.record(outcome);
        if let Some(latency) = latency {
            bucket.latency_sum += latency;
            bucket.latency_samples += 1;
        }
    }

    fn generation(&self, now: Instant) -> u64 {
        let elapsed = now.saturating_duration_since(self.anchor);
        let width = self.config.bucket_duration.as_nanos().max(1);
        (elapsed.as_nanos() / width) as u64
    }
}

#[derive(Debug, Clone, Default)]
struct WindowBucket {
    generation: Option<u64>,
    successes: u64,
    failures: u64,
    drops: u64,
    latency_sum: Duration,
    latency_samples: u64,
}

impl WindowBucket {
    fn record(&mut self, outcome: WindowOutcome) {
        match outcome {
            WindowOutcome::Success => self.successes += 1,
            WindowOutcome::Failure => self.failures += 1,
            WindowOutcome::Drop => self.drops += 1,
        }
    }

    fn is_live(&self, current_generation: u64, bucket_count: u64) -> bool {
        self.generation
            .is_some_and(|generation| current_generation.saturating_sub(generation) < bucket_count)
    }

    fn average_latency(&self) -> Option<Duration> {
        if self.latency_samples == 0 {
            return None;
        }

        Some(Duration::from_secs_f64(
            self.latency_sum.as_secs_f64() / self.latency_samples as f64,
        ))
    }
}

fn ratio(part: u64, total: u64) -> f64 {
    if total == 0 {
        0.0
    } else {
        part as f64 / total as f64
    }
}

#[cfg(test)]
mod tests {
    use std::time::{Duration, Instant};

    use super::{RollingWindow, WindowConfig, WindowOutcome};

    #[test]
    fn rolling_window_aggregates_live_buckets() {
        let mut window = RollingWindow::new(WindowConfig {
            buckets: 2,
            bucket_duration: Duration::from_millis(10),
        });
        let now = Instant::now();

        window.record_at(WindowOutcome::Success, now);
        window.record_at(WindowOutcome::Failure, now + Duration::from_millis(10));
        window.record_at(WindowOutcome::Drop, now + Duration::from_millis(20));

        let snapshot = window.snapshot_at(now + Duration::from_millis(20));
        assert_eq!(snapshot.successes, 0);
        assert_eq!(snapshot.failures, 1);
        assert_eq!(snapshot.drops, 1);
        assert_eq!(snapshot.total(), 2);
    }

    #[test]
    fn rolling_window_tracks_average_latency() {
        let mut window = RollingWindow::new(WindowConfig::default());
        let now = Instant::now();

        window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(10), now);
        window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(30), now);

        assert_eq!(
            window.snapshot_at(now).average_latency(),
            Some(Duration::from_millis(20))
        );
    }

    #[test]
    fn rolling_window_reports_max_pass_and_min_latency() {
        let mut window = RollingWindow::new(WindowConfig {
            buckets: 2,
            bucket_duration: Duration::from_millis(10),
        });
        let now = Instant::now();

        window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(40), now);
        window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(20), now);
        window.record_at_with_latency(
            WindowOutcome::Success,
            Duration::from_millis(5),
            now + Duration::from_millis(10),
        );

        assert_eq!(window.max_successes_per_bucket_at(now), 2);
        assert_eq!(
            window.min_average_latency_at(now + Duration::from_millis(10)),
            Some(Duration::from_millis(5))
        );
    }
}