stygian_graph/adapters/graphql_rate_limit.rs
1//! Request-count rate limiter for GraphQL API targets with pluggable algorithms.
2//!
3//! Two strategies are supported via [`RateLimitStrategy`]:
4//!
5//! - **[`SlidingWindow`](RateLimitStrategy::SlidingWindow)** — limits outgoing requests to
6//! `max_requests` in any rolling `window` duration. Before each request
7//! [`rate_limit_acquire`] is called; it records the current timestamp and sleeps
8//! until the oldest in-window request expires if the window is already full.
9//!
10//! - **[`TokenBucket`](RateLimitStrategy::TokenBucket)** — refills tokens at a steady rate
11//! (`max_requests / window`); short bursts are absorbed by the bucket capacity before
12//! the rate is enforced. Computes the exact wait time required to accumulate the next
13//! token instead of sleeping speculatively.
14//!
15//! A server-returned `Retry-After` value can be applied via
16//! [`rate_limit_retry_after`], which imposes a hard block until the indicated
17//! instant irrespective of the active algorithm.
18//!
19//! Operates in parallel with [`graphql_throttle`](crate::adapters::graphql_throttle)
20//! (leaky-bucket cost throttle). Both can be active simultaneously.
21
22use std::collections::VecDeque;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use tokio::sync::Mutex;
27
28/// Re-export — canonical definition lives in the ports layer.
29pub use crate::ports::graphql_plugin::RateLimitConfig;
30pub use crate::ports::graphql_plugin::RateLimitStrategy;
31
32// ─────────────────────────────────────────────────────────────────────────────
33// WindowState — per-strategy mutable inner state
34// ─────────────────────────────────────────────────────────────────────────────
35
36/// Per-strategy mutable state held inside [`RequestWindow`].
37#[derive(Debug)]
38enum WindowState {
39 /// A rolling `VecDeque` of request timestamps for the sliding-window algorithm.
40 Sliding { timestamps: VecDeque<Instant> },
41 /// Token count and last-refill timestamp for the token-bucket algorithm.
42 TokenBucket { tokens: f64, last_refill: Instant },
43}
44
45// ─────────────────────────────────────────────────────────────────────────────
46// RequestWindow
47// ─────────────────────────────────────────────────────────────────────────────
48
49/// Mutable inner state — algorithm-specific window data plus an optional hard
50/// block set by a server-returned `Retry-After` header.
51#[derive(Debug)]
52struct RequestWindow {
53 state: WindowState,
54 config: RateLimitConfig,
55 /// Hard block until this instant, set by [`record_retry_after`].
56 blocked_until: Option<Instant>,
57}
58
59impl RequestWindow {
60 fn new(config: &RateLimitConfig) -> Self {
61 let state = match config.strategy {
62 RateLimitStrategy::SlidingWindow => WindowState::Sliding {
63 timestamps: VecDeque::with_capacity(config.max_requests as usize),
64 },
65 RateLimitStrategy::TokenBucket => WindowState::TokenBucket {
66 tokens: f64::from(config.max_requests),
67 last_refill: Instant::now(),
68 },
69 };
70 Self {
71 state,
72 config: config.clone(),
73 blocked_until: None,
74 }
75 }
76
77 /// Try to acquire a request slot.
78 ///
79 /// - Returns `None` if a slot is available (the slot is claimed immediately).
80 /// - Returns `Some(wait)` with the duration the caller must sleep before
81 /// retrying (capped at `config.max_delay_ms`).
82 fn acquire(&mut self) -> Option<Duration> {
83 let now = Instant::now();
84 let max_delay = Duration::from_millis(self.config.max_delay_ms);
85
86 // Check hard block imposed by a previous Retry-After response.
87 if let Some(until) = self.blocked_until {
88 if until > now {
89 let wait = until.duration_since(now);
90 return Some(wait.min(max_delay));
91 }
92 self.blocked_until = None;
93 }
94
95 match &mut self.state {
96 WindowState::Sliding { timestamps } => {
97 let window = self.config.window;
98 // Drop timestamps that have rolled out of the window.
99 while timestamps
100 .front()
101 .is_some_and(|t| now.duration_since(*t) >= window)
102 {
103 timestamps.pop_front();
104 }
105
106 if timestamps.len() < self.config.max_requests as usize {
107 // Slot available — record and permit.
108 timestamps.push_back(now);
109 None
110 } else {
111 // Window is full; compute wait until the oldest entry rolls out.
112 let &oldest = timestamps.front()?;
113 let elapsed = now.duration_since(oldest);
114 let wait = window.saturating_sub(elapsed);
115 Some(wait.min(max_delay))
116 }
117 }
118
119 WindowState::TokenBucket {
120 tokens,
121 last_refill,
122 } => {
123 // Refill tokens proportional to elapsed time.
124 let elapsed = now.duration_since(*last_refill);
125 let rate = f64::from(self.config.max_requests) / self.config.window.as_secs_f64();
126 let refill = elapsed.as_secs_f64() * rate;
127 *tokens = (*tokens + refill).min(f64::from(self.config.max_requests));
128 *last_refill = now;
129
130 if *tokens >= 1.0 {
131 *tokens -= 1.0;
132 None
133 } else {
134 // Compute exact wait until 1 token has accumulated.
135 let wait_secs = (1.0 - *tokens) / rate;
136 let wait = Duration::from_secs_f64(wait_secs);
137 Some(wait.min(max_delay))
138 }
139 }
140 }
141 }
142
143 /// Set a hard block for `secs` seconds to honour a server-returned
144 /// `Retry-After` interval.
145 ///
146 /// A shorter `secs` value will never override a longer existing block.
147 fn record_retry_after(&mut self, secs: u64) {
148 let until = Instant::now() + Duration::from_secs(secs);
149 match self.blocked_until {
150 // Keep the later of the two blocks.
151 Some(existing) if existing >= until => {}
152 _ => self.blocked_until = Some(until),
153 }
154 }
155}
156
157// ─────────────────────────────────────────────────────────────────────────────
158// RequestRateLimit
159// ─────────────────────────────────────────────────────────────────────────────
160
161/// Shareable, cheaply-cloneable handle to a per-plugin sliding-window limiter.
162///
163/// # Example
164///
165/// ```rust
166/// use stygian_graph::adapters::graphql_rate_limit::{
167/// RateLimitConfig, RequestRateLimit, rate_limit_acquire,
168/// };
169///
170/// # async fn example() {
171/// let rl = RequestRateLimit::new(RateLimitConfig::default());
172/// rate_limit_acquire(&rl).await;
173/// // … send the request …
174/// # }
175/// ```
176#[derive(Clone, Debug)]
177pub struct RequestRateLimit {
178 inner: Arc<Mutex<RequestWindow>>,
179 config: RateLimitConfig,
180}
181
182impl RequestRateLimit {
183 /// Create a new `RequestRateLimit` from a [`RateLimitConfig`].
184 ///
185 /// # Example
186 ///
187 /// ```rust
188 /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
189 ///
190 /// let rl = RequestRateLimit::new(RateLimitConfig::default());
191 /// ```
192 #[must_use]
193 pub fn new(config: RateLimitConfig) -> Self {
194 let window = RequestWindow::new(&config);
195 Self {
196 inner: Arc::new(Mutex::new(window)),
197 config,
198 }
199 }
200
201 /// Return the [`RateLimitConfig`] this limiter was initialised from.
202 ///
203 /// # Example
204 ///
205 /// ```rust
206 /// use std::time::Duration;
207 /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
208 ///
209 /// let cfg = RateLimitConfig { max_requests: 50, ..Default::default() };
210 /// let rl = RequestRateLimit::new(cfg.clone());
211 /// assert_eq!(rl.config().max_requests, 50);
212 /// ```
213 #[must_use]
214 pub const fn config(&self) -> &RateLimitConfig {
215 &self.config
216 }
217}
218
219// ─────────────────────────────────────────────────────────────────────────────
220// Public API
221// ─────────────────────────────────────────────────────────────────────────────
222
223/// Sleep until a request slot is available within the rolling window, then
224/// record the slot.
225///
226/// If the window is full the function sleeps until the oldest in-window entry
227/// expires (capped at `config.max_delay_ms`). The `Mutex` guard is dropped
228/// before every `.await` call to preserve `Send` bounds.
229///
230/// # Example
231///
232/// ```rust
233/// use stygian_graph::adapters::graphql_rate_limit::{
234/// RateLimitConfig, RequestRateLimit, rate_limit_acquire,
235/// };
236/// use std::time::Duration;
237///
238/// # async fn example() {
239/// let rl = RequestRateLimit::new(RateLimitConfig {
240/// max_requests: 2,
241/// ..Default::default()
242/// });
243/// rate_limit_acquire(&rl).await;
244/// rate_limit_acquire(&rl).await;
245/// // Third call blocks until the window rolls forward.
246/// # }
247/// ```
248pub async fn rate_limit_acquire(rl: &RequestRateLimit) {
249 loop {
250 let delay = {
251 let mut guard = rl.inner.lock().await;
252 guard.acquire()
253 };
254 match delay {
255 None => return,
256 Some(d) => {
257 tracing::debug!(
258 delay_ms = d.as_millis(),
259 "rate limiter: window full, sleeping"
260 );
261 tokio::time::sleep(d).await;
262 }
263 }
264 }
265}
266
267/// Record a server-returned `Retry-After` delay.
268///
269/// Call this when the upstream API responds with HTTP 429 and a `Retry-After`
270/// header. Subsequent calls to [`rate_limit_acquire`] will block for at least
271/// `secs` seconds. A shorter `secs` value will never shorten an existing block.
272///
273/// # Example
274///
275/// ```rust
276/// use stygian_graph::adapters::graphql_rate_limit::{
277/// RateLimitConfig, RequestRateLimit, rate_limit_retry_after,
278/// };
279///
280/// # async fn example() {
281/// let rl = RequestRateLimit::new(RateLimitConfig::default());
282/// rate_limit_retry_after(&rl, 30).await;
283/// # }
284/// ```
285pub async fn rate_limit_retry_after(rl: &RequestRateLimit, retry_after_secs: u64) {
286 let mut guard = rl.inner.lock().await;
287 guard.record_retry_after(retry_after_secs);
288}
289
290/// Parse an integer `Retry-After` value from a header string.
291///
292/// Returns `None` if the value cannot be parsed as a non-negative integer.
293/// HTTP-date format is intentionally not supported.
294///
295/// # Example
296///
297/// ```rust
298/// use stygian_graph::adapters::graphql_rate_limit::parse_retry_after;
299///
300/// assert_eq!(parse_retry_after("30"), Some(30));
301/// assert_eq!(parse_retry_after("not-a-number"), None);
302/// ```
303#[must_use]
304pub fn parse_retry_after(value: &str) -> Option<u64> {
305 value.trim().parse::<u64>().ok()
306}
307
308// ─────────────────────────────────────────────────────────────────────────────
309// Tests
310// ─────────────────────────────────────────────────────────────────────────────
311
312#[cfg(test)]
313#[allow(clippy::unwrap_used)]
314mod tests {
315 use super::*;
316
317 fn cfg(max_requests: u32, window_secs: u64) -> RateLimitConfig {
318 RateLimitConfig {
319 max_requests,
320 window: Duration::from_secs(window_secs),
321 max_delay_ms: 60_000,
322 strategy: RateLimitStrategy::SlidingWindow,
323 }
324 }
325
326 fn cfg_bucket(max_requests: u32, window_secs: u64) -> RateLimitConfig {
327 RateLimitConfig {
328 max_requests,
329 window: Duration::from_secs(window_secs),
330 max_delay_ms: 60_000,
331 strategy: RateLimitStrategy::TokenBucket,
332 }
333 }
334
335 // 1. Window allows exactly max_requests without blocking.
336 #[test]
337 fn window_allows_up_to_max() {
338 let mut w = RequestWindow::new(&cfg(3, 60));
339 assert!(w.acquire().is_none(), "slot 1");
340 assert!(w.acquire().is_none(), "slot 2");
341 assert!(w.acquire().is_none(), "slot 3");
342 assert!(w.acquire().is_some(), "4th request must be blocked");
343 }
344
345 // 2. After the window expires the slot becomes available again.
346 #[test]
347 fn window_resets_after_expiry() {
348 let mut w = RequestWindow::new(&RateLimitConfig {
349 max_requests: 1,
350 window: Duration::from_millis(10),
351 max_delay_ms: 60_000,
352 strategy: RateLimitStrategy::SlidingWindow,
353 });
354 assert!(w.acquire().is_none(), "first request");
355 std::thread::sleep(Duration::from_millis(25));
356 assert!(w.acquire().is_none(), "window should have expired");
357 }
358
359 // 3. Timestamps are recorded immediately so concurrent callers see a
360 // reduced slot count without waiting for the request to complete.
361 #[test]
362 fn timestamps_recorded_immediately() {
363 let mut w = RequestWindow::new(&cfg(2, 60));
364 w.acquire();
365 w.acquire();
366 // After two acquisitions the window is full.
367 assert!(w.acquire().is_some(), "third request must be blocked");
368 }
369
370 // 4. record_retry_after blocks subsequent acquire calls.
371 #[test]
372 fn retry_after_blocks_further_requests() {
373 let mut w = RequestWindow::new(&cfg(100, 60));
374 w.record_retry_after(30);
375 assert!(
376 w.acquire().is_some(),
377 "Retry-After must block the next request"
378 );
379 }
380
381 // 5. A shorter Retry-After must not reduce an existing longer block.
382 #[test]
383 fn retry_after_does_not_shorten_existing_block() {
384 let mut w = RequestWindow::new(&cfg(100, 60));
385 w.record_retry_after(60);
386 let until_before = w.blocked_until.unwrap();
387 w.record_retry_after(1);
388 let until_after = w.blocked_until.unwrap();
389 assert!(
390 until_after >= until_before,
391 "shorter retry-after must not override the longer block"
392 );
393 }
394
395 // 6. parse_retry_after handles valid integers and rejects garbage.
396 #[test]
397 fn parse_retry_after_parses_integers() {
398 assert_eq!(parse_retry_after("42"), Some(42));
399 assert_eq!(parse_retry_after("0"), Some(0));
400 assert_eq!(parse_retry_after("not-a-number"), None);
401 assert_eq!(parse_retry_after(""), None);
402 assert_eq!(parse_retry_after(" 30 "), Some(30));
403 }
404
405 // ── Token-bucket strategy tests ──────────────────────────────────────────
406
407 // 7. Token bucket allows up to max_requests immediately (full bucket).
408 #[test]
409 fn token_bucket_allows_up_to_max() {
410 let mut w = RequestWindow::new(&cfg_bucket(3, 60));
411 assert!(w.acquire().is_none(), "token 1");
412 assert!(w.acquire().is_none(), "token 2");
413 assert!(w.acquire().is_none(), "token 3");
414 assert!(
415 w.acquire().is_some(),
416 "4th request must be blocked — bucket empty"
417 );
418 }
419
420 // 8. Token bucket refills over time.
421 #[test]
422 fn token_bucket_refills_after_delay() {
423 let mut w = RequestWindow::new(&RateLimitConfig {
424 // 1 token per 10 ms
425 max_requests: 1,
426 window: Duration::from_millis(10),
427 max_delay_ms: 60_000,
428 strategy: RateLimitStrategy::TokenBucket,
429 });
430 assert!(w.acquire().is_none(), "first request consumes the token");
431 assert!(w.acquire().is_some(), "bucket empty — must block");
432 std::thread::sleep(Duration::from_millis(20));
433 assert!(w.acquire().is_none(), "bucket should have refilled");
434 }
435
436 // 9. Token bucket also respects Retry-After hard blocks.
437 #[test]
438 fn token_bucket_respects_retry_after() {
439 let mut w = RequestWindow::new(&cfg_bucket(100, 60));
440 w.record_retry_after(30);
441 assert!(
442 w.acquire().is_some(),
443 "Retry-After must block even with tokens available"
444 );
445 }
446
447 // 10. Token bucket wait duration is proportional to the deficit.
448 #[test]
449 fn token_bucket_wait_is_proportional() {
450 // 60 requests / 60 s = 1 token/s. After draining the bucket the wait
451 // for a single token should be ≈ 1 s.
452 let mut w = RequestWindow::new(&RateLimitConfig {
453 max_requests: 1,
454 window: Duration::from_secs(1),
455 max_delay_ms: 60_000,
456 strategy: RateLimitStrategy::TokenBucket,
457 });
458 w.acquire(); // consume the only token
459 let wait = w.acquire().unwrap();
460 // Wait should be close to 1 s; allow generous tolerance for slow CI.
461 assert!(
462 wait <= Duration::from_secs(1),
463 "wait {wait:?} should not exceed 1 s"
464 );
465 assert!(
466 wait >= Duration::from_millis(800),
467 "wait {wait:?} should be close to 1 s"
468 );
469 }
470}