multipart_write/stream/
assembled.rs

1use crate::FusedMultipartWrite;
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10    /// Stream for [`assembled`].
11    ///
12    /// [`assembled`]: super::MultipartStreamExt::assembled
13    #[must_use = "futures do nothing unless polled"]
14    pub struct Assembled<St: Stream, Wr, F> {
15        #[pin]
16        stream: St,
17        #[pin]
18        writer: Wr,
19        buffered: Option<St::Item>,
20        f: F,
21        state: State,
22        empty: bool,
23        is_terminated: bool,
24    }
25}
26
27impl<St: Stream, Wr, F> Assembled<St, Wr, F> {
28    pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
29        Self {
30            stream,
31            writer,
32            buffered: None,
33            f,
34            state: State::PollNext,
35            empty: true,
36            is_terminated: false,
37        }
38    }
39}
40
41impl<St, Wr, F> FusedStream for Assembled<St, Wr, F>
42where
43    St: Stream,
44    Wr: FusedMultipartWrite<St::Item>,
45    F: FnMut(&Wr::Ret) -> bool,
46{
47    fn is_terminated(&self) -> bool {
48        self.is_terminated
49    }
50}
51
52impl<St, Wr, F> Stream for Assembled<St, Wr, F>
53where
54    St: Stream,
55    Wr: FusedMultipartWrite<St::Item>,
56    F: FnMut(&Wr::Ret) -> bool,
57{
58    type Item = Result<Wr::Output, Wr::Error>;
59
60    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61        let mut this = self.project();
62
63        loop {
64            // Try to send anything in the buffer first.
65            if this.buffered.is_some() {
66                match this.writer.as_mut().poll_ready(cx)? {
67                    Poll::Pending => return Poll::Pending,
68                    Poll::Ready(()) => {
69                        let it = this.buffered.take().unwrap();
70                        let ret = this.writer.as_mut().start_send(it)?;
71                        *this.empty = false;
72                        // Check if we should complete according to `F`.
73                        if (this.f)(&ret) {
74                            // false since we don't have to shut down the stream
75                            // after the `poll_complete`.
76                            *this.state = State::PollComplete(false);
77                        } else {
78                            *this.state = State::PollNext;
79                        }
80                    }
81                }
82            }
83
84            match *this.state {
85                State::PollNext => match ready!(this.stream.as_mut().poll_next(cx)) {
86                    Some(it) => *this.buffered = Some(it),
87                    _ => {
88                        if *this.empty {
89                            // We just completed, so short circuit and end here.
90                            *this.is_terminated = true;
91                            return Poll::Ready(None);
92                        }
93                        // The penultimate state when the writer has something to
94                        // complete first.
95                        *this.state = State::PollComplete(true);
96                    }
97                },
98                State::PollComplete(last) => {
99                    let out = ready!(this.writer.as_mut().poll_complete(cx));
100                    // Upstream stopped producing in the last iteration, or the
101                    // writer indicates through its `FusedMultipartWrite` impl
102                    // that it cannot be polled anymore, so set the state to
103                    // `Terminated` to immediately produce `None` and end the
104                    // stream the next time we are polled.
105                    if last || this.writer.is_terminated() {
106                        *this.state = State::Terminated;
107                    } else {
108                        *this.empty = true;
109                        *this.state = State::PollNext;
110                    }
111                    return Poll::Ready(Some(out));
112                }
113                State::Terminated => {
114                    *this.is_terminated = true;
115                    return Poll::Ready(None);
116                }
117            }
118        }
119    }
120}
121
122impl<St, Wr, F> Debug for Assembled<St, Wr, F>
123where
124    St: Stream + Debug,
125    St::Item: Debug,
126    St: Debug,
127    Wr: Debug,
128{
129    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
130        f.debug_struct("Assembled")
131            .field("stream", &self.stream)
132            .field("writer", &self.writer)
133            .field("buffered", &self.buffered)
134            .field("f", &"FnMut(&Wr::Ret) -> bool")
135            .field("state", &self.state)
136            .field("empty", &self.empty)
137            .field("is_terminated", &self.is_terminated)
138            .finish()
139    }
140}
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143enum State {
144    PollNext,
145    PollComplete(bool),
146    Terminated,
147}