Skip to main content

multipart_write/write/
ready_part.rs

1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::ready;
6
7use crate::{FusedMultipartWrite, MultipartWrite};
8
9pin_project_lite::pin_project! {
10    /// `MultipartWrite` for [`ready_part`].
11    ///
12    /// [`ready_part`]: super::MultipartWriteExt::ready_part
13    #[must_use = "futures do nothing unless polled"]
14    pub struct ReadyPart<Wr, Part, Fut, F> {
15        #[pin]
16        writer: Wr,
17        f: F,
18        #[pin]
19        future: Option<Fut>,
20        buffered: Option<Part>,
21    }
22}
23
24impl<Wr, Part, Fut, F> ReadyPart<Wr, Part, Fut, F> {
25    pub(super) fn new(writer: Wr, f: F) -> Self {
26        Self { writer, f, future: None, buffered: None }
27    }
28
29    /// Consumes `ReadyPart`, returning the underlying writer.
30    pub fn into_inner(self) -> Wr {
31        self.writer
32    }
33
34    /// Acquires a reference to the underlying writer.
35    pub fn get_ref(&self) -> &Wr {
36        &self.writer
37    }
38
39    /// Acquires a mutable reference to the underlying writer.
40    ///
41    /// It is inadvisable to directly write to the underlying writer.
42    pub fn get_mut(&mut self) -> &mut Wr {
43        &mut self.writer
44    }
45
46    /// Acquires a pinned mutable reference to the underlying writer.
47    ///
48    /// It is inadvisable to directly write to the underlying writer.
49    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
50        self.project().writer
51    }
52
53    fn poll<U, E>(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), E>>
54    where
55        Wr: MultipartWrite<Part>,
56        F: FnMut(U) -> Fut,
57        Fut: Future<Output = Result<Part, E>>,
58        E: From<Wr::Error>,
59    {
60        let mut this = self.project();
61
62        if this.buffered.is_some() {
63            match this.writer.as_mut().poll_ready(cx)? {
64                Poll::Ready(()) => {
65                    this.writer.start_send(this.buffered.take().unwrap())?;
66                },
67                Poll::Pending => return Poll::Pending,
68            }
69        }
70        if let Some(fut) = this.future.as_mut().as_pin_mut() {
71            let part = ready!(fut.poll(cx))?;
72            *this.buffered = Some(part);
73            this.future.set(None);
74        }
75
76        Poll::Ready(Ok(()))
77    }
78}
79
80impl<U, E, Wr, Part, Fut, F> FusedMultipartWrite<U>
81    for ReadyPart<Wr, Part, Fut, F>
82where
83    Wr: FusedMultipartWrite<Part>,
84    F: FnMut(U) -> Fut,
85    Fut: Future<Output = Result<Part, E>>,
86    E: From<Wr::Error>,
87{
88    fn is_terminated(&self) -> bool {
89        self.writer.is_terminated()
90    }
91}
92
93impl<U, E, Wr, Part, Fut, F> MultipartWrite<U> for ReadyPart<Wr, Part, Fut, F>
94where
95    Wr: MultipartWrite<Part>,
96    F: FnMut(U) -> Fut,
97    Fut: Future<Output = Result<Part, E>>,
98    E: From<Wr::Error>,
99{
100    type Error = E;
101    type Output = Wr::Output;
102    type Recv = ();
103
104    fn poll_ready(
105        mut self: Pin<&mut Self>,
106        cx: &mut Context<'_>,
107    ) -> Poll<Result<(), Self::Error>> {
108        ready!(self.as_mut().poll(cx))?;
109        self.project().writer.poll_ready(cx).map_err(E::from)
110    }
111
112    fn start_send(
113        self: Pin<&mut Self>,
114        part: U,
115    ) -> Result<Self::Recv, Self::Error> {
116        let mut this = self.project();
117        this.future.set(Some((this.f)(part)));
118        Ok(())
119    }
120
121    fn poll_flush(
122        mut self: Pin<&mut Self>,
123        cx: &mut Context<'_>,
124    ) -> Poll<Result<(), Self::Error>> {
125        ready!(self.as_mut().poll(cx))?;
126        ready!(self.project().writer.poll_flush(cx)?);
127        Poll::Ready(Ok(()))
128    }
129
130    fn poll_complete(
131        mut self: Pin<&mut Self>,
132        cx: &mut Context<'_>,
133    ) -> Poll<Result<Self::Output, Self::Error>> {
134        ready!(self.as_mut().poll(cx))?;
135        let out = ready!(self.project().writer.poll_complete(cx))?;
136        Poll::Ready(Ok(out))
137    }
138}
139
140impl<Wr, Part, Fut, F> Debug for ReadyPart<Wr, Part, Fut, F>
141where
142    Wr: Debug,
143    Fut: Debug,
144    Part: Debug,
145{
146    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147        f.debug_struct("ReadyPart")
148            .field("writer", &self.writer)
149            .field("future", &self.future)
150            .field("buffered", &self.buffered)
151            .finish()
152    }
153}