1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use futures::prelude::*;
use futures::task::Task;
use futures::task;
use std::sync::{Arc, Mutex};
use through::*;
pub struct PushFutureRecv<I, E> {
interior: Arc<Mutex<PushFutureInterior<I, E>>>,
}
pub struct PushFutureSend<I, E, F: Future<Item=I, Error=E>> {
source: F,
interior: Arc<Mutex<PushFutureInterior<I, E>>>,
}
enum PushFutureInterior<I, E> {
Available {
elem: Result<I, E>,
},
Unavailable {
listeners: Vec<Task>,
},
Taken
}
impl<I, E> Future for PushFutureRecv<I, E> {
type Item = I;
type Error = E;
fn poll(&mut self) -> Poll<I, E> {
let mut interior = self.interior.lock().unwrap();
through_and(&mut *interior, |interior| match interior {
PushFutureInterior::Available {
elem
} => {
let result = match elem {
Ok(item) => Ok(Async::Ready(item)),
Err(error) => Err(error),
};
(PushFutureInterior::Taken, result)
},
PushFutureInterior::Unavailable {
mut listeners
} => {
listeners.push(task::current());
(PushFutureInterior::Unavailable {
listeners
}, Ok(Async::NotReady))
},
PushFutureInterior::Taken => {
panic!("take from a PushFutureRecv twice")
}
})
}
}
impl<I, E, F: Future<Item=I, Error=E>> Future for PushFutureSend<I, E, F> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
let found: Option<Result<I, E>> = match self.source.poll() {
Ok(Async::Ready(elem)) => Some(Ok(elem)),
Err(error) => Some(Err(error)),
Ok(Async::NotReady) => None
};
match found {
Some(result) => {
let mut interior = self.interior.lock().unwrap();
through(&mut *interior, |interior| match interior {
PushFutureInterior::Unavailable {
listeners
} => {
for listener in listeners {
listener.notify();
}
PushFutureInterior::Available {
elem: result
}
},
_ => panic!("PushFutureSend: PushFutureInterior in invalid state")
});
Ok(Async::Ready(()))
},
None => Ok(Async::NotReady)
}
}
}
pub fn push_future<I, E, F: Future<Item=I, Error=E>>(source: F) -> (PushFutureSend<I, E, F>, PushFutureRecv<I, E>) {
let interior = PushFutureInterior::Unavailable {
listeners: Vec::new()
};
let interior = Arc::new(Mutex::new(interior));
(PushFutureSend {
source,
interior: interior.clone(),
}, PushFutureRecv {
interior
})
}