1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use latch::{LockLatch, SpinLatch};
#[allow(unused_imports)]
use log::Event::*;
use job::{Code, CodeImpl, Job};
use std::sync::Arc;
use thread_pool::{self, Registry, WorkerThread};
pub fn initialize() {
let registry = thread_pool::get_registry();
registry.wait_until_primed();
}
pub fn dump_stats() {
dump_stats!();
}
pub fn join<A,B,RA,RB>(oper_a: A,
oper_b: B)
-> (RA, RB)
where A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
unsafe {
let worker_thread = WorkerThread::current();
if worker_thread.is_null() {
return join_inject(oper_a, oper_b);
}
log!(Join { worker: (*worker_thread).index() });
let mut result_b = None;
let mut code_b = CodeImpl::new(oper_b, &mut result_b);
let mut latch_b = SpinLatch::new();
let mut job_b = Job::new(&mut code_b, &mut latch_b);
(*worker_thread).push(&mut job_b);
let result_a = oper_a();
if (*worker_thread).pop() {
log!(PoppedJob { worker: (*worker_thread).index() });
code_b.execute();
} else {
log!(LostJob { worker: (*worker_thread).index() });
(*worker_thread).steal_until(&latch_b);
}
(result_a, result_b.unwrap())
}
}
#[inline(never)]
unsafe fn join_inject<A,B,RA,RB>(oper_a: A,
oper_b: B)
-> (RA, RB)
where A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
let mut result_a = None;
let mut code_a = CodeImpl::new(oper_a, &mut result_a);
let mut latch_a = LockLatch::new();
let mut job_a = Job::new(&mut code_a, &mut latch_a);
let mut result_b = None;
let mut code_b = CodeImpl::new(oper_b, &mut result_b);
let mut latch_b = LockLatch::new();
let mut job_b = Job::new(&mut code_b, &mut latch_b);
thread_pool::get_registry().inject(&[&mut job_a, &mut job_b]);
latch_a.wait();
latch_b.wait();
(result_a.unwrap(), result_b.unwrap())
}
pub struct ThreadPool {
registry: Arc<Registry>
}
impl ThreadPool {
pub fn new() -> ThreadPool {
ThreadPool {
registry: Registry::new()
}
}
pub fn install<OP,R>(&self, op: OP) -> R
where OP: FnOnce() -> R + Send
{
unsafe {
let mut result_a = None;
let mut code_a = CodeImpl::new(op, &mut result_a);
let mut latch_a = LockLatch::new();
let mut job_a = Job::new(&mut code_a, &mut latch_a);
self.registry.inject(&[&mut job_a]);
latch_a.wait();
result_a.unwrap()
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
self.registry.terminate();
}
}