use std::cell::Cell;
use std::collections::vec_deque::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::Thread;
use crate::backoff::Backoff;
use crate::mpmc::spinlock::Spinlock;
pub(crate) struct Waiter {
is_moved: Cell<bool>,
thread: Thread,
}
impl Waiter {
#[inline]
pub(crate) fn new() -> Self {
Self {
is_moved: Cell::new(false),
thread: thread::current(),
}
}
pub(crate) fn sleep<F>(&self, out_condition: F) where F: Fn() -> bool {
let backoff = Backoff::default();
loop {
if out_condition() {
return;
}
if backoff.snooze_completed() {
break;
}
}
while !out_condition() {
thread::park();
}
}
#[inline]
fn wake(&self) {
self.thread.unpark();
}
}
struct Metadata {
waiters: VecDeque<*const Waiter>,
selectors: Vec<*const Waiter>,
}
impl Metadata {
#[inline]
pub(crate) fn new() -> Self {
Self {
waiters: VecDeque::new(),
selectors: Vec::new(),
}
}
#[inline]
pub(crate) fn register(&mut self, waiter: &Waiter) {
self.waiters.push_back(waiter as *const Waiter);
}
#[inline]
pub(crate) fn unregister(&mut self, waiter: &Waiter) -> bool {
if waiter.is_moved.get() {
if !self.selectors.is_empty() {
if let Some((i, _)) =
self
.selectors
.iter()
.enumerate()
.find(|&(_, item)| (*item) == (waiter as *const Waiter))
{
self.selectors.remove(i);
return true;
}
}
} else if !self.waiters.is_empty() {
if let Some((i, _)) =
self
.waiters
.iter()
.enumerate()
.find(|&(_, item)| (*item) == (waiter as *const Waiter))
{
self.waiters.remove(i);
return true;
}
}
false
}
#[inline]
pub(crate) fn notify(&mut self) {
if !self.waiters.is_empty() {
let item = self.waiters.pop_front().unwrap();
unsafe { (*item).is_moved.set(true); }
self.selectors.push(item);
}
if !self.selectors.is_empty() {
for item in self.selectors.iter() {
unsafe { (*(*item)).wake(); }
}
}
}
#[inline]
pub(crate) fn notify_all(&mut self) {
if !self.waiters.is_empty() {
for iter in self.waiters.iter() {
unsafe { (*(*iter)).wake(); }
}
}
if !self.selectors.is_empty() {
for iter in self.selectors.iter() {
unsafe { (*(*iter)).wake(); }
}
}
}
}
pub(crate) struct Waker {
guard: Spinlock<Metadata>,
is_empty: AtomicBool,
}
impl Waker {
#[inline]
pub(crate) fn new() -> Self {
Self {
guard: Spinlock::new(Metadata::new()),
is_empty: AtomicBool::new(true),
}
}
#[inline]
pub(crate) fn register<F>(&self, waiter: &Waiter, out_condition: F) -> bool where F: Fn() -> bool {
let mut inner = self.guard.lock();
if out_condition() {
return false;
}
inner.register(waiter);
self.is_empty.store(false, Ordering::SeqCst);
true
}
#[inline]
pub(crate) fn unregister(&self, waiter: &Waiter) {
if !self.is_empty.load(Ordering::SeqCst) {
let mut inner = self.guard.lock();
if inner.unregister(waiter) {
self.is_empty.store(inner.waiters.is_empty() && inner.selectors.is_empty(),
Ordering::SeqCst);
}
}
}
#[inline]
pub(crate) fn notify(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
self.guard.lock().notify();
}
}
#[inline]
pub(crate) fn notify_all(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
self.guard.lock().notify_all();
self.is_empty.store(true, Ordering::SeqCst);
}
}
}