multipart_write/stream/
assembled.rs1use crate::MultipartWrite;
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
14 pub struct Assembled<St: Stream, Wr, F> {
15 #[pin]
16 stream: St,
17 #[pin]
18 writer: Wr,
19 buffered: Option<St::Item>,
20 f: F,
21 state: State,
22 empty: bool,
23 is_terminated: bool,
24 }
25}
26
27impl<St: Stream, Wr, F> Assembled<St, Wr, F> {
28 pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
29 Self {
30 stream,
31 writer,
32 buffered: None,
33 f,
34 state: State::PollNext,
35 empty: true,
36 is_terminated: false,
37 }
38 }
39}
40
41impl<St, Wr, T, F> FusedStream for Assembled<St, Wr, F>
42where
43 St: Stream,
44 Wr: MultipartWrite<St::Item, Output = Option<T>>,
45 F: FnMut(&Wr::Ret) -> bool,
46{
47 fn is_terminated(&self) -> bool {
48 self.is_terminated
49 }
50}
51
52impl<St, Wr, T, F> Stream for Assembled<St, Wr, F>
53where
54 St: Stream,
55 Wr: MultipartWrite<St::Item, Output = Option<T>>,
56 F: FnMut(&Wr::Ret) -> bool,
57{
58 type Item = Result<T, Wr::Error>;
59
60 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61 let mut this = self.project();
62
63 loop {
64 if this.buffered.is_some() {
66 if this.writer.as_mut().poll_ready(cx)?.is_ready() {
67 let it = this.buffered.take().unwrap();
68 let ret = this.writer.as_mut().start_send(it)?;
69 *this.empty = false;
70 if (this.f)(&ret) {
72 *this.state = State::PollComplete(false);
75 } else {
76 *this.state = State::PollNext;
77 }
78 } else {
79 match this.writer.as_mut().poll_flush(cx)? {
81 Poll::Ready(()) => continue,
82 Poll::Pending => return Poll::Pending,
83 }
84 }
85 }
86
87 match *this.state {
88 State::PollNext => match ready!(this.stream.as_mut().poll_next(cx)) {
89 Some(it) => *this.buffered = Some(it),
90 _ => {
91 if *this.empty {
92 *this.is_terminated = true;
94 return Poll::Ready(None);
95 }
96 *this.state = State::PollComplete(true);
99 }
100 },
101 State::PollComplete(last) => {
102 match ready!(this.writer.as_mut().poll_complete(cx))? {
103 Some(out) => {
104 if last {
105 *this.state = State::Terminated;
106 } else {
107 *this.empty = true;
108 *this.state = State::PollNext;
109 }
110 return Poll::Ready(Some(Ok(out)));
111 }
112 _ => {
113 *this.is_terminated = true;
115 return Poll::Ready(None);
116 }
117 }
118 }
119 State::Terminated => {
120 *this.is_terminated = true;
121 return Poll::Ready(None);
122 }
123 }
124 }
125 }
126}
127
128impl<St, Wr, F> Debug for Assembled<St, Wr, F>
129where
130 St: Stream + Debug,
131 St::Item: Debug,
132 St: Debug,
133 Wr: Debug,
134{
135 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136 f.debug_struct("Assembled")
137 .field("stream", &self.stream)
138 .field("writer", &self.writer)
139 .field("buffered", &self.buffered)
140 .field("f", &"FnMut(&Wr::Ret) -> bool")
141 .field("state", &self.state)
142 .field("is_terminated", &self.is_terminated)
143 .finish()
144 }
145}
146
147#[derive(Debug, Clone, Copy)]
148enum State {
149 PollNext,
150 PollComplete(bool),
151 Terminated,
152}