multipart_write/stream/
try_send.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10    /// Stream for [`try_send`].
11    ///
12    /// [`try_send`]: super::MultipartStreamExt::try_send
13    #[must_use = "futures do nothing unless polled"]
14    pub struct TrySend<St: Stream, Wr> {
15        #[pin]
16        stream: Option<St>,
17        #[pin]
18        writer: Wr,
19        buffered: Option<St::Item>,
20        is_terminated: bool,
21    }
22}
23
24impl<St: Stream, Wr> TrySend<St, Wr> {
25    pub(super) fn new(stream: St, writer: Wr) -> Self {
26        Self {
27            stream: Some(stream),
28            writer,
29            buffered: None,
30            is_terminated: false,
31        }
32    }
33
34    pub(super) fn poll_complete(
35        self: Pin<&mut Self>,
36        cx: &mut Context<'_>,
37    ) -> Poll<Result<Wr::Output, Wr::Error>>
38    where
39        Wr: MultipartWrite<St::Item>,
40    {
41        let mut this = self.project();
42        if this.buffered.is_some() {
43            ready!(this.writer.as_mut().poll_ready(cx))?;
44            let _ = this
45                .writer
46                .as_mut()
47                .start_send(this.buffered.take().unwrap())?;
48        }
49        ready!(this.writer.as_mut().poll_flush(cx))?;
50        let out = ready!(this.writer.as_mut().poll_complete(cx));
51        Poll::Ready(out)
52    }
53}
54
55impl<St, Wr> FusedStream for TrySend<St, Wr>
56where
57    St: Stream,
58    Wr: FusedMultipartWrite<St::Item>,
59{
60    fn is_terminated(&self) -> bool {
61        self.writer.is_terminated() || self.is_terminated
62    }
63}
64
65impl<St, Wr> Stream for TrySend<St, Wr>
66where
67    St: Stream,
68    Wr: MultipartWrite<St::Item>,
69{
70    type Item = Result<Wr::Ret, Wr::Error>;
71
72    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
73        let mut this = self.project();
74
75        loop {
76            if this.buffered.is_some() {
77                loop {
78                    match this.writer.as_mut().poll_ready(cx) {
79                        Poll::Ready(Ok(())) => break,
80                        Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
81                        Poll::Pending => match this.writer.as_mut().poll_flush(cx)? {
82                            Poll::Pending => return Poll::Pending,
83                            Poll::Ready(()) => {}
84                        },
85                    }
86                }
87
88                let ret = this
89                    .writer
90                    .as_mut()
91                    .start_send(this.buffered.take().unwrap())?;
92                return Poll::Ready(Some(Ok(ret)));
93            }
94
95            let Some(st) = this.stream.as_mut().as_pin_mut() else {
96                ready!(this.writer.as_mut().poll_flush(cx))?;
97                *this.is_terminated = true;
98                return Poll::Ready(None);
99            };
100
101            match ready!(st.poll_next(cx)) {
102                Some(it) => *this.buffered = Some(it),
103                None => this.stream.set(None),
104            }
105        }
106    }
107}
108
109impl<St: Stream, Wr> Debug for TrySend<St, Wr>
110where
111    St::Item: Debug,
112    St: Debug,
113    Wr: Debug,
114{
115    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
116        f.debug_struct("TrySend")
117            .field("stream", &self.stream)
118            .field("writer", &self.writer)
119            .field("buffered", &self.buffered)
120            .field("is_terminated", &self.is_terminated)
121            .finish()
122    }
123}