multipart_write/write/
on_complete.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures::{Future, ready};
4use std::fmt::{self, Debug, Formatter};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[must_use = "futures do nothing unless polled"]
12#[pin_project::pin_project]
13pub struct OnComplete<Wr, S, F, Fut> {
14 #[pin]
15 writer: Option<Wr>,
16 f: F,
17 s: S,
18 #[pin]
19 future: Option<Fut>,
20 is_terminated: bool,
21 _f: std::marker::PhantomData<fn(S)>,
22}
23
24impl<Wr, S, F, Fut> OnComplete<Wr, S, F, Fut> {
25 pub(super) fn new(writer: Wr, s: S, f: F) -> Self {
26 Self {
27 writer: Some(writer),
28 f,
29 s,
30 future: None,
31 is_terminated: false,
32 _f: std::marker::PhantomData,
33 }
34 }
35
36 fn try_poll_ready<Part>(
37 self: Pin<&mut Self>,
38 cx: &mut Context<'_>,
39 ) -> Poll<Result<(), Wr::Error>>
40 where
41 Wr: MultipartWrite<Part>,
42 F: FnMut(&mut S) -> Fut,
43 Fut: Future<Output = Result<Wr, Wr::Error>>,
44 {
45 let mut this = self.project();
46
47 if this.writer.is_some() {
48 let wr = this.writer.as_mut().as_pin_mut().unwrap();
49 return wr.poll_ready(cx);
50 }
51
52 if this.future.is_some() {
53 let fut = this.future.as_mut().as_pin_mut().unwrap();
54 match ready!(fut.poll(cx)) {
55 Ok(wr) => {
56 this.writer.set(Some(wr));
57 this.future.set(None);
58 Poll::Pending
63 }
64 Err(e) => {
65 this.writer.set(None);
66 this.future.set(None);
67 *this.is_terminated = true;
68 Poll::Ready(Err(e))
69 }
70 }
71 } else {
72 let fut = (this.f)(this.s);
73 this.future.set(Some(fut));
74 Poll::Pending
75 }
76 }
77}
78
79impl<Wr, S, F, Fut, Part> FusedMultipartWrite<Part> for OnComplete<Wr, S, F, Fut>
80where
81 Wr: FusedMultipartWrite<Part>,
82 F: FnMut(&mut S) -> Fut,
83 Fut: Future<Output = Result<Wr, Wr::Error>>,
84{
85 fn is_terminated(&self) -> bool {
86 self.is_terminated
87 }
88}
89
90impl<Wr, S, F, Fut, Part> MultipartWrite<Part> for OnComplete<Wr, S, F, Fut>
91where
92 Wr: MultipartWrite<Part>,
93 F: FnMut(&mut S) -> Fut,
94 Fut: Future<Output = Result<Wr, Wr::Error>>,
95{
96 type Ret = Wr::Ret;
97 type Output = Wr::Output;
98 type Error = Wr::Error;
99
100 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101 self.try_poll_ready(cx)
102 }
103
104 fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
105 let mut this = self.project();
106 let wr = this
107 .writer
108 .as_mut()
109 .as_pin_mut()
110 .expect("called start_send without poll_ready");
111 wr.start_send(part)
112 }
113
114 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115 let mut this = self.project();
116 let wr = this
117 .writer
118 .as_mut()
119 .as_pin_mut()
120 .expect("polled OnComplete after completion");
121 wr.poll_flush(cx)
122 }
123
124 fn poll_complete(
125 self: Pin<&mut Self>,
126 cx: &mut Context<'_>,
127 ) -> Poll<Result<Self::Output, Self::Error>> {
128 let mut this = self.project();
129 let wr = this
130 .writer
131 .as_mut()
132 .as_pin_mut()
133 .expect("polled OnComplete after completion");
134 let output = ready!(wr.poll_complete(cx));
135 this.writer.set(None);
136
137 if output.is_err() {
138 this.future.set(None);
139 *this.is_terminated = true;
140 } else {
141 let fut = (this.f)(this.s);
142 this.future.set(Some(fut));
143 }
144
145 Poll::Ready(output)
146 }
147}
148
149impl<Wr, S, F, Fut> Debug for OnComplete<Wr, S, F, Fut>
150where
151 Wr: Debug,
152 S: Debug,
153 Fut: Debug,
154{
155 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
156 f.debug_struct("OnComplete")
157 .field("writer", &self.writer)
158 .field("s", &self.s)
159 .field("future", &self.future)
160 .field("is_terminated", &self.is_terminated)
161 .finish()
162 }
163}