use crate::SafeUnwrap;
use once_cell::sync::OnceCell;
use std::collections::VecDeque;
use std::mem;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::sync::Weak;
use std::task::Context;
use std::task::RawWaker;
use std::task::RawWakerVTable;
use std::task::Waker;
use std::thread;
#[derive(Clone)]
#[doc(hidden)]
pub struct Execlet(Arc<RustExeclet>);
struct WeakExeclet(Weak<RustExeclet>);
#[doc(hidden)]
pub struct RustExeclet(Mutex<ExecletImpl>);
impl Execlet {
fn new() -> Execlet {
Execlet(Arc::new(RustExeclet(Mutex::new(ExecletImpl {
runqueue: VecDeque::new(),
waker: None,
running: false,
}))))
}
fn downgrade(&self) -> WeakExeclet {
WeakExeclet(Arc::downgrade(&self.0))
}
unsafe fn from_raw(ptr: *const RustExeclet) -> Execlet {
Execlet(Arc::from_raw(ptr))
}
pub(crate) unsafe fn from_raw_ref(ptr: *const RustExeclet) -> Execlet {
let this = Execlet::from_raw(ptr);
mem::forget(this.clone());
this
}
pub(crate) fn run(&self, cx: &mut Context) {
let mut guard = self.0 .0.lock().safe_unwrap();
safe_debug_assert!(!guard.running);
guard.running = true;
guard.waker = Some((*cx.waker()).clone());
while let Some(task) = guard.runqueue.pop_front() {
drop(guard);
unsafe {
task.run();
}
guard = self.0 .0.lock().safe_unwrap();
}
guard.running = false;
}
fn submit(&self, task: ExecletTask) {
let mut this = self.0 .0.lock().safe_unwrap();
this.runqueue.push_back(task);
if !this.running {
if let Some(ref waker) = this.waker {
let waker = (*waker).clone();
drop(this);
waker.wake_by_ref();
}
}
}
}
struct ExecletImpl {
runqueue: VecDeque<ExecletTask>,
waker: Option<Waker>,
running: bool,
}
struct ExecletTask {
run: unsafe extern "C" fn(*mut u8),
data: *mut u8,
}
impl ExecletTask {
fn new(run: unsafe extern "C" fn(*mut u8), data: *mut u8) -> Self {
Self { run, data }
}
unsafe fn run(self) {
(self.run)(self.data)
}
}
unsafe impl Send for ExecletTask {}
unsafe impl Sync for ExecletTask {}
pub(crate) struct ExecletReaper {
execlets: Mutex<ExecletReaperQueue>,
cond: Condvar,
}
struct ExecletReaperQueue {
new: Vec<WeakExeclet>,
old: Vec<Execlet>,
}
impl ExecletReaper {
pub(crate) fn get() -> Arc<ExecletReaper> {
static INSTANCE: OnceCell<Arc<ExecletReaper>> = OnceCell::new();
(*INSTANCE.get_or_init(|| {
let reaper = Arc::new(ExecletReaper {
execlets: Mutex::new(ExecletReaperQueue {
new: vec![],
old: vec![],
}),
cond: Condvar::new(),
});
let reaper_ = reaper.clone();
thread::spawn(move || ExecletReaper::run(reaper_));
reaper
}))
.clone()
}
pub(crate) fn add(&self, execlet: Execlet) {
let mut execlets = self.execlets.lock().safe_unwrap();
execlets.new.push(execlet.downgrade());
self.cond.notify_all();
}
fn run(self: Arc<Self>) {
loop {
let mut execlets = self.execlets.lock().safe_unwrap();
while execlets.is_empty() {
execlets = self.cond.wait(execlets).safe_unwrap();
}
let execlets_to_process = execlets.take();
for execlet in execlets_to_process {
drop(execlets);
unsafe {
execlet.run(&mut Context::from_waker(&Waker::from_raw(
ExecletReaperWaker {
execlet: execlet.clone(),
}
.into_raw(),
)));
}
execlets = self.execlets.lock().safe_unwrap();
execlets.old.push(execlet);
}
}
}
fn wake(&self) {
self.cond.notify_all();
}
}
impl ExecletReaperQueue {
fn is_empty(&self) -> bool {
self.new.is_empty() && self.old.is_empty()
}
fn take(&mut self) -> Vec<Execlet> {
let mut execlets = mem::take(&mut self.old);
for execlet in mem::take(&mut self.new).into_iter() {
if let Some(execlet) = execlet.0.upgrade() {
execlets.push(Execlet(execlet));
}
}
execlets
}
}
#[derive(Clone)]
struct ExecletReaperWaker {
execlet: Execlet,
}
impl ExecletReaperWaker {
unsafe fn from_raw(data: *const ()) -> ExecletReaperWaker {
ExecletReaperWaker {
execlet: Execlet(Arc::from_raw(data as *const RustExeclet)),
}
}
fn into_raw(self) -> RawWaker {
RawWaker::new(
Arc::into_raw(self.execlet.0) as *const (),
&EXECLET_REAPER_WAKER_VTABLE,
)
}
fn wake(self) {
ExecletReaper::get().wake();
}
}
unsafe fn execlet_reaper_waker_clone(data: *const ()) -> RawWaker {
let execlet_reaper_waker = ExecletReaperWaker::from_raw(data);
let waker = execlet_reaper_waker.clone().into_raw();
mem::forget(execlet_reaper_waker);
waker
}
unsafe fn execlet_reaper_waker_wake(data: *const ()) {
ExecletReaperWaker::from_raw(data).wake()
}
unsafe fn execlet_reaper_waker_wake_by_ref(data: *const ()) {
let waker = ExecletReaperWaker::from_raw(data);
waker.clone().wake();
mem::forget(waker);
}
unsafe fn execlet_reaper_waker_drop(data: *const ()) {
drop(ExecletReaperWaker::from_raw(data))
}
static EXECLET_REAPER_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
execlet_reaper_waker_clone,
execlet_reaper_waker_wake,
execlet_reaper_waker_wake_by_ref,
execlet_reaper_waker_drop,
);
#[no_mangle]
#[doc(hidden)]
pub unsafe extern "C" fn cxxasync_execlet_create() -> *const RustExeclet {
Arc::into_raw(Execlet::new().0)
}
#[no_mangle]
#[doc(hidden)]
pub unsafe extern "C" fn cxxasync_execlet_release(this: *mut RustExeclet) -> bool {
let execlet = Execlet::from_raw(this);
Arc::strong_count(&execlet.0) > 1 }
#[no_mangle]
#[doc(hidden)]
pub unsafe extern "C" fn cxxasync_execlet_submit(
this: *mut RustExeclet,
run: extern "C" fn(*mut u8),
task_data: *mut u8,
) {
Execlet::from_raw_ref(this).submit(ExecletTask::new(run, task_data))
}