Skip to main content

gr/http/
throttle.rs

1//! Throttle module provides different strategies to throttle requests based on
2//! flow control headers or provided delays.
3
4use std::thread;
5
6use rand::RngExt;
7
8use crate::{
9    api_defaults::{DEFAULT_JITTER_MAX_MILLISECONDS, DEFAULT_JITTER_MIN_MILLISECONDS},
10    io::FlowControlHeaders,
11    log_debug, log_info,
12    time::{self, Milliseconds, Seconds},
13};
14
15/// Throttle strategy
16pub trait ThrottleStrategy {
17    /// Throttle the request based on optional flow control headers.
18    /// Implementers might use the headers to adjust the throttling or ignore
19    /// them altogether. Ex. strategies could be a fixed delay, random, or based
20    /// on rate limiting headers.
21    fn throttle(&self, flow_control_headers: Option<&FlowControlHeaders>);
22    /// Throttle for specific amount of time.
23    fn throttle_for(&self, delay: Milliseconds) {
24        log_info!("Throttling for : {} ms", delay);
25        thread::sleep(std::time::Duration::from_millis(*delay));
26    }
27    /// Return strategy type
28    fn strategy(&self) -> ThrottleStrategyType;
29}
30
31#[derive(Clone, Debug, PartialEq)]
32pub enum ThrottleStrategyType {
33    PreFixed,
34    DynamicFixed,
35    Random,
36    AutoRate,
37    NoThrottle,
38}
39
40/// Dynamically throttles for the amount of time specified in the throttle_for
41/// method using the default trait implementation. As opposed to the PreFixed,
42/// which takes a fixed delay in the constructor and throttles for that amount
43/// of time every time.
44pub struct DynamicFixed;
45
46impl ThrottleStrategy for DynamicFixed {
47    fn throttle(&self, _flow_control_headers: Option<&FlowControlHeaders>) {}
48    fn strategy(&self) -> ThrottleStrategyType {
49        ThrottleStrategyType::DynamicFixed
50    }
51}
52
53pub struct PreFixed {
54    delay: Milliseconds,
55}
56
57impl PreFixed {
58    pub fn new(delay: Milliseconds) -> Self {
59        Self { delay }
60    }
61}
62
63impl ThrottleStrategy for PreFixed {
64    fn throttle(&self, _flow_control_headers: Option<&FlowControlHeaders>) {
65        log_info!("Throttling for: {} ms", self.delay);
66        thread::sleep(std::time::Duration::from_millis(*self.delay));
67    }
68    fn strategy(&self) -> ThrottleStrategyType {
69        ThrottleStrategyType::PreFixed
70    }
71}
72
73pub struct Random {
74    delay_min: Milliseconds,
75    delay_max: Milliseconds,
76}
77
78impl Random {
79    pub fn new(delay_min: Milliseconds, delay_max: Milliseconds) -> Self {
80        Self {
81            delay_min,
82            delay_max,
83        }
84    }
85}
86
87impl ThrottleStrategy for Random {
88    fn throttle(&self, _flow_control_headers: Option<&FlowControlHeaders>) {
89        log_info!(
90            "Throttling between: {} ms and {} ms",
91            self.delay_min,
92            self.delay_max
93        );
94        let mut rng = rand::rng();
95        let wait_time = rng.random_range(*self.delay_min..=*self.delay_max);
96        log_info!("Sleeping for {} milliseconds", wait_time);
97        thread::sleep(std::time::Duration::from_millis(wait_time));
98    }
99    fn strategy(&self) -> ThrottleStrategyType {
100        ThrottleStrategyType::Random
101    }
102}
103
104#[derive(Default)]
105pub struct NoThrottle;
106
107impl NoThrottle {
108    pub fn new() -> Self {
109        Self {}
110    }
111}
112
113impl ThrottleStrategy for NoThrottle {
114    fn throttle(&self, _flow_control_headers: Option<&FlowControlHeaders>) {
115        log_info!("No throttling enabled");
116    }
117    fn strategy(&self) -> ThrottleStrategyType {
118        ThrottleStrategyType::NoThrottle
119    }
120}
121
122/// AutoRate implements an automatic throttling algorithm that limits the
123/// rate of requests based on flow control headers from the HTTP response plus a
124/// fixed random delay to avoid being predictable and too fast for the server.
125/// Inspiration ref: https://en.wikipedia.org/wiki/Leaky_bucket
126pub struct AutoRate {
127    /// Max interval milliseconds added to the automatic throttle. In order to
128    /// avoid predictability, the minimum range is 1 second. The jitter is the
129    /// max interval added to the automatic throttle. (1, jitter) milliseconds.
130    jitter_max: Milliseconds,
131    jitter_min: Milliseconds,
132    now: fn() -> Seconds,
133}
134
135impl Default for AutoRate {
136    fn default() -> Self {
137        Self {
138            jitter_max: Milliseconds::from(DEFAULT_JITTER_MAX_MILLISECONDS),
139            jitter_min: Milliseconds::from(DEFAULT_JITTER_MIN_MILLISECONDS),
140            now: time::now_epoch_seconds,
141        }
142    }
143}
144
145impl ThrottleStrategy for AutoRate {
146    fn throttle(&self, flow_control_headers: Option<&FlowControlHeaders>) {
147        if let Some(headers) = flow_control_headers {
148            let rate_limit_headers = headers.get_rate_limit_header();
149            match *rate_limit_headers {
150                Some(headers) => {
151                    // In order to avoid rate limited, we need to space the
152                    // requests evenly using: time to ratelimit-reset
153                    // (secs)/ratelimit-remaining (requests).
154                    let now = *(self.now)();
155                    log_debug!("Current epoch: {}", now);
156                    log_debug!("Rate limit reset: {}", headers.reset);
157                    let time_to_reset = headers.reset.saturating_sub(now);
158                    log_debug!("Time to reset: {}", time_to_reset);
159                    log_debug!("Remaining requests: {}", headers.remaining);
160                    let delay = time_to_reset / headers.remaining as u64;
161                    // Avoid predictability and being too fast. We could end up
162                    // being too fast when the amount of remaining requests
163                    // is high and the reset time is low. We additionally
164                    // wait in between jitter_min and jitter_max milliseconds.
165                    let additional_delay =
166                        rand::rng().random_range(*self.jitter_min..=*self.jitter_max);
167                    let total_delay = delay + additional_delay;
168                    log_info!("AutoRate throttling enabled");
169                    self.throttle_for(Milliseconds::from(total_delay));
170                }
171                None => {
172                    // When the response has status 304 Not Modified, we don't get
173                    // any rate limiting headers. In this case, we just throttle
174                    // randomly between the min and max jitter.
175                    let rand_delay_jitter =
176                        rand::rng().random_range(*self.jitter_min..=*self.jitter_max);
177                    log_info!("AutoRate throttling enabled");
178                    self.throttle_for(Milliseconds::from(rand_delay_jitter));
179                }
180            }
181        }
182    }
183    fn strategy(&self) -> ThrottleStrategyType {
184        ThrottleStrategyType::AutoRate
185    }
186}