use std::{
cell::{Cell, UnsafeCell},
collections::VecDeque,
mem::ManuallyDrop,
panic::{self, AssertUnwindSafe},
ptr::NonNull,
sync::atomic::{AtomicU8, Ordering},
thread::{self, Thread},
};
use crate::Scope;
enum Poll {
Pending,
Ready,
Locked,
}
#[derive(Debug, Default)]
pub struct Future<T = ()> {
state: AtomicU8,
waiting_thread: UnsafeCell<Option<Thread>>,
val: UnsafeCell<Option<Box<thread::Result<T>>>>,
}
impl<T> Future<T> {
pub fn poll(&self) -> bool {
self.state.load(Ordering::Acquire) == Poll::Ready as u8
}
pub fn wait(&self) -> Option<thread::Result<T>> {
loop {
let result = self.state.compare_exchange(
Poll::Pending as u8,
Poll::Locked as u8,
Ordering::AcqRel,
Ordering::Acquire,
);
match result {
Ok(_) => {
unsafe { *self.waiting_thread.get() = Some(thread::current()) };
self.state.store(Poll::Pending as u8, Ordering::Release);
thread::park();
continue;
}
Err(state) if state == Poll::Ready as u8 => {
break unsafe { (*self.val.get()).take().map(|b| *b) };
}
_ => (),
}
thread::yield_now();
}
}
pub fn complete(&self, val: thread::Result<T>) {
let val = Box::new(val);
loop {
let result = self.state.compare_exchange(
Poll::Pending as u8,
Poll::Locked as u8,
Ordering::AcqRel,
Ordering::Acquire,
);
match result {
Ok(_) => break,
Err(_) => thread::yield_now(),
}
}
unsafe {
*self.val.get() = Some(val);
}
if let Some(thread) = unsafe { (*self.waiting_thread.get()).take() } {
thread.unpark();
}
self.state.store(Poll::Ready as u8, Ordering::Release);
}
}
pub struct JobStack<F = ()> {
f: UnsafeCell<ManuallyDrop<F>>,
}
impl<F> JobStack<F> {
pub fn new(f: F) -> Self {
Self {
f: UnsafeCell::new(ManuallyDrop::new(f)),
}
}
pub unsafe fn take_once(&self) -> F {
unsafe { ManuallyDrop::take(&mut *self.f.get()) }
}
}
#[derive(Clone, Debug)]
pub struct Job<T = ()> {
stack: NonNull<JobStack>,
harness: unsafe fn(&mut Scope<'_>, NonNull<JobStack>, NonNull<Future>),
fut: Cell<Option<NonNull<Future<T>>>>,
}
impl<T> Job<T> {
pub fn new<F>(stack: &JobStack<F>) -> Self
where
F: FnOnce(&mut Scope<'_>) -> T + Send,
T: Send,
{
unsafe fn harness<F, T>(
scope: &mut Scope<'_>,
stack: NonNull<JobStack>,
fut: NonNull<Future>,
) where
F: FnOnce(&mut Scope<'_>) -> T + Send,
T: Send,
{
let stack: &JobStack<F> = unsafe { stack.cast().as_ref() };
let f = unsafe { stack.take_once() };
let fut: &Future<T> = unsafe { fut.cast().as_ref() };
fut.complete(panic::catch_unwind(AssertUnwindSafe(|| f(scope))));
}
Self {
stack: NonNull::from(stack).cast(),
harness: harness::<F, T>,
fut: Cell::new(None),
}
}
pub fn is_waiting(&self) -> bool {
self.fut.get().is_none()
}
pub fn eq(&self, other: &Job) -> bool {
self.stack == other.stack
}
pub unsafe fn poll(&self) -> bool {
self.fut
.get()
.map(|fut| {
let fut = unsafe { fut.as_ref() };
fut.poll()
})
.unwrap_or_default()
}
pub unsafe fn wait(&self) -> Option<thread::Result<T>> {
self.fut.get().and_then(|fut| {
let result = unsafe { fut.as_ref().wait() };
unsafe {
drop(Box::from_raw(fut.as_ptr()));
}
result
})
}
pub unsafe fn drop(&self) {
if let Some(fut) = self.fut.get() {
unsafe {
drop(Box::from_raw(fut.as_ptr()));
}
}
}
}
impl Job {
pub unsafe fn execute(&self, scope: &mut Scope<'_>) {
unsafe {
(self.harness)(scope, self.stack, self.fut.get().unwrap());
}
}
}
unsafe impl Send for Job {}
#[derive(Debug, Default)]
pub struct JobQueue(VecDeque<NonNull<Job>>);
impl JobQueue {
pub fn len(&self) -> usize {
self.0.len()
}
pub unsafe fn push_back<T>(&mut self, job: &Job<T>) {
self.0.push_back(NonNull::from(job).cast());
}
pub fn pop_back(&mut self) {
self.0.pop_back();
}
pub fn pop_front(&mut self) -> Option<Job> {
let job = unsafe { self.0.pop_front()?.as_ref() };
job.fut
.set(Some(Box::leak(Box::new(Future::default())).into()));
Some(job.clone())
}
}