1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use crate::{
task::{Job, ThreadLocalJob, ThreadLocalTask},
threads::ThreadAllocationOutput,
util::ThreadLocalPointer,
Queue, Shared, ThreadLocalQueue,
};
use parking_lot::Mutex;
use priority_queue::PriorityQueue;
use slotmap::DenseSlotMap;
use std::sync::{atomic::Ordering, Arc};
pub(crate) fn body<TD, TDFunc>(
shared: Arc<Shared<TD>>,
thread_info: ThreadAllocationOutput,
queue_local_index: usize,
thread_local_sender: std::sync::mpsc::Sender<ThreadLocalPointer<TD>>,
thread_local_creator: Arc<TDFunc>,
) -> impl FnOnce()
where
TD: 'static,
TDFunc: Fn() -> TD + Send + Sync + 'static,
{
move || {
let thread_locals: Arc<TD> = Arc::new(thread_local_creator());
let thread_local_ptr = &thread_locals as *const _ as *mut Arc<TD>;
let thread_queue = Arc::new(ThreadLocalQueue {
waiting: Mutex::new(DenseSlotMap::new()),
inner: Mutex::new(PriorityQueue::new()),
});
// Send thead local address
thread_local_sender
.send(ThreadLocalPointer(thread_local_ptr))
.unwrap_or_else(|_| panic!("Could not send data"));
// Drop sender so receiver will stop waiting
drop(thread_local_sender);
if let Some(affin) = thread_info.affinity {
crate::affinity::set_thread_affinity([affin]).unwrap();
}
let queue: &Queue<TD> = &shared.queue;
loop {
// Always grab global -> local
let mut global_guard = queue.inner.lock();
let local_guard = thread_queue.inner.lock();
let mut local_guard = if global_guard.is_empty() && local_guard.is_empty() {
// release the local guard while we wait
drop(local_guard);
// signal threads waiting for idle
let active_threads = shared.active_threads.fetch_sub(1, Ordering::AcqRel) - 1;
if active_threads == 0 {
// all threads are starved, wake up any idle waiters
shared.idle_wait.set();
}
// check if death was requested before
if shared.death_signal.load(Ordering::Acquire) {
break;
}
// wait for condvar signal
let condvar = &queue.condvars[queue_local_index];
// We're behind the global guard when we do this and all access is done behind it.
condvar.running.store(false, Ordering::Relaxed);
condvar.inner.wait(&mut global_guard);
condvar.running.store(true, Ordering::Relaxed);
// check if death was requested
if shared.death_signal.load(Ordering::Acquire) {
break;
}
// this thread is now active
shared.active_threads.fetch_add(1, Ordering::AcqRel);
// threads waiting for idle should wait
shared.idle_wait.reset();
// re-lock the local queue
// this is okay because we already have global
thread_queue.inner.lock()
} else {
local_guard
};
// release the global guard for now
drop(global_guard);
// First try the local queue
if let Some((job, _)) = local_guard.pop() {
drop(local_guard);
let job: ThreadLocalJob<TD> = job;
match job {
ThreadLocalJob::Future(key) => unsafe { key.poll() },
};
continue;
} else {
drop(local_guard);
}
// Then the global one
let mut global_guard = queue.inner.lock();
if let Some((job, queue_priority)) = global_guard.pop() {
drop(global_guard);
let job: Job<TD> = job;
match job {
Job::Future(task) => {
debug_assert_eq!(task.priority, queue_priority);
task.poll();
}
Job::Local(func) => {
// SAFETY: This reference will only be read in this thread,
// and this thread's stack stores all data for the thread.
let fut = func(Arc::clone(&thread_locals));
let task = ThreadLocalTask::new(
Arc::clone(&shared),
Arc::clone(&thread_queue),
fut,
queue_priority,
queue_local_index,
);
unsafe { task.poll() };
}
}
continue;
} else {
drop(global_guard);
}
}
}
}