lading_throttle/
stable.rs1use std::num::NonZeroU32;
6
7use super::{Clock, RealClock};
8
9const INTERVAL_TICKS: u64 = 1_000_000;
14
15#[derive(thiserror::Error, Debug, Clone, Copy)]
17pub enum Error {
18 #[error("Capacity")]
20 Capacity,
21}
22
23#[derive(Debug)]
24pub struct Stable<C = RealClock> {
29 valve: Valve,
30 clock: C,
32}
33
34impl<C> Stable<C>
35where
36 C: Clock + Send + Sync,
37{
38 #[inline]
39 pub(crate) async fn wait(&mut self) -> Result<(), Error> {
40 let one = unsafe { NonZeroU32::new_unchecked(1_u32) };
42 self.wait_for(one).await
43 }
44
45 pub(crate) async fn wait_for(&mut self, request: NonZeroU32) -> Result<(), Error> {
46 loop {
47 let slop: u64 = self
48 .valve
49 .request(self.clock.ticks_elapsed(), request.get())?;
50 if slop == 0 {
51 break;
52 }
53 self.clock.wait(slop).await;
54 }
55 Ok(())
56 }
57
58 pub(crate) fn with_clock(maximum_capacity: NonZeroU32, clock: C) -> Self {
59 Self {
60 valve: Valve::new(maximum_capacity),
61 clock,
62 }
63 }
64}
65
66#[derive(Debug)]
70struct Valve {
71 maximum_capacity: u32,
74 capacity: u32,
77 interval: u64,
79}
80
81impl Valve {
82 fn new(maximum_capacity: NonZeroU32) -> Self {
85 let maximum_capacity = maximum_capacity.get();
86 Self {
87 capacity: maximum_capacity,
88 maximum_capacity,
89 interval: 0,
90 }
91 }
92
93 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
99 fn request(&mut self, ticks_elapsed: u64, capacity_request: u32) -> Result<u64, Error> {
100 if capacity_request == 0 {
106 return Ok(0);
107 }
108
109 if capacity_request > self.maximum_capacity {
112 return Err(Error::Capacity);
113 }
114
115 let current_interval = ticks_elapsed / INTERVAL_TICKS;
116 if current_interval > self.interval {
117 self.capacity = self.maximum_capacity;
121 self.interval = current_interval;
122 }
123
124 if capacity_request <= self.capacity {
130 self.capacity -= capacity_request;
131 Ok(0)
132 } else {
133 Ok(INTERVAL_TICKS - (ticks_elapsed % INTERVAL_TICKS))
134 }
135 }
136}
137
138#[cfg(test)]
139mod test {
140 use std::num::NonZeroU32;
141
142 use proptest::{collection, prelude::*};
143
144 use crate::stable::{Valve, INTERVAL_TICKS};
145
146 fn capacity_never_exceeds_max_in_interval_inner(
147 maximum_capacity: u32,
148 mut requests: Vec<NonZeroU32>,
149 ) -> Result<(), proptest::test_runner::TestCaseError> {
150 let mut valve = Valve::new(NonZeroU32::new(maximum_capacity).unwrap());
151 let maximum_capacity = u64::from(maximum_capacity);
152
153 let mut ticks_elapsed: u64 = 0;
154 let mut granted_requests: u64 = 0;
155 let mut interval: u64 = 0;
156
157 let mut slop = 0;
158 for request in requests.drain(..) {
159 ticks_elapsed += slop;
160
161 let current_interval = ticks_elapsed / INTERVAL_TICKS;
162 if interval < current_interval {
163 prop_assert!(granted_requests <= maximum_capacity,
166 "[interval-change] Granted requests {granted_requests} exceeded the maximum capacity of the valve, {maximum_capacity}");
167 granted_requests = 0;
168 interval = current_interval;
169 }
170
171 match valve.request(ticks_elapsed, request.get()) {
172 Ok(0) => {
173 granted_requests += u64::from(request.get());
175 slop = 0;
176 }
177 Ok(s) => {
178 slop = s;
183 }
184 Err(_) => {
185 }
187 }
188 prop_assert!(granted_requests <= maximum_capacity,
189 "[end] Granted requests {granted_requests} exceeded the maximum capacity of the valve, {maximum_capacity}");
190 }
191 Ok(())
192 }
193
194 #[test]
195 fn static_capacity_never_exceeds_max_in_interval() {
196 let maximum_capacity = 490301363u32;
197 let requests: Vec<NonZeroU32> = vec![
198 NonZeroU32::new(1).unwrap(),
199 NonZeroU32::new(490301363).unwrap(),
200 ];
201 capacity_never_exceeds_max_in_interval_inner(maximum_capacity, requests).unwrap()
202 }
203
204 fn cap_requests(max: u32) -> impl Strategy<Value = Vec<NonZeroU32>> {
205 collection::vec((1..max).prop_map(|i| NonZeroU32::new(i).unwrap()), 1..100)
206 }
207
208 proptest! {
211 #![proptest_config(ProptestConfig {
212 cases: 1_000_000,
213 max_shrink_iters: 1_000_000,
214 .. ProptestConfig::default()
215 })]
216 #[test]
217 fn capacity_never_exceeds_max_in_interval(
218 maximum_capacity in (1..u32::MAX),
219 requests in cap_requests(u16::MAX as u32)
220 ) {
221 capacity_never_exceeds_max_in_interval_inner(maximum_capacity, requests)?
222 }
223 }
224}
225
226#[cfg(kani)]
227mod verification {
228 use crate::stable::{Valve, INTERVAL_TICKS};
229 use std::num::NonZeroU32;
230
231 #[kani::proof]
234 #[kani::unwind(100)] fn capacity_never_exceeds_max_in_interval() {
236 let maximum_capacity: NonZeroU32 = kani::any();
237 let mut valve = Valve::new(maximum_capacity);
238 let maximum_capacity = maximum_capacity.get();
239
240 let mut ticks_elapsed: u64 = 0;
241 let mut granted_requests: u64 = 0;
242 let mut interval: u64 = 0;
243
244 let iters: usize = kani::any();
245 kani::assume(iters < 100);
246
247 let mut slop = 0;
248 for _ in 0..iters {
249 let request: NonZeroU32 = kani::any();
250 kani::assume(request.get() <= maximum_capacity);
251
252 ticks_elapsed += slop;
253
254 let current_interval = ticks_elapsed / INTERVAL_TICKS;
255 if interval < current_interval {
256 if granted_requests > u64::from(maximum_capacity) {
259 panic!("too many requests granted");
260 }
261 granted_requests = 0;
262 interval = current_interval;
263 }
264
265 match valve.request(ticks_elapsed, request.get()) {
266 Ok(0) => {
267 granted_requests += u64::from(request.get());
269 slop = 0;
270 }
271 Ok(s) => {
272 slop = s;
277 }
278 Err(_) => {
279 }
281 }
282 if granted_requests > u64::from(maximum_capacity) {
283 panic!("too many requests granted");
284 }
285 }
286 }
287}