1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
use std::time::Duration;
/// What to do with stream when it is delayed, returned in callback on_stream_delayed
pub struct StreamDelayedInfo {
/// Number of element counted from the start of the stream
pub element_no: u64,
/// Current delay (in seconds)
pub current_delay: f64,
/// Total stream delay in seconds up to now (in seconds)
pub total_delay: f64,
}
/// What to do with stream when it is delayed, returned in callback on_stream_delayed
pub enum StreamBehavior {
///Do not add delay to the stream, stream will try to catch up
Continue,
///Add delay to the stream so it won't try to catch up
Delay(f64),
///Stop stream (no longer producing elements and terminates without error)
Stop,
}
/// Options to be used with rate_limit stream extension method
pub struct RateLimitOptions<'a> {
pub(crate) interval: Option<Duration>,
pub(crate) min_interval: Option<Duration>,
pub(crate) allowed_slippage_sec: Option<f64>,
pub(crate) on_stream_delayed: Option<Box<dyn FnMut(StreamDelayedInfo) -> StreamBehavior + 'a>>,
}
impl<'a> RateLimitOptions<'a> {
/// By default stream is not changed at all
pub fn empty() -> Self {
Self {
interval: None,
min_interval: None,
allowed_slippage_sec: None,
on_stream_delayed: None,
}
}
///Set targeted interval between items
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = Some(interval);
self
}
///Set targeted interval between items (in seconds)
pub fn with_interval_sec(mut self, interval: f64) -> Self {
self.interval = Some(Duration::from_secs_f64(interval));
self
}
///Set minimum interval between items
pub fn with_min_interval(mut self, min_interval: Duration) -> Self {
self.min_interval = Some(min_interval);
self
}
///Set minimum interval between items
pub fn with_min_interval_sec(mut self, min_interval: f64) -> Self {
self.min_interval = Some(Duration::from_secs_f64(min_interval));
self
}
///Set slippage - default slippage if not set (10 times interval + 0.02 sec) <br>
///if stream is currently delayed more than this then on_stream_delayed is called
pub fn with_allowed_slippage_sec(mut self, allowed_slippage_sec: f64) -> Self {
self.allowed_slippage_sec = Some(allowed_slippage_sec);
self
}
///Set callback called when stream is delayed more than allowed_slippage_sec <br/>
///return StreamBehavior::Delay to add permanent delay to stream <br/>
///return StreamBehavior::Stop to stop stream (terminate without error) <br/>
///return StreamBehavior::Continue to continue stream (trying catching up) <br/>
///First argument is current delay, second is permanent delay already in the stream
pub fn on_stream_delayed(
mut self,
on_stream_delayed: impl FnMut(StreamDelayedInfo) -> StreamBehavior + 'a,
) -> Self {
self.on_stream_delayed = Some(Box::new(on_stream_delayed));
self
}
}