1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use futures::{future::poll_fn, Async, Future, Stream};
use std::{collections::HashMap, hash::Hash};
#[cfg(all(feature = "tokio", feature = "tokio-threadpool"))]
pub fn blocking<E, F, T>(func: F) -> impl Future<Item = T, Error = E>
where
F: FnOnce() -> Result<T, E>,
{
let mut func = Some(func);
poll_fn(move || {
tokio_threadpool::blocking(func.take().unwrap())
.map_err(|_| panic!("Blocking operations must be run inside a Tokio thread pool!"))
})
.and_then(|r| r)
}
#[derive(Debug)]
pub struct SelectSet<K: Clone + Eq + Hash, S: Stream> {
current: usize,
keys: Vec<K>,
streams: HashMap<K, S>,
}
impl<K: Clone + Eq + Hash, S: Stream> SelectSet<K, S> {
pub fn new() -> SelectSet<K, S> {
SelectSet::default()
}
pub fn add(&mut self, key: K, stream: S) -> Option<S> {
if let Some(prev) = self.streams.insert(key.clone(), stream) {
Some(prev)
} else {
self.keys.push(key);
None
}
}
pub fn remove(&mut self, key: &K) -> Option<S> {
self.streams.remove(key).map(|stream| {
let n = self.keys.iter().position(|k| k == key).unwrap();
let _ = self.keys.remove(n);
stream
})
}
}
impl<K: Clone + Eq + Hash, S: Stream> Default for SelectSet<K, S> {
fn default() -> SelectSet<K, S> {
SelectSet {
current: 0,
keys: Vec::new(),
streams: HashMap::new(),
}
}
}
impl<K: Clone + Eq + Hash, S: Stream> Stream for SelectSet<K, S> {
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
if self.keys.is_empty() {
return Ok(Async::NotReady);
}
self.current = (self.current + 1) % self.keys.len();
let r = self
.streams
.get_mut(&self.keys[self.current])
.unwrap()
.poll();
if let Ok(Async::Ready(None)) = r {
let key = self.keys[self.current].clone();
let _ = self.remove(&key);
}
r
}
}