multipart_write/stream/
complete_when.rs

1use super::TrySend;
2use crate::{FusedMultipartWrite, MultipartWrite};
3
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use std::fmt::{self, Debug, Formatter};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10pin_project_lite::pin_project! {
11    /// Stream for [`complete_when`].
12    ///
13    /// [`complete_when`]: super::MultipartStreamExt::complete_when
14    #[must_use = "futures do nothing unless polled"]
15    pub struct CompleteWhen<St: Stream, Wr, F> {
16        #[pin]
17        inner: TrySend<St, Wr>,
18        f: F,
19        state: State,
20        is_empty: bool,
21    }
22}
23
24#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
25enum State {
26    PollComplete,
27    FinalPollComplete,
28    #[default]
29    PollNext,
30    Terminating,
31    Terminated,
32}
33
34impl State {
35    fn should_complete(&self) -> bool {
36        *self == Self::PollComplete || self.is_last_complete()
37    }
38
39    fn is_last_complete(&self) -> bool {
40        *self == Self::FinalPollComplete
41    }
42
43    fn is_terminated(&self) -> bool {
44        *self == Self::Terminated
45    }
46}
47
48impl<St: Stream, Wr, F> CompleteWhen<St, Wr, F> {
49    pub(super) fn new(inner: St, writer: Wr, f: F) -> Self {
50        Self {
51            inner: TrySend::new(inner, writer),
52            f,
53            state: State::PollNext,
54            is_empty: true,
55        }
56    }
57}
58
59impl<St, Wr, F> FusedStream for CompleteWhen<St, Wr, F>
60where
61    St: Stream,
62    Wr: FusedMultipartWrite<St::Item>,
63    F: FnMut(Wr::Ret) -> bool,
64{
65    fn is_terminated(&self) -> bool {
66        self.inner.is_terminated() || self.state.is_terminated()
67    }
68}
69
70impl<St, Wr, F> Stream for CompleteWhen<St, Wr, F>
71where
72    St: Stream,
73    Wr: MultipartWrite<St::Item>,
74    F: FnMut(Wr::Ret) -> bool,
75{
76    type Item = Result<Wr::Output, Wr::Error>;
77
78    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        let mut this = self.project();
80
81        // Last call to this function was the final `poll_complete` because the
82        // source stream stopped producing.  Since we've produced the last output
83        // and there is no more to write, we stop producing.
84        if *this.state == State::Terminating {
85            *this.state = State::Terminated;
86            return Poll::Ready(None);
87        }
88
89        loop {
90            if this.state.should_complete() {
91                let res = ready!(this.inner.as_mut().poll_complete(cx));
92                if this.state.is_last_complete() {
93                    *this.state = State::Terminating;
94                } else {
95                    *this.state = State::PollNext;
96                }
97                *this.is_empty = true;
98                return Poll::Ready(Some(res));
99            }
100
101            match ready!(this.inner.as_mut().poll_next(cx)) {
102                Some(Ok(ret)) => {
103                    if (this.f)(ret) {
104                        *this.state = State::PollComplete;
105                    }
106                    *this.is_empty = false;
107                }
108                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
109                None => {
110                    // If the very last thing we did was `poll_complete`, we
111                    // don't need to do it again since nothing has been written,
112                    // so shortcut and return `Poll::Ready(None)`.
113                    if *this.is_empty {
114                        *this.state = State::Terminated;
115                        return Poll::Ready(None);
116                    }
117                    *this.state = State::FinalPollComplete
118                }
119            }
120        }
121    }
122}
123
124impl<St: Stream, Wr, F> Debug for CompleteWhen<St, Wr, F>
125where
126    St: Debug,
127    St::Item: Debug,
128    Wr: Debug,
129{
130    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
131        f.debug_struct("CompleteWhen")
132            .field("inner", &self.inner)
133            .field("f", &"FnMut(Wr::Ret) -> bool")
134            .field("state", &self.state)
135            .finish()
136    }
137}