1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
extern crate tokio_core;
use std::io;
use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread::{self, JoinHandle};
use tokio_core::reactor::{Core, Remote};
pub struct TokioPool {
remotes: Vec<Remote>,
running: Arc<AtomicBool>,
next_worker: AtomicUsize,
}
pub struct PoolJoin {
joiners: Vec<JoinHandle<()>>,
}
impl TokioPool {
pub fn new(worker_count: usize) -> io::Result<(TokioPool, PoolJoin)> {
assert!(worker_count != 0);
let (tx, rx) = mpsc::channel();
let running = Arc::new(AtomicBool::new(true));
let mut joiners = Vec::with_capacity(worker_count);
for _ in 0..worker_count {
let tx = tx.clone();
let running = running.clone();
let join = thread::spawn(move || {
let mut core = match Core::new() {
Ok(core) => core,
Err(err) => {
tx.send(Err(err)).expect("Channel was closed early");
return;
}
};
tx.send(Ok(core.remote())).expect("Channel was closed early");
while running.load(Ordering::Relaxed) {
core.turn(None);
}
});
joiners.push(join);
}
let remotes: io::Result<_> = rx.into_iter().take(worker_count).collect();
let remotes = remotes?;
let pool = TokioPool {
remotes: remotes,
running: running,
next_worker: AtomicUsize::new(0),
};
let join = PoolJoin { joiners: joiners };
Ok((pool, join))
}
pub fn next_worker(&self) -> &Remote {
let next = self.next_worker.fetch_add(1, Ordering::SeqCst);
let idx = next % self.remotes.len();
&self.remotes[idx]
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed);
}
}
impl PoolJoin {
pub fn join(self) {
for joiner in self.joiners {
joiner.join().expect("Worker thread panic");
}
}
}