skp_ratelimit/algorithm/
sliding_window.rs1use std::time::Duration;
4
5use crate::algorithm::{current_timestamp_ms, timestamp_to_instant, Algorithm};
6use crate::decision::{Decision, RateLimitInfo};
7use crate::error::Result;
8use crate::quota::Quota;
9use crate::storage::{Storage, StorageEntry};
10
11#[derive(Debug, Clone, Default)]
16pub struct SlidingWindow;
17
18impl SlidingWindow {
19 pub fn new() -> Self {
21 Self
22 }
23
24 fn window_start(&self, now: u64, window_ms: u64) -> u64 {
26 (now / window_ms) * window_ms
27 }
28
29 fn weighted_count(&self, current: u64, previous: u64, window_progress: f64) -> f64 {
31 current as f64 + (previous as f64 * (1.0 - window_progress))
32 }
33}
34
35impl Algorithm for SlidingWindow {
36 fn name(&self) -> &'static str {
37 "sliding_window"
38 }
39
40 async fn check_and_record<S: Storage>(
41 &self,
42 storage: &S,
43 key: &str,
44 quota: &Quota,
45 ) -> Result<Decision> {
46 let now = current_timestamp_ms();
47 let window_ms = quota.window().as_millis() as u64;
48 let window_start = self.window_start(now, window_ms);
49 let ttl = Duration::from_millis(window_ms * 2);
50 let limit = quota.max_requests();
51
52 let decision = storage
53 .execute_atomic(key, ttl, |entry| {
54 let (current_count, prev_count, entry_window) = match &entry {
55 Some(e) if e.window_start == window_start => {
56 (e.count, e.prev_count.unwrap_or(0), window_start)
57 }
58 Some(e) if e.window_start == window_start.saturating_sub(window_ms) => {
59 (0, e.count, window_start)
61 }
62 _ => (0, 0, window_start),
63 };
64
65 let window_progress = (now - window_start) as f64 / window_ms as f64;
66 let weighted = self.weighted_count(current_count, prev_count, window_progress);
67
68 if weighted < limit as f64 {
69 let new_entry = StorageEntry::new(current_count + 1, entry_window)
70 .set_prev_count(prev_count)
71 .set_last_update(now);
72
73 let remaining = (limit as f64 - weighted - 1.0).max(0.0) as u64;
74 let reset_at = timestamp_to_instant(window_start + window_ms);
75 let info = RateLimitInfo::new(limit, remaining, reset_at, timestamp_to_instant(window_start))
76 .with_algorithm("sliding_window");
77
78 (new_entry, Decision::allowed(info))
79 } else {
80 let new_entry = entry.unwrap_or_else(|| StorageEntry::new(current_count, window_start));
81
82 let reset_at = timestamp_to_instant(window_start + window_ms);
83 let retry_after = Duration::from_millis(window_start + window_ms - now);
84 let info = RateLimitInfo::new(limit, 0, reset_at, timestamp_to_instant(window_start))
85 .with_algorithm("sliding_window")
86 .with_retry_after(retry_after);
87
88 (new_entry, Decision::denied(info))
89 }
90 })
91 .await?;
92
93 Ok(decision)
94 }
95
96 async fn check<S: Storage>(
97 &self,
98 storage: &S,
99 key: &str,
100 quota: &Quota,
101 ) -> Result<Decision> {
102 let now = current_timestamp_ms();
103 let window_ms = quota.window().as_millis() as u64;
104 let window_start = self.window_start(now, window_ms);
105 let limit = quota.max_requests();
106
107 let entry = storage.get(key).await?;
108
109 let (current_count, prev_count) = match &entry {
110 Some(e) if e.window_start == window_start => {
111 (e.count, e.prev_count.unwrap_or(0))
112 }
113 Some(e) if e.window_start == window_start.saturating_sub(window_ms) => {
114 (0, e.count)
115 }
116 _ => (0, 0),
117 };
118
119 let window_progress = (now - window_start) as f64 / window_ms as f64;
120 let weighted = self.weighted_count(current_count, prev_count, window_progress);
121
122 let remaining = (limit as f64 - weighted).max(0.0) as u64;
123 let reset_at = timestamp_to_instant(window_start + window_ms);
124 let info = RateLimitInfo::new(limit, remaining, reset_at, timestamp_to_instant(window_start))
125 .with_algorithm("sliding_window");
126
127 Ok(if weighted < limit as f64 {
128 Decision::allowed(info)
129 } else {
130 let retry_after = Duration::from_millis(window_start + window_ms - now);
131 Decision::denied(info.with_retry_after(retry_after))
132 })
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use crate::storage::MemoryStorage;
140
141 #[tokio::test]
142 async fn test_sliding_window_basic() {
143 let algorithm = SlidingWindow::new();
144 let storage = MemoryStorage::new();
145 let quota = Quota::per_minute(5);
146
147 for i in 1..=5 {
148 let decision = algorithm.check_and_record(&storage, "user:1", "a).await.unwrap();
149 assert!(decision.is_allowed(), "Request {} should be allowed", i);
150 }
151
152 let decision = algorithm.check_and_record(&storage, "user:1", "a).await.unwrap();
153 assert!(decision.is_denied());
154 }
155}