futures_rx/stream_ext/
pairwise.rs1use std::{
2 pin::Pin,
3 rc::Rc,
4 task::{Context, Poll},
5};
6
7use futures::{
8 stream::{Fuse, FusedStream},
9 Stream, StreamExt,
10};
11use pin_project_lite::pin_project;
12
13use crate::EventLite;
14
15pin_project! {
16 #[must_use = "streams do nothing unless polled"]
18 pub struct Pairwise<S: Stream> {
19 #[pin]
20 stream: Fuse<S>,
21 previous: Option<Rc<S::Item>>,
22 }
23}
24
25impl<S: Stream> Pairwise<S> {
26 pub(crate) fn new(stream: S) -> Self {
27 Self {
28 stream: stream.fuse(),
29 previous: None,
30 }
31 }
32}
33
34impl<S> FusedStream for Pairwise<S>
35where
36 S: FusedStream,
37{
38 fn is_terminated(&self) -> bool {
39 self.stream.is_terminated()
40 }
41}
42
43impl<S> Stream for Pairwise<S>
44where
45 S: Stream,
46{
47 type Item = (S::Item, EventLite<S::Item>);
48
49 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 let mut this = self.project();
51
52 match this.stream.as_mut().poll_next(cx) {
53 Poll::Ready(Some(event)) => {
54 let next = Rc::new(event);
55
56 if let Some(prev) = this.previous.replace(Rc::clone(&next)) {
57 if let Ok(prev) = Rc::try_unwrap(prev) {
58 Poll::Ready(Some((prev, EventLite(next))))
59 } else {
60 unreachable!()
61 }
62 } else {
63 cx.waker().wake_by_ref();
64
65 Poll::Pending
66 }
67 }
68 Poll::Ready(None) => Poll::Ready(None),
69 Poll::Pending => Poll::Pending,
70 }
71 }
72
73 fn size_hint(&self) -> (usize, Option<usize>) {
74 let (a, b) = self.stream.size_hint();
75 let lower = if a > 0 { a - 1 } else { 0 };
76
77 (lower, b.map(|it| if it > 0 { it - 1 } else { 0 }))
78 }
79}
80
81#[cfg(test)]
82mod test {
83 use futures::{executor::block_on, stream, StreamExt};
84
85 use crate::RxExt;
86
87 #[test]
88 fn smoke() {
89 block_on(async {
90 let stream = stream::iter(0..=5);
91 let all_events = stream
92 .pairwise()
93 .map(|(prev, next)| (prev, *next))
94 .collect::<Vec<_>>()
95 .await;
96
97 assert_eq!(all_events, [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]);
98 });
99 }
100}