streamies/try_streamies/
try_ready_result.rs1use core::pin::Pin;
2use core::task::Context;
3use core::task::Poll;
4
5use futures::stream::FusedStream;
6use futures::Stream;
7use futures::TryStream;
8use pin_project_lite::pin_project;
9
10pin_project! {
11 #[derive(Debug)]
13 #[must_use = "streams do nothing unless polled"]
14 pub struct TryReadyChunksResult<St> where St: TryStream{
15 #[pin]
16 stream: St,
17 cap: usize,
18 error: Option<St::Error>
19 }
20}
21
22impl<St> TryReadyChunksResult<St>
23where
24 St: TryStream,
25{
26 pub(super) fn new(stream: St, cap: usize) -> Self {
27 Self {
28 stream,
29 cap,
30 error: None,
31 }
32 }
33}
34
35impl<St> FusedStream for TryReadyChunksResult<St>
36where
37 St: FusedStream + TryStream + Stream<Item = Result<St::Ok, St::Error>>,
38{
39 fn is_terminated(&self) -> bool {
40 self.stream.is_terminated()
41 }
42}
43
44impl<St> Stream for TryReadyChunksResult<St>
45where
46 St: TryStream + Stream<Item = Result<St::Ok, St::Error>>, {
48 type Item = Result<Vec<St::Ok>, St::Error>;
49
50 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 let mut this = self.project();
52
53 if let Some(err) = this.error.take() {
55 return Poll::Ready(Some(Err(err)));
56 }
57
58 let mut items = Vec::new();
59
60 loop {
61 match this.stream.as_mut().poll_next(cx) {
62 Poll::Pending => {
65 return if items.is_empty() {
66 Poll::Pending
67 } else {
68 Poll::Ready(Some(Ok(items)))
69 }
70 }
71
72 Poll::Ready(Some(Ok(item))) => {
76 if items.is_empty() {
77 items.reserve(*this.cap);
78 }
79 items.push(item);
80 if items.len() >= *this.cap {
81 return Poll::Ready(Some(Ok(items)));
82 }
83 }
84
85 Poll::Ready(Some(Err(item))) => {
88 if items.is_empty() {
89 return Poll::Ready(Some(Err(item)));
90 }
91 let _ = this.error.insert(item); return Poll::Ready(Some(Ok(items)));
94 }
95
96 Poll::Ready(None) => {
98 if items.is_empty() {
99 return Poll::Ready(None);
100 }
101
102 return Poll::Ready(Some(Ok(items)));
103 }
104 }
105 }
106 }
107
108 fn size_hint(&self) -> (usize, Option<usize>) {
109 if self.error.is_some() {
110 let (lower, upper) = self.stream.size_hint();
111 (lower + 1, upper.map(|v| v + 1))
112 } else {
113 self.stream.size_hint()
114 }
115 }
116}