use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Weak;
use std::task;
use crate::shared::RuntimeInternals;
use crate::{ptr_as_usize, ProcessId};
pub(crate) const MAX_RUNTIMES: usize = 1 << MAX_RUNTIMES_BITS;
#[cfg(not(any(test, feature = "test")))]
pub(crate) const MAX_RUNTIMES_BITS: usize = 0; #[cfg(any(test, feature = "test"))]
pub(crate) const MAX_RUNTIMES_BITS: usize = 5;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(transparent)]
pub(crate) struct WakerId(u8);
static mut RUNTIMES: [Option<Weak<RuntimeInternals>>; MAX_RUNTIMES] = [NO_RUNTIME; MAX_RUNTIMES];
const NO_RUNTIME: Option<Weak<RuntimeInternals>> = None;
pub(crate) fn init(internals: Weak<RuntimeInternals>) -> WakerId {
static IDS: AtomicU8 = AtomicU8::new(0);
let id = IDS.fetch_add(1, Ordering::SeqCst);
assert!(
(id as usize) < MAX_RUNTIMES,
"Created too many Heph `Runtime`s, maximum of {}",
MAX_RUNTIMES
);
unsafe { RUNTIMES[id as usize] = Some(internals) }
WakerId(id)
}
pub(crate) fn new(waker_id: WakerId, pid: ProcessId) -> task::Waker {
let data = WakerData::new(waker_id, pid).into_raw_data();
let raw_waker = task::RawWaker::new(data, &WAKER_VTABLE);
unsafe { task::Waker::from_raw(raw_waker) }
}
fn get(waker_id: WakerId) -> &'static Weak<RuntimeInternals> {
unsafe {
RUNTIMES[waker_id.0 as usize]
.as_ref()
.expect("tried to get a waker for a thread that isn't initialised")
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(transparent)]
struct WakerData(usize);
const WAKER_ID_MASK: usize = (1 << MAX_RUNTIMES_BITS) - 1;
impl WakerData {
fn new(waker_id: WakerId, pid: ProcessId) -> WakerData {
let data =
WakerData((pid.0 << MAX_RUNTIMES_BITS) | ((waker_id.0 as usize) & WAKER_ID_MASK));
assert!(data.pid() == pid, "`ProcessId` too large for `WakerData`");
data
}
const fn waker_id(self) -> WakerId {
#[allow(clippy::cast_possible_truncation)]
WakerId((self.0 & WAKER_ID_MASK) as u8)
}
const fn pid(self) -> ProcessId {
#[allow(clippy::cast_possible_truncation)]
ProcessId(self.0 >> MAX_RUNTIMES_BITS)
}
const unsafe fn from_raw_data(data: *const ()) -> WakerData {
WakerData(ptr_as_usize(data))
}
const fn into_raw_data(self) -> *const () {
self.0 as *const ()
}
}
static WAKER_VTABLE: task::RawWakerVTable =
task::RawWakerVTable::new(clone_wake_data, wake, wake_by_ref, drop_wake_data);
fn assert_copy<T: Copy>() {}
unsafe fn clone_wake_data(data: *const ()) -> task::RawWaker {
assert_copy::<WakerData>();
task::RawWaker::new(data, &WAKER_VTABLE)
}
unsafe fn wake(data: *const ()) {
let data = WakerData::from_raw_data(data);
if let Some(shared_internals) = get(data.waker_id()).upgrade() {
shared_internals.mark_ready(data.pid());
shared_internals.wake_workers(1);
}
}
unsafe fn wake_by_ref(data: *const ()) {
assert_copy::<WakerData>();
wake(data)
}
unsafe fn drop_wake_data(_: *const ()) {
assert_copy::<WakerData>();
}
#[cfg(test)]
mod tests {
use std::mem::size_of;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::thread::{self, sleep};
use std::time::Duration;
use crate::process::{Process, ProcessData, ProcessId, ProcessResult};
use crate::shared::waker::{self, WakerData};
use crate::shared::{RuntimeInternals, Scheduler};
use crate::spawn::options::Priority;
use crate::{test, RuntimeRef};
const PID1: ProcessId = ProcessId(1);
const PID2: ProcessId = ProcessId(2);
#[test]
fn assert_waker_data_size() {
assert_eq!(size_of::<*const ()>(), size_of::<WakerData>());
}
struct TestProcess;
impl Process for TestProcess {
fn name(&self) -> &'static str {
"TestProcess"
}
fn run(self: Pin<&mut Self>, _: &mut RuntimeRef, _: ProcessId) -> ProcessResult {
unimplemented!();
}
}
#[test]
fn waker() {
let shared_internals = new_internals();
let pid = add_process(&shared_internals.scheduler);
assert!(shared_internals.scheduler.has_process());
assert!(!shared_internals.scheduler.has_ready_process());
let waker = waker::new(shared_internals.shared_id, pid);
waker.wake_by_ref();
assert!(shared_internals.scheduler.has_process());
assert!(shared_internals.scheduler.has_ready_process());
let process = shared_internals.scheduler.remove().unwrap();
assert_eq!(process.as_ref().id(), pid);
waker.wake();
assert!(!shared_internals.scheduler.has_process());
assert!(!shared_internals.scheduler.has_ready_process());
shared_internals.complete(process);
assert!(!shared_internals.scheduler.has_process());
assert!(!shared_internals.scheduler.has_ready_process());
}
#[test]
fn cloned_waker() {
let shared_internals = new_internals();
let pid = add_process(&shared_internals.scheduler);
assert!(shared_internals.scheduler.has_process());
assert!(!shared_internals.scheduler.has_ready_process());
let waker1 = waker::new(shared_internals.shared_id, pid);
let waker2 = waker1.clone();
drop(waker1);
waker2.wake();
assert!(shared_internals.scheduler.has_process());
assert!(shared_internals.scheduler.has_ready_process());
let process = shared_internals.scheduler.remove().unwrap();
assert_eq!(process.as_ref().id(), pid);
}
#[test]
fn wake_from_different_thread() {
let shared_internals = new_internals();
let pid = add_process(&shared_internals.scheduler);
assert!(shared_internals.scheduler.has_process());
assert!(!shared_internals.scheduler.has_ready_process());
let shared_internals2 = shared_internals.clone();
let handle = thread::spawn(move || {
let waker = waker::new(shared_internals2.shared_id, pid);
waker.wake_by_ref();
waker.wake();
});
loop {
if let Some(process) = shared_internals.scheduler.remove() {
assert_eq!(process.as_ref().id(), pid);
shared_internals.complete(process);
break;
}
sleep(Duration::from_millis(1));
}
handle.join().unwrap();
}
#[test]
fn no_internals() {
let waker_id = waker::init(Weak::new());
let waker = waker::new(waker_id, PID1);
waker.wake_by_ref();
waker.wake();
}
#[test]
fn will_wake() {
let waker_id = waker::init(Weak::new());
let waker1a = waker::new(waker_id, PID1);
let waker1b = waker::new(waker_id, PID1);
let waker2a = waker::new(waker_id, PID2);
let waker2b = waker2a.clone();
assert!(waker1a.will_wake(&waker1a));
assert!(waker1a.will_wake(&waker1b));
assert!(!waker1a.will_wake(&waker2a));
assert!(!waker1a.will_wake(&waker2b));
assert!(waker1b.will_wake(&waker1a));
assert!(waker1b.will_wake(&waker1b));
assert!(!waker1b.will_wake(&waker2a));
assert!(!waker1b.will_wake(&waker2b));
assert!(!waker2a.will_wake(&waker1a));
assert!(!waker2a.will_wake(&waker1b));
assert!(waker2a.will_wake(&waker2a));
assert!(waker2a.will_wake(&waker2b));
}
fn new_internals() -> Arc<RuntimeInternals> {
let setup = RuntimeInternals::setup().unwrap();
Arc::new_cyclic(|shared_internals| {
let waker_id = waker::init(shared_internals.clone());
let worker_wakers = vec![&*test::NOOP_WAKER].into_boxed_slice();
setup.complete(waker_id, worker_wakers, None)
})
}
fn add_process(scheduler: &Scheduler) -> ProcessId {
let process: Pin<Box<dyn Process + Send + Sync>> = Box::pin(TestProcess);
let process_data = Box::pin(ProcessData::new(Priority::NORMAL, process));
let pid = process_data.as_ref().id();
scheduler.add_process(process_data);
pid
}
}