futures_rx/stream_ext/
materialize.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{stream::FusedStream, Stream};
7use pin_project_lite::pin_project;
8
9use crate::Notification;
10
11pin_project! {
12    /// Stream for the [`start_with`](RxStreamExt::start_with) method.
13    #[must_use = "streams do nothing unless polled"]
14    pub struct Materialize<S: Stream> {
15        #[pin]
16        stream: S,
17        did_complete: bool,
18    }
19}
20
21impl<S: Stream> Materialize<S> {
22    pub(crate) fn new(stream: S) -> Self {
23        Self {
24            stream,
25            did_complete: false,
26        }
27    }
28}
29
30impl<S: FusedStream> FusedStream for Materialize<S> {
31    fn is_terminated(&self) -> bool {
32        self.stream.is_terminated()
33    }
34}
35
36impl<S: Stream> Stream for Materialize<S> {
37    type Item = Notification<S::Item>;
38
39    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40        let mut this = self.project();
41
42        match this.stream.as_mut().poll_next(cx) {
43            Poll::Ready(Some(event)) => Poll::Ready(Some(Notification::Next(event))),
44            Poll::Ready(None) => {
45                if *this.did_complete {
46                    Poll::Ready(None)
47                } else {
48                    *this.did_complete = true;
49                    Poll::Ready(Some(Notification::Complete))
50                }
51            }
52            Poll::Pending => Poll::Pending,
53        }
54    }
55
56    fn size_hint(&self) -> (usize, Option<usize>) {
57        let (a, b) = self.stream.size_hint();
58
59        (a + 1, b.map(|it| it + 1))
60    }
61}
62
63#[cfg(test)]
64mod test {
65    use futures::{executor::block_on, stream, StreamExt};
66
67    use crate::{Notification, RxExt};
68
69    #[test]
70    fn smoke() {
71        block_on(async {
72            let stream = stream::iter(1..=2);
73            let all_events = stream.materialize().collect::<Vec<_>>().await;
74
75            assert_eq!(
76                all_events,
77                [
78                    Notification::Next(1),
79                    Notification::Next(2),
80                    Notification::Complete
81                ]
82            );
83        });
84    }
85}