use concurrent_queue::ConcurrentQueue;
use core::sync::atomic::{AtomicBool, Ordering};
use std::thread;
#[cfg(feature = "async")]
use core::task::{Context, Poll, Waker};
#[derive(Debug)]
pub struct Mutex {
locked: AtomicBool,
wakers: ConcurrentQueue<ThreadOrWaker>,
}
impl Mutex {
#[inline]
pub fn new() -> Self {
Self {
locked: AtomicBool::new(false),
wakers: ConcurrentQueue::unbounded(),
}
}
#[inline]
pub fn lock(&self) {
while self
.locked
.compare_exchange(false, true, Ordering::AcqRel, Ordering::AcqRel)
.is_err()
{
let t = thread::current();
self.wakers
.push(ThreadOrWaker::Thread(t))
.unwrap_or_else(|_| panic!("Concurrent queue could not be pushed onto"));
thread::park();
}
}
#[cfg(feature = "async")]
#[inline]
pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<()> {
match self
.locked
.compare_exchange(false, true, Ordering::AcqRel, Ordering::AcqRel)
{
Ok(_) => Poll::Ready(()),
Err(_) => {
self.wakers
.push(ThreadOrWaker::Waker(cx.waker().clone()))
.unwrap_or_else(|_| panic!("Concurrent queue could not be pushed onto"));
Poll::Pending
}
}
}
#[inline]
pub fn unlock(&self) {
if self
.locked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::AcqRel)
.is_ok()
{
while !self.locked.load(Ordering::Acquire) {
match self.wakers.pop() {
Ok(waker) => waker.wake(),
Err(_) => break,
}
}
}
}
}
enum ThreadOrWaker {
Thread(thread::Thread),
#[cfg(feature = "async")]
Waker(Waker),
}
impl ThreadOrWaker {
#[inline]
pub(crate) fn wake(self) {
match self {
ThreadOrWaker::Thread(t) => t.unpark(),
#[cfg(feature = "async")]
ThreadOrWaker::Waker(w) => w.wake(),
}
}
}