lading_throttle/
stable.rs

1//! Stable throttle
2//!
3//! This throttle refills capacity at a steady rate.
4
5use std::num::NonZeroU32;
6
7use super::{Clock, RealClock};
8
9// An 'interval' is the period in which all counters reset. The throttle makes
10// no claims on units, but consider if a user intends to produce 1Mb/s the
11// 'interval' is one second and each tick corresponds to one microsecond. Each
12// microsecond accumulates 1 byte.
13const INTERVAL_TICKS: u64 = 1_000_000;
14
15/// Errors produced by [`Stable`].
16#[derive(thiserror::Error, Debug, Clone, Copy)]
17pub enum Error {
18    /// Requested capacity is greater than maximum allowed capacity.
19    #[error("Capacity")]
20    Capacity,
21}
22
23#[derive(Debug)]
24/// A throttle type.
25///
26/// This throttle is stable in that it will steadily refill units at a known
27/// rate and does not inspect the target in any way.
28pub struct Stable<C = RealClock> {
29    valve: Valve,
30    /// The clock that `Stable` will use.
31    clock: C,
32}
33
34impl<C> Stable<C>
35where
36    C: Clock + Send + Sync,
37{
38    #[inline]
39    pub(crate) async fn wait(&mut self) -> Result<(), Error> {
40        // SAFETY: 1_u32 is a non-zero u32.
41        let one = unsafe { NonZeroU32::new_unchecked(1_u32) };
42        self.wait_for(one).await
43    }
44
45    pub(crate) async fn wait_for(&mut self, request: NonZeroU32) -> Result<(), Error> {
46        loop {
47            let slop: u64 = self
48                .valve
49                .request(self.clock.ticks_elapsed(), request.get())?;
50            if slop == 0 {
51                break;
52            }
53            self.clock.wait(slop).await;
54        }
55        Ok(())
56    }
57
58    pub(crate) fn with_clock(maximum_capacity: NonZeroU32, clock: C) -> Self {
59        Self {
60            valve: Valve::new(maximum_capacity),
61            clock,
62        }
63    }
64}
65
66/// The non-async interior to Stable, about which we can make proof claims. The
67/// mechanical analogue isn't quite right but think of this as a poppet valve
68/// for the stable throttle.
69#[derive(Debug)]
70struct Valve {
71    /// The maximum capacity of `Valve` past which no more capacity will be
72    /// added.
73    maximum_capacity: u32,
74    /// The capacity of the `Valve`. This amount will be drawn on by every
75    /// request. It is refilled to maximum at every interval roll-over.
76    capacity: u32,
77    /// The current interval -- multiple of `INTERVAL_TICKS` --  of time.
78    interval: u64,
79}
80
81impl Valve {
82    /// Create a new `Valve` instance with a maximum capacity, given in
83    /// tick-units.
84    fn new(maximum_capacity: NonZeroU32) -> Self {
85        let maximum_capacity = maximum_capacity.get();
86        Self {
87            capacity: maximum_capacity,
88            maximum_capacity,
89            interval: 0,
90        }
91    }
92
93    /// For a given `capacity_request` and an amount of `ticks_elapsed` since
94    /// the last call return how long a caller would have to wait -- in ticks --
95    /// before the valve will have sufficient spare capacity to be open.
96    ///
97    /// Note that `ticks_elapsed` must be an absolute value.
98    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
99    fn request(&mut self, ticks_elapsed: u64, capacity_request: u32) -> Result<u64, Error> {
100        // Okay, here's the idea. We have bucket that fills every INTERVAL_TICKS
101        // seconds up and requests draw down on that bucket. When it's empty, we
102        // return the number of ticks until the next interval roll-over. Callers
103        // are expected to wait although nothing forces them to. Capacity is
104        // only drawn on when it is immediately available.
105        if capacity_request == 0 {
106            return Ok(0);
107        }
108
109        // Fast bail-out. There's no way for this to ever be satisfied and is a
110        // bug on the part of the caller, arguably.
111        if capacity_request > self.maximum_capacity {
112            return Err(Error::Capacity);
113        }
114
115        let current_interval = ticks_elapsed / INTERVAL_TICKS;
116        if current_interval > self.interval {
117            // We have rolled forward into a new interval. At this point the
118            // capacity is reset to maximum -- no matter how deep we are into
119            // the interval -- and we record the new interval index.
120            self.capacity = self.maximum_capacity;
121            self.interval = current_interval;
122        }
123
124        // If the capacity is greater or equal to the request we deduct the
125        // request from capacity and return 0 slop, signaling to the user that
126        // their request is a success. Else, we calculate how long the caller
127        // should wait until the interval rolls over. The capacity will never
128        // increase in this interval so they will have to call again later.
129        if capacity_request <= self.capacity {
130            self.capacity -= capacity_request;
131            Ok(0)
132        } else {
133            Ok(INTERVAL_TICKS - (ticks_elapsed % INTERVAL_TICKS))
134        }
135    }
136}
137
138#[cfg(test)]
139mod test {
140    use std::num::NonZeroU32;
141
142    use proptest::{collection, prelude::*};
143
144    use crate::stable::{Valve, INTERVAL_TICKS};
145
146    fn capacity_never_exceeds_max_in_interval_inner(
147        maximum_capacity: u32,
148        mut requests: Vec<NonZeroU32>,
149    ) -> Result<(), proptest::test_runner::TestCaseError> {
150        let mut valve = Valve::new(NonZeroU32::new(maximum_capacity).unwrap());
151        let maximum_capacity = u64::from(maximum_capacity);
152
153        let mut ticks_elapsed: u64 = 0;
154        let mut granted_requests: u64 = 0;
155        let mut interval: u64 = 0;
156
157        let mut slop = 0;
158        for request in requests.drain(..) {
159            ticks_elapsed += slop;
160
161            let current_interval = ticks_elapsed / INTERVAL_TICKS;
162            if interval < current_interval {
163                // We have entered into a new interval, the granted requests
164                // must be reset.
165                prop_assert!(granted_requests <= maximum_capacity,
166                                     "[interval-change] Granted requests {granted_requests} exceeded the maximum capacity of the valve, {maximum_capacity}");
167                granted_requests = 0;
168                interval = current_interval;
169            }
170
171            match valve.request(ticks_elapsed, request.get()) {
172                Ok(0) => {
173                    // The request went through right away.
174                    granted_requests += u64::from(request.get());
175                    slop = 0;
176                }
177                Ok(s) => {
178                    // The request must wait for 'slop' ticks. We choose to
179                    // 'wait' by adding to the slop accumulator but may or may
180                    // not make the same request of the valve. No request is
181                    // granted if slop is non-zero.
182                    slop = s;
183                }
184                Err(_) => {
185                    // ignored intentionally
186                }
187            }
188            prop_assert!(granted_requests <= maximum_capacity,
189                             "[end] Granted requests {granted_requests} exceeded the maximum capacity of the valve, {maximum_capacity}");
190        }
191        Ok(())
192    }
193
194    #[test]
195    fn static_capacity_never_exceeds_max_in_interval() {
196        let maximum_capacity = 490301363u32;
197        let requests: Vec<NonZeroU32> = vec![
198            NonZeroU32::new(1).unwrap(),
199            NonZeroU32::new(490301363).unwrap(),
200        ];
201        capacity_never_exceeds_max_in_interval_inner(maximum_capacity, requests).unwrap()
202    }
203
204    fn cap_requests(max: u32) -> impl Strategy<Value = Vec<NonZeroU32>> {
205        collection::vec((1..max).prop_map(|i| NonZeroU32::new(i).unwrap()), 1..100)
206    }
207
208    // The sum of capacity requests must never exceed maximum_capacity in one
209    // INTERVAL_TICKS.
210    proptest! {
211        #![proptest_config(ProptestConfig {
212            cases: 1_000_000,
213            max_shrink_iters: 1_000_000,
214            .. ProptestConfig::default()
215        })]
216        #[test]
217        fn capacity_never_exceeds_max_in_interval(
218            maximum_capacity in (1..u32::MAX),
219            requests in cap_requests(u16::MAX as u32)
220        ) {
221            capacity_never_exceeds_max_in_interval_inner(maximum_capacity, requests)?
222        }
223    }
224}
225
226#[cfg(kani)]
227mod verification {
228    use crate::stable::{Valve, INTERVAL_TICKS};
229    use std::num::NonZeroU32;
230
231    // The sum of capacity requests must never exceed maximum_capacity in one
232    // INTERVAL_TICKS.
233    #[kani::proof]
234    #[kani::unwind(100)] // must match `iters` below
235    fn capacity_never_exceeds_max_in_interval() {
236        let maximum_capacity: NonZeroU32 = kani::any();
237        let mut valve = Valve::new(maximum_capacity);
238        let maximum_capacity = maximum_capacity.get();
239
240        let mut ticks_elapsed: u64 = 0;
241        let mut granted_requests: u64 = 0;
242        let mut interval: u64 = 0;
243
244        let iters: usize = kani::any();
245        kani::assume(iters < 100);
246
247        let mut slop = 0;
248        for _ in 0..iters {
249            let request: NonZeroU32 = kani::any();
250            kani::assume(request.get() <= maximum_capacity);
251
252            ticks_elapsed += slop;
253
254            let current_interval = ticks_elapsed / INTERVAL_TICKS;
255            if interval < current_interval {
256                // We have entered into a new interval, the granted requests
257                // must be reset.
258                if granted_requests > u64::from(maximum_capacity) {
259                    panic!("too many requests granted");
260                }
261                granted_requests = 0;
262                interval = current_interval;
263            }
264
265            match valve.request(ticks_elapsed, request.get()) {
266                Ok(0) => {
267                    // The request went through right away.
268                    granted_requests += u64::from(request.get());
269                    slop = 0;
270                }
271                Ok(s) => {
272                    // The request must wait for 'slop' ticks. We choose to
273                    // 'wait' by adding to the slop accumulator but may or may
274                    // not make the same request of the valve. No request is
275                    // granted if slop is non-zero.
276                    slop = s;
277                }
278                Err(_) => {
279                    // ignored intentionally
280                }
281            }
282            if granted_requests > u64::from(maximum_capacity) {
283                panic!("too many requests granted");
284            }
285        }
286    }
287}