tokio_stream_util/try_stream/ext/
try_concat.rs1use core::future::Future;
2use core::mem;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use futures_core::future::FusedFuture;
6
7use super::{FusedStream, TryStream};
8
9#[derive(Debug)]
11#[must_use = "futures do nothing unless you `.await` or poll them"]
12pub struct TryConcat<St: TryStream> {
13 stream: St,
14 accum: Option<St::Ok>,
15}
16
17impl<St: TryStream + Unpin> Unpin for TryConcat<St> {}
18
19impl<St> TryConcat<St>
20where
21 St: TryStream,
22 St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
23{
24 pub(super) fn new(stream: St) -> Self {
25 Self {
26 stream,
27 accum: None,
28 }
29 }
30}
31
32impl<St> FusedFuture for TryConcat<St>
33where
34 St: TryStream + FusedStream,
35 St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
36{
37 fn is_terminated(&self) -> bool {
38 self.stream.is_terminated() && self.accum.is_none()
39 }
40}
41
42impl<St> Future for TryConcat<St>
43where
44 St: TryStream,
45 St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
46{
47 type Output = Result<St::Ok, St::Error>;
48
49 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50 let this = unsafe { self.get_unchecked_mut() };
51 let mut stream = unsafe { Pin::new_unchecked(&mut this.stream) };
52
53 loop {
54 match stream.as_mut().try_poll_next(cx) {
55 Poll::Ready(Some(Ok(x))) => {
56 if let Some(a) = &mut this.accum {
57 a.extend(x);
58 } else {
59 this.accum = Some(x);
60 }
61 }
62 Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
63 Poll::Ready(None) => {
64 return Poll::Ready(Ok(mem::take(&mut this.accum).unwrap_or_default()))
65 }
66 Poll::Pending => return Poll::Pending,
67 }
68 }
69 }
70}