Skip to main content

stygian_graph/adapters/
graphql_rate_limit.rs

1//! Sliding-window request-count rate limiter for GraphQL API targets.
2//!
3//! Limits outgoing requests to `max_requests` in any rolling `window` duration.
4//! Before each request [`rate_limit_acquire`] is called; it records the current
5//! timestamp and sleeps until the oldest in-window request expires if the
6//! window is already full.
7//!
8//! A server-returned `Retry-After` value can be applied via
9//! [`rate_limit_retry_after`], which imposes a hard block until the indicated
10//! instant irrespective of the sliding window state.
11//!
12//! Operates in parallel with [`graphql_throttle`](crate::adapters::graphql_throttle)
13//! (leaky-bucket cost throttle).  Both can be active simultaneously.
14
15use std::collections::VecDeque;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use tokio::sync::Mutex;
20
21/// Re-export — canonical definition lives in the ports layer.
22pub use crate::ports::graphql_plugin::RateLimitConfig;
23
24// ─────────────────────────────────────────────────────────────────────────────
25// RequestWindow
26// ─────────────────────────────────────────────────────────────────────────────
27
28/// Mutable inner state — a sliding window of request timestamps plus an
29/// optional hard block set by a server-returned `Retry-After` header.
30#[derive(Debug)]
31struct RequestWindow {
32    timestamps: VecDeque<Instant>,
33    config: RateLimitConfig,
34    /// Hard block until this instant, set by [`record_retry_after`].
35    blocked_until: Option<Instant>,
36}
37
38impl RequestWindow {
39    fn new(config: &RateLimitConfig) -> Self {
40        Self {
41            timestamps: VecDeque::with_capacity(config.max_requests as usize),
42            config: config.clone(),
43            blocked_until: None,
44        }
45    }
46
47    /// Try to acquire a request slot.
48    ///
49    /// Prunes expired entries, then:
50    /// - Returns `None` if a slot is available (timestamp is recorded immediately).
51    /// - Returns `Some(wait)` with the duration the caller must sleep before
52    ///   retrying (capped at `config.max_delay_ms`).
53    fn acquire(&mut self) -> Option<Duration> {
54        let now = Instant::now();
55        let window = self.config.window;
56        let max_delay = Duration::from_millis(self.config.max_delay_ms);
57
58        // Check hard block imposed by a previous Retry-After response.
59        if let Some(until) = self.blocked_until {
60            if until > now {
61                let wait = until.duration_since(now);
62                return Some(wait.min(max_delay));
63            }
64            self.blocked_until = None;
65        }
66
67        // Drop timestamps that have rolled out of the window.
68        while self
69            .timestamps
70            .front()
71            .is_some_and(|t| now.duration_since(*t) >= window)
72        {
73            self.timestamps.pop_front();
74        }
75
76        if self.timestamps.len() < self.config.max_requests as usize {
77            // Slot available — record and permit.
78            self.timestamps.push_back(now);
79            None
80        } else {
81            // Window is full; compute wait until the oldest entry rolls out.
82            let &oldest = self.timestamps.front()?;
83            let elapsed = now.duration_since(oldest);
84            let wait = window.saturating_sub(elapsed);
85            Some(wait.min(max_delay))
86        }
87    }
88
89    /// Set a hard block for `secs` seconds to honour a server-returned
90    /// `Retry-After` interval.
91    ///
92    /// A shorter `secs` value will never override a longer existing block.
93    fn record_retry_after(&mut self, secs: u64) {
94        let until = Instant::now() + Duration::from_secs(secs);
95        match self.blocked_until {
96            // Keep the later of the two blocks.
97            Some(existing) if existing >= until => {}
98            _ => self.blocked_until = Some(until),
99        }
100    }
101}
102
103// ─────────────────────────────────────────────────────────────────────────────
104// RequestRateLimit
105// ─────────────────────────────────────────────────────────────────────────────
106
107/// Shareable, cheaply-cloneable handle to a per-plugin sliding-window limiter.
108///
109/// # Example
110///
111/// ```rust
112/// use stygian_graph::adapters::graphql_rate_limit::{
113///     RateLimitConfig, RequestRateLimit, rate_limit_acquire,
114/// };
115///
116/// # async fn example() {
117/// let rl = RequestRateLimit::new(RateLimitConfig::default());
118/// rate_limit_acquire(&rl).await;
119/// // … send the request …
120/// # }
121/// ```
122#[derive(Clone, Debug)]
123pub struct RequestRateLimit {
124    inner: Arc<Mutex<RequestWindow>>,
125    config: RateLimitConfig,
126}
127
128impl RequestRateLimit {
129    /// Create a new `RequestRateLimit` from a [`RateLimitConfig`].
130    ///
131    /// # Example
132    ///
133    /// ```rust
134    /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
135    ///
136    /// let rl = RequestRateLimit::new(RateLimitConfig::default());
137    /// ```
138    #[must_use]
139    pub fn new(config: RateLimitConfig) -> Self {
140        let window = RequestWindow::new(&config);
141        Self {
142            inner: Arc::new(Mutex::new(window)),
143            config,
144        }
145    }
146
147    /// Return the [`RateLimitConfig`] this limiter was initialised from.
148    ///
149    /// # Example
150    ///
151    /// ```rust
152    /// use std::time::Duration;
153    /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
154    ///
155    /// let cfg = RateLimitConfig { max_requests: 50, ..Default::default() };
156    /// let rl = RequestRateLimit::new(cfg.clone());
157    /// assert_eq!(rl.config().max_requests, 50);
158    /// ```
159    #[must_use]
160    pub const fn config(&self) -> &RateLimitConfig {
161        &self.config
162    }
163}
164
165// ─────────────────────────────────────────────────────────────────────────────
166// Public API
167// ─────────────────────────────────────────────────────────────────────────────
168
169/// Sleep until a request slot is available within the rolling window, then
170/// record the slot.
171///
172/// If the window is full the function sleeps until the oldest in-window entry
173/// expires (capped at `config.max_delay_ms`).  The `Mutex` guard is dropped
174/// before every `.await` call to preserve `Send` bounds.
175///
176/// # Example
177///
178/// ```rust
179/// use stygian_graph::adapters::graphql_rate_limit::{
180///     RateLimitConfig, RequestRateLimit, rate_limit_acquire,
181/// };
182/// use std::time::Duration;
183///
184/// # async fn example() {
185/// let rl = RequestRateLimit::new(RateLimitConfig {
186///     max_requests: 2,
187///     ..Default::default()
188/// });
189/// rate_limit_acquire(&rl).await;
190/// rate_limit_acquire(&rl).await;
191/// // Third call blocks until the window rolls forward.
192/// # }
193/// ```
194pub async fn rate_limit_acquire(rl: &RequestRateLimit) {
195    loop {
196        let delay = {
197            let mut guard = rl.inner.lock().await;
198            guard.acquire()
199        };
200        match delay {
201            None => return,
202            Some(d) => {
203                tracing::debug!(
204                    delay_ms = d.as_millis(),
205                    "rate limiter: window full, sleeping"
206                );
207                tokio::time::sleep(d).await;
208            }
209        }
210    }
211}
212
213/// Record a server-returned `Retry-After` delay.
214///
215/// Call this when the upstream API responds with HTTP 429 and a `Retry-After`
216/// header.  Subsequent calls to [`rate_limit_acquire`] will block for at least
217/// `secs` seconds.  A shorter `secs` value will never shorten an existing block.
218///
219/// # Example
220///
221/// ```rust
222/// use stygian_graph::adapters::graphql_rate_limit::{
223///     RateLimitConfig, RequestRateLimit, rate_limit_retry_after,
224/// };
225///
226/// # async fn example() {
227/// let rl = RequestRateLimit::new(RateLimitConfig::default());
228/// rate_limit_retry_after(&rl, 30).await;
229/// # }
230/// ```
231pub async fn rate_limit_retry_after(rl: &RequestRateLimit, retry_after_secs: u64) {
232    let mut guard = rl.inner.lock().await;
233    guard.record_retry_after(retry_after_secs);
234}
235
236/// Parse an integer `Retry-After` value from a header string.
237///
238/// Returns `None` if the value cannot be parsed as a non-negative integer.
239/// HTTP-date format is intentionally not supported.
240///
241/// # Example
242///
243/// ```rust
244/// use stygian_graph::adapters::graphql_rate_limit::parse_retry_after;
245///
246/// assert_eq!(parse_retry_after("30"), Some(30));
247/// assert_eq!(parse_retry_after("not-a-number"), None);
248/// ```
249#[must_use]
250pub fn parse_retry_after(value: &str) -> Option<u64> {
251    value.trim().parse::<u64>().ok()
252}
253
254// ─────────────────────────────────────────────────────────────────────────────
255// Tests
256// ─────────────────────────────────────────────────────────────────────────────
257
258#[cfg(test)]
259#[allow(clippy::unwrap_used)]
260mod tests {
261    use super::*;
262
263    fn cfg(max_requests: u32, window_secs: u64) -> RateLimitConfig {
264        RateLimitConfig {
265            max_requests,
266            window: Duration::from_secs(window_secs),
267            max_delay_ms: 60_000,
268        }
269    }
270
271    // 1. Window allows exactly max_requests without blocking.
272    #[test]
273    fn window_allows_up_to_max() {
274        let mut w = RequestWindow::new(&cfg(3, 60));
275        assert!(w.acquire().is_none(), "slot 1");
276        assert!(w.acquire().is_none(), "slot 2");
277        assert!(w.acquire().is_none(), "slot 3");
278        assert!(w.acquire().is_some(), "4th request must be blocked");
279    }
280
281    // 2. After the window expires the slot becomes available again.
282    #[test]
283    fn window_resets_after_expiry() {
284        let mut w = RequestWindow::new(&RateLimitConfig {
285            max_requests: 1,
286            window: Duration::from_millis(10),
287            max_delay_ms: 60_000,
288        });
289        assert!(w.acquire().is_none(), "first request");
290        std::thread::sleep(Duration::from_millis(25));
291        assert!(w.acquire().is_none(), "window should have expired");
292    }
293
294    // 3. Timestamps are recorded immediately so concurrent callers see a
295    //    reduced slot count without waiting for the request to complete.
296    #[test]
297    fn timestamps_recorded_immediately() {
298        let mut w = RequestWindow::new(&cfg(2, 60));
299        w.acquire();
300        assert_eq!(w.timestamps.len(), 1);
301        w.acquire();
302        assert_eq!(w.timestamps.len(), 2);
303    }
304
305    // 4. record_retry_after blocks subsequent acquire calls.
306    #[test]
307    fn retry_after_blocks_further_requests() {
308        let mut w = RequestWindow::new(&cfg(100, 60));
309        w.record_retry_after(30);
310        assert!(
311            w.acquire().is_some(),
312            "Retry-After must block the next request"
313        );
314    }
315
316    // 5. A shorter Retry-After must not reduce an existing longer block.
317    #[test]
318    fn retry_after_does_not_shorten_existing_block() {
319        let mut w = RequestWindow::new(&cfg(100, 60));
320        w.record_retry_after(60);
321        let until_before = w.blocked_until.unwrap();
322        w.record_retry_after(1);
323        let until_after = w.blocked_until.unwrap();
324        assert!(
325            until_after >= until_before,
326            "shorter retry-after must not override the longer block"
327        );
328    }
329
330    // 6. parse_retry_after handles valid integers and rejects garbage.
331    #[test]
332    fn parse_retry_after_parses_integers() {
333        assert_eq!(parse_retry_after("42"), Some(42));
334        assert_eq!(parse_retry_after("0"), Some(0));
335        assert_eq!(parse_retry_after("not-a-number"), None);
336        assert_eq!(parse_retry_after(""), None);
337        assert_eq!(parse_retry_after("  30  "), Some(30));
338    }
339}