use std::{num::NonZeroUsize, time::Duration};
use futures_core::Stream;
use futures_util::StreamExt as _;
mod distinct;
pub use self::distinct::{
distinct_until_changed, distinct_until_changed_err_result, distinct_until_changed_map,
distinct_until_changed_ok_result, filter_distinct_until_changed,
filter_distinct_until_changed_err_result, filter_distinct_until_changed_ok_result,
};
mod debounce;
pub use self::debounce::Debounced;
mod throttle;
pub use self::throttle::{ThrottleIntervalConfig, Throttled, Throttler};
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod tokio;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum IntervalEdge {
Leading,
Trailing,
}
pub trait Sleep: Future<Output = ()> + Sized {
fn sleep(duration: Duration) -> Self;
}
pub trait StreamExt: Stream {
type Sleep: Sleep;
type IntervalThrottler: Throttler<<Self as Stream>::Item>;
fn debounce(self, delay: Duration) -> Debounced<Self, Self::Sleep>
where
Self: Sized,
{
Debounced::new(self, delay)
}
fn throttle<T>(
self,
throttler: T,
poll_next_max_ready_count: NonZeroUsize,
) -> Throttled<Self, T>
where
Self: Sized,
T: Throttler<Self::Item>,
{
Throttled::new(self, throttler, poll_next_max_ready_count)
}
fn throttle_interval(
self,
config: ThrottleIntervalConfig,
poll_next_max_ready_count: std::num::NonZeroUsize,
) -> Throttled<Self, Self::IntervalThrottler>
where
Self: Sized;
}
fn filter_stateful<S, T, F, G>(
stream: S,
initial_state: T,
mut filter_update_state_fn: F,
) -> impl Stream<Item = S::Item>
where
S: Stream,
F: FnMut(&mut T, &S::Item) -> G,
G: Future<Output = bool>,
{
let mut state = initial_state;
stream.filter(move |next_item| filter_update_state_fn(&mut state, next_item))
}
fn filter_stateful_sync<S, T, F>(
stream: S,
initial_state: T,
mut filter_update_state_fn: F,
) -> impl Stream<Item = S::Item>
where
S: Stream,
F: FnMut(&mut T, &S::Item) -> bool,
{
filter_stateful(stream, initial_state, move |state, next_item| {
std::future::ready(filter_update_state_fn(state, next_item))
})
}