stream_operators/
distinct.rs1use crate::{state::State, ItemKey};
2use pin_project_lite::pin_project;
3use std::{
4 collections::HashSet,
5 pin::Pin,
6 task::{ready, Context, Poll},
7};
8use tokio_stream::Stream;
9
10pin_project! {
11 #[derive(Debug)]
12 pub struct Distinct<S> where S: Stream, S::Item: ItemKey {
13 #[pin]
14 stream: S,
15 items: HashSet<<<S as Stream>::Item as ItemKey>::Key>,
16 state: State,
17 }
18}
19
20impl<S> Distinct<S>
21where
22 S: Stream,
23 S::Item: ItemKey,
24{
25 pub fn new(stream: S) -> Self {
26 Self {
27 stream,
28 items: HashSet::default(),
29 state: State::HasNext,
30 }
31 }
32}
33
34impl<S> Stream for Distinct<S>
35where
36 S: Stream,
37 S::Item: ItemKey,
38{
39 type Item = S::Item;
40
41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 let this = self.project();
43 if *this.state == State::HasNext {
44 let next = ready!(this.stream.poll_next(cx));
45 if let Some(next) = next {
46 let key = next.key();
47 if !this.items.contains(&key) {
48 this.items.insert(key);
49 return Poll::Ready(Some(next));
50 }
51 } else {
52 *this.state = State::HasNone;
53 }
54 }
55
56 match this.state {
57 State::HasNext => {
58 cx.waker().wake_by_ref();
59 Poll::Pending
60 }
61 State::HasNone => {
62 *this.state = State::Done;
63 Poll::Ready(None)
64 }
65 State::Done => panic!("poll_next called after completion"),
66 }
67 }
68}
69
70macro_rules! impl_item_key {
71 ($($t:ty),*) => {
72 $(
73 impl ItemKey for $t {
74 type Key = Self;
75 fn key(&self) -> Self::Key {
76 self.clone()
77 }
78 }
79 )*
80 };
81}
82
83impl_item_key!(u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize);
84impl_item_key!(String);
85
86#[cfg(test)]
87mod tests {
88 use crate::{ItemKey, StreamOps};
89 use tokio_stream::{iter, StreamExt};
90
91 #[derive(Debug, Clone, PartialEq, Eq)]
92 struct Todo {
93 id: u32,
94 title: String,
95 }
96
97 impl Todo {
98 fn new(id: u32, title: &str) -> Self {
99 Self {
100 id,
101 title: title.to_string(),
102 }
103 }
104 }
105
106 impl ItemKey for Todo {
107 type Key = u32;
108 fn key(&self) -> Self::Key {
109 self.id
110 }
111 }
112
113 #[tokio::test]
114 async fn distinct_should_work() {
115 let mut stream = iter(vec![1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1]).distinct();
116 assert_eq!(stream.next().await, Some(1));
117 assert_eq!(stream.next().await, Some(2));
118 assert_eq!(stream.next().await, Some(3));
119 assert_eq!(stream.next().await, Some(4));
120 assert_eq!(stream.next().await, None);
121 }
122
123 #[tokio::test]
124 async fn distinct_empty_should_work() {
125 let mut stream = iter(1..1).distinct();
126 assert_eq!(stream.next().await, None);
127 }
128
129 #[tokio::test]
130 async fn distinct_one_should_work() {
131 let mut stream = iter(vec![1]).distinct();
132 assert_eq!(stream.next().await, Some(1));
133 assert_eq!(stream.next().await, None);
134 }
135
136 #[tokio::test]
137 async fn distinct_with_todos_should_work() {
138 let mut stream = iter(vec![
139 Todo::new(1, "Buy milk"),
140 Todo::new(2, "Buy eggs"),
141 Todo::new(1, "Buy more milk"),
142 Todo::new(3, "Buy bread"),
143 Todo::new(2, "Buy more eggs"),
144 ])
145 .distinct();
146
147 assert_eq!(stream.next().await, Some(Todo::new(1, "Buy milk")));
148 assert_eq!(stream.next().await, Some(Todo::new(2, "Buy eggs")));
149 assert_eq!(stream.next().await, Some(Todo::new(3, "Buy bread")));
150 assert_eq!(stream.next().await, None);
151 }
152}