multipart_write/write/
with.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use std::fmt::{self, Debug, Formatter};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pin_project_lite::pin_project! {
9    /// `MultipartWrite` for [`with`](super::MultipartWriteExt::with).
10    #[must_use = "futures do nothing unless polled"]
11    pub struct With<Wr, Part, Fut, F> {
12        #[pin]
13        writer: Wr,
14        f: F,
15        #[pin]
16        future: Option<Fut>,
17        buffered: Option<Part>,
18    }
19}
20
21impl<Wr, Part, Fut, F> With<Wr, Part, Fut, F> {
22    pub(super) fn new(writer: Wr, f: F) -> Self {
23        Self {
24            writer,
25            f,
26            future: None,
27            buffered: None,
28        }
29    }
30
31    /// Consumes `With`, returning the underlying writer.
32    pub fn into_inner(self) -> Wr {
33        self.writer
34    }
35
36    /// Acquires a reference to the underlying writer.
37    pub fn get_ref(&self) -> &Wr {
38        &self.writer
39    }
40
41    /// Acquires a mutable reference to the underlying writer.
42    ///
43    /// It is inadvisable to directly write to the underlying writer.
44    pub fn get_mut(&mut self) -> &mut Wr {
45        &mut self.writer
46    }
47
48    /// Acquires a pinned mutable reference to the underlying writer.
49    ///
50    /// It is inadvisable to directly write to the underlying writer.
51    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
52        self.project().writer
53    }
54
55    fn poll<U, E>(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), E>>
56    where
57        Wr: MultipartWrite<Part>,
58        F: FnMut(U) -> Fut,
59        Fut: Future<Output = Result<Part, E>>,
60        E: From<Wr::Error>,
61    {
62        let mut this = self.project();
63
64        loop {
65            if this.buffered.is_some() {
66                // Check if the underlying sink is prepared for another item.
67                // If it is, we have to send it without yielding in between.
68                match this.writer.as_mut().poll_ready(cx)? {
69                    Poll::Ready(()) => {
70                        let _ = this.writer.start_send(this.buffered.take().unwrap())?;
71                    }
72                    Poll::Pending => match this.writer.as_mut().poll_flush(cx)? {
73                        Poll::Ready(()) => continue, // check `poll_ready` again
74                        Poll::Pending => return Poll::Pending,
75                    },
76                }
77            }
78            if let Some(fut) = this.future.as_mut().as_pin_mut() {
79                let part = ready!(fut.poll(cx))?;
80                *this.buffered = Some(part);
81                this.future.set(None);
82            }
83            return Poll::Ready(Ok(()));
84        }
85    }
86}
87
88impl<Wr, U, E, Part, Fut, F> FusedMultipartWrite<U> for With<Wr, Part, Fut, F>
89where
90    Wr: FusedMultipartWrite<Part>,
91    F: FnMut(U) -> Fut,
92    Fut: Future<Output = Result<Part, E>>,
93    E: From<Wr::Error>,
94{
95    fn is_terminated(&self) -> bool {
96        self.writer.is_terminated()
97    }
98}
99
100impl<Wr, U, E, Part, Fut, F> MultipartWrite<U> for With<Wr, Part, Fut, F>
101where
102    Wr: MultipartWrite<Part>,
103    F: FnMut(U) -> Fut,
104    Fut: Future<Output = Result<Part, E>>,
105    E: From<Wr::Error>,
106{
107    type Ret = ();
108    type Output = Wr::Output;
109    type Error = E;
110
111    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
112        ready!(self.as_mut().poll(cx))?;
113        ready!(self.project().writer.poll_ready(cx)?);
114        Poll::Ready(Ok(()))
115    }
116
117    fn start_send(self: Pin<&mut Self>, part: U) -> Result<Self::Ret, Self::Error> {
118        let mut this = self.project();
119        this.future.set(Some((this.f)(part)));
120        Ok(())
121    }
122
123    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
124        ready!(self.as_mut().poll(cx))?;
125        ready!(self.project().writer.poll_flush(cx)?);
126        Poll::Ready(Ok(()))
127    }
128
129    fn poll_complete(
130        mut self: Pin<&mut Self>,
131        cx: &mut Context<'_>,
132    ) -> Poll<Result<Self::Output, Self::Error>> {
133        ready!(self.as_mut().poll(cx))?;
134        let out = ready!(self.project().writer.poll_complete(cx))?;
135        Poll::Ready(Ok(out))
136    }
137}
138
139impl<Wr, Part, Fut, F> Debug for With<Wr, Part, Fut, F>
140where
141    Wr: Debug,
142    Fut: Debug,
143    Part: Debug,
144{
145    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
146        f.debug_struct("With")
147            .field("writer", &self.writer)
148            .field("f", &"impl FnMut(U) -> Fut")
149            .field("future", &self.future)
150            .field("buffered", &self.buffered)
151            .finish()
152    }
153}