Skip to main content

stygian_graph/adapters/
graphql_rate_limit.rs

1//! Request-count rate limiter for GraphQL API targets with pluggable algorithms.
2//!
3//! Two strategies are supported via [`RateLimitStrategy`]:
4//!
5//! - **[`SlidingWindow`](RateLimitStrategy::SlidingWindow)** — limits outgoing requests to
6//!   `max_requests` in any rolling `window` duration.  Before each request
7//!   [`rate_limit_acquire`] is called; it records the current timestamp and sleeps
8//!   until the oldest in-window request expires if the window is already full.
9//!
10//! - **[`TokenBucket`](RateLimitStrategy::TokenBucket)** — refills tokens at a steady rate
11//!   (`max_requests / window`); short bursts are absorbed by the bucket capacity before
12//!   the rate is enforced.  Computes the exact wait time required to accumulate the next
13//!   token instead of sleeping speculatively.
14//!
15//! A server-returned `Retry-After` value can be applied via
16//! [`rate_limit_retry_after`], which imposes a hard block until the indicated
17//! instant irrespective of the active algorithm.
18//!
19//! Operates in parallel with [`graphql_throttle`](crate::adapters::graphql_throttle)
20//! (leaky-bucket cost throttle).  Both can be active simultaneously.
21
22use std::collections::VecDeque;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use tokio::sync::Mutex;
27
28/// Re-export — canonical definition lives in the ports layer.
29pub use crate::ports::graphql_plugin::RateLimitConfig;
30pub use crate::ports::graphql_plugin::RateLimitStrategy;
31
32// ─────────────────────────────────────────────────────────────────────────────
33// WindowState — per-strategy mutable inner state
34// ─────────────────────────────────────────────────────────────────────────────
35
36/// Per-strategy mutable state held inside [`RequestWindow`].
37#[derive(Debug)]
38enum WindowState {
39    /// A rolling `VecDeque` of request timestamps for the sliding-window algorithm.
40    Sliding { timestamps: VecDeque<Instant> },
41    /// Token count and last-refill timestamp for the token-bucket algorithm.
42    TokenBucket { tokens: f64, last_refill: Instant },
43}
44
45// ─────────────────────────────────────────────────────────────────────────────
46// RequestWindow
47// ─────────────────────────────────────────────────────────────────────────────
48
49/// Mutable inner state — algorithm-specific window data plus an optional hard
50/// block set by a server-returned `Retry-After` header.
51#[derive(Debug)]
52struct RequestWindow {
53    state: WindowState,
54    config: RateLimitConfig,
55    /// Hard block until this instant, set by [`record_retry_after`].
56    blocked_until: Option<Instant>,
57}
58
59impl RequestWindow {
60    fn new(config: &RateLimitConfig) -> Self {
61        let state = match config.strategy {
62            RateLimitStrategy::SlidingWindow => WindowState::Sliding {
63                timestamps: VecDeque::with_capacity(config.max_requests as usize),
64            },
65            RateLimitStrategy::TokenBucket => WindowState::TokenBucket {
66                tokens: f64::from(config.max_requests),
67                last_refill: Instant::now(),
68            },
69        };
70        Self {
71            state,
72            config: config.clone(),
73            blocked_until: None,
74        }
75    }
76
77    /// Try to acquire a request slot.
78    ///
79    /// - Returns `None` if a slot is available (the slot is claimed immediately).
80    /// - Returns `Some(wait)` with the duration the caller must sleep before
81    ///   retrying (capped at `config.max_delay_ms`).
82    fn acquire(&mut self) -> Option<Duration> {
83        let now = Instant::now();
84        let max_delay = Duration::from_millis(self.config.max_delay_ms);
85
86        // Check hard block imposed by a previous Retry-After response.
87        if let Some(until) = self.blocked_until {
88            if until > now {
89                let wait = until.duration_since(now);
90                return Some(wait.min(max_delay));
91            }
92            self.blocked_until = None;
93        }
94
95        match &mut self.state {
96            WindowState::Sliding { timestamps } => {
97                let window = self.config.window;
98                // Drop timestamps that have rolled out of the window.
99                while timestamps
100                    .front()
101                    .is_some_and(|t| now.duration_since(*t) >= window)
102                {
103                    timestamps.pop_front();
104                }
105
106                if timestamps.len() < self.config.max_requests as usize {
107                    // Slot available — record and permit.
108                    timestamps.push_back(now);
109                    None
110                } else {
111                    // Window is full; compute wait until the oldest entry rolls out.
112                    let &oldest = timestamps.front()?;
113                    let elapsed = now.duration_since(oldest);
114                    let wait = window.saturating_sub(elapsed);
115                    Some(wait.min(max_delay))
116                }
117            }
118
119            WindowState::TokenBucket {
120                tokens,
121                last_refill,
122            } => {
123                // Refill tokens proportional to elapsed time.
124                let elapsed = now.duration_since(*last_refill);
125                let rate = f64::from(self.config.max_requests) / self.config.window.as_secs_f64();
126                let refill = elapsed.as_secs_f64() * rate;
127                *tokens = (*tokens + refill).min(f64::from(self.config.max_requests));
128                *last_refill = now;
129
130                if *tokens >= 1.0 {
131                    *tokens -= 1.0;
132                    None
133                } else {
134                    // Compute exact wait until 1 token has accumulated.
135                    let wait_secs = (1.0 - *tokens) / rate;
136                    let wait = Duration::from_secs_f64(wait_secs);
137                    Some(wait.min(max_delay))
138                }
139            }
140        }
141    }
142
143    /// Set a hard block for `secs` seconds to honour a server-returned
144    /// `Retry-After` interval.
145    ///
146    /// A shorter `secs` value will never override a longer existing block.
147    fn record_retry_after(&mut self, secs: u64) {
148        let until = Instant::now() + Duration::from_secs(secs);
149        match self.blocked_until {
150            // Keep the later of the two blocks.
151            Some(existing) if existing >= until => {}
152            _ => self.blocked_until = Some(until),
153        }
154    }
155}
156
157// ─────────────────────────────────────────────────────────────────────────────
158// RequestRateLimit
159// ─────────────────────────────────────────────────────────────────────────────
160
161/// Shareable, cheaply-cloneable handle to a per-plugin sliding-window limiter.
162///
163/// # Example
164///
165/// ```rust
166/// use stygian_graph::adapters::graphql_rate_limit::{
167///     RateLimitConfig, RequestRateLimit, rate_limit_acquire,
168/// };
169///
170/// # async fn example() {
171/// let rl = RequestRateLimit::new(RateLimitConfig::default());
172/// rate_limit_acquire(&rl).await;
173/// // … send the request …
174/// # }
175/// ```
176#[derive(Clone, Debug)]
177pub struct RequestRateLimit {
178    inner: Arc<Mutex<RequestWindow>>,
179    config: RateLimitConfig,
180}
181
182impl RequestRateLimit {
183    /// Create a new `RequestRateLimit` from a [`RateLimitConfig`].
184    ///
185    /// # Example
186    ///
187    /// ```rust
188    /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
189    ///
190    /// let rl = RequestRateLimit::new(RateLimitConfig::default());
191    /// ```
192    #[must_use]
193    pub fn new(config: RateLimitConfig) -> Self {
194        let window = RequestWindow::new(&config);
195        Self {
196            inner: Arc::new(Mutex::new(window)),
197            config,
198        }
199    }
200
201    /// Return the [`RateLimitConfig`] this limiter was initialised from.
202    ///
203    /// # Example
204    ///
205    /// ```rust
206    /// use std::time::Duration;
207    /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
208    ///
209    /// let cfg = RateLimitConfig { max_requests: 50, ..Default::default() };
210    /// let rl = RequestRateLimit::new(cfg.clone());
211    /// assert_eq!(rl.config().max_requests, 50);
212    /// ```
213    #[must_use]
214    pub const fn config(&self) -> &RateLimitConfig {
215        &self.config
216    }
217}
218
219// ─────────────────────────────────────────────────────────────────────────────
220// Public API
221// ─────────────────────────────────────────────────────────────────────────────
222
223/// Sleep until a request slot is available within the rolling window, then
224/// record the slot.
225///
226/// If the window is full the function sleeps until the oldest in-window entry
227/// expires (capped at `config.max_delay_ms`).  The `Mutex` guard is dropped
228/// before every `.await` call to preserve `Send` bounds.
229///
230/// # Example
231///
232/// ```rust
233/// use stygian_graph::adapters::graphql_rate_limit::{
234///     RateLimitConfig, RequestRateLimit, rate_limit_acquire,
235/// };
236/// use std::time::Duration;
237///
238/// # async fn example() {
239/// let rl = RequestRateLimit::new(RateLimitConfig {
240///     max_requests: 2,
241///     ..Default::default()
242/// });
243/// rate_limit_acquire(&rl).await;
244/// rate_limit_acquire(&rl).await;
245/// // Third call blocks until the window rolls forward.
246/// # }
247/// ```
248pub async fn rate_limit_acquire(rl: &RequestRateLimit) {
249    loop {
250        let delay = {
251            let mut guard = rl.inner.lock().await;
252            guard.acquire()
253        };
254        match delay {
255            None => return,
256            Some(d) => {
257                tracing::debug!(
258                    delay_ms = d.as_millis(),
259                    "rate limiter: window full, sleeping"
260                );
261                tokio::time::sleep(d).await;
262            }
263        }
264    }
265}
266
267/// Record a server-returned `Retry-After` delay.
268///
269/// Call this when the upstream API responds with HTTP 429 and a `Retry-After`
270/// header.  Subsequent calls to [`rate_limit_acquire`] will block for at least
271/// `secs` seconds.  A shorter `secs` value will never shorten an existing block.
272///
273/// # Example
274///
275/// ```rust
276/// use stygian_graph::adapters::graphql_rate_limit::{
277///     RateLimitConfig, RequestRateLimit, rate_limit_retry_after,
278/// };
279///
280/// # async fn example() {
281/// let rl = RequestRateLimit::new(RateLimitConfig::default());
282/// rate_limit_retry_after(&rl, 30).await;
283/// # }
284/// ```
285pub async fn rate_limit_retry_after(rl: &RequestRateLimit, retry_after_secs: u64) {
286    let mut guard = rl.inner.lock().await;
287    guard.record_retry_after(retry_after_secs);
288}
289
290/// Parse an integer `Retry-After` value from a header string.
291///
292/// Returns `None` if the value cannot be parsed as a non-negative integer.
293/// HTTP-date format is intentionally not supported.
294///
295/// # Example
296///
297/// ```rust
298/// use stygian_graph::adapters::graphql_rate_limit::parse_retry_after;
299///
300/// assert_eq!(parse_retry_after("30"), Some(30));
301/// assert_eq!(parse_retry_after("not-a-number"), None);
302/// ```
303#[must_use]
304pub fn parse_retry_after(value: &str) -> Option<u64> {
305    value.trim().parse::<u64>().ok()
306}
307
308// ─────────────────────────────────────────────────────────────────────────────
309// Tests
310// ─────────────────────────────────────────────────────────────────────────────
311
312#[cfg(test)]
313#[allow(clippy::unwrap_used)]
314mod tests {
315    use super::*;
316
317    fn cfg(max_requests: u32, window_secs: u64) -> RateLimitConfig {
318        RateLimitConfig {
319            max_requests,
320            window: Duration::from_secs(window_secs),
321            max_delay_ms: 60_000,
322            strategy: RateLimitStrategy::SlidingWindow,
323        }
324    }
325
326    fn cfg_bucket(max_requests: u32, window_secs: u64) -> RateLimitConfig {
327        RateLimitConfig {
328            max_requests,
329            window: Duration::from_secs(window_secs),
330            max_delay_ms: 60_000,
331            strategy: RateLimitStrategy::TokenBucket,
332        }
333    }
334
335    // 1. Window allows exactly max_requests without blocking.
336    #[test]
337    fn window_allows_up_to_max() {
338        let mut w = RequestWindow::new(&cfg(3, 60));
339        assert!(w.acquire().is_none(), "slot 1");
340        assert!(w.acquire().is_none(), "slot 2");
341        assert!(w.acquire().is_none(), "slot 3");
342        assert!(w.acquire().is_some(), "4th request must be blocked");
343    }
344
345    // 2. After the window expires the slot becomes available again.
346    #[test]
347    fn window_resets_after_expiry() {
348        let mut w = RequestWindow::new(&RateLimitConfig {
349            max_requests: 1,
350            window: Duration::from_millis(10),
351            max_delay_ms: 60_000,
352            strategy: RateLimitStrategy::SlidingWindow,
353        });
354        assert!(w.acquire().is_none(), "first request");
355        std::thread::sleep(Duration::from_millis(25));
356        assert!(w.acquire().is_none(), "window should have expired");
357    }
358
359    // 3. Timestamps are recorded immediately so concurrent callers see a
360    //    reduced slot count without waiting for the request to complete.
361    #[test]
362    fn timestamps_recorded_immediately() {
363        let mut w = RequestWindow::new(&cfg(2, 60));
364        w.acquire();
365        w.acquire();
366        // After two acquisitions the window is full.
367        assert!(w.acquire().is_some(), "third request must be blocked");
368    }
369
370    // 4. record_retry_after blocks subsequent acquire calls.
371    #[test]
372    fn retry_after_blocks_further_requests() {
373        let mut w = RequestWindow::new(&cfg(100, 60));
374        w.record_retry_after(30);
375        assert!(
376            w.acquire().is_some(),
377            "Retry-After must block the next request"
378        );
379    }
380
381    // 5. A shorter Retry-After must not reduce an existing longer block.
382    #[test]
383    fn retry_after_does_not_shorten_existing_block() {
384        let mut w = RequestWindow::new(&cfg(100, 60));
385        w.record_retry_after(60);
386        let until_before = w.blocked_until.unwrap();
387        w.record_retry_after(1);
388        let until_after = w.blocked_until.unwrap();
389        assert!(
390            until_after >= until_before,
391            "shorter retry-after must not override the longer block"
392        );
393    }
394
395    // 6. parse_retry_after handles valid integers and rejects garbage.
396    #[test]
397    fn parse_retry_after_parses_integers() {
398        assert_eq!(parse_retry_after("42"), Some(42));
399        assert_eq!(parse_retry_after("0"), Some(0));
400        assert_eq!(parse_retry_after("not-a-number"), None);
401        assert_eq!(parse_retry_after(""), None);
402        assert_eq!(parse_retry_after("  30  "), Some(30));
403    }
404
405    // ── Token-bucket strategy tests ──────────────────────────────────────────
406
407    // 7. Token bucket allows up to max_requests immediately (full bucket).
408    #[test]
409    fn token_bucket_allows_up_to_max() {
410        let mut w = RequestWindow::new(&cfg_bucket(3, 60));
411        assert!(w.acquire().is_none(), "token 1");
412        assert!(w.acquire().is_none(), "token 2");
413        assert!(w.acquire().is_none(), "token 3");
414        assert!(
415            w.acquire().is_some(),
416            "4th request must be blocked — bucket empty"
417        );
418    }
419
420    // 8. Token bucket refills over time.
421    #[test]
422    fn token_bucket_refills_after_delay() {
423        let mut w = RequestWindow::new(&RateLimitConfig {
424            // 1 token per 10 ms
425            max_requests: 1,
426            window: Duration::from_millis(10),
427            max_delay_ms: 60_000,
428            strategy: RateLimitStrategy::TokenBucket,
429        });
430        assert!(w.acquire().is_none(), "first request consumes the token");
431        assert!(w.acquire().is_some(), "bucket empty — must block");
432        std::thread::sleep(Duration::from_millis(20));
433        assert!(w.acquire().is_none(), "bucket should have refilled");
434    }
435
436    // 9. Token bucket also respects Retry-After hard blocks.
437    #[test]
438    fn token_bucket_respects_retry_after() {
439        let mut w = RequestWindow::new(&cfg_bucket(100, 60));
440        w.record_retry_after(30);
441        assert!(
442            w.acquire().is_some(),
443            "Retry-After must block even with tokens available"
444        );
445    }
446
447    // 10. Token bucket wait duration is proportional to the deficit.
448    #[test]
449    fn token_bucket_wait_is_proportional() {
450        // 60 requests / 60 s = 1 token/s.  After draining the bucket the wait
451        // for a single token should be ≈ 1 s.
452        let mut w = RequestWindow::new(&RateLimitConfig {
453            max_requests: 1,
454            window: Duration::from_secs(1),
455            max_delay_ms: 60_000,
456            strategy: RateLimitStrategy::TokenBucket,
457        });
458        w.acquire(); // consume the only token
459        let wait = w.acquire().unwrap();
460        // Wait should be close to 1 s; allow generous tolerance for slow CI.
461        assert!(
462            wait <= Duration::from_secs(1),
463            "wait {wait:?} should not exceed 1 s"
464        );
465        assert!(
466            wait >= Duration::from_millis(800),
467            "wait {wait:?} should be close to 1 s"
468        );
469    }
470}