multipart_write/write/
on_complete.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures::{Future, ready};
4use std::fmt::{self, Debug, Formatter};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8/// `MultipartWrite` for [`on_complete`].
9///
10/// [`on_complete`]: super::MultipartWriteExt::on_complete
11#[must_use = "futures do nothing unless polled"]
12#[pin_project::pin_project]
13pub struct OnComplete<Wr, S, F, Fut> {
14    #[pin]
15    writer: Option<Wr>,
16    f: F,
17    s: S,
18    #[pin]
19    future: Option<Fut>,
20    is_terminated: bool,
21    _f: std::marker::PhantomData<fn(S)>,
22}
23
24impl<Wr, S, F, Fut> OnComplete<Wr, S, F, Fut> {
25    pub(super) fn new(writer: Wr, s: S, f: F) -> Self {
26        Self {
27            writer: Some(writer),
28            f,
29            s,
30            future: None,
31            is_terminated: false,
32            _f: std::marker::PhantomData,
33        }
34    }
35
36    fn try_poll_ready<Part>(
37        self: Pin<&mut Self>,
38        cx: &mut Context<'_>,
39    ) -> Poll<Result<(), Wr::Error>>
40    where
41        Wr: MultipartWrite<Part>,
42        F: FnMut(&mut S) -> Fut,
43        Fut: Future<Output = Result<Wr, Wr::Error>>,
44    {
45        let mut this = self.project();
46
47        if this.writer.is_some() {
48            let wr = this.writer.as_mut().as_pin_mut().unwrap();
49            return wr.poll_ready(cx);
50        }
51
52        if this.future.is_some() {
53            let fut = this.future.as_mut().as_pin_mut().unwrap();
54            match ready!(fut.poll(cx)) {
55                Ok(wr) => {
56                    this.writer.set(Some(wr));
57                    this.future.set(None);
58                    // Return `Poll::Pending`, not `Poll::Ready(Ok(()))` because
59                    // the writer is set now, but that doesn't mean it's ready.
60                    // On the next poll it will hit the top and call `poll_ready`
61                    // on the writer.
62                    Poll::Pending
63                }
64                Err(e) => {
65                    this.writer.set(None);
66                    this.future.set(None);
67                    *this.is_terminated = true;
68                    Poll::Ready(Err(e))
69                }
70            }
71        } else {
72            let fut = (this.f)(this.s);
73            this.future.set(Some(fut));
74            Poll::Pending
75        }
76    }
77}
78
79impl<Wr, S, F, Fut, Part> FusedMultipartWrite<Part> for OnComplete<Wr, S, F, Fut>
80where
81    Wr: FusedMultipartWrite<Part>,
82    F: FnMut(&mut S) -> Fut,
83    Fut: Future<Output = Result<Wr, Wr::Error>>,
84{
85    fn is_terminated(&self) -> bool {
86        self.is_terminated
87    }
88}
89
90impl<Wr, S, F, Fut, Part> MultipartWrite<Part> for OnComplete<Wr, S, F, Fut>
91where
92    Wr: MultipartWrite<Part>,
93    F: FnMut(&mut S) -> Fut,
94    Fut: Future<Output = Result<Wr, Wr::Error>>,
95{
96    type Ret = Wr::Ret;
97    type Output = Wr::Output;
98    type Error = Wr::Error;
99
100    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101        self.try_poll_ready(cx)
102    }
103
104    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
105        let mut this = self.project();
106        let wr = this
107            .writer
108            .as_mut()
109            .as_pin_mut()
110            .expect("called start_send without poll_ready");
111        wr.start_send(part)
112    }
113
114    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115        let mut this = self.project();
116        let wr = this
117            .writer
118            .as_mut()
119            .as_pin_mut()
120            .expect("polled OnComplete after completion");
121        wr.poll_flush(cx)
122    }
123
124    fn poll_complete(
125        self: Pin<&mut Self>,
126        cx: &mut Context<'_>,
127    ) -> Poll<Result<Self::Output, Self::Error>> {
128        let mut this = self.project();
129        let wr = this
130            .writer
131            .as_mut()
132            .as_pin_mut()
133            .expect("polled OnComplete after completion");
134        let output = ready!(wr.poll_complete(cx));
135        this.writer.set(None);
136
137        if output.is_err() {
138            this.future.set(None);
139            *this.is_terminated = true;
140        } else {
141            let fut = (this.f)(this.s);
142            this.future.set(Some(fut));
143        }
144
145        Poll::Ready(output)
146    }
147}
148
149impl<Wr, S, F, Fut> Debug for OnComplete<Wr, S, F, Fut>
150where
151    Wr: Debug,
152    S: Debug,
153    Fut: Debug,
154{
155    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
156        f.debug_struct("OnComplete")
157            .field("writer", &self.writer)
158            .field("s", &self.s)
159            .field("future", &self.future)
160            .field("is_terminated", &self.is_terminated)
161            .finish()
162    }
163}