Skip to main content

throttle_net/
provider.rs

1//! Parse rate-limit response headers and reconcile a limiter with the server.
2//!
3//! Downstreams advertise their own view of your remaining budget in response
4//! headers — and every major provider spells it differently. [`HeaderProfile`]
5//! captures one provider's convention; a set of ready-made profiles
6//! ([`HeaderProfile::OPENAI`], [`HeaderProfile::GITHUB`], …) covers the common
7//! ones, and [`parse`](HeaderProfile::parse) turns a header set into a normalized
8//! [`RateLimitInfo`].
9//!
10//! Parsing is defensive: unrecognized or malformed values are dropped, never a
11//! panic. [`RateLimitInfo::sync_requests`] then reconciles a [`Throttle`] with the
12//! server's reported remaining count — only ever *reducing* the local budget, so
13//! synchronization can never raise it above the hard limit.
14
15use core::time::Duration;
16use std::time::{SystemTime, UNIX_EPOCH};
17
18use clock_lib::Clock;
19
20use crate::retry_after::parse_retry_after_at;
21use crate::throttle::Throttle;
22use crate::timeutil::civil_to_unix;
23
24/// One metered dimension's reported window: its ceiling, what is left, and how
25/// long until it refills. Any field may be absent if the server did not send it.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub struct Window {
28    /// The dimension's ceiling for the window.
29    pub limit: Option<u64>,
30    /// Units remaining in the current window.
31    pub remaining: Option<u64>,
32    /// Time until the window resets.
33    pub reset: Option<Duration>,
34}
35
36/// A normalized view of the rate-limit headers on a response.
37///
38/// Providers that meter requests and tokens separately (the LLM APIs) populate
39/// both [`requests`](Self::requests) and [`tokens`](Self::tokens); single-limit
40/// providers populate only `requests`. [`retry_after`](Self::retry_after) carries
41/// a `Retry-After` when present.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43pub struct RateLimitInfo {
44    /// The request-count window, if the response carried one.
45    pub requests: Option<Window>,
46    /// The token window, if the response carried one (LLM providers).
47    pub tokens: Option<Window>,
48    /// The `Retry-After` delay, if present.
49    pub retry_after: Option<Duration>,
50}
51
52impl RateLimitInfo {
53    /// Reconciles `throttle` with the server's reported requests-remaining,
54    /// draining tokens so the local available count does not exceed it.
55    ///
56    /// This only ever *reduces* the local budget — it never adds tokens — so it
57    /// corrects client/server drift without ever raising the throttle above its
58    /// hard capacity. Returns the number of tokens drained.
59    ///
60    /// # Examples
61    ///
62    /// ```
63    /// use throttle_net::Throttle;
64    /// use throttle_net::provider::{RateLimitInfo, Window};
65    ///
66    /// let throttle = Throttle::per_second(100); // locally believes 100 are free
67    /// let info = RateLimitInfo {
68    ///     requests: Some(Window { remaining: Some(10), ..Window::default() }),
69    ///     ..RateLimitInfo::default()
70    /// };
71    /// let drained = info.sync_requests(&throttle);
72    /// assert_eq!(drained, 90);
73    /// assert_eq!(throttle.available(), 10); // now matches the server
74    /// ```
75    pub fn sync_requests<C: Clock + Clone>(&self, throttle: &Throttle<C>) -> u32 {
76        drain_to(throttle, self.requests.and_then(|w| w.remaining))
77    }
78
79    /// Reconciles `throttle` with the server's reported tokens-remaining, the same
80    /// way as [`sync_requests`](Self::sync_requests). Returns the tokens drained.
81    pub fn sync_tokens<C: Clock + Clone>(&self, throttle: &Throttle<C>) -> u32 {
82        drain_to(throttle, self.tokens.and_then(|w| w.remaining))
83    }
84}
85
86/// Drains `throttle` down to `remaining`, never adding. Returns the count drained.
87fn drain_to<C: Clock + Clone>(throttle: &Throttle<C>, remaining: Option<u64>) -> u32 {
88    let Some(remaining) = remaining else {
89        return 0;
90    };
91    let remaining = u32::try_from(remaining).unwrap_or(u32::MAX);
92    let available = throttle.available();
93    if remaining >= available {
94        return 0;
95    }
96    let excess = available - remaining;
97    if throttle.try_acquire_with_cost(excess) {
98        excess
99    } else {
100        0
101    }
102}
103
104/// How a provider encodes the "reset" value of a window.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106enum ResetFormat {
107    /// Whole seconds until the window resets (the IETF `RateLimit` draft).
108    DeltaSeconds,
109    /// A duration string such as `1s`, `6m0s`, or `100ms` (OpenAI).
110    DurationString,
111    /// An absolute Unix timestamp in seconds (GitHub).
112    UnixSeconds,
113    /// An absolute RFC 3339 / ISO 8601 instant (Anthropic).
114    Rfc3339,
115}
116
117/// The limit/remaining/reset header names for one dimension.
118#[derive(Debug, Clone, Copy)]
119struct Triple {
120    limit: &'static str,
121    remaining: &'static str,
122    reset: &'static str,
123}
124
125/// One provider's header convention.
126///
127/// Use a built-in profile ([`HeaderProfile::OPENAI`] and friends) and call
128/// [`parse`](Self::parse).
129#[derive(Debug, Clone, Copy)]
130pub struct HeaderProfile {
131    requests: Option<Triple>,
132    tokens: Option<Triple>,
133    retry_after: Option<&'static str>,
134    reset: ResetFormat,
135}
136
137impl HeaderProfile {
138    /// OpenAI: `x-ratelimit-{limit,remaining,reset}-{requests,tokens}`, reset as a
139    /// duration string, plus `retry-after`.
140    pub const OPENAI: Self = Self {
141        requests: Some(Triple {
142            limit: "x-ratelimit-limit-requests",
143            remaining: "x-ratelimit-remaining-requests",
144            reset: "x-ratelimit-reset-requests",
145        }),
146        tokens: Some(Triple {
147            limit: "x-ratelimit-limit-tokens",
148            remaining: "x-ratelimit-remaining-tokens",
149            reset: "x-ratelimit-reset-tokens",
150        }),
151        retry_after: Some("retry-after"),
152        reset: ResetFormat::DurationString,
153    };
154
155    /// Anthropic: `anthropic-ratelimit-{requests,tokens}-{limit,remaining,reset}`,
156    /// reset as an RFC 3339 instant, plus `retry-after`.
157    pub const ANTHROPIC: Self = Self {
158        requests: Some(Triple {
159            limit: "anthropic-ratelimit-requests-limit",
160            remaining: "anthropic-ratelimit-requests-remaining",
161            reset: "anthropic-ratelimit-requests-reset",
162        }),
163        tokens: Some(Triple {
164            limit: "anthropic-ratelimit-tokens-limit",
165            remaining: "anthropic-ratelimit-tokens-remaining",
166            reset: "anthropic-ratelimit-tokens-reset",
167        }),
168        retry_after: Some("retry-after"),
169        reset: ResetFormat::Rfc3339,
170    };
171
172    /// GitHub: `x-ratelimit-{limit,remaining,reset}`, reset as an absolute Unix
173    /// timestamp, plus `retry-after`.
174    pub const GITHUB: Self = Self {
175        requests: Some(Triple {
176            limit: "x-ratelimit-limit",
177            remaining: "x-ratelimit-remaining",
178            reset: "x-ratelimit-reset",
179        }),
180        tokens: None,
181        retry_after: Some("retry-after"),
182        reset: ResetFormat::UnixSeconds,
183    };
184
185    /// The IETF `RateLimit` header draft: `RateLimit-{Limit,Remaining,Reset}`,
186    /// reset as delta-seconds, plus `Retry-After`. A reasonable default for
187    /// standards-compliant or unknown APIs.
188    pub const RFC: Self = Self {
189        requests: Some(Triple {
190            limit: "ratelimit-limit",
191            remaining: "ratelimit-remaining",
192            reset: "ratelimit-reset",
193        }),
194        tokens: None,
195        retry_after: Some("retry-after"),
196        reset: ResetFormat::DeltaSeconds,
197    };
198
199    /// Stripe: no standard rate-limit headers; it signals back-off with
200    /// `Retry-After` on a 429.
201    pub const STRIPE: Self = Self {
202        requests: None,
203        tokens: None,
204        retry_after: Some("retry-after"),
205        reset: ResetFormat::DeltaSeconds,
206    };
207
208    /// AWS: like Stripe, back-off is signalled with `Retry-After`.
209    pub const AWS: Self = Self {
210        requests: None,
211        tokens: None,
212        retry_after: Some("retry-after"),
213        reset: ResetFormat::DeltaSeconds,
214    };
215
216    /// Parses `headers` into a [`RateLimitInfo`], using the system clock to
217    /// resolve absolute reset timestamps.
218    ///
219    /// `headers` is a slice of `(name, value)` pairs; lookups are
220    /// case-insensitive.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use throttle_net::provider::HeaderProfile;
226    ///
227    /// let headers = [
228    ///     ("x-ratelimit-limit-requests", "100"),
229    ///     ("x-ratelimit-remaining-requests", "42"),
230    ///     ("x-ratelimit-reset-requests", "1s"),
231    /// ];
232    /// let info = HeaderProfile::OPENAI.parse(&headers);
233    /// let requests = info.requests.unwrap();
234    /// assert_eq!(requests.limit, Some(100));
235    /// assert_eq!(requests.remaining, Some(42));
236    /// ```
237    #[must_use]
238    pub fn parse(&self, headers: &[(&str, &str)]) -> RateLimitInfo {
239        self.parse_at(headers, current_unix_secs())
240    }
241
242    /// Parses `headers` relative to an explicit current time (Unix seconds), for
243    /// deterministic tests and absolute-timestamp providers.
244    #[must_use]
245    pub fn parse_at(&self, headers: &[(&str, &str)], now_unix_secs: i64) -> RateLimitInfo {
246        RateLimitInfo {
247            requests: self
248                .requests
249                .and_then(|t| self.window(headers, &t, now_unix_secs)),
250            tokens: self
251                .tokens
252                .and_then(|t| self.window(headers, &t, now_unix_secs)),
253            retry_after: self
254                .retry_after
255                .and_then(|name| header(headers, name))
256                .and_then(|value| parse_retry_after_at(value, now_unix_secs)),
257        }
258    }
259
260    /// Builds a [`Window`] from a triple, or `None` if none of its headers are
261    /// present.
262    fn window(&self, headers: &[(&str, &str)], triple: &Triple, now: i64) -> Option<Window> {
263        let limit = header(headers, triple.limit).and_then(parse_u64);
264        let remaining = header(headers, triple.remaining).and_then(parse_u64);
265        let reset = header(headers, triple.reset).and_then(|v| self.parse_reset(v, now));
266        if limit.is_none() && remaining.is_none() && reset.is_none() {
267            return None;
268        }
269        Some(Window {
270            limit,
271            remaining,
272            reset,
273        })
274    }
275
276    /// Parses a reset value into a time-until-reset, per this profile's format.
277    fn parse_reset(&self, value: &str, now: i64) -> Option<Duration> {
278        match self.reset {
279            ResetFormat::DeltaSeconds => value.trim().parse::<u64>().ok().map(Duration::from_secs),
280            ResetFormat::DurationString => parse_duration_string(value.trim()),
281            ResetFormat::UnixSeconds => value
282                .trim()
283                .parse::<i64>()
284                .ok()
285                // Saturating: an extreme reset timestamp or `now` must not overflow
286                // the subtraction (a past reset already clamps to zero below).
287                .map(|at| Duration::from_secs(u64::try_from(at.saturating_sub(now)).unwrap_or(0))),
288            ResetFormat::Rfc3339 => parse_rfc3339(value.trim())
289                .map(|at| Duration::from_secs(u64::try_from(at.saturating_sub(now)).unwrap_or(0))),
290        }
291    }
292}
293
294/// Case-insensitive header lookup.
295fn header<'a>(headers: &'a [(&str, &str)], name: &str) -> Option<&'a str> {
296    headers
297        .iter()
298        .find(|(k, _)| k.eq_ignore_ascii_case(name))
299        .map(|(_, v)| *v)
300}
301
302/// Parses a trimmed non-negative integer.
303fn parse_u64(value: &str) -> Option<u64> {
304    value.trim().parse::<u64>().ok()
305}
306
307/// Parses a Go-style duration string (`1s`, `6m0s`, `100ms`, `1h2m3s`), or a bare
308/// integer as seconds, into a [`Duration`].
309fn parse_duration_string(value: &str) -> Option<Duration> {
310    if value.is_empty() {
311        return None;
312    }
313    // A bare number is treated as seconds.
314    if let Ok(secs) = value.parse::<u64>() {
315        return Some(Duration::from_secs(secs));
316    }
317
318    let bytes = value.as_bytes();
319    let mut total = Duration::ZERO;
320    let mut i = 0;
321    let mut saw_unit = false;
322    while i < bytes.len() {
323        // Read the numeric part.
324        let start = i;
325        while i < bytes.len() && bytes[i].is_ascii_digit() {
326            i += 1;
327        }
328        if i == start {
329            return None; // expected a number
330        }
331        let number: u64 = value.get(start..i)?.parse().ok()?;
332        // Read the unit (longest first: "ms" before "m"/"s").
333        let unit_start = i;
334        while i < bytes.len() && bytes[i].is_ascii_alphabetic() {
335            i += 1;
336        }
337        let unit = value.get(unit_start..i)?;
338        let part = match unit {
339            "ms" => Duration::from_millis(number),
340            "s" => Duration::from_secs(number),
341            "m" => Duration::from_secs(number.saturating_mul(60)),
342            "h" => Duration::from_secs(number.saturating_mul(3600)),
343            _ => return None,
344        };
345        total = total.saturating_add(part);
346        saw_unit = true;
347    }
348    saw_unit.then_some(total)
349}
350
351/// Parses an RFC 3339 / ISO 8601 UTC instant (`2026-01-01T00:00:00Z`, with an
352/// optional fractional second) into Unix seconds. Only the `Z` (UTC) form is
353/// accepted; other offsets return `None`.
354fn parse_rfc3339(value: &str) -> Option<i64> {
355    let (date, rest) = value.split_once('T')?;
356    // Strip the zone: require a trailing 'Z'.
357    let time = rest.strip_suffix('Z').or_else(|| rest.strip_suffix('z'))?;
358    // Drop any fractional-second part.
359    let time = time.split('.').next()?;
360
361    let mut d = date.split('-');
362    let year = d.next()?.parse::<i64>().ok()?;
363    let month = d.next()?.parse::<u32>().ok()?;
364    let day = d.next()?.parse::<u32>().ok()?;
365    if d.next().is_some() {
366        return None;
367    }
368
369    let mut t = time.split(':');
370    let h = t.next()?.parse::<u32>().ok()?;
371    let m = t.next()?.parse::<u32>().ok()?;
372    let s = t.next()?.parse::<u32>().ok()?;
373    if t.next().is_some() {
374        return None;
375    }
376
377    civil_to_unix(year, month, day, h, m, s)
378}
379
380/// Current time as Unix seconds, saturating.
381fn current_unix_secs() -> i64 {
382    SystemTime::now()
383        .duration_since(UNIX_EPOCH)
384        .map_or(0, |d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX))
385}
386
387#[cfg(test)]
388mod tests {
389    #![allow(clippy::unwrap_used)]
390
391    use super::{HeaderProfile, RateLimitInfo, Window, parse_duration_string, parse_rfc3339};
392    use crate::throttle::Throttle;
393    use core::time::Duration;
394
395    #[test]
396    fn test_openai_recorded_headers() {
397        // A representative OpenAI response header set.
398        let headers = [
399            ("x-ratelimit-limit-requests", "5000"),
400            ("x-ratelimit-remaining-requests", "4999"),
401            ("x-ratelimit-reset-requests", "12ms"),
402            ("x-ratelimit-limit-tokens", "160000"),
403            ("x-ratelimit-remaining-tokens", "159952"),
404            ("x-ratelimit-reset-tokens", "6m0s"),
405        ];
406        let info = HeaderProfile::OPENAI.parse_at(&headers, 0);
407        let req = info.requests.unwrap();
408        assert_eq!(req.limit, Some(5000));
409        assert_eq!(req.remaining, Some(4999));
410        assert_eq!(req.reset, Some(Duration::from_millis(12)));
411        let tok = info.tokens.unwrap();
412        assert_eq!(tok.limit, Some(160_000));
413        assert_eq!(tok.remaining, Some(159_952));
414        assert_eq!(tok.reset, Some(Duration::from_secs(360)));
415    }
416
417    #[test]
418    fn test_anthropic_recorded_headers_rfc3339_reset() {
419        // Anthropic reports an absolute RFC 3339 reset instant.
420        let headers = [
421            ("anthropic-ratelimit-requests-limit", "50"),
422            ("anthropic-ratelimit-requests-remaining", "49"),
423            ("anthropic-ratelimit-requests-reset", "2026-01-01T00:01:00Z"),
424            ("anthropic-ratelimit-tokens-limit", "40000"),
425            ("anthropic-ratelimit-tokens-remaining", "39000"),
426            ("anthropic-ratelimit-tokens-reset", "2026-01-01T00:01:00Z"),
427        ];
428        // now = 2026-01-01T00:00:00Z = 1_767_225_600; reset is 60s later.
429        let info = HeaderProfile::ANTHROPIC.parse_at(&headers, 1_767_225_600);
430        assert_eq!(info.requests.unwrap().remaining, Some(49));
431        assert_eq!(info.requests.unwrap().reset, Some(Duration::from_secs(60)));
432        assert_eq!(info.tokens.unwrap().remaining, Some(39000));
433    }
434
435    #[test]
436    fn test_github_recorded_headers_unix_reset() {
437        let headers = [
438            ("X-RateLimit-Limit", "60"),
439            ("X-RateLimit-Remaining", "57"),
440            ("X-RateLimit-Reset", "1767225660"), // 2026-01-01T00:01:00Z
441            ("X-RateLimit-Used", "3"),
442        ];
443        // Case-insensitive header names; now 60s before the reset.
444        let info = HeaderProfile::GITHUB.parse_at(&headers, 1_767_225_600);
445        let req = info.requests.unwrap();
446        assert_eq!(req.limit, Some(60));
447        assert_eq!(req.remaining, Some(57));
448        assert_eq!(req.reset, Some(Duration::from_secs(60)));
449        assert!(info.tokens.is_none());
450    }
451
452    #[test]
453    fn test_rfc_draft_delta_seconds() {
454        let headers = [
455            ("RateLimit-Limit", "100"),
456            ("RateLimit-Remaining", "0"),
457            ("RateLimit-Reset", "30"),
458            ("Retry-After", "30"),
459        ];
460        let info = HeaderProfile::RFC.parse_at(&headers, 0);
461        let req = info.requests.unwrap();
462        assert_eq!(req.remaining, Some(0));
463        assert_eq!(req.reset, Some(Duration::from_secs(30)));
464        assert_eq!(info.retry_after, Some(Duration::from_secs(30)));
465    }
466
467    #[test]
468    fn test_stripe_retry_after_only() {
469        let headers = [("Retry-After", "5")];
470        let info = HeaderProfile::STRIPE.parse_at(&headers, 0);
471        assert!(info.requests.is_none());
472        assert_eq!(info.retry_after, Some(Duration::from_secs(5)));
473    }
474
475    #[test]
476    fn test_missing_headers_yield_none() {
477        let info = HeaderProfile::OPENAI.parse_at(&[], 0);
478        assert_eq!(info, RateLimitInfo::default());
479    }
480
481    #[test]
482    fn test_extreme_reset_and_now_do_not_overflow() {
483        // A reset timestamp and `now` at the i64 extremes must not overflow the
484        // `at - now` subtraction (regression: the fuzz-smoke suite caught a panic
485        // here). A past or unreachable reset clamps the time-until-reset to zero.
486        let headers = [("x-ratelimit-reset", "9223372036854775807")]; // i64::MAX
487        let info = HeaderProfile::GITHUB.parse_at(&headers, i64::MIN);
488        assert!(
489            info.requests.is_some(),
490            "an extreme reset still parses a window"
491        );
492
493        let past = [("x-ratelimit-reset", "-9223372036854775808")]; // i64::MIN
494        let info = HeaderProfile::GITHUB.parse_at(&past, i64::MAX);
495        assert_eq!(info.requests.unwrap().reset, Some(Duration::ZERO));
496    }
497
498    #[test]
499    fn test_malformed_values_are_dropped() {
500        let headers = [
501            ("x-ratelimit-limit-requests", "lots"),
502            ("x-ratelimit-remaining-requests", "42"),
503            ("x-ratelimit-reset-requests", "soon"),
504        ];
505        let info = HeaderProfile::OPENAI.parse_at(&headers, 0);
506        let req = info.requests.unwrap();
507        assert_eq!(req.limit, None); // "lots" dropped
508        assert_eq!(req.remaining, Some(42));
509        assert_eq!(req.reset, None); // "soon" dropped
510    }
511
512    #[test]
513    fn test_duration_string_parsing() {
514        assert_eq!(parse_duration_string("1s"), Some(Duration::from_secs(1)));
515        assert_eq!(
516            parse_duration_string("6m0s"),
517            Some(Duration::from_secs(360))
518        );
519        assert_eq!(
520            parse_duration_string("100ms"),
521            Some(Duration::from_millis(100))
522        );
523        assert_eq!(
524            parse_duration_string("1h2m3s"),
525            Some(Duration::from_secs(3723))
526        );
527        assert_eq!(parse_duration_string("30"), Some(Duration::from_secs(30)));
528        assert_eq!(parse_duration_string("nope"), None);
529    }
530
531    #[test]
532    fn test_rfc3339_parsing() {
533        assert_eq!(parse_rfc3339("2026-01-01T00:00:00Z"), Some(1_767_225_600));
534        // Fractional seconds are tolerated.
535        assert_eq!(
536            parse_rfc3339("2026-01-01T00:00:00.123Z"),
537            Some(1_767_225_600)
538        );
539        // A non-UTC offset is refused.
540        assert_eq!(parse_rfc3339("2026-01-01T00:00:00+02:00"), None);
541        assert_eq!(parse_rfc3339("garbage"), None);
542    }
543
544    #[test]
545    fn test_sync_drains_to_server_remaining() {
546        let throttle = Throttle::per_second(100); // locally 100 available
547        let info = RateLimitInfo {
548            requests: Some(Window {
549                remaining: Some(10),
550                ..Window::default()
551            }),
552            ..RateLimitInfo::default()
553        };
554        let drained = info.sync_requests(&throttle);
555        assert_eq!(drained, 90);
556        assert_eq!(throttle.available(), 10);
557    }
558
559    #[test]
560    fn test_sync_never_raises_above_hard_limit() {
561        let throttle = Throttle::per_second(100);
562        assert!(throttle.try_acquire_with_cost(95)); // local available now 5
563        // Server claims 50 remaining — more than we have. Sync must NOT add.
564        let info = RateLimitInfo {
565            requests: Some(Window {
566                remaining: Some(50),
567                ..Window::default()
568            }),
569            ..RateLimitInfo::default()
570        };
571        let drained = info.sync_requests(&throttle);
572        assert_eq!(drained, 0);
573        assert_eq!(throttle.available(), 5); // unchanged; never exceeds capacity
574        assert!(throttle.available() <= throttle.capacity());
575    }
576
577    #[test]
578    fn test_sync_with_no_info_is_a_noop() {
579        let throttle = Throttle::per_second(10);
580        assert_eq!(RateLimitInfo::default().sync_requests(&throttle), 0);
581        assert_eq!(throttle.available(), 10);
582    }
583}