lading_throttle/
lib.rs

1//! The lading throttle mechanism
2//!
3//! This library supports throttling mechanisms for the rest of the lading
4//! project.
5//!
6//! ## Metrics
7//!
8//! The [predictive] and [stable] throttles emit metrics. See those modules'
9//! documentation for details. The all-out throttle does not emit any metrics.
10//!
11
12#![deny(clippy::all)]
13#![deny(clippy::cargo)]
14#![deny(clippy::pedantic)]
15#![deny(clippy::print_stdout)]
16#![deny(clippy::print_stderr)]
17#![deny(clippy::dbg_macro)]
18#![deny(unused_extern_crates)]
19#![deny(unused_allocation)]
20#![deny(unused_assignments)]
21#![deny(unused_comparisons)]
22#![deny(unreachable_pub)]
23#![deny(missing_docs)]
24#![deny(missing_copy_implementations)]
25#![deny(missing_debug_implementations)]
26#![allow(clippy::cast_precision_loss)]
27#![allow(clippy::multiple_crate_versions)]
28
29use async_trait::async_trait;
30use serde::{Deserialize, Serialize};
31use std::num::NonZeroU32;
32use tokio::time::{self, Duration, Instant};
33
34pub mod stable;
35
36#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)]
37/// Configuration of this generator.
38#[serde(rename_all = "snake_case")]
39pub enum Config {
40    /// A throttle that allows the user to produce as fast as possible.
41    AllOut,
42    /// A throttle that attempts stable load
43    Stable,
44}
45
46impl Default for Config {
47    fn default() -> Self {
48        Self::Stable
49    }
50}
51
52/// Errors produced by [`Throttle`].
53#[derive(thiserror::Error, Debug, Clone, Copy)]
54pub enum Error {
55    /// Stable
56    #[error(transparent)]
57    Stable(#[from] stable::Error),
58}
59
60#[async_trait]
61/// The `Clock` used for every throttle
62pub trait Clock {
63    /// The number of ticks elapsed since last queried
64    fn ticks_elapsed(&self) -> u64;
65    /// Wait for `ticks` amount of time
66    async fn wait(&self, ticks: u64);
67}
68
69#[derive(Debug, Clone, Copy)]
70/// A clock that operates with respect to real-clock time.
71pub struct RealClock {
72    start: Instant,
73}
74
75impl Default for RealClock {
76    fn default() -> Self {
77        Self {
78            start: Instant::now(),
79        }
80    }
81}
82
83#[async_trait]
84impl Clock for RealClock {
85    /// Return the number of ticks since `Clock` was created.
86    ///
87    /// # Panics
88    ///
89    /// Function will panic if the number of ticks elapsed is greater than u64::MAX.
90    #[allow(clippy::cast_possible_truncation)]
91    fn ticks_elapsed(&self) -> u64 {
92        let now = Instant::now();
93        let ticks_since: u128 = now.duration_since(self.start).as_micros();
94        assert!(
95            ticks_since <= u128::from(u64::MAX),
96            "584,554 years elapsed since last call!"
97        );
98        ticks_since as u64
99    }
100
101    async fn wait(&self, ticks: u64) {
102        time::sleep(Duration::from_micros(ticks)).await;
103    }
104}
105
106/// The throttle mechanism
107#[derive(Debug)]
108pub enum Throttle<C = RealClock> {
109    /// Load that comes from this variant is stable with respect to the clock
110    Stable(stable::Stable<C>),
111    /// Load that comes from this variant is as fast as possible with respect to
112    /// the clock
113    AllOut,
114}
115
116impl Throttle<RealClock> {
117    /// Create a new instance of `Throttle` with a real-time clock
118    #[must_use]
119    pub fn new_with_config(config: Config, maximum_capacity: NonZeroU32) -> Self {
120        match config {
121            Config::Stable => Throttle::Stable(stable::Stable::with_clock(
122                maximum_capacity,
123                RealClock::default(),
124            )),
125            Config::AllOut => Throttle::AllOut,
126        }
127    }
128}
129
130impl<C> Throttle<C>
131where
132    C: Clock + Sync + Send,
133{
134    /// Wait for a single unit of capacity to be available, equivalent to
135    /// `wait_for` of 1.
136    ///
137    /// # Errors
138    ///
139    /// See documentation in `Error`
140    #[inline]
141    pub async fn wait(&mut self) -> Result<(), Error> {
142        match self {
143            Throttle::Stable(inner) => inner.wait().await?,
144            Throttle::AllOut => (),
145        }
146
147        Ok(())
148    }
149
150    /// Wait for `request` capacity to be available in the throttle
151    ///
152    /// # Errors
153    ///
154    /// See documentation in `Error`
155    #[inline]
156    pub async fn wait_for(&mut self, request: NonZeroU32) -> Result<(), Error> {
157        match self {
158            Throttle::Stable(inner) => inner.wait_for(request).await?,
159            Throttle::AllOut => (),
160        }
161
162        Ok(())
163    }
164}