multipart_write/stream/
collect_completed.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures::future::{FusedFuture, Future};
4use futures::ready;
5use futures::stream::{Fuse, Stream, StreamExt};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9#[derive(Debug)]
11#[must_use = "futures do nothing unless polled"]
12#[pin_project::pin_project(project = CollectCompletedProj)]
13pub struct CollectCompleted<St, Wr, Part> {
14 #[pin]
15 writer: Option<Wr>,
16 #[pin]
17 stream: Fuse<St>,
18 buffered: Option<Part>,
19}
20
21impl<St: Stream, Wr, Part> CollectCompleted<St, Wr, Part> {
22 pub(super) fn new(stream: St, writer: Wr) -> Self {
23 Self {
24 writer: Some(writer),
25 stream: stream.fuse(),
26 buffered: None,
27 }
28 }
29}
30
31impl<St, Wr, Part> FusedFuture for CollectCompleted<St, Wr, Part>
32where
33 Wr: FusedMultipartWrite<Part>,
34 St: Stream<Item = Result<Part, Wr::Error>>,
35{
36 fn is_terminated(&self) -> bool {
37 self.writer.as_ref().is_none_or(|wr| wr.is_terminated())
38 }
39}
40
41impl<St, Wr, Part> Future for CollectCompleted<St, Wr, Part>
42where
43 Wr: MultipartWrite<Part>,
44 St: Stream<Item = Result<Part, Wr::Error>>,
45{
46 type Output = Result<Wr::Output, Wr::Error>;
47
48 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49 let CollectCompletedProj {
50 mut writer,
51 mut stream,
52 buffered,
53 } = self.project();
54 let mut wr = writer
55 .as_mut()
56 .as_pin_mut()
57 .expect("polled `CollectCompleted` after completion");
58
59 loop {
60 if buffered.is_some() {
61 ready!(wr.as_mut().poll_ready(cx))?;
62 let _ = wr.as_mut().start_send(buffered.take().unwrap())?;
63 }
64
65 match stream.as_mut().poll_next(cx)? {
66 Poll::Ready(Some(it)) => {
67 *buffered = Some(it);
68 }
69 Poll::Ready(None) => {
70 let output = ready!(wr.poll_complete(cx))?;
71 writer.set(None);
72 return Poll::Ready(Ok(output));
73 }
74 Poll::Pending => {
75 ready!(wr.poll_flush(cx))?;
76 return Poll::Pending;
77 }
78 }
79 }
80 }
81}