use crate::{
task::{Job, ThreadLocalJob, ThreadLocalTask},
threads::ThreadAllocationOutput,
util::ThreadLocalPointer,
Queue, Shared, ThreadLocalQueue,
};
use parking_lot::Mutex;
use priority_queue::PriorityQueue;
use slotmap::DenseSlotMap;
use std::sync::{atomic::Ordering, Arc};
pub(crate) fn body<TD, TDFunc>(
shared: Arc<Shared<TD>>,
thread_info: ThreadAllocationOutput,
queue_local_index: usize,
thread_local_sender: std::sync::mpsc::Sender<ThreadLocalPointer<TD>>,
thread_local_creator: Arc<TDFunc>,
) -> impl FnOnce()
where
TD: 'static,
TDFunc: Fn() -> TD + Send + Sync + 'static,
{
move || {
let thread_locals: Arc<TD> = Arc::new(thread_local_creator());
let thread_local_ptr = &thread_locals as *const _ as *mut Arc<TD>;
let thread_queue = Arc::new(ThreadLocalQueue {
waiting: Mutex::new(DenseSlotMap::new()),
inner: Mutex::new(PriorityQueue::new()),
});
thread_local_sender
.send(ThreadLocalPointer(thread_local_ptr))
.unwrap_or_else(|_| panic!("Could not send data"));
drop(thread_local_sender);
if let Some(affin) = thread_info.affinity {
crate::affinity::set_thread_affinity([affin]).unwrap();
}
let queue: &Queue<TD> = &shared.queue;
loop {
let mut global_guard = queue.inner.lock();
let local_guard = thread_queue.inner.lock();
let mut local_guard = if global_guard.is_empty() && local_guard.is_empty() {
drop(local_guard);
let active_threads = shared.active_threads.fetch_sub(1, Ordering::AcqRel) - 1;
if active_threads == 0 {
shared.idle_wait.set();
}
if shared.death_signal.load(Ordering::Acquire) {
break;
}
let condvar = &queue.condvars[queue_local_index];
condvar.running.store(false, Ordering::Relaxed);
condvar.inner.wait(&mut global_guard);
condvar.running.store(true, Ordering::Relaxed);
if shared.death_signal.load(Ordering::Acquire) {
break;
}
shared.active_threads.fetch_add(1, Ordering::AcqRel);
shared.idle_wait.reset();
thread_queue.inner.lock()
} else {
local_guard
};
drop(global_guard);
if let Some((job, _)) = local_guard.pop() {
drop(local_guard);
let job: ThreadLocalJob<TD> = job;
match job {
ThreadLocalJob::Future(key) => unsafe { key.poll() },
};
continue;
} else {
drop(local_guard);
}
let mut global_guard = queue.inner.lock();
if let Some((job, queue_priority)) = global_guard.pop() {
drop(global_guard);
let job: Job<TD> = job;
match job {
Job::Future(task) => {
debug_assert_eq!(task.priority, queue_priority);
task.poll();
}
Job::Local(func) => {
let fut = func(Arc::clone(&thread_locals));
let task = ThreadLocalTask::new(
Arc::clone(&shared),
Arc::clone(&thread_queue),
fut,
queue_priority,
queue_local_index,
);
unsafe { task.poll() };
}
}
continue;
} else {
drop(global_guard);
}
}
}
}