#![deny(unsafe_code)]
use std::fmt;
#[cfg(not(feature = "loom"))]
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{self, Acquire, Relaxed};
#[cfg(feature = "loom")]
use loom::sync::atomic::AtomicUsize;
use crate::opcode::Opcode;
use crate::sync_primitive::SyncPrimitive;
use crate::wait_queue::WaitQueue;
#[derive(Default)]
pub struct Semaphore {
state: AtomicUsize,
}
impl Semaphore {
pub const MAX_PERMITS: usize = WaitQueue::DATA_MASK;
#[inline]
pub fn is_open(&self, mo: Ordering) -> bool {
let state = self.state.load(mo);
(state & WaitQueue::DATA_MASK) != Self::MAX_PERMITS
}
#[inline]
pub fn is_closed(&self, mo: Ordering) -> bool {
(self.state.load(mo) & WaitQueue::DATA_MASK) == WaitQueue::DATA_MASK
}
#[inline]
pub fn available_permits(&self, mo: Ordering) -> usize {
Self::MAX_PERMITS - (self.state.load(mo) & WaitQueue::DATA_MASK)
}
#[inline]
pub async fn acquire_async(&self) {
loop {
let (result, state) = self.try_acquire_internal(1);
if result {
return;
}
if self.wait_resources_async(state, Opcode::Semaphore(1)).await {
return;
}
}
}
#[inline]
pub fn acquire_sync(&self) {
loop {
let (result, state) = self.try_acquire_internal(1);
if result {
return;
}
if self.wait_resources_sync(state, Opcode::Semaphore(1)) {
return;
}
}
}
#[inline]
pub fn try_acquire(&self) -> bool {
self.try_acquire_internal(1).0
}
#[inline]
pub async fn acquire_many_async(&self, count: usize) {
loop {
let (result, state) = self.try_acquire_internal(count);
if result {
return;
}
#[allow(clippy::cast_possible_truncation)]
if self
.wait_resources_async(state, Opcode::Semaphore(count as u8))
.await
{
return;
}
}
}
#[inline]
pub fn acquire_many_sync(&self, count: usize) {
loop {
let (result, state) = self.try_acquire_internal(count);
if result {
return;
}
#[allow(clippy::cast_possible_truncation)]
if self.wait_resources_sync(state, Opcode::Semaphore(count as u8)) {
return;
}
}
}
#[inline]
pub fn try_acquire_many(&self, count: usize) -> bool {
self.try_acquire_internal(count).0
}
#[inline]
pub fn release(&self) -> bool {
match self.state.compare_exchange(1, 0, Acquire, Relaxed) {
Ok(_) => true,
Err(state) => self.release_loop(state, Opcode::Semaphore(1)),
}
}
#[inline]
pub fn release_many(&self, count: usize) -> bool {
let Ok(count) = u8::try_from(count) else {
return false;
};
match self
.state
.compare_exchange(count as usize, 0, Acquire, Relaxed)
{
Ok(_) => true,
Err(state) => self.release_loop(state, Opcode::Semaphore(count)),
}
}
fn try_acquire_internal(&self, count: usize) -> (bool, usize) {
let mut state = self.state.load(Acquire);
loop {
if state & WaitQueue::ADDR_MASK != 0
|| (state & WaitQueue::DATA_MASK) + count > Self::MAX_PERMITS
{
return (false, state);
}
match self
.state
.compare_exchange(state, state + count, Acquire, Relaxed)
{
Ok(_) => return (true, 0),
Err(new_state) => state = new_state,
}
}
}
}
impl fmt::Debug for Semaphore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load(Relaxed);
let available_permits = Self::MAX_PERMITS - (state & WaitQueue::DATA_MASK);
let wait_queue_being_processed = state & WaitQueue::LOCKED_FLAG == WaitQueue::LOCKED_FLAG;
let wait_queue_tail_addr = state & WaitQueue::ADDR_MASK;
f.debug_struct("WaitQueue")
.field("state", &state)
.field("available_permits", &available_permits)
.field("wait_queue_being_processed", &wait_queue_being_processed)
.field("wait_queue_tail_addr", &wait_queue_tail_addr)
.finish()
}
}
impl SyncPrimitive for Semaphore {
#[inline]
fn state(&self) -> &AtomicUsize {
&self.state
}
#[inline]
fn max_shared_owners() -> usize {
Self::MAX_PERMITS
}
}