multipart_write/write/
bootstrapped.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::{Future, ready};
4use std::fmt::{self, Debug, Formatter};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pin_project_lite::pin_project! {
9    /// `MultipartWrite` for [`bootstrapped`].
10    ///
11    /// [`bootstrapped`]: super::MultipartWriteExt::bootstrapped
12    #[must_use = "futures do nothing unless polled"]
13    pub struct Bootstrapped<Wr, S, F, Fut> {
14        #[pin]
15        writer: Option<Wr>,
16        f: F,
17        s: S,
18        #[pin]
19        future: Option<Fut>,
20        _f: std::marker::PhantomData<fn(S)>,
21    }
22}
23
24impl<Wr, S, F, Fut> Bootstrapped<Wr, S, F, Fut> {
25    pub(super) fn new(writer: Wr, s: S, f: F) -> Self {
26        Self {
27            writer: Some(writer),
28            f,
29            s,
30            future: None,
31            _f: std::marker::PhantomData,
32        }
33    }
34
35    fn poll_new_writer<Part>(
36        self: Pin<&mut Self>,
37        cx: &mut Context<'_>,
38    ) -> Poll<Result<(), Wr::Error>>
39    where
40        Wr: MultipartWrite<Part>,
41        F: FnMut(&mut S) -> Fut,
42        Fut: Future<Output = Result<Option<Wr>, Wr::Error>>,
43    {
44        let mut this = self.project();
45        // This should not be possible because this is only called from a
46        // `self.writer.is_none()` context and they can't both be `None` unless
47        // something bad is happening.
48        if this.future.is_none() {
49            let fut = (this.f)(this.s);
50            this.future.set(Some(fut));
51        }
52        let fut = this.future.as_mut().as_pin_mut().unwrap();
53        match ready!(fut.poll(cx)) {
54            Ok(wr) => {
55                this.future.set(None);
56                // If `wr` is `None` this should be the last time this future is
57                // polled because now it `is_terminated` according to its fused
58                // implementation, which all `Future` and `Stream` impls defer to
59                // if appropriate for `FusedFuture/Stream`.
60                this.writer.set(wr);
61                Poll::Ready(Ok(()))
62            }
63            Err(e) => {
64                this.future.set(None);
65                Poll::Ready(Err(e))
66            }
67        }
68    }
69}
70
71impl<Wr, S, F, Fut, Part> FusedMultipartWrite<Part> for Bootstrapped<Wr, S, F, Fut>
72where
73    Wr: FusedMultipartWrite<Part>,
74    F: FnMut(&mut S) -> Fut,
75    Fut: Future<Output = Result<Option<Wr>, Wr::Error>>,
76{
77    fn is_terminated(&self) -> bool {
78        self.writer.is_none() && self.future.is_none()
79    }
80}
81
82impl<Wr, S, F, Fut, Part> MultipartWrite<Part> for Bootstrapped<Wr, S, F, Fut>
83where
84    Wr: MultipartWrite<Part>,
85    F: FnMut(&mut S) -> Fut,
86    Fut: Future<Output = Result<Option<Wr>, Wr::Error>>,
87{
88    type Ret = Wr::Ret;
89    type Output = Wr::Output;
90    type Error = Wr::Error;
91
92    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
93        if self.writer.is_none() {
94            ready!(self.as_mut().poll_new_writer(cx))?;
95        }
96        let mut this = self.project();
97        let wr = this.writer.as_mut().as_pin_mut().unwrap();
98        wr.poll_ready(cx)
99    }
100
101    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
102        let mut this = self.project();
103        // This has to be non-`None` if following the rules.
104        assert!(this.writer.is_some());
105        let wr = this.writer.as_mut().as_pin_mut().unwrap();
106        wr.start_send(part)
107    }
108
109    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
110        let mut this = self.project();
111        let wr = this
112            .writer
113            .as_mut()
114            .as_pin_mut()
115            .expect("polled Bootstrapped after completion");
116        wr.poll_flush(cx)
117    }
118
119    fn poll_complete(
120        self: Pin<&mut Self>,
121        cx: &mut Context<'_>,
122    ) -> Poll<Result<Self::Output, Self::Error>> {
123        let mut this = self.project();
124        let wr = this
125            .writer
126            .as_mut()
127            .as_pin_mut()
128            .expect("polled Bootstrapped after completion");
129        let output = ready!(wr.poll_complete(cx));
130        let fut = (this.f)(this.s);
131        // Have to have one of these two be non-`None` or else this future
132        // becomes fused, i.e. `FusedFuture`s deferring to this type will return
133        // that it's terminated.
134        //
135        // This future may not be able to return a non-`None` value itself, which
136        // is fine--it'll become terminated at that point.
137        this.future.set(Some(fut));
138        this.writer.set(None);
139        Poll::Ready(output)
140    }
141}
142
143impl<Wr, S, F, Fut> Debug for Bootstrapped<Wr, S, F, Fut>
144where
145    Wr: Debug,
146    S: Debug,
147    Fut: Debug,
148{
149    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
150        f.debug_struct("Bootstrapped")
151            .field("writer", &self.writer)
152            .field("f", &"F")
153            .field("s", &self.s)
154            .field("future", &self.future)
155            .finish()
156    }
157}