use crate::config::{PriorityConfig, QueueStrategy};
use crate::wait_queue::queue::WaitQueue;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
#[derive(Debug)]
pub struct RankedSemaphore {
pub(crate) permits: AtomicUsize,
pub(crate) waiters: Mutex<WaitQueue>,
}
impl RankedSemaphore {
pub const MAX_PERMITS: usize = usize::MAX >> 3;
pub(crate) const CLOSED: usize = 1;
pub(crate) const PERMIT_SHIFT: usize = 1;
pub fn new_fifo(permits: usize) -> Self {
if permits > Self::MAX_PERMITS {
panic!("permits exceed MAX_PERMITS");
}
Self::new(permits, QueueStrategy::Fifo)
}
pub fn new_lifo(permits: usize) -> Self {
if permits > Self::MAX_PERMITS {
panic!("permits exceed MAX_PERMITS");
}
Self::new(permits, QueueStrategy::Lifo)
}
pub fn new(permits: usize, default_strategy: QueueStrategy) -> Self {
if permits > Self::MAX_PERMITS {
panic!("permits exceed MAX_PERMITS");
}
let config = PriorityConfig::new().default_strategy(default_strategy);
Self::new_with_config(permits, config)
}
pub fn new_with_config(permits: usize, config: PriorityConfig) -> Self {
if permits > Self::MAX_PERMITS {
panic!("permits exceed MAX_PERMITS");
}
Self {
permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
waiters: Mutex::new(WaitQueue::new(config)),
}
}
pub fn available_permits(&self) -> usize {
self.permits.load(Ordering::Acquire) >> Self::PERMIT_SHIFT
}
pub fn is_closed(&self) -> bool {
self.permits.load(Ordering::Acquire) & Self::CLOSED == Self::CLOSED
}
pub fn add_permits(&self, added: usize) {
if added == 0 {
return;
}
self.add_permits_locked(added, self.waiters.lock().unwrap());
}
pub(crate) fn add_permits_locked(
&self,
mut rem: usize,
waiters: std::sync::MutexGuard<'_, crate::wait_queue::queue::WaitQueue>,
) {
let mut lock = Some(waiters);
while rem > 0 {
let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap());
if waiters.is_empty() {
drop(waiters);
break;
}
let (wake_list, permits_assigned) = waiters.select_waiters_to_notify(rem);
rem -= permits_assigned;
if permits_assigned == 0 || wake_list.is_empty() {
drop(waiters);
break;
}
drop(waiters);
if self.is_closed() {
self.permits
.fetch_add(permits_assigned << Self::PERMIT_SHIFT, Ordering::Release);
return;
}
let mut wake_list = wake_list;
wake_list.wake_all();
if !wake_list.was_full() {
break;
}
}
if rem > 0 {
let prev = self
.permits
.fetch_add(rem << Self::PERMIT_SHIFT, Ordering::Release);
let prev_permits = prev >> Self::PERMIT_SHIFT;
if prev_permits + rem > Self::MAX_PERMITS {
panic!(
"number of added permits ({}) would overflow MAX_PERMITS ({})",
rem,
Self::MAX_PERMITS
);
}
}
}
pub fn close(&self) {
self.permits.fetch_or(Self::CLOSED, Ordering::Release);
let mut waiters = self.waiters.lock().unwrap();
waiters.close();
}
pub fn forget_permits(&self, n: usize) -> usize {
if n == 0 {
return 0;
}
let mut curr_bits = self.permits.load(Ordering::Acquire);
loop {
let curr_permits = curr_bits >> Self::PERMIT_SHIFT;
let removed = curr_permits.min(n);
let new_permits = curr_permits - removed;
let new_bits = (new_permits << Self::PERMIT_SHIFT) | (curr_bits & Self::CLOSED);
match self.permits.compare_exchange_weak(
curr_bits,
new_bits,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return removed,
Err(actual) => curr_bits = actual,
}
}
}
}