multipart_write/stream/
collect_completed.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures::future::{FusedFuture, Future};
4use futures::ready;
5use futures::stream::{Fuse, Stream, StreamExt};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9/// Future for [`collect_completed`](super::MultipartStreamExt::collect_completed).
10#[derive(Debug)]
11#[must_use = "futures do nothing unless polled"]
12#[pin_project::pin_project(project = CollectCompletedProj)]
13pub struct CollectCompleted<St, Wr, Part> {
14    #[pin]
15    writer: Option<Wr>,
16    #[pin]
17    stream: Fuse<St>,
18    buffered: Option<Part>,
19}
20
21impl<St: Stream, Wr, Part> CollectCompleted<St, Wr, Part> {
22    pub(super) fn new(stream: St, writer: Wr) -> Self {
23        Self {
24            writer: Some(writer),
25            stream: stream.fuse(),
26            buffered: None,
27        }
28    }
29}
30
31impl<St, Wr, Part> FusedFuture for CollectCompleted<St, Wr, Part>
32where
33    Wr: FusedMultipartWrite<Part>,
34    St: Stream<Item = Result<Part, Wr::Error>>,
35{
36    fn is_terminated(&self) -> bool {
37        self.writer.as_ref().is_none_or(|wr| wr.is_terminated())
38    }
39}
40
41impl<St, Wr, Part> Future for CollectCompleted<St, Wr, Part>
42where
43    Wr: MultipartWrite<Part>,
44    St: Stream<Item = Result<Part, Wr::Error>>,
45{
46    type Output = Result<Wr::Output, Wr::Error>;
47
48    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49        let CollectCompletedProj {
50            mut writer,
51            mut stream,
52            buffered,
53        } = self.project();
54        let mut wr = writer
55            .as_mut()
56            .as_pin_mut()
57            .expect("polled `CollectCompleted` after completion");
58
59        loop {
60            if buffered.is_some() {
61                ready!(wr.as_mut().poll_ready(cx))?;
62                let _ = wr.as_mut().start_send(buffered.take().unwrap())?;
63            }
64
65            match stream.as_mut().poll_next(cx)? {
66                Poll::Ready(Some(it)) => {
67                    *buffered = Some(it);
68                }
69                Poll::Ready(None) => {
70                    let output = ready!(wr.poll_complete(cx))?;
71                    writer.set(None);
72                    return Poll::Ready(Ok(output));
73                }
74                Poll::Pending => {
75                    ready!(wr.poll_flush(cx))?;
76                    return Poll::Pending;
77                }
78            }
79        }
80    }
81}