Skip to main content

streamx/
distinct_until_changed.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures::Stream;
5
6/// A stream that only emits values when they differ from the previous value.
7///
8/// This stream filters out consecutive duplicate values, only yielding items
9/// when they are different from the immediately preceding item.
10///
11/// This is useful when you want to skip duplicate consecutive values and only
12/// react to changes.
13pub struct DistinctUntilChangedStream<T>
14where
15  T: Stream,
16  T::Item: Clone,
17{
18  source: Pin<Box<T>>,
19  previous: Option<T::Item>,
20}
21
22impl<T> Stream for DistinctUntilChangedStream<T>
23where
24  T: Stream,
25  T::Item: PartialEq + Clone,
26{
27  type Item = T::Item;
28
29  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30    let this = unsafe { self.get_unchecked_mut() };
31    loop {
32      match this.source.as_mut().poll_next(cx) {
33        Poll::Ready(Some(item)) => {
34          match &this.previous {
35            Some(previous) if previous == &item => {
36              // Skip this duplicate value
37              continue;
38            }
39            _ => {
40              // This is a new value, emit it
41              this.previous = Some(item.clone());
42              return Poll::Ready(Some(item));
43            }
44          }
45        }
46        Poll::Ready(None) => {
47          return Poll::Ready(None);
48        }
49        Poll::Pending => {
50          return Poll::Pending;
51        }
52      }
53    }
54  }
55}
56
57/// A stream that only emits values when they differ from the previous value,
58/// using a custom comparison function.
59///
60/// This stream filters out consecutive duplicate values based on a custom
61/// comparison function, only yielding items when the comparison function
62/// returns false (indicating they are different).
63pub struct DistinctUntilChangedByStream<T, F>
64where
65  T: Stream,
66  T::Item: Clone,
67{
68  source: Pin<Box<T>>,
69  compare: F,
70  previous: Option<T::Item>,
71}
72
73impl<T, F> Stream for DistinctUntilChangedByStream<T, F>
74where
75  T: Stream,
76  T::Item: Clone,
77  F: FnMut(&T::Item, &T::Item) -> bool,
78{
79  type Item = T::Item;
80
81  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82    let this = unsafe { self.get_unchecked_mut() };
83    loop {
84      match this.source.as_mut().poll_next(cx) {
85        Poll::Ready(Some(item)) => {
86          match &this.previous {
87            Some(previous) if (this.compare)(previous, &item) => {
88              // Skip this duplicate value
89              continue;
90            }
91            _ => {
92              // This is a new value, emit it
93              this.previous = Some(item.clone());
94              return Poll::Ready(Some(item));
95            }
96          }
97        }
98        Poll::Ready(None) => {
99          return Poll::Ready(None);
100        }
101        Poll::Pending => {
102          return Poll::Pending;
103        }
104      }
105    }
106  }
107}
108
109/// Extension trait that adds `.distinct_until_changed()` to streams.
110pub trait StreamDistinctUntilChangedExt: Stream + Sized {
111  /// Wraps this stream so that only values different from the previous value are yielded.
112  ///
113  /// This filters out consecutive duplicate values, only emitting items when they
114  /// differ from the immediately preceding item. Uses `PartialEq` for comparison.
115  ///
116  /// # Example
117  ///
118  /// ```
119  /// use futures::StreamExt;
120  /// use futures::executor::block_on;
121  /// use streamx::StreamDistinctUntilChangedExt;
122  ///
123  /// let mut stream = futures::stream::iter([1, 1, 2, 2, 3, 1, 1]).distinct_until_changed();
124  ///
125  /// assert_eq!(block_on(async { stream.next().await }), Some(1));
126  /// assert_eq!(block_on(async { stream.next().await }), Some(2));
127  /// assert_eq!(block_on(async { stream.next().await }), Some(3));
128  /// assert_eq!(block_on(async { stream.next().await }), Some(1));
129  /// assert_eq!(block_on(async { stream.next().await }), None);
130  /// ```
131  fn distinct_until_changed(self) -> DistinctUntilChangedStream<Self>
132  where
133    Self::Item: PartialEq + Clone,
134  {
135    DistinctUntilChangedStream {
136      source: Box::pin(self),
137      previous: None,
138    }
139  }
140
141  /// Like `distinct_until_changed()`, but uses a custom comparison function.
142  ///
143  /// The comparison function should return `true` if two items are considered equal
144  /// (and the second should be skipped), or `false` if they are different (and the
145  /// second should be emitted).
146  ///
147  /// # Example
148  ///
149  /// ```
150  /// use futures::StreamExt;
151  /// use futures::executor::block_on;
152  /// use streamx::StreamDistinctUntilChangedExt;
153  ///
154  /// let mut stream = futures::stream::iter([1, 2, 3, 4, 5])
155  ///   .distinct_until_changed_by(|a, b| a % 2 == b % 2);
156  ///
157  /// // Only emits when parity changes
158  /// assert_eq!(block_on(async { stream.next().await }), Some(1));
159  /// assert_eq!(block_on(async { stream.next().await }), Some(2));
160  /// assert_eq!(block_on(async { stream.next().await }), Some(3));
161  /// assert_eq!(block_on(async { stream.next().await }), Some(4));
162  /// assert_eq!(block_on(async { stream.next().await }), Some(5));
163  /// ```
164  fn distinct_until_changed_by<F>(self, compare: F) -> DistinctUntilChangedByStream<Self, F>
165  where
166    Self::Item: Clone,
167    F: FnMut(&Self::Item, &Self::Item) -> bool,
168  {
169    DistinctUntilChangedByStream {
170      source: Box::pin(self),
171      compare,
172      previous: None,
173    }
174  }
175}
176
177impl<T: Stream + Sized> StreamDistinctUntilChangedExt for T {}
178
179#[cfg(test)]
180mod tests {
181  use std::pin::Pin;
182  use std::task::{Context, Poll};
183
184  use futures::StreamExt;
185
186  use super::StreamDistinctUntilChangedExt;
187
188  struct MpscStream<T>(tokio::sync::mpsc::UnboundedReceiver<T>);
189
190  impl<T> futures::Stream for MpscStream<T> {
191    type Item = T;
192
193    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
194      self.0.poll_recv(cx)
195    }
196  }
197
198  #[tokio::test]
199  async fn distinct_until_changed_filters_consecutive_duplicates() {
200    let mut stream = futures::stream::iter([1, 1, 2, 2, 3, 1, 1]).distinct_until_changed();
201
202    assert_eq!(stream.next().await, Some(1));
203    assert_eq!(stream.next().await, Some(2));
204    assert_eq!(stream.next().await, Some(3));
205    assert_eq!(stream.next().await, Some(1));
206    assert_eq!(stream.next().await, None);
207  }
208
209  #[tokio::test]
210  async fn distinct_until_changed_emits_first_value() {
211    let mut stream = futures::stream::iter([1, 2, 3]).distinct_until_changed();
212
213    assert_eq!(stream.next().await, Some(1));
214    assert_eq!(stream.next().await, Some(2));
215    assert_eq!(stream.next().await, Some(3));
216    assert_eq!(stream.next().await, None);
217  }
218
219  #[tokio::test]
220  async fn distinct_until_changed_handles_all_duplicates() {
221    let mut stream = futures::stream::iter([1, 1, 1, 1]).distinct_until_changed();
222
223    assert_eq!(stream.next().await, Some(1));
224    assert_eq!(stream.next().await, None);
225  }
226
227  #[tokio::test]
228  async fn distinct_until_changed_handles_empty_stream() {
229    let mut stream = futures::stream::iter([] as [i32; 0]).distinct_until_changed();
230
231    assert_eq!(stream.next().await, None);
232  }
233
234  #[tokio::test]
235  async fn distinct_until_changed_handles_single_value() {
236    let mut stream = futures::stream::iter([42]).distinct_until_changed();
237
238    assert_eq!(stream.next().await, Some(42));
239    assert_eq!(stream.next().await, None);
240  }
241
242  #[tokio::test]
243  async fn distinct_until_changed_works_with_mpsc() {
244    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
245
246    let mut stream = MpscStream(rx).distinct_until_changed();
247
248    tx.send(1).unwrap();
249    assert_eq!(stream.next().await, Some(1));
250
251    tx.send(1).unwrap();
252    // Should skip this duplicate
253    tx.send(2).unwrap();
254    assert_eq!(stream.next().await, Some(2));
255
256    tx.send(2).unwrap();
257    tx.send(2).unwrap();
258    tx.send(3).unwrap();
259    assert_eq!(stream.next().await, Some(3));
260
261    drop(tx);
262    assert_eq!(stream.next().await, None);
263  }
264
265  #[tokio::test]
266  async fn distinct_until_changed_by_uses_custom_comparison() {
267    let mut stream =
268      futures::stream::iter([1, 2, 3, 4, 5]).distinct_until_changed_by(|a, b| a % 2 == b % 2);
269
270    // Should emit when parity changes
271    assert_eq!(stream.next().await, Some(1)); // odd
272    assert_eq!(stream.next().await, Some(2)); // even (parity changed)
273    assert_eq!(stream.next().await, Some(3)); // odd (parity changed)
274    assert_eq!(stream.next().await, Some(4)); // even (parity changed)
275    assert_eq!(stream.next().await, Some(5)); // odd (parity changed)
276    assert_eq!(stream.next().await, None);
277  }
278
279  #[tokio::test]
280  async fn distinct_until_changed_by_filters_based_on_custom_logic() {
281    let mut stream = futures::stream::iter([1, 2, 3, 4, 5, 6])
282      .distinct_until_changed_by(|a, b| (a / 3) == (b / 3));
283
284    // Should emit when the value divided by 3 changes
285    assert_eq!(stream.next().await, Some(1)); // 1/3 = 0
286    assert_eq!(stream.next().await, Some(3)); // 3/3 = 1 (changed)
287    assert_eq!(stream.next().await, Some(6)); // 6/3 = 2 (changed)
288    assert_eq!(stream.next().await, None);
289  }
290
291  #[tokio::test]
292  async fn distinct_until_changed_by_works_with_mpsc() {
293    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
294
295    let mut stream = MpscStream(rx).distinct_until_changed_by(|a, b| a == b);
296
297    tx.send(1).unwrap();
298    assert_eq!(stream.next().await, Some(1));
299
300    tx.send(1).unwrap();
301    tx.send(2).unwrap();
302    assert_eq!(stream.next().await, Some(2));
303
304    drop(tx);
305    assert_eq!(stream.next().await, None);
306  }
307}