jstream_ext/
dedup.rs

1use crate::op_prelude::*;
2use std::collections::hash_map::RandomState;
3use std::collections::HashSet;
4use std::hash::{BuildHasher, Hash, Hasher};
5
6pin_project! {
7    /// Stream for the [`try_dedup`](super::ext::JTryStreamExt::try_dedup) method
8    #[must_use = "streams do nothing unless polled"]
9    pub struct TryDedupStream<S> {
10        #[pin]
11        src: S,
12        size_hint: (usize, Option<usize>),
13        known: HashSet<u64>,
14        hasher: RandomState,
15    }
16}
17
18impl<S> Stream for TryDedupStream<S>
19where
20    S: TryStream,
21    S::Ok: Hash,
22{
23    type Item = Result<S::Ok, S::Error>;
24
25    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26        let mut this = self.project();
27        Poll::Ready(loop {
28            match ready!(this.src.as_mut().try_poll_next(cx)) {
29                Some(Ok(v)) => if this.known.insert(hash(&*this.hasher, &v)) {
30                    break Some(Ok(v));
31                }
32                other => break other,
33            }
34        })
35    }
36
37    fn size_hint(&self) -> (usize, Option<usize>) {
38        self.size_hint
39    }
40}
41
42impl<S> FusedStream for TryDedupStream<S>
43where
44    S: TryStream + FusedStream,
45    S::Ok: Hash,
46{
47    delegate_fused!(src);
48}
49
50#[cfg(feature = "sink")]
51impl<S, Item, E> Sink<Item> for TryDedupStream<S>
52where
53    S: Sink<Item, Error=E> + TryStream,
54    S::Ok: Hash
55{
56    delegate_sink!(src, E, Item);
57}
58
59impl<S> TryDedupStream<S>
60where
61    S: TryStream,
62    S::Ok: Hash,
63{
64    //noinspection DuplicatedCode
65    pub(crate) fn new(src: S) -> Self {
66        let size_hint = src.size_hint();
67        Self {
68            src,
69            size_hint,
70            hasher: RandomState::default(),
71            known: HashSet::default(),
72        }
73    }
74}
75
76pin_project! {
77    /// Stream for the [`dedup`](super::ext::JStreamExt::dedup) method
78    #[must_use = "streams do nothing unless polled"]
79    pub struct DedupStream<S> {
80        #[pin]
81        src: S,
82        size_hint: (usize, Option<usize>),
83        known: HashSet<u64>,
84        hasher: RandomState,
85    }
86}
87
88impl<S> Stream for DedupStream<S>
89where
90    S: Stream,
91    S::Item: Hash,
92{
93    type Item = S::Item;
94
95    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96        let mut this = self.project();
97        Poll::Ready(loop {
98            if let Some(next) = ready!(this.src.as_mut().poll_next(cx)) {
99                if this.known.insert(hash(&*this.hasher, &next)) {
100                    break Some(next);
101                }
102            } else {
103                break None;
104            }
105        })
106    }
107
108    fn size_hint(&self) -> (usize, Option<usize>) {
109        self.size_hint
110    }
111}
112
113impl<S> FusedStream for DedupStream<S>
114where
115    S: Stream + FusedStream,
116    S::Item: Hash
117{
118    delegate_fused!(src);
119}
120
121#[cfg(feature = "sink")]
122impl<S, Item> Sink<Item> for DedupStream<S>
123where
124    S: Sink<Item> + Stream,
125    S::Item: Hash
126{
127    delegate_sink!(src, S::Error, Item);
128}
129
130impl<S> DedupStream<S>
131where
132    S: Stream,
133{
134    //noinspection DuplicatedCode
135    pub(crate) fn new(src: S) -> Self {
136        let size_hint = src.size_hint();
137        Self {
138            src,
139            size_hint,
140            hasher: RandomState::default(),
141            known: HashSet::default(),
142        }
143    }
144}
145
146fn hash<H>(hasher: &RandomState, value: &H) -> u64
147where
148    H: Hash,
149{
150    let mut hasher = hasher.build_hasher();
151    value.hash(&mut hasher);
152    hasher.finish()
153}
154
155#[cfg(test)]
156mod tests {
157    use super::TryDedupStream;
158    use futures::executor::block_on;
159    use futures::TryStreamExt;
160
161    #[test]
162    fn test_dedup_simple() {
163        let src: Vec<Result<&str, ()>> = vec![
164            Ok("hello"),
165            Ok("hello"),
166            Ok("world!"),
167            Ok("world!"),
168            Ok("123 123!"),
169            Ok("123 123!"),
170        ];
171
172        let mut raised = TryDedupStream::new(futures::stream::iter(src));
173        assert_eq!(block_on(raised.try_next()), Ok(Some("hello")));
174        assert_eq!(block_on(raised.try_next()), Ok(Some("world!")));
175        assert_eq!(block_on(raised.try_next()), Ok(Some("123 123!")));
176        assert_eq!(block_on(raised.try_next()), Ok(None));
177    }
178
179    #[test]
180    fn test_dedup_err() {
181        let src: Vec<Result<&str, ()>> =
182            vec![Ok("hello"), Ok("hello"), Ok("abc z"), Err(()), Ok("abc")];
183        let mut raised = TryDedupStream::new(futures::stream::iter(src));
184        assert_eq!(block_on(raised.try_next()), Ok(Some("hello")));
185        assert_eq!(block_on(raised.try_next()), Ok(Some("abc z")));
186        assert_eq!(block_on(raised.try_next()), Err(()));
187    }
188}