multipart_write/stream/
write_complete.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::future::{FusedFuture, Future};
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use std::fmt::{self, Debug, Formatter};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10pin_project_lite::pin_project! {
11    /// Future for [`write_complete`](super::MultipartStreamExt::write_complete).
12    #[must_use = "futures do nothing unless polled"]
13    pub struct WriteComplete<St: Stream, Wr> {
14        #[pin]
15        writer: Wr,
16        #[pin]
17        stream: Option<St>,
18        buffered: Option<St::Item>,
19        is_terminated: bool,
20    }
21}
22
23impl<St: Stream, Wr> WriteComplete<St, Wr> {
24    pub(super) fn new(stream: St, writer: Wr) -> Self {
25        Self {
26            writer,
27            stream: Some(stream),
28            buffered: None,
29            is_terminated: false,
30        }
31    }
32}
33
34impl<St, Wr> FusedFuture for WriteComplete<St, Wr>
35where
36    Wr: FusedMultipartWrite<St::Item>,
37    St: FusedStream,
38{
39    fn is_terminated(&self) -> bool {
40        self.is_terminated
41    }
42}
43
44impl<St, Wr> Future for WriteComplete<St, Wr>
45where
46    Wr: MultipartWrite<St::Item>,
47    St: Stream,
48{
49    type Output = Result<Wr::Output, Wr::Error>;
50
51    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52        let mut this = self.project();
53
54        loop {
55            if this.buffered.is_some() {
56                let Poll::Ready(res) = this.writer.as_mut().poll_ready(cx) else {
57                    ready!(this.writer.poll_flush(cx))?;
58                    return Poll::Pending;
59                };
60                match res {
61                    Err(e) => return Poll::Ready(Err(e)),
62                    Ok(()) => {
63                        let _ = this
64                            .writer
65                            .as_mut()
66                            .start_send(this.buffered.take().unwrap())?;
67                    }
68                }
69            }
70
71            let Some(mut st) = this.stream.as_mut().as_pin_mut() else {
72                let output = ready!(this.writer.as_mut().poll_complete(cx));
73                *this.is_terminated = true;
74                return Poll::Ready(output);
75            };
76
77            match st.as_mut().poll_next(cx) {
78                Poll::Pending => {
79                    ready!(this.writer.poll_flush(cx))?;
80                    return Poll::Pending;
81                }
82                Poll::Ready(Some(it)) => *this.buffered = Some(it),
83                Poll::Ready(None) => {
84                    // Close the stream and start the process to complete
85                    // the write/future.
86                    this.stream.set(None);
87                    match this.writer.as_mut().poll_flush(cx) {
88                        Poll::Pending => return Poll::Pending,
89                        Poll::Ready(Ok(())) => {
90                            let output = ready!(this.writer.as_mut().poll_complete(cx));
91                            *this.is_terminated = true;
92                            return Poll::Ready(output);
93                        }
94                        Poll::Ready(Err(e)) => {
95                            *this.is_terminated = true;
96                            return Poll::Ready(Err(e));
97                        }
98                    }
99                }
100            }
101        }
102    }
103}
104
105impl<St, Wr> Debug for WriteComplete<St, Wr>
106where
107    St: Stream + Debug,
108    St::Item: Debug,
109    Wr: Debug,
110{
111    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
112        f.debug_struct("WriteComplete")
113            .field("writer", &self.writer)
114            .field("stream", &self.stream)
115            .field("buffered", &self.buffered)
116            .field("is_terminated", &self.is_terminated)
117            .finish()
118    }
119}