1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures::Stream;
5
6pub struct DistinctUntilChangedStream<T>
14where
15 T: Stream,
16 T::Item: Clone,
17{
18 source: Pin<Box<T>>,
19 previous: Option<T::Item>,
20}
21
22impl<T> Stream for DistinctUntilChangedStream<T>
23where
24 T: Stream,
25 T::Item: PartialEq + Clone,
26{
27 type Item = T::Item;
28
29 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30 let this = unsafe { self.get_unchecked_mut() };
31 loop {
32 match this.source.as_mut().poll_next(cx) {
33 Poll::Ready(Some(item)) => {
34 match &this.previous {
35 Some(previous) if previous == &item => {
36 continue;
38 }
39 _ => {
40 this.previous = Some(item.clone());
42 return Poll::Ready(Some(item));
43 }
44 }
45 }
46 Poll::Ready(None) => {
47 return Poll::Ready(None);
48 }
49 Poll::Pending => {
50 return Poll::Pending;
51 }
52 }
53 }
54 }
55}
56
57pub struct DistinctUntilChangedByStream<T, F>
64where
65 T: Stream,
66 T::Item: Clone,
67{
68 source: Pin<Box<T>>,
69 compare: F,
70 previous: Option<T::Item>,
71}
72
73impl<T, F> Stream for DistinctUntilChangedByStream<T, F>
74where
75 T: Stream,
76 T::Item: Clone,
77 F: FnMut(&T::Item, &T::Item) -> bool,
78{
79 type Item = T::Item;
80
81 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82 let this = unsafe { self.get_unchecked_mut() };
83 loop {
84 match this.source.as_mut().poll_next(cx) {
85 Poll::Ready(Some(item)) => {
86 match &this.previous {
87 Some(previous) if (this.compare)(previous, &item) => {
88 continue;
90 }
91 _ => {
92 this.previous = Some(item.clone());
94 return Poll::Ready(Some(item));
95 }
96 }
97 }
98 Poll::Ready(None) => {
99 return Poll::Ready(None);
100 }
101 Poll::Pending => {
102 return Poll::Pending;
103 }
104 }
105 }
106 }
107}
108
109pub trait StreamDistinctUntilChangedExt: Stream + Sized {
111 fn distinct_until_changed(self) -> DistinctUntilChangedStream<Self>
132 where
133 Self::Item: PartialEq + Clone,
134 {
135 DistinctUntilChangedStream {
136 source: Box::pin(self),
137 previous: None,
138 }
139 }
140
141 fn distinct_until_changed_by<F>(self, compare: F) -> DistinctUntilChangedByStream<Self, F>
165 where
166 Self::Item: Clone,
167 F: FnMut(&Self::Item, &Self::Item) -> bool,
168 {
169 DistinctUntilChangedByStream {
170 source: Box::pin(self),
171 compare,
172 previous: None,
173 }
174 }
175}
176
177impl<T: Stream + Sized> StreamDistinctUntilChangedExt for T {}
178
179#[cfg(test)]
180mod tests {
181 use std::pin::Pin;
182 use std::task::{Context, Poll};
183
184 use futures::StreamExt;
185
186 use super::StreamDistinctUntilChangedExt;
187
188 struct MpscStream<T>(tokio::sync::mpsc::UnboundedReceiver<T>);
189
190 impl<T> futures::Stream for MpscStream<T> {
191 type Item = T;
192
193 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
194 self.0.poll_recv(cx)
195 }
196 }
197
198 #[tokio::test]
199 async fn distinct_until_changed_filters_consecutive_duplicates() {
200 let mut stream = futures::stream::iter([1, 1, 2, 2, 3, 1, 1]).distinct_until_changed();
201
202 assert_eq!(stream.next().await, Some(1));
203 assert_eq!(stream.next().await, Some(2));
204 assert_eq!(stream.next().await, Some(3));
205 assert_eq!(stream.next().await, Some(1));
206 assert_eq!(stream.next().await, None);
207 }
208
209 #[tokio::test]
210 async fn distinct_until_changed_emits_first_value() {
211 let mut stream = futures::stream::iter([1, 2, 3]).distinct_until_changed();
212
213 assert_eq!(stream.next().await, Some(1));
214 assert_eq!(stream.next().await, Some(2));
215 assert_eq!(stream.next().await, Some(3));
216 assert_eq!(stream.next().await, None);
217 }
218
219 #[tokio::test]
220 async fn distinct_until_changed_handles_all_duplicates() {
221 let mut stream = futures::stream::iter([1, 1, 1, 1]).distinct_until_changed();
222
223 assert_eq!(stream.next().await, Some(1));
224 assert_eq!(stream.next().await, None);
225 }
226
227 #[tokio::test]
228 async fn distinct_until_changed_handles_empty_stream() {
229 let mut stream = futures::stream::iter([] as [i32; 0]).distinct_until_changed();
230
231 assert_eq!(stream.next().await, None);
232 }
233
234 #[tokio::test]
235 async fn distinct_until_changed_handles_single_value() {
236 let mut stream = futures::stream::iter([42]).distinct_until_changed();
237
238 assert_eq!(stream.next().await, Some(42));
239 assert_eq!(stream.next().await, None);
240 }
241
242 #[tokio::test]
243 async fn distinct_until_changed_works_with_mpsc() {
244 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
245
246 let mut stream = MpscStream(rx).distinct_until_changed();
247
248 tx.send(1).unwrap();
249 assert_eq!(stream.next().await, Some(1));
250
251 tx.send(1).unwrap();
252 tx.send(2).unwrap();
254 assert_eq!(stream.next().await, Some(2));
255
256 tx.send(2).unwrap();
257 tx.send(2).unwrap();
258 tx.send(3).unwrap();
259 assert_eq!(stream.next().await, Some(3));
260
261 drop(tx);
262 assert_eq!(stream.next().await, None);
263 }
264
265 #[tokio::test]
266 async fn distinct_until_changed_by_uses_custom_comparison() {
267 let mut stream =
268 futures::stream::iter([1, 2, 3, 4, 5]).distinct_until_changed_by(|a, b| a % 2 == b % 2);
269
270 assert_eq!(stream.next().await, Some(1)); assert_eq!(stream.next().await, Some(2)); assert_eq!(stream.next().await, Some(3)); assert_eq!(stream.next().await, Some(4)); assert_eq!(stream.next().await, Some(5)); assert_eq!(stream.next().await, None);
277 }
278
279 #[tokio::test]
280 async fn distinct_until_changed_by_filters_based_on_custom_logic() {
281 let mut stream = futures::stream::iter([1, 2, 3, 4, 5, 6])
282 .distinct_until_changed_by(|a, b| (a / 3) == (b / 3));
283
284 assert_eq!(stream.next().await, Some(1)); assert_eq!(stream.next().await, Some(3)); assert_eq!(stream.next().await, Some(6)); assert_eq!(stream.next().await, None);
289 }
290
291 #[tokio::test]
292 async fn distinct_until_changed_by_works_with_mpsc() {
293 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
294
295 let mut stream = MpscStream(rx).distinct_until_changed_by(|a, b| a == b);
296
297 tx.send(1).unwrap();
298 assert_eq!(stream.next().await, Some(1));
299
300 tx.send(1).unwrap();
301 tx.send(2).unwrap();
302 assert_eq!(stream.next().await, Some(2));
303
304 drop(tx);
305 assert_eq!(stream.next().await, None);
306 }
307}