1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// This is both the data shared between all handles to the same stream, and the logic shared between the sync and unsync versions.
use std::collections::HashSet;
use std::hash::Hash;
use indexmap::IndexMap;
use futures_core::{Stream, Poll, Async};
use futures_core::task::{Waker, Context};
pub struct Shared<S: Stream, Key, ItemFn, ErrFn> {
pub inner: Option<S>,
item_fn: ItemFn,
err_fn: ErrFn,
// The keys of all currently active handles.
active_keys: HashSet<Key>,
current: Option<(Result<S::Item, S::Error>, Key)>,
wakers: IndexMap<Key, Waker>,
default_waker: Option<Waker>,
done: bool,
}
impl<S, Key, ItemFn, ErrFn> Shared<S, Key, ItemFn, ErrFn>
where S: Stream,
Key: Eq + Hash
{
pub fn new(inner: S, item_fn: ItemFn, err_fn: ErrFn) -> Shared<S, Key, ItemFn, ErrFn> {
Shared {
inner: Some(inner),
item_fn,
err_fn,
active_keys: HashSet::new(),
current: None,
wakers: IndexMap::new(),
default_waker: None,
done: false,
}
}
pub fn register_key(&mut self, key: Key) -> bool {
self.active_keys.insert(key)
}
pub fn deregister_key(&mut self, key: &Key) {
self.wakers.remove(key);
self.current
.take()
.map(|(current_result, current_key)| {
if current_key == *key {
self.default_waker.take().map(|default| default.wake());
}
self.current = Some((current_result, current_key));
});
self.active_keys.remove(key);
}
}
impl<S, Key, ItemFn, ErrFn> Shared<S, Key, ItemFn, ErrFn>
where S: Stream,
Key: Eq + Hash,
ItemFn: Fn(&S::Item) -> Key,
ErrFn: Fn(&S::Error) -> Key
{
pub fn poll_default(&mut self, cx: &mut Context) -> Poll<Option<S::Item>, S::Error> {
if self.done {
self.wake_next_handle();
Ok(Async::Ready(None))
} else {
match self.current.take() {
None => {
let mut inner = self.inner.take().unwrap();
// No item buffered, poll inner stream.
match inner.poll_next(cx) {
Ok(Async::Ready(Some(item))) => {
// Got new item, buffer it and call poll_default again.
let key = (self.item_fn)(&item);
self.current = Some((Ok(item), key));
self.inner = Some(inner);
return self.poll_default(cx);
}
Err(err) => {
// Got new error, buffer it and call poll_default again.
let key = (self.err_fn)(&err);
self.current = Some((Err(err), key));
self.inner = Some(inner);
return self.poll_default(cx);
}
Ok(Async::Ready(None)) => {
self.done = true;
self.wake_next_handle();
self.inner = Some(inner);
Ok(Async::Ready(None))
}
Ok(Async::Pending) => {
// No item available, park.
self.default_waker = Some(cx.waker());
self.inner = Some(inner);
Ok(Async::Pending)
}
}
}
Some((result, key)) => {
// There's a buffered result + key.
if self.active_keys.contains(&key) {
// There's a handle for this key, notify it if its blocking.
self.default_waker = Some(cx.waker());
self.wakers.remove(&key).map(|waker| waker.wake());
self.current = Some((result, key));
Ok(Async::Pending)
} else {
// No handle for this key, emit/yield it.
// // Also notify the next parked task.
self.wake_next_handle(); // TODO rename notify stuff to wake
match result {
Ok(item) => Ok(Async::Ready(Some(item))),
Err(err) => Err(err),
}
}
}
}
}
}
pub fn poll_handle(&mut self, key: Key, cx: &mut Context) -> Poll<Option<S::Item>, S::Error> {
if self.done {
self.wake_next_handle();
Ok(Async::Ready(None))
} else {
match self.current.take() {
None => {
let mut inner = self.inner.take().expect("Polled key handle after calling into_inner on the default handle");
// No item buffered, poll inner stream.
match inner.poll_next(cx) {
Ok(Async::Ready(Some(item))) => {
// Got new item, buffer it and call poll_handle again.
let item_key = (self.item_fn)(&item);
self.current = Some((Ok(item), item_key));
self.inner = Some(inner);
self.poll_handle(key, cx)
}
Err(err) => {
// Got new error, buffer it and call poll_handle again.
let err_key = (self.err_fn)(&err);
self.current = Some((Err(err), err_key));
self.inner = Some(inner);
return self.poll_handle(key, cx);
}
Ok(Async::Ready(None)) => {
// End of underlying stream.
self.done = true;
self.wake_default_or_next();
self.inner = Some(inner);
Ok(Async::Ready(None))
}
Ok(Async::Pending) => {
// No item available, park.
self.wakers.insert(key, cx.waker());
self.inner = Some(inner);
Ok(Async::Pending)
}
}
}
Some((result, buffered_key)) => {
// There's a buffered result + key.
if buffered_key == key {
// We should emit the item, also notify the next parked task.
self.wake_default_or_next();
match result {
Ok(item) => Ok(Async::Ready(Some(item))),
Err(err) => Err(err),
}
} else {
// Not our key, store item, park and let another task handle it.
self.wakers
.remove(&buffered_key)
.map_or_else(|| self.wake_default(), |waker| waker.wake());
self.wakers.insert(key, cx.waker());
self.current = Some((result, buffered_key));
Ok(Async::Pending)
}
}
}
}
}
// wake the next key handle
fn wake_next_handle(&mut self) {
self.wakers.pop().map(|(_, waker)| waker.wake());
}
// wake the default handle
fn wake_default(&mut self) {
self.default_waker.take().map(|default| default.wake());
}
// wake the default handle or the next key handle if default can't be woken
fn wake_default_or_next(&mut self) {
self.default_waker
.take()
.map_or_else(|| self.wake_next_handle(), |default| default.wake());
}
}