multipart_write/stream/
assembled.rs1use crate::FusedMultipartWrite;
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
14 pub struct Assembled<St: Stream, Wr, F> {
15 #[pin]
16 stream: St,
17 #[pin]
18 writer: Wr,
19 buffered: Option<St::Item>,
20 f: F,
21 state: State,
22 empty: bool,
23 is_terminated: bool,
24 }
25}
26
27impl<St: Stream, Wr, F> Assembled<St, Wr, F> {
28 pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
29 Self {
30 stream,
31 writer,
32 buffered: None,
33 f,
34 state: State::PollNext,
35 empty: true,
36 is_terminated: false,
37 }
38 }
39}
40
41impl<St, Wr, F> FusedStream for Assembled<St, Wr, F>
42where
43 St: Stream,
44 Wr: FusedMultipartWrite<St::Item>,
45 F: FnMut(&Wr::Ret) -> bool,
46{
47 fn is_terminated(&self) -> bool {
48 self.is_terminated
49 }
50}
51
52impl<St, Wr, F> Stream for Assembled<St, Wr, F>
53where
54 St: Stream,
55 Wr: FusedMultipartWrite<St::Item>,
56 F: FnMut(&Wr::Ret) -> bool,
57{
58 type Item = Result<Wr::Output, Wr::Error>;
59
60 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61 let mut this = self.project();
62
63 loop {
64 if this.buffered.is_some() {
66 match this.writer.as_mut().poll_ready(cx)? {
67 Poll::Pending => return Poll::Pending,
68 Poll::Ready(()) => {
69 let it = this.buffered.take().unwrap();
70 let ret = this.writer.as_mut().start_send(it)?;
71 *this.empty = false;
72 if (this.f)(&ret) {
74 *this.state = State::PollComplete(false);
77 } else {
78 *this.state = State::PollNext;
79 }
80 }
81 }
82 }
83
84 match *this.state {
85 State::PollNext => match ready!(this.stream.as_mut().poll_next(cx)) {
86 Some(it) => *this.buffered = Some(it),
87 _ => {
88 if *this.empty {
89 *this.is_terminated = true;
91 return Poll::Ready(None);
92 }
93 *this.state = State::PollComplete(true);
96 }
97 },
98 State::PollComplete(last) => {
99 let out = ready!(this.writer.as_mut().poll_complete(cx));
100 if last || this.writer.is_terminated() {
106 *this.state = State::Terminated;
107 } else {
108 *this.empty = true;
109 *this.state = State::PollNext;
110 }
111 return Poll::Ready(Some(out));
112 }
113 State::Terminated => {
114 *this.is_terminated = true;
115 return Poll::Ready(None);
116 }
117 }
118 }
119 }
120}
121
122impl<St, Wr, F> Debug for Assembled<St, Wr, F>
123where
124 St: Stream + Debug,
125 St::Item: Debug,
126 St: Debug,
127 Wr: Debug,
128{
129 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
130 f.debug_struct("Assembled")
131 .field("stream", &self.stream)
132 .field("writer", &self.writer)
133 .field("buffered", &self.buffered)
134 .field("f", &"FnMut(&Wr::Ret) -> bool")
135 .field("state", &self.state)
136 .field("empty", &self.empty)
137 .field("is_terminated", &self.is_terminated)
138 .finish()
139 }
140}
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143enum State {
144 PollNext,
145 PollComplete(bool),
146 Terminated,
147}