multipart_write/stream/
try_send.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10    /// Stream for [`try_send`].
11    ///
12    /// [`try_send`]: super::MultipartStreamExt::try_send
13    #[must_use = "futures do nothing unless polled"]
14    pub struct TrySend<St: Stream, Wr> {
15        #[pin]
16        stream: Option<St>,
17        #[pin]
18        writer: Wr,
19        buffered: Option<St::Item>,
20        is_terminated: bool,
21    }
22}
23
24impl<St: Stream, Wr> TrySend<St, Wr> {
25    pub(super) fn new(stream: St, writer: Wr) -> Self {
26        Self {
27            stream: Some(stream),
28            writer,
29            buffered: None,
30            is_terminated: false,
31        }
32    }
33
34    pub(super) fn poll_complete(
35        self: Pin<&mut Self>,
36        cx: &mut Context<'_>,
37    ) -> Poll<Result<Wr::Output, Wr::Error>>
38    where
39        Wr: MultipartWrite<St::Item>,
40    {
41        let mut this = self.project();
42        if this.buffered.is_some() {
43            ready!(this.writer.as_mut().poll_ready(cx))?;
44            let _ = this
45                .writer
46                .as_mut()
47                .start_send(this.buffered.take().unwrap())?;
48        }
49        ready!(this.writer.as_mut().poll_flush(cx))?;
50        let out = ready!(this.writer.as_mut().poll_complete(cx));
51        Poll::Ready(out)
52    }
53}
54
55impl<St, Wr> FusedStream for TrySend<St, Wr>
56where
57    St: Stream,
58    Wr: FusedMultipartWrite<St::Item>,
59{
60    fn is_terminated(&self) -> bool {
61        self.writer.is_terminated() || self.is_terminated
62    }
63}
64
65impl<St, Wr> Stream for TrySend<St, Wr>
66where
67    St: Stream,
68    Wr: MultipartWrite<St::Item>,
69{
70    type Item = Result<Wr::Ret, Wr::Error>;
71
72    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
73        let mut this = self.project();
74
75        loop {
76            if this.buffered.is_some() {
77                loop {
78                    // Poll the inner writer until ready in a loop because
79                    // `start_send` has to happen in the same call as getting a
80                    // `Poll::Ready(Ok(()))` from the writer.
81                    match this.writer.as_mut().poll_ready(cx) {
82                        Poll::Ready(Ok(())) => break,
83                        Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
84                        Poll::Pending => match this.writer.as_mut().poll_flush(cx)? {
85                            Poll::Pending => return Poll::Pending,
86                            Poll::Ready(()) => {}
87                        },
88                    }
89                }
90
91                let ret = this
92                    .writer
93                    .as_mut()
94                    .start_send(this.buffered.take().unwrap())?;
95                return Poll::Ready(Some(Ok(ret)));
96            }
97
98            let Some(st) = this.stream.as_mut().as_pin_mut() else {
99                // Nothing more to write since the stream is not producing and we
100                // have no buffered items to send.
101                ready!(this.writer.as_mut().poll_flush(cx))?;
102                *this.is_terminated = true;
103                return Poll::Ready(None);
104            };
105
106            match ready!(st.poll_next(cx)) {
107                Some(it) => *this.buffered = Some(it),
108                None => this.stream.set(None),
109            }
110        }
111    }
112}
113
114impl<St: Stream, Wr> Debug for TrySend<St, Wr>
115where
116    St::Item: Debug,
117    St: Debug,
118    Wr: Debug,
119{
120    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
121        f.debug_struct("TrySend")
122            .field("stream", &self.stream)
123            .field("writer", &self.writer)
124            .field("buffered", &self.buffered)
125            .field("is_terminated", &self.is_terminated)
126            .finish()
127    }
128}