stream_operators/
distinct.rs

1use crate::{state::State, ItemKey};
2use pin_project_lite::pin_project;
3use std::{
4    collections::HashSet,
5    pin::Pin,
6    task::{ready, Context, Poll},
7};
8use tokio_stream::Stream;
9
10pin_project! {
11    #[derive(Debug)]
12    pub struct Distinct<S> where S: Stream, S::Item: ItemKey {
13        #[pin]
14        stream: S,
15        items: HashSet<<<S as Stream>::Item as ItemKey>::Key>,
16        state: State,
17    }
18}
19
20impl<S> Distinct<S>
21where
22    S: Stream,
23    S::Item: ItemKey,
24{
25    pub fn new(stream: S) -> Self {
26        Self {
27            stream,
28            items: HashSet::default(),
29            state: State::HasNext,
30        }
31    }
32}
33
34impl<S> Stream for Distinct<S>
35where
36    S: Stream,
37    S::Item: ItemKey,
38{
39    type Item = S::Item;
40
41    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        let this = self.project();
43        if *this.state == State::HasNext {
44            let next = ready!(this.stream.poll_next(cx));
45            if let Some(next) = next {
46                let key = next.key();
47                if !this.items.contains(&key) {
48                    this.items.insert(key);
49                    return Poll::Ready(Some(next));
50                }
51            } else {
52                *this.state = State::HasNone;
53            }
54        }
55
56        match this.state {
57            State::HasNext => {
58                cx.waker().wake_by_ref();
59                Poll::Pending
60            }
61            State::HasNone => {
62                *this.state = State::Done;
63                Poll::Ready(None)
64            }
65            State::Done => panic!("poll_next called after completion"),
66        }
67    }
68}
69
70macro_rules! impl_item_key {
71    ($($t:ty),*) => {
72        $(
73            impl ItemKey for $t {
74                type Key = Self;
75                fn key(&self) -> Self::Key {
76                    self.clone()
77                }
78            }
79        )*
80    };
81}
82
83impl_item_key!(u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize);
84impl_item_key!(String);
85
86#[cfg(test)]
87mod tests {
88    use crate::{ItemKey, StreamOps};
89    use tokio_stream::{iter, StreamExt};
90
91    #[derive(Debug, Clone, PartialEq, Eq)]
92    struct Todo {
93        id: u32,
94        title: String,
95    }
96
97    impl Todo {
98        fn new(id: u32, title: &str) -> Self {
99            Self {
100                id,
101                title: title.to_string(),
102            }
103        }
104    }
105
106    impl ItemKey for Todo {
107        type Key = u32;
108        fn key(&self) -> Self::Key {
109            self.id
110        }
111    }
112
113    #[tokio::test]
114    async fn distinct_should_work() {
115        let mut stream = iter(vec![1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1]).distinct();
116        assert_eq!(stream.next().await, Some(1));
117        assert_eq!(stream.next().await, Some(2));
118        assert_eq!(stream.next().await, Some(3));
119        assert_eq!(stream.next().await, Some(4));
120        assert_eq!(stream.next().await, None);
121    }
122
123    #[tokio::test]
124    async fn distinct_empty_should_work() {
125        let mut stream = iter(1..1).distinct();
126        assert_eq!(stream.next().await, None);
127    }
128
129    #[tokio::test]
130    async fn distinct_one_should_work() {
131        let mut stream = iter(vec![1]).distinct();
132        assert_eq!(stream.next().await, Some(1));
133        assert_eq!(stream.next().await, None);
134    }
135
136    #[tokio::test]
137    async fn distinct_with_todos_should_work() {
138        let mut stream = iter(vec![
139            Todo::new(1, "Buy milk"),
140            Todo::new(2, "Buy eggs"),
141            Todo::new(1, "Buy more milk"),
142            Todo::new(3, "Buy bread"),
143            Todo::new(2, "Buy more eggs"),
144        ])
145        .distinct();
146
147        assert_eq!(stream.next().await, Some(Todo::new(1, "Buy milk")));
148        assert_eq!(stream.next().await, Some(Todo::new(2, "Buy eggs")));
149        assert_eq!(stream.next().await, Some(Todo::new(3, "Buy bread")));
150        assert_eq!(stream.next().await, None);
151    }
152}