1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::mem;
use {Future, Task, Poll};
use util::Collapsed;
pub struct Join<A, B> where A: Future, B: Future<Error=A::Error> {
a: MaybeDone<A>,
b: MaybeDone<B>,
}
enum MaybeDone<A: Future> {
NotYet(Collapsed<A>),
Done(A::Item),
Gone,
}
pub fn new<A, B>(a: A, b: B) -> Join<A, B>
where A: Future,
B: Future<Error=A::Error>,
{
let a = Collapsed::Start(a);
let b = Collapsed::Start(b);
Join {
a: MaybeDone::NotYet(a),
b: MaybeDone::NotYet(b),
}
}
impl<A, B> Future for Join<A, B>
where A: Future,
B: Future<Error=A::Error>,
{
type Item = (A::Item, B::Item);
type Error = A::Error;
fn poll(&mut self, task: &mut Task) -> Poll<Self::Item, Self::Error> {
match (self.a.poll(task), self.b.poll(task)) {
(Ok(true), Ok(true)) => Poll::Ok((self.a.take(), self.b.take())),
(Err(e), _) |
(_, Err(e)) => {
self.a = MaybeDone::Gone;
self.b = MaybeDone::Gone;
Poll::Err(e)
}
(Ok(_), Ok(_)) => Poll::NotReady,
}
}
fn schedule(&mut self, task: &mut Task) {
match (&mut self.a, &mut self.b) {
(&mut MaybeDone::NotYet(ref mut a),
&mut MaybeDone::NotYet(ref mut b)) => {
a.schedule(task);
b.schedule(task);
}
(&mut MaybeDone::NotYet(ref mut a), _) => {
a.schedule(task);
}
(_, &mut MaybeDone::NotYet(ref mut b)) => {
b.schedule(task);
}
(&mut MaybeDone::Gone, _) |
(_, &mut MaybeDone::Gone) => task.notify(),
(&mut MaybeDone::Done(_), &mut MaybeDone::Done(_)) => panic!(),
}
}
fn tailcall(&mut self)
-> Option<Box<Future<Item=Self::Item, Error=Self::Error>>> {
self.a.collapse();
self.b.collapse();
None
}
}
impl<A: Future> MaybeDone<A> {
fn poll(&mut self, task: &mut Task) -> Result<bool, A::Error> {
let res = match *self {
MaybeDone::NotYet(ref mut a) => a.poll(task),
MaybeDone::Done(_) => return Ok(true),
MaybeDone::Gone => panic!("cannot poll Join twice"),
};
match res {
Poll::Ok(res) => {
*self = MaybeDone::Done(res);
Ok(true)
}
Poll::Err(res) => Err(res),
Poll::NotReady => Ok(false),
}
}
fn take(&mut self) -> A::Item {
match mem::replace(self, MaybeDone::Gone) {
MaybeDone::Done(a) => a,
_ => panic!(),
}
}
fn collapse(&mut self) {
match *self {
MaybeDone::NotYet(ref mut a) => a.collapse(),
_ => {}
}
}
}