futures_rx/stream_ext/
materialize.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::{stream::FusedStream, Stream};
7use pin_project_lite::pin_project;
8
9use crate::Notification;
10
11pin_project! {
12 #[must_use = "streams do nothing unless polled"]
14 pub struct Materialize<S: Stream> {
15 #[pin]
16 stream: S,
17 did_complete: bool,
18 }
19}
20
21impl<S: Stream> Materialize<S> {
22 pub(crate) fn new(stream: S) -> Self {
23 Self {
24 stream,
25 did_complete: false,
26 }
27 }
28}
29
30impl<S: FusedStream> FusedStream for Materialize<S> {
31 fn is_terminated(&self) -> bool {
32 self.stream.is_terminated()
33 }
34}
35
36impl<S: Stream> Stream for Materialize<S> {
37 type Item = Notification<S::Item>;
38
39 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40 let mut this = self.project();
41
42 match this.stream.as_mut().poll_next(cx) {
43 Poll::Ready(Some(event)) => Poll::Ready(Some(Notification::Next(event))),
44 Poll::Ready(None) => {
45 if *this.did_complete {
46 Poll::Ready(None)
47 } else {
48 *this.did_complete = true;
49 Poll::Ready(Some(Notification::Complete))
50 }
51 }
52 Poll::Pending => Poll::Pending,
53 }
54 }
55
56 fn size_hint(&self) -> (usize, Option<usize>) {
57 let (a, b) = self.stream.size_hint();
58
59 (a + 1, b.map(|it| it + 1))
60 }
61}
62
63#[cfg(test)]
64mod test {
65 use futures::{executor::block_on, stream, StreamExt};
66
67 use crate::{Notification, RxExt};
68
69 #[test]
70 fn smoke() {
71 block_on(async {
72 let stream = stream::iter(1..=2);
73 let all_events = stream.materialize().collect::<Vec<_>>().await;
74
75 assert_eq!(
76 all_events,
77 [
78 Notification::Next(1),
79 Notification::Next(2),
80 Notification::Complete
81 ]
82 );
83 });
84 }
85}