multipart_write/stream/
assembled.rs

1use crate::FusedMultipartWrite;
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10    /// Stream for [`assembled`].
11    ///
12    /// [`assembled`]: super::MultipartStreamExt::assembled
13    #[must_use = "futures do nothing unless polled"]
14    pub struct Assembled<St: Stream, Wr, F> {
15        #[pin]
16        stream: St,
17        #[pin]
18        writer: Wr,
19        buffered: Option<St::Item>,
20        f: F,
21        state: State,
22        empty: bool,
23        is_terminated: bool,
24    }
25}
26
27impl<St: Stream, Wr, F> Assembled<St, Wr, F> {
28    pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
29        Self {
30            stream,
31            writer,
32            buffered: None,
33            f,
34            state: State::PollNext,
35            empty: true,
36            is_terminated: false,
37        }
38    }
39}
40
41impl<St, Wr, F> FusedStream for Assembled<St, Wr, F>
42where
43    St: Stream,
44    Wr: FusedMultipartWrite<St::Item>,
45    F: FnMut(&Wr::Ret) -> bool,
46{
47    fn is_terminated(&self) -> bool {
48        self.is_terminated
49    }
50}
51
52impl<St, Wr, F> Stream for Assembled<St, Wr, F>
53where
54    St: Stream,
55    Wr: FusedMultipartWrite<St::Item>,
56    F: FnMut(&Wr::Ret) -> bool,
57{
58    type Item = Result<Wr::Output, Wr::Error>;
59
60    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61        let mut this = self.project();
62
63        loop {
64            // Try to send anything in the buffer first.
65            if this.buffered.is_some() {
66                match this.writer.as_mut().poll_ready(cx)? {
67                    Poll::Pending => return Poll::Pending,
68                    Poll::Ready(()) => {
69                        let it = this.buffered.take().unwrap();
70                        let ret = this.writer.as_mut().start_send(it)?;
71                        *this.empty = false;
72                        // Check if we should complete according to `F`.
73                        if (this.f)(&ret) {
74                            // `poll_complete` not followed by stream shutdown:
75                            // the state is `PollComplete(true)` only when the
76                            // stream stopped producing but we have to do one
77                            // final call to `poll_complete`.
78                            *this.state = State::PollComplete(false);
79                        } else {
80                            *this.state = State::PollNext;
81                        }
82                    }
83                }
84            }
85
86            match *this.state {
87                State::PollNext => match ready!(this.stream.as_mut().poll_next(cx)) {
88                    Some(it) => *this.buffered = Some(it),
89                    _ => {
90                        if *this.empty {
91                            // We just completed, so short circuit and end here.
92                            *this.is_terminated = true;
93                            return Poll::Ready(None);
94                        }
95                        // The penultimate state when the writer has something to
96                        // complete first.
97                        *this.state = State::PollComplete(true);
98                    }
99                },
100                State::PollComplete(last) => {
101                    let out = ready!(this.writer.as_mut().poll_complete(cx));
102                    // Upstream stopped producing in the last iteration, or the
103                    // writer indicates through its `FusedMultipartWrite` impl
104                    // that it cannot be polled anymore, so set the state to
105                    // `Terminated` to end the stream the next time we're polled.
106                    if last || this.writer.is_terminated() {
107                        *this.state = State::Terminated;
108                    } else {
109                        *this.empty = true;
110                        *this.state = State::PollNext;
111                    }
112                    return Poll::Ready(Some(out));
113                }
114                State::Terminated => {
115                    *this.is_terminated = true;
116                    return Poll::Ready(None);
117                }
118            }
119        }
120    }
121}
122
123impl<St, Wr, F> Debug for Assembled<St, Wr, F>
124where
125    St: Stream + Debug,
126    St::Item: Debug,
127    St: Debug,
128    Wr: Debug,
129{
130    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
131        f.debug_struct("Assembled")
132            .field("stream", &self.stream)
133            .field("writer", &self.writer)
134            .field("buffered", &self.buffered)
135            .field("f", &"FnMut(&Wr::Ret) -> bool")
136            .field("state", &self.state)
137            .field("empty", &self.empty)
138            .field("is_terminated", &self.is_terminated)
139            .finish()
140    }
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144enum State {
145    PollNext,
146    PollComplete(bool),
147    Terminated,
148}