loadwise-core 0.1.0

Core traits, strategies, and in-memory stores for loadwise
Documentation
//! Fixed-window time-bucketed counter for rate tracking.

use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
use std::time::{Duration, Instant};

/// Fixed-window counter that divides time into buckets for rate tracking.
///
/// Divides the timeline into `resolution` buckets, each covering
/// `window_size / resolution` of time. Queries sum only non-expired buckets.
///
/// Uses atomic operations internally — lock-free and `Send + Sync`.
pub(crate) struct FixedWindowCounter {
    /// Per-bucket epoch (which logical bucket period this slot last belonged to).
    epochs: Box<[AtomicU64]>,
    /// Per-bucket accumulated count.
    counts: Box<[AtomicU64]>,
    /// Duration of a single bucket in nanoseconds.
    bucket_duration_ns: u64,
    /// Number of buckets (resolution).
    resolution: usize,
    /// Anchor point for elapsed-time calculations.
    start: Instant,
}

impl FixedWindowCounter {
    /// Creates a new counter with the given window size and resolution (bucket count).
    ///
    /// # Panics
    ///
    /// Panics if `resolution` is 0 or `window_size` is zero.
    pub(crate) fn new(window_size: Duration, resolution: usize) -> Self {
        assert!(resolution > 0, "resolution must be > 0");
        let window_ns = window_size.as_nanos() as u64;
        assert!(window_ns > 0, "window_size must be non-zero");

        let bucket_duration_ns = window_ns / resolution as u64;
        assert!(
            bucket_duration_ns > 0,
            "window_size ({window_ns} ns) too small for resolution {resolution}"
        );

        Self {
            epochs: (0..resolution).map(|_| AtomicU64::new(0)).collect(),
            counts: (0..resolution).map(|_| AtomicU64::new(0)).collect(),
            bucket_duration_ns,
            resolution,
            start: Instant::now(),
        }
    }

    /// Records `amount` into the current time bucket.
    ///
    /// If the bucket has rolled over to a new epoch, its count is reset
    /// atomically before adding the new amount.
    pub(crate) fn record(&self, amount: u64) {
        let elapsed_ns = self.start.elapsed().as_nanos() as u64;
        let current_epoch = elapsed_ns / self.bucket_duration_ns;
        let bucket_idx = (current_epoch as usize) % self.resolution;

        loop {
            let stored_epoch = self.epochs[bucket_idx].load(Relaxed);

            if stored_epoch == current_epoch {
                // Same epoch — just add.
                self.counts[bucket_idx].fetch_add(amount, Relaxed);
                return;
            }

            // Stale epoch — try to claim the bucket for the current epoch.
            match self.epochs[bucket_idx].compare_exchange(
                stored_epoch,
                current_epoch,
                Relaxed,
                Relaxed,
            ) {
                Ok(_) => {
                    // We won the CAS — reset then add. Using store(0) + fetch_add
                    // instead of store(amount) to avoid overwriting a concurrent
                    // fetch_add from a thread that saw the new epoch between our
                    // CAS and this write.
                    self.counts[bucket_idx].store(0, Relaxed);
                    self.counts[bucket_idx].fetch_add(amount, Relaxed);
                    return;
                }
                Err(_) => {
                    // Another thread updated the epoch — retry.
                    continue;
                }
            }
        }
    }

    /// Returns the sum of all bucket counts whose epochs fall within the
    /// current window.
    ///
    /// A bucket is considered current when
    /// `current_epoch - bucket_epoch < resolution`.
    pub(crate) fn sum(&self) -> u64 {
        let elapsed_ns = self.start.elapsed().as_nanos() as u64;
        let current_epoch = elapsed_ns / self.bucket_duration_ns;

        let mut total: u64 = 0;
        for i in 0..self.resolution {
            let epoch = self.epochs[i].load(Relaxed);
            if epoch <= current_epoch && current_epoch - epoch < self.resolution as u64 {
                total = total.saturating_add(self.counts[i].load(Relaxed));
            }
        }
        total
    }

    /// Returns how much capacity remains before hitting `limit`.
    ///
    /// Equivalent to `limit.saturating_sub(self.sum())`.
    pub(crate) fn remaining(&self, limit: u64) -> u64 {
        limit.saturating_sub(self.sum())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;

    #[test]
    fn record_and_sum() {
        let counter = FixedWindowCounter::new(Duration::from_secs(60), 6);
        counter.record(50);
        counter.record(30);
        assert_eq!(counter.sum(), 80);
    }

    #[test]
    fn remaining_calculation() {
        let counter = FixedWindowCounter::new(Duration::from_secs(60), 6);
        counter.record(200);
        assert_eq!(counter.remaining(1000), 800);
        assert_eq!(counter.remaining(100), 0);
    }

    #[test]
    fn expired_buckets_not_counted() {
        let counter = FixedWindowCounter::new(Duration::from_millis(50), 5);
        counter.record(42);
        assert_eq!(counter.sum(), 42);

        // Sleep past the entire window so all buckets expire.
        thread::sleep(Duration::from_millis(80));
        assert_eq!(counter.sum(), 0);
    }

    #[test]
    fn multiple_records_accumulate() {
        let counter = FixedWindowCounter::new(Duration::from_secs(10), 2);
        for _ in 0..100 {
            counter.record(1);
        }
        assert_eq!(counter.sum(), 100);
    }
}