future_utils/
finally.rs

1use futures::{Async, Future, Stream};
2
3/// Wraps a stream or future and runs a callback when the stream/future ends or when `Finally` is
4/// dropped.
5pub struct Finally<F, D>
6where
7    D: FnOnce(),
8{
9    inner: Option<OnDrop<F, D>>,
10}
11
12struct OnDrop<F, D>
13where
14    D: FnOnce(),
15{
16    future: F,
17    on_drop: Option<D>,
18}
19
20impl<F, D> Drop for OnDrop<F, D>
21where
22    D: FnOnce(),
23{
24    fn drop(&mut self) {
25        unwrap!(self.on_drop.take())()
26    }
27}
28
29impl<F, D> Finally<F, D>
30where
31    D: FnOnce(),
32{
33    pub fn new(future: F, on_drop: D) -> Finally<F, D> {
34        Finally {
35            inner: Some(OnDrop {
36                future: future,
37                on_drop: Some(on_drop),
38            }),
39        }
40    }
41}
42
43impl<F, D> Future for Finally<F, D>
44where
45    F: Future,
46    D: FnOnce(),
47{
48    type Item = F::Item;
49    type Error = F::Error;
50
51    fn poll(&mut self) -> Result<Async<F::Item>, F::Error> {
52        let mut on_drop = unwrap!(self.inner.take());
53        match on_drop.future.poll()? {
54            Async::Ready(x) => Ok(Async::Ready(x)),
55            Async::NotReady => {
56                self.inner = Some(on_drop);
57                Ok(Async::NotReady)
58            },
59        }
60    }
61}
62
63impl<S, D> Stream for Finally<S, D>
64where
65    S: Stream,
66    D: FnOnce(),
67{
68    type Item = S::Item;
69    type Error = S::Error;
70
71    fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
72        let mut on_drop = unwrap!(self.inner.take());
73        match on_drop.future.poll()? {
74            Async::Ready(x) => Ok(Async::Ready(x)),
75            Async::NotReady => {
76                self.inner = Some(on_drop);
77                Ok(Async::NotReady)
78            },
79        }
80    }
81}
82