1use core::time::Duration;
14use std::collections::VecDeque;
15use std::sync::{Mutex, MutexGuard, PoisonError};
16
17use clock_lib::{Clock, Monotonic, SystemClock};
18
19use crate::decision::Decision;
20#[cfg(feature = "runtime")]
21use crate::error::ThrottleError;
22use crate::limiter::Limiter;
23
24#[derive(Clone, Copy)]
27struct Grant {
28 at_ms: u64,
29 count: u32,
30}
31
32struct Log {
34 grants: VecDeque<Grant>,
36 used: u32,
38}
39
40pub struct SlidingWindowLog<C = SystemClock>
62where
63 C: Clock,
64{
65 limit: u32,
66 window: Duration,
67 log: Mutex<Log>,
68 clock: C,
69 epoch: Monotonic,
70}
71
72impl SlidingWindowLog<SystemClock> {
73 #[must_use]
88 pub fn new(limit: u32, window: Duration) -> Self {
89 Self::with_clock_inner(limit, window, SystemClock::new())
90 }
91
92 #[must_use]
103 pub fn per_second(rate: u32) -> Self {
104 Self::new(rate, Duration::from_secs(1))
105 }
106}
107
108impl<C> SlidingWindowLog<C>
109where
110 C: Clock + Clone,
111{
112 fn with_clock_inner(limit: u32, window: Duration, clock: C) -> Self {
113 let epoch = clock.now();
114 Self {
115 limit,
116 window,
117 log: Mutex::new(Log {
118 grants: VecDeque::new(),
119 used: 0,
120 }),
121 clock,
122 epoch,
123 }
124 }
125
126 #[must_use]
147 pub fn with_clock<C2>(self, clock: C2) -> SlidingWindowLog<C2>
148 where
149 C2: Clock + Clone,
150 {
151 SlidingWindowLog::with_clock_inner(self.limit, self.window, clock)
152 }
153
154 #[inline]
155 fn lock(&self) -> MutexGuard<'_, Log> {
156 self.log.lock().unwrap_or_else(PoisonError::into_inner)
157 }
158
159 #[inline]
160 fn now_ms(&self) -> u64 {
161 let elapsed = self.clock.now().saturating_duration_since(self.epoch);
162 u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX)
163 }
164
165 #[inline]
166 fn window_ms(&self) -> u64 {
167 u64::try_from(self.window.as_millis()).unwrap_or(u64::MAX)
168 }
169
170 fn prune(log: &mut Log, now_ms: u64, window_ms: u64) {
176 while let Some(front) = log.grants.front() {
177 if front.at_ms.saturating_add(window_ms) <= now_ms {
178 log.used = log.used.saturating_sub(front.count);
179 let _ = log.grants.pop_front();
180 } else {
181 break;
182 }
183 }
184 }
185
186 fn wait_for(log: &Log, now_ms: u64, window_ms: u64, needed: u32) -> Duration {
188 let mut freed = 0u32;
189 for grant in &log.grants {
190 freed = freed.saturating_add(grant.count);
191 if freed >= needed {
192 let ready_at = grant.at_ms.saturating_add(window_ms);
193 return Duration::from_millis(ready_at.saturating_sub(now_ms));
194 }
195 }
196 Duration::from_millis(window_ms)
198 }
199
200 fn decide(&self, cost: u32) -> Decision {
202 if cost > self.limit {
203 return Decision::Impossible;
204 }
205 if cost == 0 {
206 return Decision::Acquired;
207 }
208 let now_ms = self.now_ms();
209 let window_ms = self.window_ms();
210 let mut log = self.lock();
211 Self::prune(&mut log, now_ms, window_ms);
212 if log.used + cost <= self.limit {
213 log.used += cost;
214 log.grants.push_back(Grant {
215 at_ms: now_ms,
216 count: cost,
217 });
218 Decision::Acquired
219 } else {
220 let needed = log.used + cost - self.limit;
221 Decision::Retry {
222 after: Self::wait_for(&log, now_ms, window_ms, needed),
223 }
224 }
225 }
226
227 #[inline]
229 #[must_use]
230 pub fn try_acquire(&self) -> bool {
231 self.decide(1).is_acquired()
232 }
233
234 #[inline]
236 #[must_use]
237 pub fn try_acquire_with_cost(&self, cost: u32) -> bool {
238 self.decide(cost).is_acquired()
239 }
240
241 #[must_use]
243 pub fn peek(&self, cost: u32) -> Decision {
244 if cost > self.limit {
245 return Decision::Impossible;
246 }
247 if cost == 0 {
248 return Decision::Acquired;
249 }
250 let now_ms = self.now_ms();
251 let window_ms = self.window_ms();
252 let mut log = self.lock();
253 Self::prune(&mut log, now_ms, window_ms);
254 if log.used + cost <= self.limit {
255 Decision::Acquired
256 } else {
257 let needed = log.used + cost - self.limit;
258 Decision::Retry {
259 after: Self::wait_for(&log, now_ms, window_ms, needed),
260 }
261 }
262 }
263
264 #[must_use]
266 pub fn available(&self) -> u32 {
267 let now_ms = self.now_ms();
268 let window_ms = self.window_ms();
269 let mut log = self.lock();
270 Self::prune(&mut log, now_ms, window_ms);
271 self.limit.saturating_sub(log.used)
272 }
273
274 #[inline]
276 #[must_use]
277 pub fn capacity(&self) -> u32 {
278 self.limit
279 }
280}
281
282#[cfg(feature = "runtime")]
283#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
284impl<C> SlidingWindowLog<C>
285where
286 C: Clock + Clone,
287{
288 pub async fn acquire(&self) -> Result<(), ThrottleError> {
294 self.acquire_with_cost(1).await
295 }
296
297 pub async fn acquire_with_cost(&self, cost: u32) -> Result<(), ThrottleError> {
304 loop {
305 match self.decide(cost) {
306 Decision::Acquired => return Ok(()),
307 Decision::Impossible => {
308 return Err(ThrottleError::CostExceedsCapacity {
309 cost,
310 capacity: self.limit,
311 });
312 }
313 Decision::Retry { after } => crate::rt::sleep(after).await,
314 }
315 }
316 }
317}
318
319impl<C> Limiter for SlidingWindowLog<C>
320where
321 C: Clock + Clone + Send + Sync,
322{
323 #[inline]
324 fn peek(&self, cost: u32) -> Decision {
325 SlidingWindowLog::peek(self, cost)
326 }
327
328 #[inline]
329 fn acquire_cost(&self, cost: u32) -> Decision {
330 self.decide(cost)
331 }
332
333 #[inline]
334 fn available(&self) -> u32 {
335 SlidingWindowLog::available(self)
336 }
337
338 #[inline]
339 fn capacity(&self) -> u32 {
340 self.limit
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 #![allow(clippy::unwrap_used, clippy::expect_used)]
347
348 use super::SlidingWindowLog;
349 use crate::limiter::Limiter;
350 use clock_lib::ManualClock;
351 use core::time::Duration;
352 use std::sync::Arc;
353
354 fn assert_send_sync<T: Send + Sync>() {}
355
356 #[test]
357 fn test_is_send_sync() {
358 assert_send_sync::<SlidingWindowLog>();
359 }
360
361 #[test]
362 fn test_admits_up_to_limit_then_refuses() {
363 let limiter = SlidingWindowLog::new(3, Duration::from_secs(1));
364 assert!(limiter.try_acquire());
365 assert!(limiter.try_acquire());
366 assert!(limiter.try_acquire());
367 assert!(!limiter.try_acquire());
368 assert_eq!(limiter.available(), 0);
369 }
370
371 #[test]
372 fn test_window_slides_exactly() {
373 let clock = Arc::new(ManualClock::new());
374 let limiter = SlidingWindowLog::new(2, Duration::from_secs(1)).with_clock(clock.clone());
375
376 assert!(limiter.try_acquire()); clock.advance(Duration::from_millis(600));
378 assert!(limiter.try_acquire()); assert!(!limiter.try_acquire()); clock.advance(Duration::from_millis(401));
384 assert!(limiter.try_acquire());
385 assert!(!limiter.try_acquire());
386 }
387
388 #[test]
389 fn test_no_boundary_burst() {
390 let clock = Arc::new(ManualClock::new());
394 let limiter = SlidingWindowLog::new(3, Duration::from_secs(1)).with_clock(clock.clone());
395
396 clock.advance(Duration::from_millis(900));
397 for _ in 0..3 {
398 assert!(limiter.try_acquire()); }
400 clock.advance(Duration::from_millis(200)); assert!(!limiter.try_acquire()); }
403
404 #[test]
405 fn test_cost_aware_and_impossible() {
406 let limiter = SlidingWindowLog::new(5, Duration::from_secs(1));
407 assert!(limiter.try_acquire_with_cost(4));
408 assert!(!limiter.try_acquire_with_cost(4)); assert!(limiter.try_acquire_with_cost(1));
410 assert_eq!(
412 SlidingWindowLog::new(5, Duration::from_secs(1)).peek(9),
413 crate::Decision::Impossible
414 );
415 }
416
417 #[test]
418 fn test_peek_does_not_record() {
419 let limiter = SlidingWindowLog::new(2, Duration::from_secs(1));
420 assert!(limiter.peek(2).is_acquired());
421 assert_eq!(limiter.available(), 2); }
423
424 #[test]
425 fn test_retry_after_points_to_oldest_expiry() {
426 let clock = Arc::new(ManualClock::new());
427 let limiter = SlidingWindowLog::new(1, Duration::from_secs(1)).with_clock(clock.clone());
428 assert!(limiter.try_acquire()); let after = limiter
430 .peek(1)
431 .retry_after()
432 .expect("should suggest a wait");
433 assert_eq!(after, Duration::from_secs(1));
435 }
436
437 #[test]
438 fn test_works_as_a_limiter_trait_object() {
439 let limiter = SlidingWindowLog::new(2, Duration::from_secs(1));
440 let dyn_limiter: &dyn Limiter = &limiter;
441 assert_eq!(dyn_limiter.capacity(), 2);
442 assert!(dyn_limiter.acquire_cost(1).is_acquired());
443 assert_eq!(dyn_limiter.available(), 1);
444 }
445}