multipart_write/write/
ready_part.rs1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::ready;
6
7use crate::{FusedMultipartWrite, MultipartWrite};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
14 pub struct ReadyPart<Wr, Part, Fut, F> {
15 #[pin]
16 writer: Wr,
17 f: F,
18 #[pin]
19 future: Option<Fut>,
20 buffered: Option<Part>,
21 }
22}
23
24impl<Wr, Part, Fut, F> ReadyPart<Wr, Part, Fut, F> {
25 pub(super) fn new(writer: Wr, f: F) -> Self {
26 Self { writer, f, future: None, buffered: None }
27 }
28
29 pub fn into_inner(self) -> Wr {
31 self.writer
32 }
33
34 pub fn get_ref(&self) -> &Wr {
36 &self.writer
37 }
38
39 pub fn get_mut(&mut self) -> &mut Wr {
43 &mut self.writer
44 }
45
46 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
50 self.project().writer
51 }
52
53 fn poll<U, E>(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), E>>
54 where
55 Wr: MultipartWrite<Part>,
56 F: FnMut(U) -> Fut,
57 Fut: Future<Output = Result<Part, E>>,
58 E: From<Wr::Error>,
59 {
60 let mut this = self.project();
61
62 if this.buffered.is_some() {
63 match this.writer.as_mut().poll_ready(cx)? {
64 Poll::Ready(()) => {
65 this.writer.start_send(this.buffered.take().unwrap())?;
66 },
67 Poll::Pending => return Poll::Pending,
68 }
69 }
70 if let Some(fut) = this.future.as_mut().as_pin_mut() {
71 let part = ready!(fut.poll(cx))?;
72 *this.buffered = Some(part);
73 this.future.set(None);
74 }
75
76 Poll::Ready(Ok(()))
77 }
78}
79
80impl<U, E, Wr, Part, Fut, F> FusedMultipartWrite<U>
81 for ReadyPart<Wr, Part, Fut, F>
82where
83 Wr: FusedMultipartWrite<Part>,
84 F: FnMut(U) -> Fut,
85 Fut: Future<Output = Result<Part, E>>,
86 E: From<Wr::Error>,
87{
88 fn is_terminated(&self) -> bool {
89 self.writer.is_terminated()
90 }
91}
92
93impl<U, E, Wr, Part, Fut, F> MultipartWrite<U> for ReadyPart<Wr, Part, Fut, F>
94where
95 Wr: MultipartWrite<Part>,
96 F: FnMut(U) -> Fut,
97 Fut: Future<Output = Result<Part, E>>,
98 E: From<Wr::Error>,
99{
100 type Error = E;
101 type Output = Wr::Output;
102 type Recv = ();
103
104 fn poll_ready(
105 mut self: Pin<&mut Self>,
106 cx: &mut Context<'_>,
107 ) -> Poll<Result<(), Self::Error>> {
108 ready!(self.as_mut().poll(cx))?;
109 self.project().writer.poll_ready(cx).map_err(E::from)
110 }
111
112 fn start_send(
113 self: Pin<&mut Self>,
114 part: U,
115 ) -> Result<Self::Recv, Self::Error> {
116 let mut this = self.project();
117 this.future.set(Some((this.f)(part)));
118 Ok(())
119 }
120
121 fn poll_flush(
122 mut self: Pin<&mut Self>,
123 cx: &mut Context<'_>,
124 ) -> Poll<Result<(), Self::Error>> {
125 ready!(self.as_mut().poll(cx))?;
126 ready!(self.project().writer.poll_flush(cx)?);
127 Poll::Ready(Ok(()))
128 }
129
130 fn poll_complete(
131 mut self: Pin<&mut Self>,
132 cx: &mut Context<'_>,
133 ) -> Poll<Result<Self::Output, Self::Error>> {
134 ready!(self.as_mut().poll(cx))?;
135 let out = ready!(self.project().writer.poll_complete(cx))?;
136 Poll::Ready(Ok(out))
137 }
138}
139
140impl<Wr, Part, Fut, F> Debug for ReadyPart<Wr, Part, Fut, F>
141where
142 Wr: Debug,
143 Fut: Debug,
144 Part: Debug,
145{
146 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147 f.debug_struct("ReadyPart")
148 .field("writer", &self.writer)
149 .field("future", &self.future)
150 .field("buffered", &self.buffered)
151 .finish()
152 }
153}