streamies/try_streamies/
try_ready_result.rs

1use core::pin::Pin;
2use core::task::Context;
3use core::task::Poll;
4
5use futures::stream::FusedStream;
6use futures::Stream;
7use futures::TryStream;
8use pin_project_lite::pin_project;
9
10pin_project! {
11    /// Stream for the [`merge_round_robin`](crate::Streamies::merge_round_robin) method.
12    #[derive(Debug)]
13    #[must_use = "streams do nothing unless polled"]
14    pub struct TryReadyChunksResult<St> where St: TryStream{
15        #[pin]
16        stream: St,
17        cap: usize,
18        error: Option<St::Error>
19    }
20}
21
22impl<St> TryReadyChunksResult<St>
23where
24    St: TryStream,
25{
26    pub(super) fn new(stream: St, cap: usize) -> Self {
27        Self {
28            stream,
29            cap,
30            error: None,
31        }
32    }
33}
34
35impl<St> FusedStream for TryReadyChunksResult<St>
36where
37    St: FusedStream + TryStream + Stream<Item = Result<St::Ok, St::Error>>,
38{
39    fn is_terminated(&self) -> bool {
40        self.stream.is_terminated()
41    }
42}
43
44impl<St> Stream for TryReadyChunksResult<St>
45where
46    St: TryStream + Stream<Item = Result<St::Ok, St::Error>>, // Stream bound for the TryStream to have a Result item
47{
48    type Item = Result<Vec<St::Ok>, St::Error>;
49
50    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        let mut this = self.project();
52
53        // Return the error of the previous poll
54        if let Some(err) = this.error.take() {
55            return Poll::Ready(Some(Err(err)));
56        }
57
58        let mut items = Vec::new();
59
60        loop {
61            match this.stream.as_mut().poll_next(cx) {
62                // Flush all collected data if underlying stream doesn't contain
63                // more ready values
64                Poll::Pending => {
65                    return if items.is_empty() {
66                        Poll::Pending
67                    } else {
68                        Poll::Ready(Some(Ok(items)))
69                    }
70                }
71
72                // Push the ready item into the buffer and check whether it is full.
73                // If so, replace our buffer with a new and empty one and return
74                // the full one.
75                Poll::Ready(Some(Ok(item))) => {
76                    if items.is_empty() {
77                        items.reserve(*this.cap);
78                    }
79                    items.push(item);
80                    if items.len() >= *this.cap {
81                        return Poll::Ready(Some(Ok(items)));
82                    }
83                }
84
85                // Found an error! If we got items, we store it for next poll, and return our items
86                // Or else we return the error directly
87                Poll::Ready(Some(Err(item))) => {
88                    if items.is_empty() {
89                        return Poll::Ready(Some(Err(item)));
90                    }
91                    let _ = this.error.insert(item); // The previous error should be yielded earlier
92
93                    return Poll::Ready(Some(Ok(items)));
94                }
95
96                // Got a None. The stream is finished, so if we got values, we return them
97                Poll::Ready(None) => {
98                    if items.is_empty() {
99                        return Poll::Ready(None);
100                    }
101
102                    return Poll::Ready(Some(Ok(items)));
103                }
104            }
105        }
106    }
107
108    fn size_hint(&self) -> (usize, Option<usize>) {
109        if self.error.is_some() {
110            let (lower, upper) = self.stream.size_hint();
111            (lower + 1, upper.map(|v| v + 1))
112        } else {
113            self.stream.size_hint()
114        }
115    }
116}