use std::ptr::addr_of_mut;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use std::sync::{Condvar, Mutex};
#[derive(Default)]
pub(crate) struct WaitQueue {
wait_queue: AtomicPtr<Entry>,
}
impl WaitQueue {
#[inline]
pub fn wait<T, F: FnOnce() -> Result<T, ()>>(&self, f: F) -> Result<T, ()> {
let mut current = self.wait_queue.load(Relaxed);
let mut entry = Entry::new(current);
while let Err(actual) =
self.wait_queue
.compare_exchange(current, addr_of_mut!(entry), AcqRel, Relaxed)
{
current = actual;
entry.next_ptr = current;
}
let result = f();
if result.is_ok() {
self.signal();
}
entry.wait();
result
}
#[inline]
pub fn signal(&self) {
let mut current = self.wait_queue.swap(std::ptr::null_mut(), AcqRel);
while let Some(entry_ref) = unsafe { current.as_ref() } {
let next_ptr = entry_ref.next_ptr;
entry_ref.signal();
current = next_ptr;
}
}
}
struct Entry {
next_ptr: *mut Entry,
condvar: Condvar,
mutex: Mutex<bool>,
}
impl Entry {
fn new(next_ptr: *mut Entry) -> Entry {
#[allow(clippy::mutex_atomic)]
Entry {
next_ptr,
condvar: Condvar::new(),
mutex: Mutex::new(false),
}
}
fn wait(&self) {
#[allow(clippy::mutex_atomic)]
let mut completed = self.mutex.lock().unwrap();
while !*completed {
completed = self.condvar.wait(completed).unwrap();
}
}
fn signal(&self) {
#[allow(clippy::mutex_atomic)]
let mut completed = self.mutex.lock().unwrap();
*completed = true;
self.condvar.notify_one();
}
}