use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::Thread;
use likely_stable::likely;
use crate::spsc::spinlock::Spinlock;
const N_TOTAL: u8 = 132;
const N_SPIN: u8 = 128;
pub(crate) struct Waiter {
thread: Thread,
}
impl Waiter {
#[inline]
pub(crate) fn new() -> Self {
Self {
thread: thread::current(),
}
}
pub(crate) fn sleep<F>(&self, out_condition: F) where F: Fn() -> bool {
for i in 0..N_TOTAL {
if out_condition() {
return;
}
if likely(i < N_SPIN) {
core::hint::spin_loop();
} else {
thread::yield_now();
}
}
while !out_condition() {
thread::park();
}
}
#[inline]
fn wake(&self) {
self.thread.unpark();
}
}
struct Metadata {
waiters: VecDeque<*const Waiter>,
}
impl Metadata {
#[inline]
fn new() -> Self {
Self {
waiters: VecDeque::new(),
}
}
#[inline]
fn register(&mut self, waiter: &Waiter) {
self.waiters.push_back(waiter as *const Waiter);
}
#[inline]
fn unregister(&mut self, waiter: &Waiter) -> bool {
if !self.waiters.is_empty() {
if let Some((i, _)) =
self
.waiters
.iter()
.enumerate()
.find(|&(_, item)| (*item) == (waiter as *const Waiter))
{
self.waiters.remove(i);
return true;
}
}
false
}
#[inline]
fn notify(&mut self) {
if !self.waiters.is_empty() {
unsafe { (*(*self.waiters.get(0).unwrap())).wake(); }
}
}
#[inline]
fn notify_all(&mut self) {
if !self.waiters.is_empty() {
for iter in self.waiters.iter() {
unsafe { (*(*iter)).wake(); }
}
}
}
}
pub(crate) struct Waker {
guard: Spinlock<Metadata>,
is_empty: AtomicBool,
}
impl Waker {
#[inline(always)]
pub(crate) fn new() -> Self {
Self {
guard: Spinlock::new(Metadata::new()),
is_empty: AtomicBool::new(true),
}
}
#[inline]
pub(crate) fn register<F>(&self, waiter: &Waiter, out_condition: F) -> bool where F: Fn() -> bool {
let mut inner = self.guard.lock();
if out_condition() {
return false;
}
inner.register(waiter);
self.is_empty.store(false, Ordering::SeqCst);
true
}
#[inline]
pub(crate) fn unregister(&self, waiter: &Waiter) {
if !self.is_empty.load(Ordering::SeqCst) {
let mut inner = self.guard.lock();
if inner.unregister(waiter) {
self.is_empty.store(inner.waiters.is_empty(), Ordering::SeqCst);
}
}
}
#[inline]
pub(crate) fn notify(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
self.guard.lock().notify();
}
}
#[inline]
pub(crate) fn notify_all(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
self.guard.lock().notify_all();
self.is_empty.store(true, Ordering::SeqCst);
}
}
}