stream_rate_limiter/options.rs
1use std::time::Duration;
2
3/// What to do with stream when it is delayed, returned in callback on_stream_delayed
4pub struct StreamDelayedInfo {
5 /// Number of element counted from the start of the stream
6 pub element_no: u64,
7 /// Current delay (in seconds)
8 pub current_delay: f64,
9 /// Total stream delay in seconds up to now (in seconds)
10 pub total_delay: f64,
11}
12
13/// What to do with stream when it is delayed, returned in callback on_stream_delayed
14pub enum StreamBehavior {
15 ///Do not add delay to the stream, stream will try to catch up
16 Continue,
17 ///Add delay to the stream so it won't try to catch up
18 Delay(f64),
19 ///Stop stream (no longer producing elements and terminates without error)
20 Stop,
21}
22
23/// Options to be used with rate_limit stream extension method
24pub struct RateLimitOptions<'a> {
25 pub(crate) interval: Option<Duration>,
26 pub(crate) min_interval: Option<Duration>,
27 pub(crate) allowed_slippage_sec: Option<f64>,
28 pub(crate) on_stream_delayed:
29 Option<Box<dyn FnMut(StreamDelayedInfo) -> StreamBehavior + Send + 'a>>,
30}
31
32impl<'a> RateLimitOptions<'a> {
33 /// By default stream is not changed at all
34 pub fn empty() -> Self {
35 Self {
36 interval: None,
37 min_interval: None,
38 allowed_slippage_sec: None,
39 on_stream_delayed: None,
40 }
41 }
42
43 ///Set targeted interval between items
44 pub fn with_interval(mut self, interval: Duration) -> Self {
45 self.interval = Some(interval);
46 self
47 }
48
49 ///Set targeted interval between items (in seconds)
50 pub fn with_interval_sec(mut self, interval: f64) -> Self {
51 self.interval = Some(Duration::from_secs_f64(interval));
52 self
53 }
54
55 ///Set minimum interval between items
56 pub fn with_min_interval(mut self, min_interval: Duration) -> Self {
57 self.min_interval = Some(min_interval);
58 self
59 }
60
61 ///Set minimum interval between items
62 pub fn with_min_interval_sec(mut self, min_interval: f64) -> Self {
63 self.min_interval = Some(Duration::from_secs_f64(min_interval));
64 self
65 }
66
67 ///Set slippage - default slippage if not set (10 times interval + 0.02 sec) <br>
68 ///if stream is currently delayed more than this then on_stream_delayed is called
69 pub fn with_allowed_slippage_sec(mut self, allowed_slippage_sec: f64) -> Self {
70 self.allowed_slippage_sec = Some(allowed_slippage_sec);
71 self
72 }
73
74 ///Set callback called when stream is delayed more than allowed_slippage_sec <br/>
75 ///return StreamBehavior::Delay to add permanent delay to stream <br/>
76 ///return StreamBehavior::Stop to stop stream (terminate without error) <br/>
77 ///return StreamBehavior::Continue to continue stream (trying catching up) <br/>
78 ///First argument is current delay, second is permanent delay already in the stream
79 pub fn on_stream_delayed(
80 mut self,
81 on_stream_delayed: impl FnMut(StreamDelayedInfo) -> StreamBehavior + 'a + std::marker::Send,
82 ) -> Self {
83 self.on_stream_delayed = Some(Box::new(on_stream_delayed));
84 self
85 }
86}