stream_rate_limiter/
options.rs

1use std::time::Duration;
2
3/// What to do with stream when it is delayed, returned in callback on_stream_delayed
4pub struct StreamDelayedInfo {
5    /// Number of element counted from the start of the stream
6    pub element_no: u64,
7    /// Current delay (in seconds)
8    pub current_delay: f64,
9    /// Total stream delay in seconds up to now (in seconds)
10    pub total_delay: f64,
11}
12
13/// What to do with stream when it is delayed, returned in callback on_stream_delayed
14pub enum StreamBehavior {
15    ///Do not add delay to the stream, stream will try to catch up
16    Continue,
17    ///Add delay to the stream so it won't try to catch up
18    Delay(f64),
19    ///Stop stream (no longer producing elements and terminates without error)
20    Stop,
21}
22
23/// Options to be used with rate_limit stream extension method
24pub struct RateLimitOptions<'a> {
25    pub(crate) interval: Option<Duration>,
26    pub(crate) min_interval: Option<Duration>,
27    pub(crate) allowed_slippage_sec: Option<f64>,
28    pub(crate) on_stream_delayed:
29        Option<Box<dyn FnMut(StreamDelayedInfo) -> StreamBehavior + Send + 'a>>,
30}
31
32impl<'a> RateLimitOptions<'a> {
33    /// By default stream is not changed at all
34    pub fn empty() -> Self {
35        Self {
36            interval: None,
37            min_interval: None,
38            allowed_slippage_sec: None,
39            on_stream_delayed: None,
40        }
41    }
42
43    ///Set targeted interval between items
44    pub fn with_interval(mut self, interval: Duration) -> Self {
45        self.interval = Some(interval);
46        self
47    }
48
49    ///Set targeted interval between items (in seconds)
50    pub fn with_interval_sec(mut self, interval: f64) -> Self {
51        self.interval = Some(Duration::from_secs_f64(interval));
52        self
53    }
54
55    ///Set minimum interval between items
56    pub fn with_min_interval(mut self, min_interval: Duration) -> Self {
57        self.min_interval = Some(min_interval);
58        self
59    }
60
61    ///Set minimum interval between items
62    pub fn with_min_interval_sec(mut self, min_interval: f64) -> Self {
63        self.min_interval = Some(Duration::from_secs_f64(min_interval));
64        self
65    }
66
67    ///Set slippage - default slippage if not set (10 times interval + 0.02 sec) <br>
68    ///if stream is currently delayed more than this then on_stream_delayed is called
69    pub fn with_allowed_slippage_sec(mut self, allowed_slippage_sec: f64) -> Self {
70        self.allowed_slippage_sec = Some(allowed_slippage_sec);
71        self
72    }
73
74    ///Set callback called when stream is delayed more than allowed_slippage_sec <br/>
75    ///return StreamBehavior::Delay to add permanent delay to stream <br/>
76    ///return StreamBehavior::Stop to stop stream (terminate without error) <br/>
77    ///return StreamBehavior::Continue to continue stream (trying catching up) <br/>
78    ///First argument is current delay, second is permanent delay already in the stream
79    pub fn on_stream_delayed(
80        mut self,
81        on_stream_delayed: impl FnMut(StreamDelayedInfo) -> StreamBehavior + 'a + std::marker::Send,
82    ) -> Self {
83        self.on_stream_delayed = Some(Box::new(on_stream_delayed));
84        self
85    }
86}