use alloc::boxed::Box;
use alloc::collections::VecDeque;
use core::cell::UnsafeCell;
use core::mem::ManuallyDrop;
use core::ptr::NonNull;
use std::thread::Result as ThreadResult;
use crate::signal::Signal;
use crate::thread_pool::Worker;
use crate::unwind;
trait Job {
unsafe fn execute(this: NonNull<()>, worker: &Worker);
}
pub struct JobRef {
job_pointer: NonNull<()>,
execute_fn: unsafe fn(NonNull<()>, &Worker),
}
impl JobRef {
#[inline]
pub unsafe fn new_raw(
job_pointer: NonNull<()>,
execute_fn: unsafe fn(NonNull<()>, &Worker),
) -> JobRef {
JobRef {
job_pointer,
execute_fn,
}
}
#[inline]
pub fn id(&self) -> impl Eq + use<> {
(self.job_pointer, self.execute_fn)
}
#[inline]
pub fn execute(self, worker: &Worker) {
unsafe { (self.execute_fn)(self.job_pointer, worker) }
}
}
unsafe impl Send for JobRef {}
pub struct JobQueue {
job_refs: UnsafeCell<VecDeque<JobRef>>,
}
impl JobQueue {
pub fn new() -> JobQueue {
JobQueue {
job_refs: UnsafeCell::new(VecDeque::new()),
}
}
#[inline(always)]
pub fn push_back(&self, job_ref: JobRef) {
let job_refs = unsafe { &mut *self.job_refs.get() };
job_refs.push_back(job_ref);
}
#[inline(always)]
pub fn pop_back(&self) -> Option<JobRef> {
let job_refs = unsafe { &mut *self.job_refs.get() };
job_refs.pop_back()
}
#[inline(always)]
pub fn pop_front(&self) -> Option<JobRef> {
let job_refs = unsafe { &mut *self.job_refs.get() };
job_refs.pop_front()
}
}
pub struct StackJob<F, T> {
f: UnsafeCell<ManuallyDrop<F>>,
signal: Signal<ThreadResult<T>>,
}
impl<F, T> StackJob<F, T>
where
F: FnOnce(&Worker) -> T + Send,
T: Send,
{
pub fn new(f: F) -> StackJob<F, T> {
StackJob {
f: UnsafeCell::new(ManuallyDrop::new(f)),
signal: Signal::new(),
}
}
pub unsafe fn as_job_ref(&self) -> JobRef {
let job_pointer = NonNull::from(self).cast();
unsafe { JobRef::new_raw(job_pointer, Self::execute) }
}
#[inline(always)]
pub unsafe fn unwrap(mut self) -> F {
unsafe { ManuallyDrop::take(self.f.get_mut()) }
}
#[inline(always)]
pub fn signal(&self) -> &Signal<ThreadResult<T>> {
&self.signal
}
}
impl<F, T> Job for StackJob<F, T>
where
F: FnOnce(&Worker) -> T + Send,
T: Send,
{
unsafe fn execute(this: NonNull<()>, worker: &Worker) {
let this = unsafe { this.cast::<Self>().as_ref() };
let abort_guard = unwind::AbortOnDrop;
let f_ref = unsafe { &mut *this.f.get() };
let f = unsafe { ManuallyDrop::take(f_ref) };
let result = unwind::halt_unwinding(|| f(worker));
unsafe { Signal::send(&this.signal, result) }
core::mem::forget(abort_guard);
}
}
pub struct HeapJob<F> {
f: F,
}
impl<F> HeapJob<F>
where
F: FnOnce(&Worker) + Send,
{
pub fn new(f: F) -> Box<Self> {
Box::new(HeapJob { f })
}
pub unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
let job_pointer = unsafe { NonNull::new_unchecked(Box::into_raw(self)).cast() };
unsafe { JobRef::new_raw(job_pointer, Self::execute) }
}
}
impl<F> Job for HeapJob<F>
where
F: FnOnce(&Worker) + Send,
{
unsafe fn execute(this: NonNull<()>, worker: &Worker) {
let this = unsafe { Box::from_raw(this.cast::<Self>().as_ptr()) };
(this.f)(worker);
}
}