Skip to main content

multipart_write/stream/
try_complete_when.rs

1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::ready;
6use futures_core::stream::{FusedStream, Stream};
7
8use crate::FusedMultipartWrite;
9
10pin_project_lite::pin_project! {
11    /// Stream for [`try_complete_when`].
12    ///
13    /// [`try_complete_when`]: super::MultipartStreamExt::try_complete_when
14    #[must_use = "futures do nothing unless polled"]
15    pub struct TryCompleteWhen<St: Stream, Wr, F> {
16        #[pin]
17        stream: St,
18        #[pin]
19        writer: Wr,
20        buffered: Option<St::Item>,
21        f: F,
22        state: State,
23        empty: bool,
24        is_terminated: bool,
25    }
26}
27
28impl<St: Stream, Wr, F> TryCompleteWhen<St, Wr, F> {
29    pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
30        Self {
31            stream,
32            writer,
33            buffered: None,
34            f,
35            state: State::PollNext,
36            empty: true,
37            is_terminated: false,
38        }
39    }
40}
41
42impl<St, Wr, F> FusedStream for TryCompleteWhen<St, Wr, F>
43where
44    St: Stream,
45    Wr: FusedMultipartWrite<St::Item>,
46    F: FnMut(Wr::Recv) -> bool,
47{
48    fn is_terminated(&self) -> bool {
49        self.is_terminated
50    }
51}
52
53impl<St, Wr, F> Stream for TryCompleteWhen<St, Wr, F>
54where
55    St: Stream,
56    Wr: FusedMultipartWrite<St::Item>,
57    F: FnMut(Wr::Recv) -> bool,
58{
59    type Item = Result<Wr::Output, Wr::Error>;
60
61    fn poll_next(
62        self: Pin<&mut Self>,
63        cx: &mut Context<'_>,
64    ) -> Poll<Option<Self::Item>> {
65        let mut this = self.project();
66
67        loop {
68            // Try to send anything in the buffer first.
69            if this.buffered.is_some() {
70                match this.writer.as_mut().poll_ready(cx)? {
71                    Poll::Pending => return Poll::Pending,
72                    Poll::Ready(()) => {
73                        let it = this.buffered.take().unwrap();
74                        let ret = this.writer.as_mut().start_send(it)?;
75                        *this.empty = false;
76                        // Check if we should complete according to `F`.
77                        if (this.f)(ret) {
78                            // `poll_complete` not followed by stream shutdown:
79                            // the state is `PollComplete(true)` only when the
80                            // stream stopped producing but we have to do one
81                            // final call to `poll_complete` because it has had
82                            // something written to it
83                            *this.state = State::PollComplete(false);
84                        } else {
85                            *this.state = State::PollNext;
86                        }
87                    },
88                }
89            }
90
91            match *this.state {
92                State::PollNext => {
93                    match ready!(this.stream.as_mut().poll_next(cx)) {
94                        Some(it) => *this.buffered = Some(it),
95                        _ => {
96                            // No more stream and nothing written to this writer
97                            // means it's over.
98                            if *this.empty {
99                                *this.is_terminated = true;
100                                return Poll::Ready(None);
101                            }
102                            // The penultimate state when the writer has had
103                            // something written.
104                            *this.state = State::PollComplete(true);
105                        },
106                    }
107                },
108                State::PollComplete(last) => {
109                    let out = ready!(this.writer.as_mut().poll_complete(cx));
110                    // Upstream stopped producing in the last iteration, or the
111                    // writer now indicates that it cannot be polled anymore, so
112                    // set the state to `Terminated` to end on the next poll
113                    // after returning the last item.
114                    if last || this.writer.is_terminated() {
115                        *this.state = State::Terminated;
116                    } else {
117                        // Otherwise, we can just keep polling upstream to start
118                        // building a new writer utput.
119                        *this.empty = true;
120                        *this.state = State::PollNext;
121                    }
122                    return Poll::Ready(Some(out));
123                },
124                State::Terminated => {
125                    *this.is_terminated = true;
126                    return Poll::Ready(None);
127                },
128            }
129        }
130    }
131}
132
133impl<St, Wr, F> Debug for TryCompleteWhen<St, Wr, F>
134where
135    St: Stream + Debug,
136    St::Item: Debug,
137    St: Debug,
138    Wr: Debug,
139{
140    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
141        f.debug_struct("TryCompleteWhen")
142            .field("stream", &self.stream)
143            .field("writer", &self.writer)
144            .field("buffered", &self.buffered)
145            .field("state", &self.state)
146            .field("empty", &self.empty)
147            .field("is_terminated", &self.is_terminated)
148            .finish()
149    }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153enum State {
154    PollNext,
155    PollComplete(bool),
156    Terminated,
157}