multipart_write/stream/
assembled.rs

1use crate::MultipartWrite;
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10    /// Stream for [`assembled`].
11    ///
12    /// [`assembled`]: super::MultipartStreamExt::assembled
13    #[must_use = "futures do nothing unless polled"]
14    pub struct Assembled<St: Stream, Wr, F> {
15        #[pin]
16        stream: St,
17        #[pin]
18        writer: Wr,
19        buffered: Option<St::Item>,
20        f: F,
21        state: State,
22        empty: bool,
23        is_terminated: bool,
24    }
25}
26
27impl<St: Stream, Wr, F> Assembled<St, Wr, F> {
28    pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
29        Self {
30            stream,
31            writer,
32            buffered: None,
33            f,
34            state: State::PollNext,
35            empty: true,
36            is_terminated: false,
37        }
38    }
39}
40
41impl<St, Wr, T, F> FusedStream for Assembled<St, Wr, F>
42where
43    St: Stream,
44    Wr: MultipartWrite<St::Item, Output = Option<T>>,
45    F: FnMut(&Wr::Ret) -> bool,
46{
47    fn is_terminated(&self) -> bool {
48        self.is_terminated
49    }
50}
51
52impl<St, Wr, T, F> Stream for Assembled<St, Wr, F>
53where
54    St: Stream,
55    Wr: MultipartWrite<St::Item, Output = Option<T>>,
56    F: FnMut(&Wr::Ret) -> bool,
57{
58    type Item = Result<T, Wr::Error>;
59
60    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61        let mut this = self.project();
62
63        loop {
64            // Try to send anything in the buffer first.
65            if this.buffered.is_some() {
66                if this.writer.as_mut().poll_ready(cx)?.is_ready() {
67                    let it = this.buffered.take().unwrap();
68                    let ret = this.writer.as_mut().start_send(it)?;
69                    *this.empty = false;
70                    // Check if we should complete according to `F`.
71                    if (this.f)(&ret) {
72                        // `false` since we don't have to shut down the stream
73                        // after this poll_complete.
74                        *this.state = State::PollComplete(false);
75                    } else {
76                        *this.state = State::PollNext;
77                    }
78                } else {
79                    // Writer isn't ready so poll_flush until it is.
80                    match this.writer.as_mut().poll_flush(cx)? {
81                        Poll::Ready(()) => continue,
82                        Poll::Pending => return Poll::Pending,
83                    }
84                }
85            }
86
87            match *this.state {
88                State::PollNext => match ready!(this.stream.as_mut().poll_next(cx)) {
89                    Some(it) => *this.buffered = Some(it),
90                    _ => {
91                        if *this.empty {
92                            // We just completed, so short circuit and end here.
93                            *this.is_terminated = true;
94                            return Poll::Ready(None);
95                        }
96                        // The penultimate state when the writer has something to
97                        // complete first.
98                        *this.state = State::PollComplete(true);
99                    }
100                },
101                State::PollComplete(last) => {
102                    match ready!(this.writer.as_mut().poll_complete(cx))? {
103                        Some(out) => {
104                            if last {
105                                *this.state = State::Terminated;
106                            } else {
107                                *this.empty = true;
108                                *this.state = State::PollNext;
109                            }
110                            return Poll::Ready(Some(Ok(out)));
111                        }
112                        _ => {
113                            // Inner writer is `None` now, so end the stream.
114                            *this.is_terminated = true;
115                            return Poll::Ready(None);
116                        }
117                    }
118                }
119                State::Terminated => {
120                    *this.is_terminated = true;
121                    return Poll::Ready(None);
122                }
123            }
124        }
125    }
126}
127
128impl<St, Wr, F> Debug for Assembled<St, Wr, F>
129where
130    St: Stream + Debug,
131    St::Item: Debug,
132    St: Debug,
133    Wr: Debug,
134{
135    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136        f.debug_struct("Assembled")
137            .field("stream", &self.stream)
138            .field("writer", &self.writer)
139            .field("buffered", &self.buffered)
140            .field("f", &"FnMut(&Wr::Ret) -> bool")
141            .field("state", &self.state)
142            .field("is_terminated", &self.is_terminated)
143            .finish()
144    }
145}
146
147#[derive(Debug, Clone, Copy)]
148enum State {
149    PollNext,
150    PollComplete(bool),
151    Terminated,
152}