1pub mod future {
2 use pin_project::pin_project;
3 use std::future::Future;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6
7 #[pin_project]
8 pub struct ThenFinally<FT, Fut, F, O> {
9 #[pin]
10 item: Option<FT>,
11 #[pin]
12 fut: Option<Fut>,
13 f: Option<F>,
14 output: Option<O>,
15 }
16
17 pub trait ThenFinallyFutureExt: Sized {
18 fn then_finally<Fut: Future, F: FnOnce() -> Fut, O>(
22 self,
23 f: F,
24 ) -> ThenFinally<Self, Fut, F, O> {
25 ThenFinally {
26 item: Some(self),
27 fut: None,
28 f: Some(f),
29 output: None,
30 }
31 }
32 }
33
34 impl<T: Sized> ThenFinallyFutureExt for T {}
35
36 impl<FT: Future<Output = O>, Fut: Future, F, O> Future for ThenFinally<FT, Fut, F, O>
37 where
38 F: FnOnce() -> Fut,
39 {
40 type Output = FT::Output;
41 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42 let mut this = self.project();
43
44 if let Some(item) = this.item.as_mut().as_pin_mut() {
45 let output = futures::ready!(item.poll(cx));
46 this.output.replace(output);
47 let func = this.f.take().expect("function is valid");
48 let fut = Some(func());
49 this.fut.set(fut);
50 this.item.set(None);
51 }
52
53 if let Some(fut) = this.fut.as_mut().as_pin_mut() {
54 futures::ready!(fut.poll(cx));
55 this.fut.set(None);
56 }
57
58 let output = this.output.take();
59
60 Poll::Ready(output.expect("output from future to be value"))
61 }
62 }
63}
64
65pub mod try_future {
66 use futures::future::{Future, TryFuture};
67 use pin_project::pin_project;
68 use std::pin::Pin;
69 use std::task::{Context, Poll};
70
71 #[pin_project]
72 pub struct ThenTryFinally<FT, Fut, F>
73 where
74 FT: TryFuture,
75 {
76 #[pin]
77 item: Option<FT>,
78 #[pin]
79 fut: Option<Fut>,
80 f: Option<F>,
81 output: Option<Result<FT::Ok, FT::Error>>,
82 }
83
84 pub trait ThenFinallyTryFutureExt: Sized {
85 fn then_try_finally<Fut: Future, F: FnOnce() -> Fut>(
86 self,
87 f: F,
88 ) -> ThenTryFinally<Self, Fut, F>
89 where
90 Self: TryFuture,
91 {
92 ThenTryFinally {
93 item: Some(self),
94 fut: None,
95 f: Some(f),
96 output: None,
97 }
98 }
99 }
100
101 impl<T: Sized> ThenFinallyTryFutureExt for T {}
102
103 impl<FT: TryFuture, Fut: Future, F> Future for ThenTryFinally<FT, Fut, F>
104 where
105 F: FnOnce() -> Fut,
106 {
107 type Output = Result<FT::Ok, FT::Error>;
108 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
109 let mut this = self.project();
110
111 if let Some(item) = this.item.as_mut().as_pin_mut() {
112 let output = futures::ready!(item.try_poll(cx));
113 this.item.set(None);
114 let func = this.f.take().expect("function is valid");
115 if let Err(e) = output {
116 return Poll::Ready(Err(e));
117 }
118
119 this.output.replace(output);
120 let fut = Some(func());
121 this.fut.set(fut);
122 }
123
124 if let Some(fut) = this.fut.as_mut().as_pin_mut() {
125 futures::ready!(fut.poll(cx));
126 this.fut.set(None);
127 }
128
129 let output = this.output.take();
130
131 Poll::Ready(output.expect("output from future to be value"))
132 }
133 }
134}
135
136pub mod stream {
137 use futures::{Future, Stream};
138 use pin_project::pin_project;
139 use std::pin::Pin;
140 use std::task::{Context, Poll};
141
142 #[pin_project]
143 pub struct Finally<ST, Fut, F> {
144 #[pin]
145 item: Option<ST>,
146 #[pin]
147 fut: Option<Fut>,
148 f: Option<F>,
149 }
150
151 pub trait FinallyStreamExt: Sized {
152 fn finally<Fut: Future, F: FnOnce() -> Fut>(self, f: F) -> Finally<Self, Fut, F> {
156 Finally {
157 item: Some(self),
158 fut: None,
159 f: Some(f),
160 }
161 }
162 }
163
164 impl<T: Sized> FinallyStreamExt for T {}
165
166 impl<ST: Stream, Fut: Future, F> Stream for Finally<ST, Fut, F>
167 where
168 F: FnOnce() -> Fut,
169 {
170 type Item = ST::Item;
171 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172 let mut this = self.project();
173
174 if let Some(item) = this.item.as_mut().as_pin_mut() {
175 match futures::ready!(item.poll_next(cx)) {
176 Some(item) => return Poll::Ready(Some(item)),
177 None => {
178 let func = this.f.take().expect("function is valid");
179 let fut = Some(func());
180 this.fut.set(fut);
181 this.item.set(None);
182 }
183 };
184 }
185
186 if let Some(fut) = this.fut.as_mut().as_pin_mut() {
187 futures::ready!(fut.poll(cx));
188 this.fut.set(None);
189 }
190
191 Poll::Ready(None)
192 }
193 }
194}
195
196pub mod try_stream {
197 use futures::{Future, Stream, TryStream};
198 use pin_project::pin_project;
199 use std::pin::Pin;
200 use std::task::{Context, Poll};
201
202 #[pin_project]
203 pub struct TryFinally<ST, Fut, F>
204 where
205 ST: TryStream,
206 {
207 #[pin]
208 item: Option<ST>,
209 #[pin]
210 fut: Option<Fut>,
211 f: Option<F>,
212 }
213
214 pub trait FinallyTryStreamExt: Sized {
215 fn try_finally<Fut: Future, F: FnOnce() -> Fut>(self, f: F) -> TryFinally<Self, Fut, F>
216 where
217 Self: TryStream,
218 {
219 TryFinally {
220 item: Some(self),
221 fut: None,
222 f: Some(f),
223 }
224 }
225 }
226
227 impl<T: Sized> FinallyTryStreamExt for T {}
228
229 impl<ST: TryStream, Fut: Future, F> Stream for TryFinally<ST, Fut, F>
230 where
231 F: FnOnce() -> Fut,
232 {
233 type Item = Result<ST::Ok, ST::Error>;
234 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
235 let mut this = self.project();
236
237 if let Some(item) = this.item.as_mut().as_pin_mut() {
238 let result = futures::ready!(item.try_poll_next(cx));
239
240 match result {
241 Some(Ok(val)) => return Poll::Ready(Some(Ok(val))),
242 Some(Err(e)) => {
243 this.item.set(None);
244 this.f.take();
245 return Poll::Ready(Some(Err(e)));
246 }
247 None => {
248 let func = this.f.take().expect("function is valid");
249 let fut = Some(func());
250 this.fut.set(fut);
251 this.item.set(None);
252 }
253 }
254 }
255
256 if let Some(fut) = this.fut.as_mut().as_pin_mut() {
257 futures::ready!(fut.poll(cx));
258 this.fut.set(None);
259 }
260
261 Poll::Ready(None)
262 }
263 }
264}
265
266#[cfg(test)]
267mod test {
268 use crate::future::ThenFinallyFutureExt;
269 use crate::stream::FinallyStreamExt;
270 use crate::try_future::ThenFinallyTryFutureExt;
271 use crate::try_stream::FinallyTryStreamExt;
272 use futures::{StreamExt, TryStreamExt};
273 use std::convert::Infallible;
274
275 #[test]
276 fn future_final() {
277 futures::executor::block_on(async move {
278 let mut val = 0;
279
280 futures::future::ready(())
281 .then_finally(|| async {
282 val = 1;
283 })
284 .await;
285
286 assert_eq!(val, 1);
287 });
288 }
289
290 #[test]
291 fn try_future_final() {
292 futures::executor::block_on(async move {
293 let mut val = 0;
294
295 futures::future::ok::<_, Infallible>(0)
296 .then_try_finally(|| async {
297 val = 1;
298 })
299 .await
300 .expect("infallible");
301
302 assert_eq!(val, 1);
303
304 futures::future::err::<i8, std::io::Error>(std::io::ErrorKind::Other.into())
305 .then_try_finally(|| async { unreachable!() })
306 .await
307 .expect_err("should return an error");
308 });
309 }
310
311 #[test]
312 fn stream_final() {
313 futures::executor::block_on(async move {
314 let mut val = 0;
315
316 let st = futures::stream::once(async { 0 }).finally(|| async {
317 val = 1;
318 });
319
320 futures::pin_mut!(st);
321
322 while let Some(v) = st.next().await {
323 assert_eq!(v, 0);
324 }
325
326 assert_eq!(val, 1);
327 });
328 }
329
330 #[test]
331 fn try_stream_final() {
332 futures::executor::block_on(async move {
333 let mut val = 0;
334
335 let st =
336 futures::stream::once(async { Ok::<_, Infallible>(0) }).try_finally(|| async {
337 val = 1;
338 });
339
340 futures::pin_mut!(st);
341
342 while let Ok(Some(v)) = st.try_next().await {
343 assert_eq!(v, 0);
344 }
345
346 let st = futures::stream::once(async {
347 Err::<i8, std::io::Error>(std::io::ErrorKind::Other.into())
348 })
349 .try_finally(|| async { unreachable!() });
350
351 futures::pin_mut!(st);
352
353 while let Ok(_) = st.try_next().await {
354 unreachable!()
355 }
356
357 assert_eq!(val, 1);
358 });
359 }
360}