multipart_write/stream/
try_complete_when.rs1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::ready;
6use futures_core::stream::{FusedStream, Stream};
7
8use crate::FusedMultipartWrite;
9
10pin_project_lite::pin_project! {
11 #[must_use = "futures do nothing unless polled"]
15 pub struct TryCompleteWhen<St: Stream, Wr, F> {
16 #[pin]
17 stream: St,
18 #[pin]
19 writer: Wr,
20 buffered: Option<St::Item>,
21 f: F,
22 state: State,
23 empty: bool,
24 is_terminated: bool,
25 }
26}
27
28impl<St: Stream, Wr, F> TryCompleteWhen<St, Wr, F> {
29 pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
30 Self {
31 stream,
32 writer,
33 buffered: None,
34 f,
35 state: State::PollNext,
36 empty: true,
37 is_terminated: false,
38 }
39 }
40}
41
42impl<St, Wr, F> FusedStream for TryCompleteWhen<St, Wr, F>
43where
44 St: Stream,
45 Wr: FusedMultipartWrite<St::Item>,
46 F: FnMut(Wr::Recv) -> bool,
47{
48 fn is_terminated(&self) -> bool {
49 self.is_terminated
50 }
51}
52
53impl<St, Wr, F> Stream for TryCompleteWhen<St, Wr, F>
54where
55 St: Stream,
56 Wr: FusedMultipartWrite<St::Item>,
57 F: FnMut(Wr::Recv) -> bool,
58{
59 type Item = Result<Wr::Output, Wr::Error>;
60
61 fn poll_next(
62 self: Pin<&mut Self>,
63 cx: &mut Context<'_>,
64 ) -> Poll<Option<Self::Item>> {
65 let mut this = self.project();
66
67 loop {
68 if this.buffered.is_some() {
70 match this.writer.as_mut().poll_ready(cx)? {
71 Poll::Pending => return Poll::Pending,
72 Poll::Ready(()) => {
73 let it = this.buffered.take().unwrap();
74 let ret = this.writer.as_mut().start_send(it)?;
75 *this.empty = false;
76 if (this.f)(ret) {
78 *this.state = State::PollComplete(false);
84 } else {
85 *this.state = State::PollNext;
86 }
87 },
88 }
89 }
90
91 match *this.state {
92 State::PollNext => {
93 match ready!(this.stream.as_mut().poll_next(cx)) {
94 Some(it) => *this.buffered = Some(it),
95 _ => {
96 if *this.empty {
99 *this.is_terminated = true;
100 return Poll::Ready(None);
101 }
102 *this.state = State::PollComplete(true);
105 },
106 }
107 },
108 State::PollComplete(last) => {
109 let out = ready!(this.writer.as_mut().poll_complete(cx));
110 if last || this.writer.is_terminated() {
115 *this.state = State::Terminated;
116 } else {
117 *this.empty = true;
120 *this.state = State::PollNext;
121 }
122 return Poll::Ready(Some(out));
123 },
124 State::Terminated => {
125 *this.is_terminated = true;
126 return Poll::Ready(None);
127 },
128 }
129 }
130 }
131}
132
133impl<St, Wr, F> Debug for TryCompleteWhen<St, Wr, F>
134where
135 St: Stream + Debug,
136 St::Item: Debug,
137 St: Debug,
138 Wr: Debug,
139{
140 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
141 f.debug_struct("TryCompleteWhen")
142 .field("stream", &self.stream)
143 .field("writer", &self.writer)
144 .field("buffered", &self.buffered)
145 .field("state", &self.state)
146 .field("empty", &self.empty)
147 .field("is_terminated", &self.is_terminated)
148 .finish()
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153enum State {
154 PollNext,
155 PollComplete(bool),
156 Terminated,
157}