multipart_write/stream/
assembled.rs1use crate::FusedMultipartWrite;
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
14 pub struct Assembled<St: Stream, Wr, F> {
15 #[pin]
16 stream: St,
17 #[pin]
18 writer: Wr,
19 buffered: Option<St::Item>,
20 f: F,
21 state: State,
22 empty: bool,
23 is_terminated: bool,
24 }
25}
26
27impl<St: Stream, Wr, F> Assembled<St, Wr, F> {
28 pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
29 Self {
30 stream,
31 writer,
32 buffered: None,
33 f,
34 state: State::PollNext,
35 empty: true,
36 is_terminated: false,
37 }
38 }
39}
40
41impl<St, Wr, F> FusedStream for Assembled<St, Wr, F>
42where
43 St: Stream,
44 Wr: FusedMultipartWrite<St::Item>,
45 F: FnMut(&Wr::Ret) -> bool,
46{
47 fn is_terminated(&self) -> bool {
48 self.is_terminated
49 }
50}
51
52impl<St, Wr, F> Stream for Assembled<St, Wr, F>
53where
54 St: Stream,
55 Wr: FusedMultipartWrite<St::Item>,
56 F: FnMut(&Wr::Ret) -> bool,
57{
58 type Item = Result<Wr::Output, Wr::Error>;
59
60 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61 let mut this = self.project();
62
63 loop {
64 if this.buffered.is_some() {
66 match this.writer.as_mut().poll_ready(cx)? {
67 Poll::Pending => return Poll::Pending,
68 Poll::Ready(()) => {
69 let it = this.buffered.take().unwrap();
70 let ret = this.writer.as_mut().start_send(it)?;
71 *this.empty = false;
72 if (this.f)(&ret) {
74 *this.state = State::PollComplete(false);
79 } else {
80 *this.state = State::PollNext;
81 }
82 }
83 }
84 }
85
86 match *this.state {
87 State::PollNext => match ready!(this.stream.as_mut().poll_next(cx)) {
88 Some(it) => *this.buffered = Some(it),
89 _ => {
90 if *this.empty {
91 *this.is_terminated = true;
93 return Poll::Ready(None);
94 }
95 *this.state = State::PollComplete(true);
98 }
99 },
100 State::PollComplete(last) => {
101 let out = ready!(this.writer.as_mut().poll_complete(cx));
102 if last || this.writer.is_terminated() {
107 *this.state = State::Terminated;
108 } else {
109 *this.empty = true;
110 *this.state = State::PollNext;
111 }
112 return Poll::Ready(Some(out));
113 }
114 State::Terminated => {
115 *this.is_terminated = true;
116 return Poll::Ready(None);
117 }
118 }
119 }
120 }
121}
122
123impl<St, Wr, F> Debug for Assembled<St, Wr, F>
124where
125 St: Stream + Debug,
126 St::Item: Debug,
127 St: Debug,
128 Wr: Debug,
129{
130 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
131 f.debug_struct("Assembled")
132 .field("stream", &self.stream)
133 .field("writer", &self.writer)
134 .field("buffered", &self.buffered)
135 .field("f", &"FnMut(&Wr::Ret) -> bool")
136 .field("state", &self.state)
137 .field("empty", &self.empty)
138 .field("is_terminated", &self.is_terminated)
139 .finish()
140 }
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144enum State {
145 PollNext,
146 PollComplete(bool),
147 Terminated,
148}