1use crate::op_prelude::*;
2use std::collections::hash_map::RandomState;
3use std::collections::HashSet;
4use std::hash::{BuildHasher, Hash, Hasher};
5
6pin_project! {
7 #[must_use = "streams do nothing unless polled"]
9 pub struct TryDedupStream<S> {
10 #[pin]
11 src: S,
12 size_hint: (usize, Option<usize>),
13 known: HashSet<u64>,
14 hasher: RandomState,
15 }
16}
17
18impl<S> Stream for TryDedupStream<S>
19where
20 S: TryStream,
21 S::Ok: Hash,
22{
23 type Item = Result<S::Ok, S::Error>;
24
25 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26 let mut this = self.project();
27 Poll::Ready(loop {
28 match ready!(this.src.as_mut().try_poll_next(cx)) {
29 Some(Ok(v)) => if this.known.insert(hash(&*this.hasher, &v)) {
30 break Some(Ok(v));
31 }
32 other => break other,
33 }
34 })
35 }
36
37 fn size_hint(&self) -> (usize, Option<usize>) {
38 self.size_hint
39 }
40}
41
42impl<S> FusedStream for TryDedupStream<S>
43where
44 S: TryStream + FusedStream,
45 S::Ok: Hash,
46{
47 delegate_fused!(src);
48}
49
50#[cfg(feature = "sink")]
51impl<S, Item, E> Sink<Item> for TryDedupStream<S>
52where
53 S: Sink<Item, Error=E> + TryStream,
54 S::Ok: Hash
55{
56 delegate_sink!(src, E, Item);
57}
58
59impl<S> TryDedupStream<S>
60where
61 S: TryStream,
62 S::Ok: Hash,
63{
64 pub(crate) fn new(src: S) -> Self {
66 let size_hint = src.size_hint();
67 Self {
68 src,
69 size_hint,
70 hasher: RandomState::default(),
71 known: HashSet::default(),
72 }
73 }
74}
75
76pin_project! {
77 #[must_use = "streams do nothing unless polled"]
79 pub struct DedupStream<S> {
80 #[pin]
81 src: S,
82 size_hint: (usize, Option<usize>),
83 known: HashSet<u64>,
84 hasher: RandomState,
85 }
86}
87
88impl<S> Stream for DedupStream<S>
89where
90 S: Stream,
91 S::Item: Hash,
92{
93 type Item = S::Item;
94
95 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96 let mut this = self.project();
97 Poll::Ready(loop {
98 if let Some(next) = ready!(this.src.as_mut().poll_next(cx)) {
99 if this.known.insert(hash(&*this.hasher, &next)) {
100 break Some(next);
101 }
102 } else {
103 break None;
104 }
105 })
106 }
107
108 fn size_hint(&self) -> (usize, Option<usize>) {
109 self.size_hint
110 }
111}
112
113impl<S> FusedStream for DedupStream<S>
114where
115 S: Stream + FusedStream,
116 S::Item: Hash
117{
118 delegate_fused!(src);
119}
120
121#[cfg(feature = "sink")]
122impl<S, Item> Sink<Item> for DedupStream<S>
123where
124 S: Sink<Item> + Stream,
125 S::Item: Hash
126{
127 delegate_sink!(src, S::Error, Item);
128}
129
130impl<S> DedupStream<S>
131where
132 S: Stream,
133{
134 pub(crate) fn new(src: S) -> Self {
136 let size_hint = src.size_hint();
137 Self {
138 src,
139 size_hint,
140 hasher: RandomState::default(),
141 known: HashSet::default(),
142 }
143 }
144}
145
146fn hash<H>(hasher: &RandomState, value: &H) -> u64
147where
148 H: Hash,
149{
150 let mut hasher = hasher.build_hasher();
151 value.hash(&mut hasher);
152 hasher.finish()
153}
154
155#[cfg(test)]
156mod tests {
157 use super::TryDedupStream;
158 use futures::executor::block_on;
159 use futures::TryStreamExt;
160
161 #[test]
162 fn test_dedup_simple() {
163 let src: Vec<Result<&str, ()>> = vec![
164 Ok("hello"),
165 Ok("hello"),
166 Ok("world!"),
167 Ok("world!"),
168 Ok("123 123!"),
169 Ok("123 123!"),
170 ];
171
172 let mut raised = TryDedupStream::new(futures::stream::iter(src));
173 assert_eq!(block_on(raised.try_next()), Ok(Some("hello")));
174 assert_eq!(block_on(raised.try_next()), Ok(Some("world!")));
175 assert_eq!(block_on(raised.try_next()), Ok(Some("123 123!")));
176 assert_eq!(block_on(raised.try_next()), Ok(None));
177 }
178
179 #[test]
180 fn test_dedup_err() {
181 let src: Vec<Result<&str, ()>> =
182 vec![Ok("hello"), Ok("hello"), Ok("abc z"), Err(()), Ok("abc")];
183 let mut raised = TryDedupStream::new(futures::stream::iter(src));
184 assert_eq!(block_on(raised.try_next()), Ok(Some("hello")));
185 assert_eq!(block_on(raised.try_next()), Ok(Some("abc z")));
186 assert_eq!(block_on(raised.try_next()), Err(()));
187 }
188}