1mod debounce_time;
2mod dinstinct_until_changed;
3mod distinct;
4mod state;
5
6use std::{fmt, time::Duration};
7use tokio_stream::Stream;
8
9pub use debounce_time::DebounceTime;
10pub use dinstinct_until_changed::DistinctUntilChanged;
11use distinct::Distinct;
12
13pub trait StreamOps: Stream + Sized {
14 fn debounce_time(self, timeout: Duration) -> DebounceTime<Self>;
15 fn distinct(self) -> Distinct<Self>
16 where
17 Self::Item: ItemKey;
18 fn distinct_until_changed(self) -> DistinctUntilChanged<Self>;
19}
20
21pub trait ItemKey {
22 type Key: fmt::Debug + PartialEq + Eq + std::hash::Hash;
23 fn key(&self) -> Self::Key;
24}
25
26impl<S> StreamOps for S
27where
28 S: Stream + Sized,
29{
30 fn debounce_time(self, timeout: Duration) -> DebounceTime<Self> {
31 DebounceTime::new(self, timeout)
32 }
33
34 fn distinct(self) -> Distinct<Self>
35 where
36 Self::Item: ItemKey,
37 {
38 Distinct::new(self)
39 }
40
41 fn distinct_until_changed(self) -> DistinctUntilChanged<Self> {
42 DistinctUntilChanged::new(self)
43 }
44}
45
46#[cfg(test)]
47pub mod test_utils {
48 use futures::stream::StreamExt as _;
49 use std::time::Duration;
50 use tokio::time::interval;
51 use tokio_stream::{wrappers::IntervalStream, Stream};
52
53 pub fn interval_value(
54 duration: Duration,
55 start: usize,
56 step: usize,
57 ) -> impl Stream<Item = usize> {
58 IntervalStream::new(interval(duration))
59 .enumerate()
60 .map(move |(i, _)| start + i * step)
61 }
62}