multipart_write/stream/
try_send.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
14 pub struct TrySend<St: Stream, Wr> {
15 #[pin]
16 stream: Option<St>,
17 #[pin]
18 writer: Wr,
19 buffered: Option<St::Item>,
20 is_terminated: bool,
21 }
22}
23
24impl<St: Stream, Wr> TrySend<St, Wr> {
25 pub(super) fn new(stream: St, writer: Wr) -> Self {
26 Self {
27 stream: Some(stream),
28 writer,
29 buffered: None,
30 is_terminated: false,
31 }
32 }
33
34 pub(super) fn poll_complete(
35 self: Pin<&mut Self>,
36 cx: &mut Context<'_>,
37 ) -> Poll<Result<Wr::Output, Wr::Error>>
38 where
39 Wr: MultipartWrite<St::Item>,
40 {
41 let mut this = self.project();
42 if this.buffered.is_some() {
43 ready!(this.writer.as_mut().poll_ready(cx))?;
44 let _ = this
45 .writer
46 .as_mut()
47 .start_send(this.buffered.take().unwrap())?;
48 }
49 ready!(this.writer.as_mut().poll_flush(cx))?;
50 let out = ready!(this.writer.as_mut().poll_complete(cx));
51 Poll::Ready(out)
52 }
53}
54
55impl<St, Wr> FusedStream for TrySend<St, Wr>
56where
57 St: Stream,
58 Wr: FusedMultipartWrite<St::Item>,
59{
60 fn is_terminated(&self) -> bool {
61 self.writer.is_terminated() || self.is_terminated
62 }
63}
64
65impl<St, Wr> Stream for TrySend<St, Wr>
66where
67 St: Stream,
68 Wr: MultipartWrite<St::Item>,
69{
70 type Item = Result<Wr::Ret, Wr::Error>;
71
72 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
73 let mut this = self.project();
74
75 loop {
76 if this.buffered.is_some() {
77 loop {
78 match this.writer.as_mut().poll_ready(cx) {
79 Poll::Ready(Ok(())) => break,
80 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
81 Poll::Pending => match this.writer.as_mut().poll_flush(cx)? {
82 Poll::Pending => return Poll::Pending,
83 Poll::Ready(()) => {}
84 },
85 }
86 }
87
88 let ret = this
89 .writer
90 .as_mut()
91 .start_send(this.buffered.take().unwrap())?;
92 return Poll::Ready(Some(Ok(ret)));
93 }
94
95 let Some(st) = this.stream.as_mut().as_pin_mut() else {
96 ready!(this.writer.as_mut().poll_flush(cx))?;
97 *this.is_terminated = true;
98 return Poll::Ready(None);
99 };
100
101 match ready!(st.poll_next(cx)) {
102 Some(it) => *this.buffered = Some(it),
103 None => this.stream.set(None),
104 }
105 }
106 }
107}
108
109impl<St: Stream, Wr> Debug for TrySend<St, Wr>
110where
111 St::Item: Debug,
112 St: Debug,
113 Wr: Debug,
114{
115 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
116 f.debug_struct("TrySend")
117 .field("stream", &self.stream)
118 .field("writer", &self.writer)
119 .field("buffered", &self.buffered)
120 .field("is_terminated", &self.is_terminated)
121 .finish()
122 }
123}