tokio_stream_util/try_stream/ext/
try_concat.rs

1use core::future::Future;
2use core::mem;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use futures_core::future::FusedFuture;
6
7use super::{FusedStream, TryStream};
8
9/// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
10#[derive(Debug)]
11#[must_use = "futures do nothing unless you `.await` or poll them"]
12pub struct TryConcat<St: TryStream> {
13    stream: St,
14    accum: Option<St::Ok>,
15}
16
17impl<St: TryStream + Unpin> Unpin for TryConcat<St> {}
18
19impl<St> TryConcat<St>
20where
21    St: TryStream,
22    St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
23{
24    pub(super) fn new(stream: St) -> Self {
25        Self {
26            stream,
27            accum: None,
28        }
29    }
30}
31
32impl<St> FusedFuture for TryConcat<St>
33where
34    St: TryStream + FusedStream,
35    St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
36{
37    fn is_terminated(&self) -> bool {
38        self.stream.is_terminated() && self.accum.is_none()
39    }
40}
41
42impl<St> Future for TryConcat<St>
43where
44    St: TryStream,
45    St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
46{
47    type Output = Result<St::Ok, St::Error>;
48
49    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50        let this = unsafe { self.get_unchecked_mut() };
51        let mut stream = unsafe { Pin::new_unchecked(&mut this.stream) };
52
53        loop {
54            match stream.as_mut().try_poll_next(cx) {
55                Poll::Ready(Some(Ok(x))) => {
56                    if let Some(a) = &mut this.accum {
57                        a.extend(x);
58                    } else {
59                        this.accum = Some(x);
60                    }
61                }
62                Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
63                Poll::Ready(None) => {
64                    return Poll::Ready(Ok(mem::take(&mut this.accum).unwrap_or_default()))
65                }
66                Poll::Pending => return Poll::Pending,
67            }
68        }
69    }
70}