1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use latch::Latch;
#[allow(unused_imports)]
use log::Event::*;
use job::{Code, CodeImpl, Job};
use std::sync::Arc;
use thread_pool::{self, Registry, WorkerThread};
pub fn initialize() {
let registry = thread_pool::get_registry();
registry.wait_until_primed();
}
pub fn dump_stats() {
dump_stats!();
}
pub fn join<A,R_A,B,R_B>(oper_a: A,
oper_b: B)
-> (R_A, R_B)
where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send,
{
unsafe {
let worker_thread = WorkerThread::current();
if worker_thread.is_null() {
return join_inject(oper_a, oper_b);
}
log!(Join { worker: (*worker_thread).index() });
let mut result_b = None;
let mut code_b = CodeImpl::new(oper_b, &mut result_b);
let mut latch_b = Latch::new();
let mut job_b = Job::new(&mut code_b, &mut latch_b);
(*worker_thread).push(&mut job_b);
let result_a = oper_a();
if (*worker_thread).pop() {
log!(PoppedJob { worker: (*worker_thread).index() });
code_b.execute();
} else {
log!(LostJob { worker: (*worker_thread).index() });
(*worker_thread).steal_until(&latch_b);
}
(result_a, result_b.unwrap())
}
}
#[inline(never)]
unsafe fn join_inject<A,R_A,B,R_B>(oper_a: A,
oper_b: B)
-> (R_A, R_B)
where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send,
{
let mut result_a = None;
let mut code_a = CodeImpl::new(oper_a, &mut result_a);
let mut latch_a = Latch::new();
let mut job_a = Job::new(&mut code_a, &mut latch_a);
let mut result_b = None;
let mut code_b = CodeImpl::new(oper_b, &mut result_b);
let mut latch_b = Latch::new();
let mut job_b = Job::new(&mut code_b, &mut latch_b);
thread_pool::get_registry().inject(&[&mut job_a, &mut job_b]);
latch_a.wait();
latch_b.wait();
(result_a.unwrap(), result_b.unwrap())
}
pub struct ThreadPool {
registry: Arc<Registry>
}
impl ThreadPool {
pub fn new() -> ThreadPool {
ThreadPool {
registry: Registry::new()
}
}
pub fn install<OP,R>(&self, op: OP) -> R
where OP: FnOnce() -> R + Send
{
unsafe {
let mut result_a = None;
let mut code_a = CodeImpl::new(op, &mut result_a);
let mut latch_a = Latch::new();
let mut job_a = Job::new(&mut code_a, &mut latch_a);
self.registry.inject(&[&mut job_a]);
latch_a.wait();
result_a.unwrap()
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
self.registry.terminate();
}
}