multipart_write/stream/
try_send.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
14 pub struct TrySend<St: Stream, Wr> {
15 #[pin]
16 stream: Option<St>,
17 #[pin]
18 writer: Wr,
19 buffered: Option<St::Item>,
20 is_terminated: bool,
21 }
22}
23
24impl<St: Stream, Wr> TrySend<St, Wr> {
25 pub(super) fn new(stream: St, writer: Wr) -> Self {
26 Self {
27 stream: Some(stream),
28 writer,
29 buffered: None,
30 is_terminated: false,
31 }
32 }
33
34 pub(super) fn poll_complete(
35 self: Pin<&mut Self>,
36 cx: &mut Context<'_>,
37 ) -> Poll<Result<Wr::Output, Wr::Error>>
38 where
39 Wr: MultipartWrite<St::Item>,
40 {
41 let mut this = self.project();
42 if this.buffered.is_some() {
43 ready!(this.writer.as_mut().poll_ready(cx))?;
44 let _ = this
45 .writer
46 .as_mut()
47 .start_send(this.buffered.take().unwrap())?;
48 }
49 ready!(this.writer.as_mut().poll_flush(cx))?;
50 let out = ready!(this.writer.as_mut().poll_complete(cx));
51 Poll::Ready(out)
52 }
53}
54
55impl<St, Wr> FusedStream for TrySend<St, Wr>
56where
57 St: Stream,
58 Wr: FusedMultipartWrite<St::Item>,
59{
60 fn is_terminated(&self) -> bool {
61 self.writer.is_terminated() || self.is_terminated
62 }
63}
64
65impl<St, Wr> Stream for TrySend<St, Wr>
66where
67 St: Stream,
68 Wr: MultipartWrite<St::Item>,
69{
70 type Item = Result<Wr::Ret, Wr::Error>;
71
72 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
73 let mut this = self.project();
74
75 loop {
76 if this.buffered.is_some() {
77 loop {
78 match this.writer.as_mut().poll_ready(cx) {
82 Poll::Ready(Ok(())) => break,
83 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
84 Poll::Pending => match this.writer.as_mut().poll_flush(cx)? {
85 Poll::Pending => return Poll::Pending,
86 Poll::Ready(()) => {}
87 },
88 }
89 }
90
91 let ret = this
92 .writer
93 .as_mut()
94 .start_send(this.buffered.take().unwrap())?;
95 return Poll::Ready(Some(Ok(ret)));
96 }
97
98 let Some(st) = this.stream.as_mut().as_pin_mut() else {
99 ready!(this.writer.as_mut().poll_flush(cx))?;
102 *this.is_terminated = true;
103 return Poll::Ready(None);
104 };
105
106 match ready!(st.poll_next(cx)) {
107 Some(it) => *this.buffered = Some(it),
108 None => this.stream.set(None),
109 }
110 }
111 }
112}
113
114impl<St: Stream, Wr> Debug for TrySend<St, Wr>
115where
116 St::Item: Debug,
117 St: Debug,
118 Wr: Debug,
119{
120 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
121 f.debug_struct("TrySend")
122 .field("stream", &self.stream)
123 .field("writer", &self.writer)
124 .field("buffered", &self.buffered)
125 .field("is_terminated", &self.is_terminated)
126 .finish()
127 }
128}