multipart_write/write/
with.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use std::fmt::{self, Debug, Formatter};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pin_project_lite::pin_project! {
9    /// `MultipartWrite` for [`with`](super::MultipartWriteExt::with).
10    #[must_use = "futures do nothing unless polled"]
11    pub struct With<Wr, Part, Fut, F> {
12        #[pin]
13        writer: Wr,
14        f: F,
15        #[pin]
16        future: Option<Fut>,
17        buffered: Option<Part>,
18    }
19}
20
21impl<Wr, Part, Fut, F> With<Wr, Part, Fut, F> {
22    pub(super) fn new(writer: Wr, f: F) -> Self {
23        Self {
24            writer,
25            f,
26            future: None,
27            buffered: None,
28        }
29    }
30
31    /// Acquires a reference to the underlying writer.
32    pub fn get_ref(&self) -> &Wr {
33        &self.writer
34    }
35
36    /// Acquires a mutable reference to the underlying writer.
37    ///
38    /// It is inadvisable to directly write to the underlying writer.
39    pub fn get_mut(&mut self) -> &mut Wr {
40        &mut self.writer
41    }
42
43    /// Acquires a pinned mutable reference to the underlying writer.
44    ///
45    /// It is inadvisable to directly write to the underlying writer.
46    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
47        self.project().writer
48    }
49
50    fn poll<U, E>(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), E>>
51    where
52        Wr: MultipartWrite<Part>,
53        F: FnMut(U) -> Fut,
54        Fut: Future<Output = Result<Part, E>>,
55        E: From<Wr::Error>,
56    {
57        let mut this = self.project();
58
59        loop {
60            if this.buffered.is_some() {
61                // Check if the underlying sink is prepared for another item.
62                // If it is, we have to send it without yielding in between.
63                match this.writer.as_mut().poll_ready(cx)? {
64                    Poll::Ready(()) => {
65                        let _ = this.writer.start_send(this.buffered.take().unwrap())?;
66                    }
67                    Poll::Pending => match this.writer.as_mut().poll_flush(cx)? {
68                        Poll::Ready(()) => continue, // check `poll_ready` again
69                        Poll::Pending => return Poll::Pending,
70                    },
71                }
72            }
73            if let Some(fut) = this.future.as_mut().as_pin_mut() {
74                let part = ready!(fut.poll(cx))?;
75                *this.buffered = Some(part);
76                this.future.set(None);
77            }
78            return Poll::Ready(Ok(()));
79        }
80    }
81}
82
83impl<Wr, U, E, Part, Fut, F> FusedMultipartWrite<U> for With<Wr, Part, Fut, F>
84where
85    Wr: FusedMultipartWrite<Part>,
86    F: FnMut(U) -> Fut,
87    Fut: Future<Output = Result<Part, E>>,
88    E: From<Wr::Error>,
89{
90    fn is_terminated(&self) -> bool {
91        self.writer.is_terminated()
92    }
93}
94
95impl<Wr, U, E, Part, Fut, F> MultipartWrite<U> for With<Wr, Part, Fut, F>
96where
97    Wr: MultipartWrite<Part>,
98    F: FnMut(U) -> Fut,
99    Fut: Future<Output = Result<Part, E>>,
100    E: From<Wr::Error>,
101{
102    type Ret = ();
103    type Output = Wr::Output;
104    type Error = E;
105
106    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
107        ready!(self.as_mut().poll(cx))?;
108        ready!(self.project().writer.poll_ready(cx)?);
109        Poll::Ready(Ok(()))
110    }
111
112    fn start_send(self: Pin<&mut Self>, part: U) -> Result<Self::Ret, Self::Error> {
113        let mut this = self.project();
114        this.future.set(Some((this.f)(part)));
115        Ok(())
116    }
117
118    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119        ready!(self.as_mut().poll(cx))?;
120        ready!(self.project().writer.poll_flush(cx)?);
121        Poll::Ready(Ok(()))
122    }
123
124    fn poll_complete(
125        mut self: Pin<&mut Self>,
126        cx: &mut Context<'_>,
127    ) -> Poll<Result<Self::Output, Self::Error>> {
128        ready!(self.as_mut().poll(cx))?;
129        let out = ready!(self.project().writer.poll_complete(cx))?;
130        Poll::Ready(Ok(out))
131    }
132}
133
134impl<Wr, Part, Fut, F> Debug for With<Wr, Part, Fut, F>
135where
136    Wr: Debug,
137    Fut: Debug,
138    Part: Debug,
139{
140    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
141        f.debug_struct("With")
142            .field("writer", &self.writer)
143            .field("f", &"impl FnMut(U) -> Fut")
144            .field("future", &self.future)
145            .field("buffered", &self.buffered)
146            .finish()
147    }
148}