multipart_write/stream/
write_complete.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::future::{FusedFuture, Future};
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use std::fmt::{self, Debug, Formatter};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10pin_project_lite::pin_project! {
11 #[must_use = "futures do nothing unless polled"]
13 pub struct WriteComplete<St: Stream, Wr> {
14 #[pin]
15 writer: Wr,
16 #[pin]
17 stream: Option<St>,
18 buffered: Option<St::Item>,
19 is_terminated: bool,
20 }
21}
22
23impl<St: Stream, Wr> WriteComplete<St, Wr> {
24 pub(super) fn new(stream: St, writer: Wr) -> Self {
25 Self {
26 writer,
27 stream: Some(stream),
28 buffered: None,
29 is_terminated: false,
30 }
31 }
32}
33
34impl<St, Wr> FusedFuture for WriteComplete<St, Wr>
35where
36 Wr: FusedMultipartWrite<St::Item>,
37 St: FusedStream,
38{
39 fn is_terminated(&self) -> bool {
40 self.is_terminated
41 }
42}
43
44impl<St, Wr> Future for WriteComplete<St, Wr>
45where
46 Wr: MultipartWrite<St::Item>,
47 St: Stream,
48{
49 type Output = Result<Wr::Output, Wr::Error>;
50
51 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52 let mut this = self.project();
53
54 loop {
55 if this.buffered.is_some() {
56 let Poll::Ready(res) = this.writer.as_mut().poll_ready(cx) else {
57 ready!(this.writer.poll_flush(cx))?;
58 return Poll::Pending;
59 };
60 match res {
61 Err(e) => return Poll::Ready(Err(e)),
62 Ok(()) => {
63 let _ = this
64 .writer
65 .as_mut()
66 .start_send(this.buffered.take().unwrap())?;
67 }
68 }
69 }
70
71 let Some(mut st) = this.stream.as_mut().as_pin_mut() else {
72 let output = ready!(this.writer.as_mut().poll_complete(cx));
73 *this.is_terminated = true;
74 return Poll::Ready(output);
75 };
76
77 match st.as_mut().poll_next(cx) {
78 Poll::Pending => {
79 ready!(this.writer.poll_flush(cx))?;
80 return Poll::Pending;
81 }
82 Poll::Ready(Some(it)) => *this.buffered = Some(it),
83 Poll::Ready(None) => {
84 this.stream.set(None);
87 match this.writer.as_mut().poll_flush(cx) {
88 Poll::Pending => return Poll::Pending,
89 Poll::Ready(Ok(())) => {
90 let output = ready!(this.writer.as_mut().poll_complete(cx));
91 *this.is_terminated = true;
92 return Poll::Ready(output);
93 }
94 Poll::Ready(Err(e)) => {
95 *this.is_terminated = true;
96 return Poll::Ready(Err(e));
97 }
98 }
99 }
100 }
101 }
102 }
103}
104
105impl<St, Wr> Debug for WriteComplete<St, Wr>
106where
107 St: Stream + Debug,
108 St::Item: Debug,
109 Wr: Debug,
110{
111 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
112 f.debug_struct("WriteComplete")
113 .field("writer", &self.writer)
114 .field("stream", &self.stream)
115 .field("buffered", &self.buffered)
116 .field("is_terminated", &self.is_terminated)
117 .finish()
118 }
119}