1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use futures::{Future, Never, Async, Poll, FutureExt};
use std::fmt::Debug;
use std::panic::AssertUnwindSafe;
use futures::task::Context;
use futures::channel::oneshot::{channel, Sender, Receiver};
pub struct DispatchHandle<T, E> {
pub inner: Receiver<Result<T, E>>,
}
impl<T, E> Future for DispatchHandle<T, E>
where T: Send + 'static,
E: Send + Debug + 'static
{
type Item = T;
type Error = E;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
match self.inner.poll(cx).expect("") {
Async::Ready(Ok(e)) => Ok(e.into()),
Async::Ready(Err(e)) => Err(e),
Async::Pending => Ok(Async::Pending),
}
}
}
pub struct MySender<F, T> {
pub fut: F,
pub tx: Option<Sender<T>>,
}
impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> {
type Item = ();
type Error = Never;
fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> {
let res = match self.fut.poll(cx) {
Ok(Async::Ready(e)) => Ok(e),
Ok(Async::Pending) => return Ok(Async::Pending),
Err(e) => Err(e),
};
drop(self.tx.take().unwrap().send(res));
Ok(Async::Ready(()))
}
}
#[derive(Debug)]
pub struct Spawn<F>(Option<F>);
pub fn spawn<F>(f: F) -> Spawn<F>
where F: Future<Item = (), Error = Never> + 'static + Send
{
Spawn(Some(f))
}
impl<F: Future<Item = (), Error = Never> + Send + 'static> Future for Spawn<F> {
type Item = ();
type Error = Never;
fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> {
let (tx, _rx) = channel();
let sender = MySender {
fut: AssertUnwindSafe(self.0.take().unwrap()).catch_unwind(),
tx: Some(tx),
};
cx.spawn(sender);
Ok(Async::Ready(()))
}
}