skp_ratelimit/algorithm/
sliding_log.rs1use std::time::Duration;
7
8use crate::algorithm::{current_timestamp_ms, timestamp_to_instant, Algorithm};
9use crate::decision::{Decision, RateLimitInfo};
10use crate::error::Result;
11use crate::quota::Quota;
12use crate::storage::{Storage, StorageEntry};
13
14#[derive(Debug, Clone, Default)]
19pub struct SlidingLog;
20
21impl SlidingLog {
22 pub fn new() -> Self {
24 Self
25 }
26
27 fn filter_window(&self, timestamps: &[u64], window_start: u64) -> Vec<u64> {
29 timestamps
30 .iter()
31 .filter(|&&ts| ts >= window_start)
32 .copied()
33 .collect()
34 }
35}
36
37impl Algorithm for SlidingLog {
38 fn name(&self) -> &'static str {
39 "sliding_log"
40 }
41
42 async fn check_and_record<S: Storage>(
43 &self,
44 storage: &S,
45 key: &str,
46 quota: &Quota,
47 ) -> Result<Decision> {
48 let now = current_timestamp_ms();
49 let window_ms = quota.window().as_millis() as u64;
50 let window_start = now.saturating_sub(window_ms);
51 let ttl = Duration::from_millis(window_ms * 2);
52 let limit = quota.max_requests();
53
54 let decision = storage
55 .execute_atomic(key, ttl, |entry| {
56 let mut timestamps = entry
57 .and_then(|e| e.timestamps)
58 .unwrap_or_default();
59
60 timestamps = self.filter_window(×tamps, window_start);
62 let current_count = timestamps.len() as u64;
63
64 if current_count < limit {
65 timestamps.push(now);
66 let new_entry = StorageEntry::with_timestamps(timestamps);
67
68 let remaining = limit - current_count - 1;
69 let reset_at = timestamp_to_instant(now + window_ms);
70 let info = RateLimitInfo::new(limit, remaining, reset_at, timestamp_to_instant(window_start))
71 .with_algorithm("sliding_log");
72
73 (new_entry, Decision::allowed(info))
74 } else {
75 let new_entry = StorageEntry::with_timestamps(timestamps.clone());
76
77 let oldest = timestamps.first().copied().unwrap_or(now);
79 let retry_ms = oldest + window_ms - now;
80 let reset_at = timestamp_to_instant(oldest + window_ms);
81
82 let info = RateLimitInfo::new(limit, 0, reset_at, timestamp_to_instant(window_start))
83 .with_algorithm("sliding_log")
84 .with_retry_after(Duration::from_millis(retry_ms));
85
86 (new_entry, Decision::denied(info))
87 }
88 })
89 .await?;
90
91 Ok(decision)
92 }
93
94 async fn check<S: Storage>(
95 &self,
96 storage: &S,
97 key: &str,
98 quota: &Quota,
99 ) -> Result<Decision> {
100 let now = current_timestamp_ms();
101 let window_ms = quota.window().as_millis() as u64;
102 let window_start = now.saturating_sub(window_ms);
103 let limit = quota.max_requests();
104
105 let entry = storage.get(key).await?;
106 let timestamps = entry
107 .and_then(|e| e.timestamps)
108 .unwrap_or_default();
109
110 let filtered = self.filter_window(×tamps, window_start);
111 let current_count = filtered.len() as u64;
112
113 let remaining = limit.saturating_sub(current_count);
114 let reset_at = if let Some(&oldest) = filtered.first() {
115 timestamp_to_instant(oldest + window_ms)
116 } else {
117 timestamp_to_instant(now + window_ms)
118 };
119
120 let info = RateLimitInfo::new(limit, remaining, reset_at, timestamp_to_instant(window_start))
121 .with_algorithm("sliding_log");
122
123 Ok(if current_count < limit {
124 Decision::allowed(info)
125 } else {
126 let oldest = filtered.first().copied().unwrap_or(now);
127 let retry_ms = oldest + window_ms - now;
128 Decision::denied(info.with_retry_after(Duration::from_millis(retry_ms)))
129 })
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use crate::storage::MemoryStorage;
137
138 #[tokio::test]
139 async fn test_sliding_log_basic() {
140 let algorithm = SlidingLog::new();
141 let storage = MemoryStorage::new();
142 let quota = Quota::per_minute(5);
143
144 for i in 1..=5 {
145 let decision = algorithm.check_and_record(&storage, "user:1", "a).await.unwrap();
146 assert!(decision.is_allowed(), "Request {} should be allowed", i);
147 }
148
149 let decision = algorithm.check_and_record(&storage, "user:1", "a).await.unwrap();
150 assert!(decision.is_denied());
151 }
152
153 #[tokio::test]
154 async fn test_sliding_log_precision() {
155 let algorithm = SlidingLog::new();
156 let storage = MemoryStorage::new();
157 let quota = Quota::new(2, Duration::from_millis(200));
159
160 algorithm.check_and_record(&storage, "user:1", "a).await.unwrap();
161 tokio::time::sleep(Duration::from_millis(50)).await;
162 algorithm.check_and_record(&storage, "user:1", "a).await.unwrap();
163
164 let decision = algorithm.check_and_record(&storage, "user:1", "a).await.unwrap();
165 assert!(decision.is_denied());
166
167 tokio::time::sleep(Duration::from_millis(200)).await;
169
170 let decision = algorithm.check_and_record(&storage, "user:1", "a).await.unwrap();
171 assert!(decision.is_allowed());
172 }
173}