multipart_write/write/
with.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use std::fmt::{self, Debug, Formatter};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pin_project_lite::pin_project! {
9 #[must_use = "futures do nothing unless polled"]
11 pub struct With<Wr, Part, Fut, F> {
12 #[pin]
13 writer: Wr,
14 f: F,
15 #[pin]
16 future: Option<Fut>,
17 buffered: Option<Part>,
18 }
19}
20
21impl<Wr, Part, Fut, F> With<Wr, Part, Fut, F> {
22 pub(super) fn new(writer: Wr, f: F) -> Self {
23 Self {
24 writer,
25 f,
26 future: None,
27 buffered: None,
28 }
29 }
30
31 pub fn into_inner(self) -> Wr {
33 self.writer
34 }
35
36 pub fn get_ref(&self) -> &Wr {
38 &self.writer
39 }
40
41 pub fn get_mut(&mut self) -> &mut Wr {
45 &mut self.writer
46 }
47
48 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
52 self.project().writer
53 }
54
55 fn poll<U, E>(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), E>>
56 where
57 Wr: MultipartWrite<Part>,
58 F: FnMut(U) -> Fut,
59 Fut: Future<Output = Result<Part, E>>,
60 E: From<Wr::Error>,
61 {
62 let mut this = self.project();
63
64 loop {
65 if this.buffered.is_some() {
66 match this.writer.as_mut().poll_ready(cx)? {
69 Poll::Ready(()) => {
70 let _ = this.writer.start_send(this.buffered.take().unwrap())?;
71 }
72 Poll::Pending => match this.writer.as_mut().poll_flush(cx)? {
73 Poll::Ready(()) => continue, Poll::Pending => return Poll::Pending,
75 },
76 }
77 }
78 if let Some(fut) = this.future.as_mut().as_pin_mut() {
79 let part = ready!(fut.poll(cx))?;
80 *this.buffered = Some(part);
81 this.future.set(None);
82 }
83 return Poll::Ready(Ok(()));
84 }
85 }
86}
87
88impl<Wr, U, E, Part, Fut, F> FusedMultipartWrite<U> for With<Wr, Part, Fut, F>
89where
90 Wr: FusedMultipartWrite<Part>,
91 F: FnMut(U) -> Fut,
92 Fut: Future<Output = Result<Part, E>>,
93 E: From<Wr::Error>,
94{
95 fn is_terminated(&self) -> bool {
96 self.writer.is_terminated()
97 }
98}
99
100impl<Wr, U, E, Part, Fut, F> MultipartWrite<U> for With<Wr, Part, Fut, F>
101where
102 Wr: MultipartWrite<Part>,
103 F: FnMut(U) -> Fut,
104 Fut: Future<Output = Result<Part, E>>,
105 E: From<Wr::Error>,
106{
107 type Ret = ();
108 type Output = Wr::Output;
109 type Error = E;
110
111 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
112 ready!(self.as_mut().poll(cx))?;
113 ready!(self.project().writer.poll_ready(cx)?);
114 Poll::Ready(Ok(()))
115 }
116
117 fn start_send(self: Pin<&mut Self>, part: U) -> Result<Self::Ret, Self::Error> {
118 let mut this = self.project();
119 this.future.set(Some((this.f)(part)));
120 Ok(())
121 }
122
123 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
124 ready!(self.as_mut().poll(cx))?;
125 ready!(self.project().writer.poll_flush(cx)?);
126 Poll::Ready(Ok(()))
127 }
128
129 fn poll_complete(
130 mut self: Pin<&mut Self>,
131 cx: &mut Context<'_>,
132 ) -> Poll<Result<Self::Output, Self::Error>> {
133 ready!(self.as_mut().poll(cx))?;
134 let out = ready!(self.project().writer.poll_complete(cx))?;
135 Poll::Ready(Ok(out))
136 }
137}
138
139impl<Wr, Part, Fut, F> Debug for With<Wr, Part, Fut, F>
140where
141 Wr: Debug,
142 Fut: Debug,
143 Part: Debug,
144{
145 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
146 f.debug_struct("With")
147 .field("writer", &self.writer)
148 .field("f", &"impl FnMut(U) -> Fut")
149 .field("future", &self.future)
150 .field("buffered", &self.buffered)
151 .finish()
152 }
153}