use std::cell::Cell;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, Thread};
use conquer_util::BackOff;
use crate::cell::Block;
use crate::state::{
AtomicOnceState,
OnceState::{Ready, Uninit, WouldBlock},
WaiterQueue,
};
use crate::{Internal, POISON_PANIC_MSG};
#[derive(Copy, Clone, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
pub struct ParkThread;
impl Internal for ParkThread {}
impl Block for ParkThread {
#[inline]
fn block(state: &AtomicOnceState) {
let backoff = BackOff::new();
let queue = loop {
let state = state.load(Ordering::Acquire).expect(POISON_PANIC_MSG);
match state {
Ready => return,
WouldBlock(queue) if backoff.advise_yield() => break queue,
_ => {}
}
backoff.spin();
};
backoff.reset();
let mut waiter = StackWaiter {
thread: Cell::new(Some(thread::current())),
ready: AtomicBool::default(),
next: queue.head(),
};
let mut curr = queue;
let new_head = WaiterQueue::from(&waiter as *const StackWaiter);
while let Err(error) = state.try_swap_waiters(curr, new_head, Ordering::AcqRel) {
match error {
WouldBlock(queue) => {
curr = queue;
waiter.next = queue.head();
backoff.spin();
}
Ready => return,
Uninit => unreachable!(), }
}
while !waiter.ready.load(Ordering::Acquire) {
thread::park();
}
assert_eq!(state.load(Ordering::Acquire).expect(POISON_PANIC_MSG), Ready);
}
#[inline]
fn unblock(waiter_queue: WaiterQueue) {
let mut curr = waiter_queue.head();
while !curr.is_null() {
let thread = unsafe {
let waiter = &*curr;
curr = waiter.next;
let thread = waiter.thread.take().unwrap();
waiter.ready.store(true, Ordering::Release);
thread
};
thread.unpark();
}
}
}
#[repr(align(4))]
struct StackWaiter {
thread: Cell<Option<Thread>>,
ready: AtomicBool,
next: *const StackWaiter,
}
impl From<*const StackWaiter> for WaiterQueue {
#[inline]
fn from(waiter: *const StackWaiter) -> Self {
Self { head: waiter as *const () }
}
}
impl WaiterQueue {
#[inline]
fn head(self) -> *const StackWaiter {
self.head as *const _
}
}