stygian_graph/adapters/graphql_rate_limit.rs
1//! Sliding-window request-count rate limiter for GraphQL API targets.
2//!
3//! Limits outgoing requests to `max_requests` in any rolling `window` duration.
4//! Before each request [`rate_limit_acquire`] is called; it records the current
5//! timestamp and sleeps until the oldest in-window request expires if the
6//! window is already full.
7//!
8//! A server-returned `Retry-After` value can be applied via
9//! [`rate_limit_retry_after`], which imposes a hard block until the indicated
10//! instant irrespective of the sliding window state.
11//!
12//! Operates in parallel with [`graphql_throttle`](crate::adapters::graphql_throttle)
13//! (leaky-bucket cost throttle). Both can be active simultaneously.
14
15use std::collections::VecDeque;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use tokio::sync::Mutex;
20
21/// Re-export — canonical definition lives in the ports layer.
22pub use crate::ports::graphql_plugin::RateLimitConfig;
23
24// ─────────────────────────────────────────────────────────────────────────────
25// RequestWindow
26// ─────────────────────────────────────────────────────────────────────────────
27
28/// Mutable inner state — a sliding window of request timestamps plus an
29/// optional hard block set by a server-returned `Retry-After` header.
30#[derive(Debug)]
31struct RequestWindow {
32 timestamps: VecDeque<Instant>,
33 config: RateLimitConfig,
34 /// Hard block until this instant, set by [`record_retry_after`].
35 blocked_until: Option<Instant>,
36}
37
38impl RequestWindow {
39 fn new(config: &RateLimitConfig) -> Self {
40 Self {
41 timestamps: VecDeque::with_capacity(config.max_requests as usize),
42 config: config.clone(),
43 blocked_until: None,
44 }
45 }
46
47 /// Try to acquire a request slot.
48 ///
49 /// Prunes expired entries, then:
50 /// - Returns `None` if a slot is available (timestamp is recorded immediately).
51 /// - Returns `Some(wait)` with the duration the caller must sleep before
52 /// retrying (capped at `config.max_delay_ms`).
53 fn acquire(&mut self) -> Option<Duration> {
54 let now = Instant::now();
55 let window = self.config.window;
56 let max_delay = Duration::from_millis(self.config.max_delay_ms);
57
58 // Check hard block imposed by a previous Retry-After response.
59 if let Some(until) = self.blocked_until {
60 if until > now {
61 let wait = until.duration_since(now);
62 return Some(wait.min(max_delay));
63 }
64 self.blocked_until = None;
65 }
66
67 // Drop timestamps that have rolled out of the window.
68 while self
69 .timestamps
70 .front()
71 .is_some_and(|t| now.duration_since(*t) >= window)
72 {
73 self.timestamps.pop_front();
74 }
75
76 if self.timestamps.len() < self.config.max_requests as usize {
77 // Slot available — record and permit.
78 self.timestamps.push_back(now);
79 None
80 } else {
81 // Window is full; compute wait until the oldest entry rolls out.
82 let &oldest = self.timestamps.front()?;
83 let elapsed = now.duration_since(oldest);
84 let wait = window.saturating_sub(elapsed);
85 Some(wait.min(max_delay))
86 }
87 }
88
89 /// Set a hard block for `secs` seconds to honour a server-returned
90 /// `Retry-After` interval.
91 ///
92 /// A shorter `secs` value will never override a longer existing block.
93 fn record_retry_after(&mut self, secs: u64) {
94 let until = Instant::now() + Duration::from_secs(secs);
95 match self.blocked_until {
96 // Keep the later of the two blocks.
97 Some(existing) if existing >= until => {}
98 _ => self.blocked_until = Some(until),
99 }
100 }
101}
102
103// ─────────────────────────────────────────────────────────────────────────────
104// RequestRateLimit
105// ─────────────────────────────────────────────────────────────────────────────
106
107/// Shareable, cheaply-cloneable handle to a per-plugin sliding-window limiter.
108///
109/// # Example
110///
111/// ```rust
112/// use stygian_graph::adapters::graphql_rate_limit::{
113/// RateLimitConfig, RequestRateLimit, rate_limit_acquire,
114/// };
115///
116/// # async fn example() {
117/// let rl = RequestRateLimit::new(RateLimitConfig::default());
118/// rate_limit_acquire(&rl).await;
119/// // … send the request …
120/// # }
121/// ```
122#[derive(Clone, Debug)]
123pub struct RequestRateLimit {
124 inner: Arc<Mutex<RequestWindow>>,
125 config: RateLimitConfig,
126}
127
128impl RequestRateLimit {
129 /// Create a new `RequestRateLimit` from a [`RateLimitConfig`].
130 ///
131 /// # Example
132 ///
133 /// ```rust
134 /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
135 ///
136 /// let rl = RequestRateLimit::new(RateLimitConfig::default());
137 /// ```
138 #[must_use]
139 pub fn new(config: RateLimitConfig) -> Self {
140 let window = RequestWindow::new(&config);
141 Self {
142 inner: Arc::new(Mutex::new(window)),
143 config,
144 }
145 }
146
147 /// Return the [`RateLimitConfig`] this limiter was initialised from.
148 ///
149 /// # Example
150 ///
151 /// ```rust
152 /// use std::time::Duration;
153 /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
154 ///
155 /// let cfg = RateLimitConfig { max_requests: 50, ..Default::default() };
156 /// let rl = RequestRateLimit::new(cfg.clone());
157 /// assert_eq!(rl.config().max_requests, 50);
158 /// ```
159 #[must_use]
160 pub const fn config(&self) -> &RateLimitConfig {
161 &self.config
162 }
163}
164
165// ─────────────────────────────────────────────────────────────────────────────
166// Public API
167// ─────────────────────────────────────────────────────────────────────────────
168
169/// Sleep until a request slot is available within the rolling window, then
170/// record the slot.
171///
172/// If the window is full the function sleeps until the oldest in-window entry
173/// expires (capped at `config.max_delay_ms`). The `Mutex` guard is dropped
174/// before every `.await` call to preserve `Send` bounds.
175///
176/// # Example
177///
178/// ```rust
179/// use stygian_graph::adapters::graphql_rate_limit::{
180/// RateLimitConfig, RequestRateLimit, rate_limit_acquire,
181/// };
182/// use std::time::Duration;
183///
184/// # async fn example() {
185/// let rl = RequestRateLimit::new(RateLimitConfig {
186/// max_requests: 2,
187/// ..Default::default()
188/// });
189/// rate_limit_acquire(&rl).await;
190/// rate_limit_acquire(&rl).await;
191/// // Third call blocks until the window rolls forward.
192/// # }
193/// ```
194pub async fn rate_limit_acquire(rl: &RequestRateLimit) {
195 loop {
196 let delay = {
197 let mut guard = rl.inner.lock().await;
198 guard.acquire()
199 };
200 match delay {
201 None => return,
202 Some(d) => {
203 tracing::debug!(
204 delay_ms = d.as_millis(),
205 "rate limiter: window full, sleeping"
206 );
207 tokio::time::sleep(d).await;
208 }
209 }
210 }
211}
212
213/// Record a server-returned `Retry-After` delay.
214///
215/// Call this when the upstream API responds with HTTP 429 and a `Retry-After`
216/// header. Subsequent calls to [`rate_limit_acquire`] will block for at least
217/// `secs` seconds. A shorter `secs` value will never shorten an existing block.
218///
219/// # Example
220///
221/// ```rust
222/// use stygian_graph::adapters::graphql_rate_limit::{
223/// RateLimitConfig, RequestRateLimit, rate_limit_retry_after,
224/// };
225///
226/// # async fn example() {
227/// let rl = RequestRateLimit::new(RateLimitConfig::default());
228/// rate_limit_retry_after(&rl, 30).await;
229/// # }
230/// ```
231pub async fn rate_limit_retry_after(rl: &RequestRateLimit, retry_after_secs: u64) {
232 let mut guard = rl.inner.lock().await;
233 guard.record_retry_after(retry_after_secs);
234}
235
236/// Parse an integer `Retry-After` value from a header string.
237///
238/// Returns `None` if the value cannot be parsed as a non-negative integer.
239/// HTTP-date format is intentionally not supported.
240///
241/// # Example
242///
243/// ```rust
244/// use stygian_graph::adapters::graphql_rate_limit::parse_retry_after;
245///
246/// assert_eq!(parse_retry_after("30"), Some(30));
247/// assert_eq!(parse_retry_after("not-a-number"), None);
248/// ```
249#[must_use]
250pub fn parse_retry_after(value: &str) -> Option<u64> {
251 value.trim().parse::<u64>().ok()
252}
253
254// ─────────────────────────────────────────────────────────────────────────────
255// Tests
256// ─────────────────────────────────────────────────────────────────────────────
257
258#[cfg(test)]
259#[allow(clippy::unwrap_used)]
260mod tests {
261 use super::*;
262
263 fn cfg(max_requests: u32, window_secs: u64) -> RateLimitConfig {
264 RateLimitConfig {
265 max_requests,
266 window: Duration::from_secs(window_secs),
267 max_delay_ms: 60_000,
268 }
269 }
270
271 // 1. Window allows exactly max_requests without blocking.
272 #[test]
273 fn window_allows_up_to_max() {
274 let mut w = RequestWindow::new(&cfg(3, 60));
275 assert!(w.acquire().is_none(), "slot 1");
276 assert!(w.acquire().is_none(), "slot 2");
277 assert!(w.acquire().is_none(), "slot 3");
278 assert!(w.acquire().is_some(), "4th request must be blocked");
279 }
280
281 // 2. After the window expires the slot becomes available again.
282 #[test]
283 fn window_resets_after_expiry() {
284 let mut w = RequestWindow::new(&RateLimitConfig {
285 max_requests: 1,
286 window: Duration::from_millis(10),
287 max_delay_ms: 60_000,
288 });
289 assert!(w.acquire().is_none(), "first request");
290 std::thread::sleep(Duration::from_millis(25));
291 assert!(w.acquire().is_none(), "window should have expired");
292 }
293
294 // 3. Timestamps are recorded immediately so concurrent callers see a
295 // reduced slot count without waiting for the request to complete.
296 #[test]
297 fn timestamps_recorded_immediately() {
298 let mut w = RequestWindow::new(&cfg(2, 60));
299 w.acquire();
300 assert_eq!(w.timestamps.len(), 1);
301 w.acquire();
302 assert_eq!(w.timestamps.len(), 2);
303 }
304
305 // 4. record_retry_after blocks subsequent acquire calls.
306 #[test]
307 fn retry_after_blocks_further_requests() {
308 let mut w = RequestWindow::new(&cfg(100, 60));
309 w.record_retry_after(30);
310 assert!(
311 w.acquire().is_some(),
312 "Retry-After must block the next request"
313 );
314 }
315
316 // 5. A shorter Retry-After must not reduce an existing longer block.
317 #[test]
318 fn retry_after_does_not_shorten_existing_block() {
319 let mut w = RequestWindow::new(&cfg(100, 60));
320 w.record_retry_after(60);
321 let until_before = w.blocked_until.unwrap();
322 w.record_retry_after(1);
323 let until_after = w.blocked_until.unwrap();
324 assert!(
325 until_after >= until_before,
326 "shorter retry-after must not override the longer block"
327 );
328 }
329
330 // 6. parse_retry_after handles valid integers and rejects garbage.
331 #[test]
332 fn parse_retry_after_parses_integers() {
333 assert_eq!(parse_retry_after("42"), Some(42));
334 assert_eq!(parse_retry_after("0"), Some(0));
335 assert_eq!(parse_retry_after("not-a-number"), None);
336 assert_eq!(parse_retry_after(""), None);
337 assert_eq!(parse_retry_after(" 30 "), Some(30));
338 }
339}