multipart_write/stream/
complete_with.rs1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::future::{FusedFuture, Future};
6use futures_core::ready;
7use futures_core::stream::Stream;
8
9use crate::MultipartWrite;
10
11pin_project_lite::pin_project! {
12 #[must_use = "futures do nothing unless polled"]
16 pub struct CompleteWith<St: Stream, Wr> {
17 #[pin]
18 writer: Wr,
19 #[pin]
20 stream: Option<St>,
21 buffered: Option<St::Item>,
22 is_terminated: bool,
23 }
24}
25
26impl<St: Stream, Wr> CompleteWith<St, Wr> {
27 pub(super) fn new(stream: St, writer: Wr) -> Self {
28 Self {
29 writer,
30 stream: Some(stream),
31 buffered: None,
32 is_terminated: false,
33 }
34 }
35}
36
37impl<St, Wr> FusedFuture for CompleteWith<St, Wr>
38where
39 St: Stream,
40 Wr: MultipartWrite<St::Item>,
41{
42 fn is_terminated(&self) -> bool {
43 self.is_terminated
44 }
45}
46
47impl<St, Wr> Future for CompleteWith<St, Wr>
48where
49 St: Stream,
50 Wr: MultipartWrite<St::Item>,
51{
52 type Output = Result<Wr::Output, Wr::Error>;
53
54 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55 let mut this = self.project();
56
57 loop {
58 if this.buffered.is_some() {
59 match this.writer.as_mut().poll_ready(cx)? {
60 Poll::Pending => return Poll::Pending,
61 Poll::Ready(()) => {
62 let _ = this
63 .writer
64 .as_mut()
65 .start_send(this.buffered.take().unwrap())?;
66 },
67 }
68 }
69
70 let Some(st) = this.stream.as_mut().as_pin_mut() else {
71 let out = ready!(this.writer.as_mut().poll_complete(cx));
72 *this.is_terminated = true;
73 return Poll::Ready(out);
74 };
75
76 match ready!(st.poll_next(cx)) {
77 Some(it) => *this.buffered = Some(it),
78 None => this.stream.set(None),
79 }
80 }
81 }
82}
83
84impl<St, Wr> Debug for CompleteWith<St, Wr>
85where
86 St: Stream + Debug,
87 St::Item: Debug,
88 Wr: Debug,
89{
90 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
91 f.debug_struct("CompleteWith")
92 .field("writer", &self.writer)
93 .field("stream", &self.stream)
94 .field("buffered", &self.buffered)
95 .field("is_terminated", &self.is_terminated)
96 .finish()
97 }
98}