1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::sync::{Arc, Mutex};
use pulse::Signal;
use rayon::ThreadPool;
use dispatch::ThreadLocal;
use dispatch::stage::Stage;
use res::Resources;
const ERR_NO_DISPATCH: &str = "wait() called before dispatch or called twice";
pub struct AsyncDispatcher<'a> {
res: Arc<Resources>,
signal: Option<Signal>,
stages: Arc<Mutex<Vec<Stage<'static>>>>,
thread_local: ThreadLocal<'a>,
thread_pool: Arc<ThreadPool>,
}
pub fn new_async<'a>(res: Resources,
stages: Vec<Stage<'static>>,
thread_local: ThreadLocal<'a>,
thread_pool: Arc<ThreadPool>)
-> AsyncDispatcher<'a> {
AsyncDispatcher {
res: Arc::new(res),
signal: None,
stages: Arc::new(Mutex::new(stages)),
thread_local: thread_local,
thread_pool: thread_pool,
}
}
impl<'a> AsyncDispatcher<'a> {
pub fn dispatch(&mut self) {
let (signal, pulse) = Signal::new();
self.signal = Some(signal);
let stages = self.stages.clone();
let res = self.res.clone();
self.thread_pool
.spawn_async(move || {
{
let stages = stages;
let mut stages = stages.lock().expect("Mutex poisoned");
let res = &*res;
for stage in &mut *stages {
stage.execute(res);
}
}
pulse.pulse();
})
}
pub fn wait(&mut self) {
self.wait_without_tl();
let res = &*self.res;
for sys in &mut self.thread_local {
sys.run_now(res);
}
}
pub fn wait_without_tl(&mut self) {
self.signal
.take()
.expect(ERR_NO_DISPATCH)
.wait()
.expect("The worker thread may have panicked");
}
pub fn dispatch_thread_local(&mut self) {
if self.signal.is_some() {
self.wait_without_tl();
}
let res = &*self.res;
for sys in &mut self.thread_local {
sys.run_now(res);
}
}
pub fn mut_res(&mut self) -> &mut Resources {
if self.signal.is_some() {
self.wait();
}
Arc::get_mut(&mut self.res).expect(ERR_NO_DISPATCH)
}
}