mod semaphore;
pub mod guard;
use std::cell::UnsafeCell;
use std::hint;
use std::sync::atomic::Ordering::*;
use std::thread;
use guard::{WomGuard, WorlGuardRead, WorlGuardWrite};
use semaphore::{AcquireError, AcquireResult, RwSemaphore};
type AtomicCounter = std::sync::atomic::AtomicU8;
pub struct Worl<T> {
sem: RwSemaphore,
data: UnsafeCell<Option<T>>,
}
pub struct Wom<T> {
locked: AtomicCounter,
data: UnsafeCell<Option<T>>,
}
impl<T> Worl<T> {
pub fn new(data: T) -> Self {
Self {
sem: RwSemaphore::new(),
data: UnsafeCell::new(Some(data)),
}
}
pub fn empty() -> Self {
Self {
sem: RwSemaphore::new(),
data: UnsafeCell::new(None),
}
}
#[inline(always)]
fn has_contents(&self) -> bool {
unsafe { &*self.data.get() }.is_some()
}
pub fn set_timeout(self, timeout_millis: u16) -> Self {
self.sem.set_timeout(timeout_millis);
self
}
pub fn read(&self) -> AcquireResult<WorlGuardRead<'_, T>> {
if let Err(err) = self.sem.acquire_read() {
Err(err)
} else if !self.has_contents() {
self.sem.release_read();
Err(AcquireError::ValueNone)
} else {
Ok(WorlGuardRead::new(self))
}
}
pub fn write(&self) -> AcquireResult<WorlGuardWrite<'_, T>> {
if let Err(err) = self.sem.acquire_read() {
Err(err)
} else {
self.sem.release_read();
if !self.has_contents() {
Err(AcquireError::ValueNone)
} else if let Err(err) = self.sem.acquire_write() {
Err(err)
} else {
Ok(WorlGuardWrite::new(self))
}
}
}
pub fn read_unscheduled(&self) -> AcquireResult<WorlGuardRead<'_, T>> {
if let Err(err) = self.sem.acquire_read_unscheduled() {
Err(err)
} else if !self.has_contents() {
self.sem.release_read();
Err(AcquireError::ValueNone)
} else {
Ok(WorlGuardRead::new(self))
}
}
pub fn write_unscheduled(&self) -> AcquireResult<WorlGuardWrite<'_, T>> {
if let Err(err) = self.sem.acquire_read_unscheduled() {
Err(err)
} else {
self.sem.release_read();
if !self.has_contents() {
Err(AcquireError::ValueNone)
} else if let Err(err) = self.sem.acquire_write_unscheduled() {
Err(err)
} else {
Ok(WorlGuardWrite::new(self))
}
}
}
pub fn clear(&self) -> AcquireResult {
self.sem.acquire_write_unscheduled()?;
unsafe {
*self.data.get() = None;
}
self.sem.release_write();
Ok(())
}
pub fn set(&self, data: T) -> AcquireResult<WorlGuardWrite<'_, T>> {
self.sem.acquire_write_unscheduled()?;
unsafe {
*self.data.get() = Some(data);
}
Ok(WorlGuardWrite::new(self))
}
pub fn swap(&self, data: &mut T) -> AcquireResult<WorlGuardWrite<'_, T>> {
self.sem.acquire_read_unscheduled()?;
let current = unsafe { &mut *self.data.get() }.as_mut();
if let Some(cur) = current {
self.sem.release_read();
self.sem.acquire_write_unscheduled()?;
std::mem::swap(data, cur);
Ok(WorlGuardWrite::new(self))
} else {
self.sem.release_read();
Err(AcquireError::ValueNone)
}
}
}
impl<'a, T> Wom<T> {
const UNLOCKED: u8 = 0;
const LOCKED: u8 = 1;
const CONTENDED: u8 = 2;
pub fn new(data: T) -> Self {
Self {
locked: AtomicCounter::new(Self::UNLOCKED),
data: UnsafeCell::new(Some(data)),
}
}
pub fn empty() -> Self {
Self {
locked: AtomicCounter::new(Self::UNLOCKED),
data: UnsafeCell::new(None),
}
}
fn data_as_mut(&'a self) -> Option<&'a mut T> {
unsafe { &mut *self.data.get() }.as_mut()
}
#[inline]
pub fn get_mut(&'a mut self) -> AcquireResult<&'a mut T> {
self.data.get_mut().as_mut().ok_or(AcquireError::ValueNone)
}
fn spin(&self) -> u8 {
let mut spin = 100;
loop {
let state = self.locked.load(Relaxed);
if state != Self::LOCKED || spin == 0 {
return state;
}
hint::spin_loop();
spin -= 1;
}
}
fn resolve_contention(&self) {
let mut state = self.spin();
if state == Self::UNLOCKED {
match self
.locked
.compare_exchange(Self::UNLOCKED, Self::LOCKED, Acquire, Relaxed)
{
Ok(_) => return, Err(s) => state = s,
}
}
loop {
if state != Self::CONTENDED
&& self.locked.swap(Self::CONTENDED, AcqRel) == Self::UNLOCKED
{
return;
}
unsafe {
let lock_addr = self.locked.as_ptr();
#[allow(clippy::while_immutable_condition)]
while *lock_addr == state {
thread::yield_now();
}
}
state = self.spin();
}
}
fn release(&self) {
if self
.locked
.compare_exchange(Self::LOCKED, Self::UNLOCKED, AcqRel, Relaxed)
.is_err()
{
let mut state = self.spin();
if state == Self::CONTENDED {
match self
.locked
.compare_exchange(Self::CONTENDED, Self::UNLOCKED, AcqRel, Relaxed)
{
Ok(_) => return, Err(s) => state = s,
}
}
loop {
if state != Self::CONTENDED
&& self.locked.swap(Self::CONTENDED, Release) == Self::UNLOCKED
{
return;
}
unsafe {
let lock_addr = self.locked.as_ptr();
#[allow(clippy::while_immutable_condition)]
while *lock_addr == state {
thread::yield_now();
}
}
state = self.spin();
}
}
}
pub fn lock(&'a self) -> AcquireResult<WomGuard<'a, T>> {
if self
.locked
.compare_exchange(Self::UNLOCKED, Self::LOCKED, Acquire, Relaxed)
.is_err()
{
self.resolve_contention();
}
if self.data_as_mut().is_none() {
self.release();
Err(AcquireError::ValueNone)
} else {
Ok(WomGuard::new(self))
}
}
pub fn try_lock(&'a self) -> AcquireResult<WomGuard<'a, T>> {
if self
.locked
.compare_exchange(Self::UNLOCKED, Self::LOCKED, Acquire, Relaxed)
.is_err()
{
Err(AcquireError::WriteUnavailable)
} else if self.data_as_mut().is_none() {
Err(AcquireError::ValueNone)
} else {
Ok(WomGuard::new(self))
}
}
pub fn clear(&self) -> AcquireResult {
let _ = self.lock()?;
unsafe {
*self.data.get() = None;
}
Ok(())
}
pub fn set(&'a self, data: T) -> AcquireResult<WomGuard<'a, T>> {
if self
.locked
.compare_exchange(Self::UNLOCKED, Self::LOCKED, Acquire, Relaxed)
.is_err()
{
self.resolve_contention();
}
unsafe {
*self.data.get() = Some(data);
}
Ok(WomGuard::new(self))
}
pub fn swap(&'a self, data: &mut T) -> AcquireResult<WomGuard<'a, T>> {
let lock = self.lock()?;
std::mem::swap(data, self.data_as_mut().unwrap());
Ok(lock)
}
}
impl<T> Drop for Worl<T> {
fn drop(&mut self) {
self.sem.close();
}
}
unsafe impl<T: Send> Sync for Worl<T> {}
unsafe impl<T: Send> Send for Worl<T> {}
unsafe impl<T: Send> Sync for Wom<T> {}
unsafe impl<T: Send> Send for Wom<T> {}