Skip to main content

multipart_write/stream/
complete_with.rs

1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::future::{FusedFuture, Future};
6use futures_core::ready;
7use futures_core::stream::Stream;
8
9use crate::MultipartWrite;
10
11pin_project_lite::pin_project! {
12    /// Future for [`complete_with`].
13    ///
14    /// [`complete_with`]: super::MultipartStreamExt::complete_with
15    #[must_use = "futures do nothing unless polled"]
16    pub struct CompleteWith<St: Stream, Wr> {
17        #[pin]
18        writer: Wr,
19        #[pin]
20        stream: Option<St>,
21        buffered: Option<St::Item>,
22        is_terminated: bool,
23    }
24}
25
26impl<St: Stream, Wr> CompleteWith<St, Wr> {
27    pub(super) fn new(stream: St, writer: Wr) -> Self {
28        Self {
29            writer,
30            stream: Some(stream),
31            buffered: None,
32            is_terminated: false,
33        }
34    }
35}
36
37impl<St, Wr> FusedFuture for CompleteWith<St, Wr>
38where
39    St: Stream,
40    Wr: MultipartWrite<St::Item>,
41{
42    fn is_terminated(&self) -> bool {
43        self.is_terminated
44    }
45}
46
47impl<St, Wr> Future for CompleteWith<St, Wr>
48where
49    St: Stream,
50    Wr: MultipartWrite<St::Item>,
51{
52    type Output = Result<Wr::Output, Wr::Error>;
53
54    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55        let mut this = self.project();
56
57        loop {
58            if this.buffered.is_some() {
59                match this.writer.as_mut().poll_ready(cx)? {
60                    Poll::Pending => return Poll::Pending,
61                    Poll::Ready(()) => {
62                        let _ = this
63                            .writer
64                            .as_mut()
65                            .start_send(this.buffered.take().unwrap())?;
66                    },
67                }
68            }
69
70            let Some(st) = this.stream.as_mut().as_pin_mut() else {
71                let out = ready!(this.writer.as_mut().poll_complete(cx));
72                *this.is_terminated = true;
73                return Poll::Ready(out);
74            };
75
76            match ready!(st.poll_next(cx)) {
77                Some(it) => *this.buffered = Some(it),
78                None => this.stream.set(None),
79            }
80        }
81    }
82}
83
84impl<St, Wr> Debug for CompleteWith<St, Wr>
85where
86    St: Stream + Debug,
87    St::Item: Debug,
88    Wr: Debug,
89{
90    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
91        f.debug_struct("CompleteWith")
92            .field("writer", &self.writer)
93            .field("stream", &self.stream)
94            .field("buffered", &self.buffered)
95            .field("is_terminated", &self.is_terminated)
96            .finish()
97    }
98}