tower 0.4.11

Tower is a library of modular and reusable components for building robust clients and servers.
Documentation
//! A retry "budget" for allowing only a certain amount of retries over time.

use std::{
    fmt,
    sync::{
        atomic::{AtomicIsize, Ordering},
        Mutex,
    },
    time::Duration,
};
use tokio::time::Instant;

/// Represents a "budget" for retrying requests.
///
/// This is useful for limiting the amount of retries a service can perform
/// over a period of time, or per a certain number of requests attempted.
pub struct Budget {
    bucket: Bucket,
    deposit_amount: isize,
    withdraw_amount: isize,
}

/// Indicates that it is not currently allowed to "withdraw" another retry
/// from the [`Budget`].
#[derive(Debug)]
pub struct Overdrawn {
    _inner: (),
}

#[derive(Debug)]
struct Bucket {
    generation: Mutex<Generation>,
    /// Initial budget allowed for every second.
    reserve: isize,
    /// Slots of a the TTL divided evenly.
    slots: Box<[AtomicIsize]>,
    /// The amount of time represented by each slot.
    window: Duration,
    /// The changers for the current slot to be commited
    /// after the slot expires.
    writer: AtomicIsize,
}

#[derive(Debug)]
struct Generation {
    /// Slot index of the last generation.
    index: usize,
    /// The timestamp since the last generation expired.
    time: Instant,
}

// ===== impl Budget =====

impl Budget {
    /// Create a [`Budget`] that allows for a certain percent of the total
    /// requests to be retried.
    ///
    /// - The `ttl` is the duration of how long a single `deposit` should be
    ///   considered. Must be between 1 and 60 seconds.
    /// - The `min_per_sec` is the minimum rate of retries allowed to accomodate
    ///   clients that have just started issuing requests, or clients that do
    ///   not issue many requests per window.
    /// - The `retry_percent` is the percentage of calls to `deposit` that can
    ///   be retried. This is in addition to any retries allowed for via
    ///   `min_per_sec`. Must be between 0 and 1000.
    ///
    ///   As an example, if `0.1` is used, then for every 10 calls to `deposit`,
    ///   1 retry will be allowed. If `2.0` is used, then every `deposit`
    ///   allows for 2 retries.
    pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self {
        // assertions taken from finagle
        assert!(ttl >= Duration::from_secs(1));
        assert!(ttl <= Duration::from_secs(60));
        assert!(retry_percent >= 0.0);
        assert!(retry_percent <= 1000.0);
        assert!(min_per_sec < ::std::i32::MAX as u32);

        let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 {
            // If there is no percent, then you gain nothing from deposits.
            // Withdrawals can only be made against the reserve, over time.
            (0, 1)
        } else if retry_percent <= 1.0 {
            (1, (1.0 / retry_percent) as isize)
        } else {
            // Support for when retry_percent is between 1.0 and 1000.0,
            // meaning for every deposit D, D*retry_percent withdrawals
            // can be made.
            (1000, (1000.0 / retry_percent) as isize)
        };
        let reserve = (min_per_sec as isize)
            .saturating_mul(ttl.as_secs() as isize) // ttl is between 1 and 60 seconds
            .saturating_mul(withdraw_amount);

        // AtomicIsize isn't clone, so the slots need to be built in a loop...
        let windows = 10u32;
        let mut slots = Vec::with_capacity(windows as usize);
        for _ in 0..windows {
            slots.push(AtomicIsize::new(0));
        }

        Budget {
            bucket: Bucket {
                generation: Mutex::new(Generation {
                    index: 0,
                    time: Instant::now(),
                }),
                reserve,
                slots: slots.into_boxed_slice(),
                window: ttl / windows,
                writer: AtomicIsize::new(0),
            },
            deposit_amount,
            withdraw_amount,
        }
    }

    /// Store a "deposit" in the budget, which will be used to permit future
    /// withdrawals.
    pub fn deposit(&self) {
        self.bucket.put(self.deposit_amount);
    }

    /// Check whether there is enough "balance" in the budget to issue a new
    /// retry.
    ///
    /// If there is not enough, an `Err(Overdrawn)` is returned.
    pub fn withdraw(&self) -> Result<(), Overdrawn> {
        if self.bucket.try_get(self.withdraw_amount) {
            Ok(())
        } else {
            Err(Overdrawn { _inner: () })
        }
    }
}

impl Default for Budget {
    fn default() -> Budget {
        Budget::new(Duration::from_secs(10), 10, 0.2)
    }
}

impl fmt::Debug for Budget {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Budget")
            .field("deposit", &self.deposit_amount)
            .field("withdraw", &self.withdraw_amount)
            .field("balance", &self.bucket.sum())
            .finish()
    }
}

// ===== impl Bucket =====

impl Bucket {
    fn put(&self, amt: isize) {
        self.expire();
        self.writer.fetch_add(amt, Ordering::SeqCst);
    }

    fn try_get(&self, amt: isize) -> bool {
        debug_assert!(amt >= 0);

        self.expire();

        let sum = self.sum();
        if sum >= amt {
            self.writer.fetch_add(-amt, Ordering::SeqCst);
            true
        } else {
            false
        }
    }

    fn expire(&self) {
        let mut gen = self.generation.lock().expect("generation lock");

        let now = Instant::now();
        let diff = now - gen.time;
        if diff < self.window {
            // not expired yet
            return;
        }

        let to_commit = self.writer.swap(0, Ordering::SeqCst);
        self.slots[gen.index].store(to_commit, Ordering::SeqCst);

        let mut diff = diff;
        let mut idx = (gen.index + 1) % self.slots.len();
        while diff > self.window {
            self.slots[idx].store(0, Ordering::SeqCst);
            diff -= self.window;
            idx = (idx + 1) % self.slots.len();
        }

        gen.index = idx;
        gen.time = now;
    }

    fn sum(&self) -> isize {
        let current = self.writer.load(Ordering::SeqCst);
        let windowed_sum: isize = self
            .slots
            .iter()
            .map(|slot| slot.load(Ordering::SeqCst))
            // fold() is used instead of sum() to determine overflow behavior
            .fold(0, isize::saturating_add);

        current
            .saturating_add(windowed_sum)
            .saturating_add(self.reserve)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::time;

    #[test]
    fn empty() {
        let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
        bgt.withdraw().unwrap_err();
    }

    #[tokio::test]
    async fn leaky() {
        time::pause();

        let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
        bgt.deposit();

        time::advance(Duration::from_secs(3)).await;

        bgt.withdraw().unwrap_err();
    }

    #[tokio::test]
    async fn slots() {
        time::pause();

        let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
        bgt.deposit();
        bgt.deposit();
        time::advance(Duration::from_millis(901)).await;
        // 900ms later, the deposit should still be valid
        bgt.withdraw().unwrap();

        // blank slate
        time::advance(Duration::from_millis(2001)).await;

        bgt.deposit();
        time::advance(Duration::from_millis(301)).await;
        bgt.deposit();
        time::advance(Duration::from_millis(801)).await;
        bgt.deposit();

        // the first deposit is expired, but the 2nd should still be valid,
        // combining with the 3rd
        bgt.withdraw().unwrap();
    }

    #[tokio::test]
    async fn reserve() {
        let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
        bgt.withdraw().unwrap();
        bgt.withdraw().unwrap();
        bgt.withdraw().unwrap();
        bgt.withdraw().unwrap();
        bgt.withdraw().unwrap();

        bgt.withdraw().unwrap_err();
    }
}