stream_operators/
lib.rs

1mod debounce_time;
2mod dinstinct_until_changed;
3mod distinct;
4mod state;
5
6use std::{fmt, time::Duration};
7use tokio_stream::Stream;
8
9pub use debounce_time::DebounceTime;
10pub use dinstinct_until_changed::DistinctUntilChanged;
11use distinct::Distinct;
12
13pub trait StreamOps: Stream + Sized {
14    fn debounce_time(self, timeout: Duration) -> DebounceTime<Self>;
15    fn distinct(self) -> Distinct<Self>
16    where
17        Self::Item: ItemKey;
18    fn distinct_until_changed(self) -> DistinctUntilChanged<Self>;
19}
20
21pub trait ItemKey {
22    type Key: fmt::Debug + PartialEq + Eq + std::hash::Hash;
23    fn key(&self) -> Self::Key;
24}
25
26impl<S> StreamOps for S
27where
28    S: Stream + Sized,
29{
30    fn debounce_time(self, timeout: Duration) -> DebounceTime<Self> {
31        DebounceTime::new(self, timeout)
32    }
33
34    fn distinct(self) -> Distinct<Self>
35    where
36        Self::Item: ItemKey,
37    {
38        Distinct::new(self)
39    }
40
41    fn distinct_until_changed(self) -> DistinctUntilChanged<Self> {
42        DistinctUntilChanged::new(self)
43    }
44}
45
46#[cfg(test)]
47pub mod test_utils {
48    use futures::stream::StreamExt as _;
49    use std::time::Duration;
50    use tokio::time::interval;
51    use tokio_stream::{wrappers::IntervalStream, Stream};
52
53    pub fn interval_value(
54        duration: Duration,
55        start: usize,
56        step: usize,
57    ) -> impl Stream<Item = usize> {
58        IntervalStream::new(interval(duration))
59            .enumerate()
60            .map(move |(i, _)| start + i * step)
61    }
62}