futures_finally/
lib.rs

1pub mod future {
2    use pin_project::pin_project;
3    use std::future::Future;
4    use std::pin::Pin;
5    use std::task::{Context, Poll};
6
7    #[pin_project]
8    pub struct ThenFinally<FT, Fut, F, O> {
9        #[pin]
10        item: Option<FT>,
11        #[pin]
12        fut: Option<Fut>,
13        f: Option<F>,
14        output: Option<O>,
15    }
16
17    pub trait ThenFinallyFutureExt: Sized {
18        /// Consumes the current future into a new one which will execute an asynchronous upon completion of the future
19        ///
20        /// Note that this will execute the code regardless of a value that the future returns.
21        fn then_finally<Fut: Future, F: FnOnce() -> Fut, O>(
22            self,
23            f: F,
24        ) -> ThenFinally<Self, Fut, F, O> {
25            ThenFinally {
26                item: Some(self),
27                fut: None,
28                f: Some(f),
29                output: None,
30            }
31        }
32    }
33
34    impl<T: Sized> ThenFinallyFutureExt for T {}
35
36    impl<FT: Future<Output = O>, Fut: Future, F, O> Future for ThenFinally<FT, Fut, F, O>
37    where
38        F: FnOnce() -> Fut,
39    {
40        type Output = FT::Output;
41        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42            let mut this = self.project();
43
44            if let Some(item) = this.item.as_mut().as_pin_mut() {
45                let output = futures::ready!(item.poll(cx));
46                this.output.replace(output);
47                let func = this.f.take().expect("function is valid");
48                let fut = Some(func());
49                this.fut.set(fut);
50                this.item.set(None);
51            }
52
53            if let Some(fut) = this.fut.as_mut().as_pin_mut() {
54                futures::ready!(fut.poll(cx));
55                this.fut.set(None);
56            }
57
58            let output = this.output.take();
59
60            Poll::Ready(output.expect("output from future to be value"))
61        }
62    }
63}
64
65pub mod try_future {
66    use futures::future::{Future, TryFuture};
67    use pin_project::pin_project;
68    use std::pin::Pin;
69    use std::task::{Context, Poll};
70
71    #[pin_project]
72    pub struct ThenTryFinally<FT, Fut, F>
73    where
74        FT: TryFuture,
75    {
76        #[pin]
77        item: Option<FT>,
78        #[pin]
79        fut: Option<Fut>,
80        f: Option<F>,
81        output: Option<Result<FT::Ok, FT::Error>>,
82    }
83
84    pub trait ThenFinallyTryFutureExt: Sized {
85        fn then_try_finally<Fut: Future, F: FnOnce() -> Fut>(
86            self,
87            f: F,
88        ) -> ThenTryFinally<Self, Fut, F>
89        where
90            Self: TryFuture,
91        {
92            ThenTryFinally {
93                item: Some(self),
94                fut: None,
95                f: Some(f),
96                output: None,
97            }
98        }
99    }
100
101    impl<T: Sized> ThenFinallyTryFutureExt for T {}
102
103    impl<FT: TryFuture, Fut: Future, F> Future for ThenTryFinally<FT, Fut, F>
104    where
105        F: FnOnce() -> Fut,
106    {
107        type Output = Result<FT::Ok, FT::Error>;
108        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
109            let mut this = self.project();
110
111            if let Some(item) = this.item.as_mut().as_pin_mut() {
112                let output = futures::ready!(item.try_poll(cx));
113                this.item.set(None);
114                let func = this.f.take().expect("function is valid");
115                if let Err(e) = output {
116                    return Poll::Ready(Err(e));
117                }
118
119                this.output.replace(output);
120                let fut = Some(func());
121                this.fut.set(fut);
122            }
123
124            if let Some(fut) = this.fut.as_mut().as_pin_mut() {
125                futures::ready!(fut.poll(cx));
126                this.fut.set(None);
127            }
128
129            let output = this.output.take();
130
131            Poll::Ready(output.expect("output from future to be value"))
132        }
133    }
134}
135
136pub mod stream {
137    use futures::{Future, Stream};
138    use pin_project::pin_project;
139    use std::pin::Pin;
140    use std::task::{Context, Poll};
141
142    #[pin_project]
143    pub struct Finally<ST, Fut, F> {
144        #[pin]
145        item: Option<ST>,
146        #[pin]
147        fut: Option<Fut>,
148        f: Option<F>,
149    }
150
151    pub trait FinallyStreamExt: Sized {
152        /// Consumes the current stream into a new one which will execute an asynchronous upon completion of the stream
153        ///
154        /// Note that this will execute the code regardless of a value that the stream returns.
155        fn finally<Fut: Future, F: FnOnce() -> Fut>(self, f: F) -> Finally<Self, Fut, F> {
156            Finally {
157                item: Some(self),
158                fut: None,
159                f: Some(f),
160            }
161        }
162    }
163
164    impl<T: Sized> FinallyStreamExt for T {}
165
166    impl<ST: Stream, Fut: Future, F> Stream for Finally<ST, Fut, F>
167    where
168        F: FnOnce() -> Fut,
169    {
170        type Item = ST::Item;
171        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172            let mut this = self.project();
173
174            if let Some(item) = this.item.as_mut().as_pin_mut() {
175                match futures::ready!(item.poll_next(cx)) {
176                    Some(item) => return Poll::Ready(Some(item)),
177                    None => {
178                        let func = this.f.take().expect("function is valid");
179                        let fut = Some(func());
180                        this.fut.set(fut);
181                        this.item.set(None);
182                    }
183                };
184            }
185
186            if let Some(fut) = this.fut.as_mut().as_pin_mut() {
187                futures::ready!(fut.poll(cx));
188                this.fut.set(None);
189            }
190
191            Poll::Ready(None)
192        }
193    }
194}
195
196pub mod try_stream {
197    use futures::{Future, Stream, TryStream};
198    use pin_project::pin_project;
199    use std::pin::Pin;
200    use std::task::{Context, Poll};
201
202    #[pin_project]
203    pub struct TryFinally<ST, Fut, F>
204    where
205        ST: TryStream,
206    {
207        #[pin]
208        item: Option<ST>,
209        #[pin]
210        fut: Option<Fut>,
211        f: Option<F>,
212    }
213
214    pub trait FinallyTryStreamExt: Sized {
215        fn try_finally<Fut: Future, F: FnOnce() -> Fut>(self, f: F) -> TryFinally<Self, Fut, F>
216        where
217            Self: TryStream,
218        {
219            TryFinally {
220                item: Some(self),
221                fut: None,
222                f: Some(f),
223            }
224        }
225    }
226
227    impl<T: Sized> FinallyTryStreamExt for T {}
228
229    impl<ST: TryStream, Fut: Future, F> Stream for TryFinally<ST, Fut, F>
230    where
231        F: FnOnce() -> Fut,
232    {
233        type Item = Result<ST::Ok, ST::Error>;
234        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
235            let mut this = self.project();
236
237            if let Some(item) = this.item.as_mut().as_pin_mut() {
238                let result = futures::ready!(item.try_poll_next(cx));
239
240                match result {
241                    Some(Ok(val)) => return Poll::Ready(Some(Ok(val))),
242                    Some(Err(e)) => {
243                        this.item.set(None);
244                        this.f.take();
245                        return Poll::Ready(Some(Err(e)));
246                    }
247                    None => {
248                        let func = this.f.take().expect("function is valid");
249                        let fut = Some(func());
250                        this.fut.set(fut);
251                        this.item.set(None);
252                    }
253                }
254            }
255
256            if let Some(fut) = this.fut.as_mut().as_pin_mut() {
257                futures::ready!(fut.poll(cx));
258                this.fut.set(None);
259            }
260
261            Poll::Ready(None)
262        }
263    }
264}
265
266#[cfg(test)]
267mod test {
268    use crate::future::ThenFinallyFutureExt;
269    use crate::stream::FinallyStreamExt;
270    use crate::try_future::ThenFinallyTryFutureExt;
271    use crate::try_stream::FinallyTryStreamExt;
272    use futures::{StreamExt, TryStreamExt};
273    use std::convert::Infallible;
274
275    #[test]
276    fn future_final() {
277        futures::executor::block_on(async move {
278            let mut val = 0;
279
280            futures::future::ready(())
281                .then_finally(|| async {
282                    val = 1;
283                })
284                .await;
285
286            assert_eq!(val, 1);
287        });
288    }
289
290    #[test]
291    fn try_future_final() {
292        futures::executor::block_on(async move {
293            let mut val = 0;
294
295            futures::future::ok::<_, Infallible>(0)
296                .then_try_finally(|| async {
297                    val = 1;
298                })
299                .await
300                .expect("infallible");
301
302            assert_eq!(val, 1);
303
304            futures::future::err::<i8, std::io::Error>(std::io::ErrorKind::Other.into())
305                .then_try_finally(|| async { unreachable!() })
306                .await
307                .expect_err("should return an error");
308        });
309    }
310
311    #[test]
312    fn stream_final() {
313        futures::executor::block_on(async move {
314            let mut val = 0;
315
316            let st = futures::stream::once(async { 0 }).finally(|| async {
317                val = 1;
318            });
319
320            futures::pin_mut!(st);
321
322            while let Some(v) = st.next().await {
323                assert_eq!(v, 0);
324            }
325
326            assert_eq!(val, 1);
327        });
328    }
329
330    #[test]
331    fn try_stream_final() {
332        futures::executor::block_on(async move {
333            let mut val = 0;
334
335            let st =
336                futures::stream::once(async { Ok::<_, Infallible>(0) }).try_finally(|| async {
337                    val = 1;
338                });
339
340            futures::pin_mut!(st);
341
342            while let Ok(Some(v)) = st.try_next().await {
343                assert_eq!(v, 0);
344            }
345
346            let st = futures::stream::once(async {
347                Err::<i8, std::io::Error>(std::io::ErrorKind::Other.into())
348            })
349            .try_finally(|| async { unreachable!() });
350
351            futures::pin_mut!(st);
352
353            while let Ok(_) = st.try_next().await {
354                unreachable!()
355            }
356
357            assert_eq!(val, 1);
358        });
359    }
360}