use std::{
collections::vec_deque::VecDeque,
sync::atomic::{AtomicBool, AtomicU32, Ordering},
};
use parking_lot::Mutex;
use omango_futex::{wait, wake_one};
use omango_util::{
backoff::Backoff,
hint::{likely, unlikely},
};
use crate::queue::state::State;
const PARKED: u32 = 1;
const UN_PARKED: u32 = 2;
pub(crate) trait Checker {
fn is_close(&self) -> bool;
}
pub(crate) struct Waiter {
atom: *const AtomicU32,
expected: u32,
parked: AtomicU32,
}
impl Waiter {
#[inline]
pub(crate) fn new(e_lap: *const AtomicU32, lap: u32) -> Self {
Self {
atom: e_lap,
expected: lap,
parked: AtomicU32::new(UN_PARKED),
}
}
pub(crate) fn retry(&self, checker: &dyn Checker, n_retry: u8) -> State {
let backoff = Backoff::default();
let atom = unsafe { &(*self.atom) };
for _ in 0..n_retry {
loop {
if unlikely(checker.is_close()) {
return State::Closed;
}
if atom.load(Ordering::Acquire) != self.expected {
return State::Success;
}
if backoff.snooze_completed() {
break;
}
}
backoff.reset();
}
State::Failed
}
pub(crate) fn sleep(&self, checker: &dyn Checker) -> State {
let atom = unsafe { &(*self.atom) };
loop {
if unlikely(checker.is_close()) {
self.parked.store(UN_PARKED, Ordering::Release);
return State::Closed;
}
if atom.load(Ordering::Acquire) != self.expected {
self.parked.store(UN_PARKED, Ordering::Release);
return State::Success;
}
self.parked.store(PARKED, Ordering::Relaxed);
wait(&self.parked, PARKED);
}
}
}
struct Metadata {
waiters: VecDeque<*const Waiter>,
start: usize,
closed: bool,
}
impl Metadata {
#[inline]
fn new() -> Self {
Self {
waiters: VecDeque::new(),
start: 0,
closed: false,
}
}
#[inline]
fn register(&mut self, waiter: &Waiter) {
self.waiters.push_back(waiter as *const Waiter);
}
#[inline]
fn unregister(&mut self, waiter: &Waiter) {
if let Some((i, _)) =
self
.waiters
.iter()
.enumerate()
.find(|&(_, item)| (*item) == (waiter as *const Waiter))
{
self.waiters.remove(i);
if self.start > 0 {
self.start -= 1;
}
}
}
#[inline]
fn notify(&mut self) {
if likely(self.start < self.waiters.len()) {
self.start += 1;
}
for idx in 0..self.start {
let waiter = self.waiters.get(idx).unwrap();
unsafe {
if (*(*waiter)).parked.load(Ordering::Acquire) == PARKED {
wake_one(&(*(*waiter)).parked);
}
}
}
}
#[inline]
fn close(&mut self) {
self.closed = true;
if !self.waiters.is_empty() {
for iter in self.waiters.iter() {
unsafe { wake_one(&(*(*iter)).parked); }
}
}
}
#[inline]
fn is_empty(&self) -> bool {
self.waiters.is_empty()
}
}
pub(crate) struct Waker {
guard: Mutex<Metadata>,
empty: AtomicBool,
}
impl Default for Waker {
#[inline]
fn default() -> Self {
Self {
guard: Mutex::new(Metadata::new()),
empty: AtomicBool::new(true),
}
}
}
impl Waker {
#[inline]
pub(crate) fn register(&self, waiter: &Waiter) -> bool {
let mut inner = self.guard.lock();
if inner.closed {
return false;
}
unsafe {
if (*waiter.atom).load(Ordering::Acquire) != waiter.expected {
return false;
}
}
inner.register(waiter);
self.empty.store(false, Ordering::SeqCst);
true
}
#[inline]
pub(crate) fn unregister(&self, waiter: &Waiter) {
let mut inner = self.guard.lock();
inner.unregister(waiter);
self.empty.store(inner.waiters.is_empty(), Ordering::SeqCst);
}
#[inline]
pub(crate) fn wake(&self) {
if unlikely(!self.empty.load(Ordering::SeqCst)) {
self.guard.lock().notify();
}
}
pub(crate) fn close(&self) {
loop {
let mut inner = self.guard.lock();
if !inner.is_empty() {
inner.close();
} else {
self.empty.store(true, Ordering::SeqCst);
return;
}
}
}
}