use std::{io, i32};
use std::sync::atomic::{AtomicI32, Ordering};
use sys::{futex_wait, futex_wake};
pub struct RwFutex {
readers: AtomicI32,
writers_queued: AtomicI32,
writers_wakeup: AtomicI32,
}
const WRITER_LOCKED_READERS_QUEUED: i32 = i32::MIN + 1;
impl RwFutex {
pub fn new() -> RwFutex {
RwFutex {
readers: AtomicI32::new(0),
writers_queued: AtomicI32::new(0),
writers_wakeup: AtomicI32::new(0),
}
}
pub fn acquire_read(&self) {
loop {
let mut val;
if self.writers_queued.load(Ordering::Relaxed) == 0 {
val = self.readers.fetch_add(1, Ordering::Acquire);
if val >= 0 {
break;
}
val = val + 1;
while val > WRITER_LOCKED_READERS_QUEUED && val < 0 {
val = self.readers.compare_and_swap(val, WRITER_LOCKED_READERS_QUEUED, Ordering::Relaxed);
}
if val >= 0 {
continue;
}
} else {
val = self.readers.load(Ordering::Relaxed);
}
if let Err(e) = futex_wait(&self.readers, val) {
match e.kind() {
io::ErrorKind::WouldBlock
| io::ErrorKind::Interrupted => (), _ => panic!("{}", e),
}
}
}
}
pub fn acquire_write(&self) {
self.writers_queued.fetch_add(1, Ordering::Acquire);
loop {
let val = self.readers.compare_and_swap(0, i32::MIN, Ordering::Acquire);
if val == 0 {
break;
} else {
if let Err(e) = futex_wait(&self.writers_wakeup, 0) {
match e.kind() {
io::ErrorKind::WouldBlock
| io::ErrorKind::Interrupted => (), _ => panic!("{}", e),
}
}
}
}
self.writers_queued.fetch_sub(1, Ordering::Release);
}
pub fn release_read(&self) {
let val = self.readers.fetch_sub(1, Ordering::Release);
if val == 1 {
if self.writers_queued.load(Ordering::Relaxed) > 0 {
futex_wake(&self.writers_wakeup, 1).unwrap();
}
}
}
pub fn release_write(&self) {
self.readers.swap(0, Ordering::Release);
if self.writers_queued.load(Ordering::Relaxed) > 0 {
if futex_wake(&self.writers_wakeup, 1).unwrap() == 1 {
return;
}
}
futex_wake(&self.readers, i32::MAX).unwrap();
}
}