futures_rx/stream_ext/
pairwise.rs

1use std::{
2    pin::Pin,
3    rc::Rc,
4    task::{Context, Poll},
5};
6
7use futures::{
8    stream::{Fuse, FusedStream},
9    Stream, StreamExt,
10};
11use pin_project_lite::pin_project;
12
13use crate::EventLite;
14
15pin_project! {
16    /// Stream for the [`pairwise`](RxStreamExt::pairwise) method.
17    #[must_use = "streams do nothing unless polled"]
18    pub struct Pairwise<S: Stream> {
19        #[pin]
20        stream: Fuse<S>,
21        previous: Option<Rc<S::Item>>,
22    }
23}
24
25impl<S: Stream> Pairwise<S> {
26    pub(crate) fn new(stream: S) -> Self {
27        Self {
28            stream: stream.fuse(),
29            previous: None,
30        }
31    }
32}
33
34impl<S> FusedStream for Pairwise<S>
35where
36    S: FusedStream,
37{
38    fn is_terminated(&self) -> bool {
39        self.stream.is_terminated()
40    }
41}
42
43impl<S> Stream for Pairwise<S>
44where
45    S: Stream,
46{
47    type Item = (S::Item, EventLite<S::Item>);
48
49    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        let mut this = self.project();
51
52        match this.stream.as_mut().poll_next(cx) {
53            Poll::Ready(Some(event)) => {
54                let next = Rc::new(event);
55
56                if let Some(prev) = this.previous.replace(Rc::clone(&next)) {
57                    if let Ok(prev) = Rc::try_unwrap(prev) {
58                        Poll::Ready(Some((prev, EventLite(next))))
59                    } else {
60                        unreachable!()
61                    }
62                } else {
63                    cx.waker().wake_by_ref();
64
65                    Poll::Pending
66                }
67            }
68            Poll::Ready(None) => Poll::Ready(None),
69            Poll::Pending => Poll::Pending,
70        }
71    }
72
73    fn size_hint(&self) -> (usize, Option<usize>) {
74        let (a, b) = self.stream.size_hint();
75        let lower = if a > 0 { a - 1 } else { 0 };
76
77        (lower, b.map(|it| if it > 0 { it - 1 } else { 0 }))
78    }
79}
80
81#[cfg(test)]
82mod test {
83    use futures::{executor::block_on, stream, StreamExt};
84
85    use crate::RxExt;
86
87    #[test]
88    fn smoke() {
89        block_on(async {
90            let stream = stream::iter(0..=5);
91            let all_events = stream
92                .pairwise()
93                .map(|(prev, next)| (prev, *next))
94                .collect::<Vec<_>>()
95                .await;
96
97            assert_eq!(all_events, [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]);
98        });
99    }
100}