use std::error::Error;
use std::fmt::Display;
use std::future::Future;
#[cfg(feature="use_std_sync")]
use std::sync::{MutexGuard, TryLockError};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Poll, Waker};
use inc_dec::{IncDecSelf, IntIncDecSelf};
use paste::paste;
use accessorise::impl_get_val;
use crate::QueuedWaker;
use std::fmt::Debug;
use super::PreferredMutexType;
#[derive(Debug)]
pub struct LimitedWakerPermitQueueInternals
{
pub no_permits_queue: VecDeque<QueuedWaker>, pub latest_id: usize,
pub active_ids: HashMap<usize, bool>, pub permits: usize,
pub max_permits_queue: VecDeque<QueuedWaker>
}
impl LimitedWakerPermitQueueInternals
{
pub fn new() -> Self
{
Self
{
no_permits_queue: VecDeque::new(),
latest_id: 0,
active_ids: HashMap::new(),
permits: 0,
max_permits_queue: VecDeque::new()
}
}
pub fn with_capacity(capacity: usize) -> Self
{
Self
{
no_permits_queue: VecDeque::with_capacity(capacity),
latest_id: 0,
active_ids: HashMap::with_capacity(capacity),
permits: 0,
max_permits_queue: VecDeque::with_capacity(capacity)
}
}
pub fn with_permits(permits: usize) -> Self
{
Self
{
no_permits_queue: VecDeque::new(), latest_id: 0,
active_ids: HashMap::new(), permits: permits,
max_permits_queue: VecDeque::new()
}
}
pub fn with_capacity_and_permits(capacity_and_permits: usize) -> Self
{
Self
{
no_permits_queue: VecDeque::with_capacity(capacity_and_permits),
latest_id: 0,
active_ids: HashMap::with_capacity(capacity_and_permits),
permits: capacity_and_permits,
max_permits_queue: VecDeque::with_capacity(capacity_and_permits)
}
}
pub fn with_capacity_and_permits_separate(capacity: usize, permits: usize) -> Self
{
Self
{
no_permits_queue: VecDeque::with_capacity(capacity),
latest_id: 0,
active_ids: HashMap::with_capacity(capacity),
permits,
max_permits_queue: VecDeque::with_capacity(capacity)
}
}
}
#[derive(Debug)]
pub struct LimitedWakerPermitQueue
{
internal_mut_state: PreferredMutexType<Option<LimitedWakerPermitQueueInternals>>,
max_permits: usize
}
impl LimitedWakerPermitQueue
{
pub fn new(max_permits: usize) -> Self
{
Self
{
internal_mut_state: PreferredMutexType::new(Some(LimitedWakerPermitQueueInternals::new())),
max_permits
}
}
pub fn with_capacity(max_permits: usize, capacity: usize) -> Self
{
Self
{
internal_mut_state: PreferredMutexType::new(Some(LimitedWakerPermitQueueInternals::with_capacity(capacity))),
max_permits
}
}
pub fn with_permits(max_permits: usize, permits: usize) -> Self
{
Self
{
internal_mut_state: PreferredMutexType::new(Some(LimitedWakerPermitQueueInternals::with_capacity(permits))),
max_permits
}
}
pub fn with_capacity_and_permits(max_permits: usize, capacity_and_permits: usize) -> Self
{
Self
{
internal_mut_state: PreferredMutexType::new(Some(LimitedWakerPermitQueueInternals::with_capacity_and_permits(capacity_and_permits))),
max_permits
}
}
pub fn with_capacity_and_permits_separate(max_permits: usize, capacity: usize, permits: usize) -> Self
{
Self
{
internal_mut_state: PreferredMutexType::new(Some(LimitedWakerPermitQueueInternals::with_capacity_and_permits_separate(capacity, permits))),
max_permits
}
}
#[cfg(feature="use_std_sync")]
fn get_mg(&self) -> MutexGuard<'_, Option<LimitedWakerPermitQueueInternals>>
{
let lock_result = self.internal_mut_state.lock();
match lock_result
{
Ok(mg) =>
{
mg
}
Err(err) =>
{
self.internal_mut_state.clear_poison();
err.into_inner()
}
}
}
pub fn avalible_permits(&self) -> Option<usize>
{
#[cfg(feature="use_std_sync")]
let mut mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.internal_mut_state.lock();
if let Some(val) = &mut *mg
{
return Some(val.permits);
}
None
}
pub fn active_ids_len(&self) -> Option<usize>
{
#[cfg(feature="use_std_sync")]
let mut mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.internal_mut_state.lock();
if let Some(val) = &mut *mg
{
return Some(val.active_ids.len());
}
None
}
pub fn is_closed(&self) -> bool
{
#[cfg(feature="use_std_sync")]
let mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mg = self.internal_mut_state.lock();
mg.is_none()
}
impl_get_val!(max_permits, usize);
pub fn has_max_permits(&self) -> Option<bool>
{
#[cfg(feature="use_std_sync")]
let mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mg = self.internal_mut_state.lock();
if let Some(val) = &*mg
{
let is_at_max = val.permits == self.max_permits;
return Some(is_at_max);
}
None
}
pub fn head_room(&self) -> Option<usize>
{
#[cfg(feature="use_std_sync")]
let mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mg = self.internal_mut_state.lock();
if let Some(val) = &*mg
{
let head_room = self.max_permits - val.permits;
return Some(head_room);
}
None
}
pub fn add_permits(&self, count: usize, buffer: &mut VecDeque<QueuedWaker>) -> Option<usize>
{
let permits_added;
{
#[cfg(feature="use_std_sync")]
let mut mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.internal_mut_state.lock();
if let Some(val) = &mut *mg
{
if count == 0
{
return Some(0);
}
let original_permits = val.permits;
let permits = val.permits;
let new_permits;
if let Some(resultant_permits) = permits.checked_add(count)
{
new_permits = resultant_permits;
}
else
{
return Some(0);
}
if new_permits > self.max_permits
{
val.permits = self.max_permits;
permits_added = self.max_permits - original_permits;
}
else
{
val.permits = new_permits;
permits_added = count;
}
let mut potential_wakers_to_wake = permits_added;
while potential_wakers_to_wake > 0
{
let opt_front_waker = val.no_permits_queue.pop_front();
if let Some(front_waker) = opt_front_waker
{
if let Some(shouldve_awoken) = val.active_ids.get_mut(&front_waker.id())
{
*shouldve_awoken = true;
}
buffer.push_back(front_waker);
}
else
{
break;
}
potential_wakers_to_wake.mm();
}
}
else
{
return None;
}
}
for item in buffer.drain(..)
{
item.wake();
}
Some(permits_added)
}
pub fn add_permit(&self) -> Option<bool>
{
let opt_waker;
{
#[cfg(feature="use_std_sync")]
let mut mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.internal_mut_state.lock();
if let Some(val) = &mut *mg
{
let permits = val.permits;
let resultant_permits;
match permits.checked_add(1)
{
Some(val) =>
{
resultant_permits = val;
}
None =>
{
return Some(false);
}
}
if resultant_permits > self.max_permits
{
return Some(false);
}
val.permits = resultant_permits;
opt_waker = val.no_permits_queue.pop_front();
if let Some(front_waker) = &opt_waker
{
if let Some(shouldve_awoken) = val.active_ids.get_mut(&front_waker.id())
{
*shouldve_awoken = true;
}
}
}
else
{
return None;
}
}
if let Some(waker) = opt_waker
{
waker.wake();
}
Some(true)
}
pub fn remove_permits(&self, count: usize, buffer: &mut VecDeque<QueuedWaker>) -> Option<usize>
{
let permits_removed;
{
#[cfg(feature="use_std_sync")]
let mut mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.internal_mut_state.lock();
if let Some(val) = &mut *mg
{
if count == 0
{
return Some(0);
}
let permits = val.permits;
if let Some(resultant_permits) = permits.checked_sub(count)
{
val.permits = resultant_permits;
permits_removed = count;
}
else
{
permits_removed = count - (count - val.permits);
val.permits = 0;
}
let mut potential_wakers_to_wake = permits_removed;
while potential_wakers_to_wake > 0
{
let opt_front_waker = val.max_permits_queue.pop_front();
if let Some(front_waker) = opt_front_waker
{
if let Some(shouldve_awoken) = val.active_ids.get_mut(&front_waker.id())
{
*shouldve_awoken = true;
}
buffer.push_back(front_waker);
}
else
{
break;
}
potential_wakers_to_wake.mm();
}
}
else
{
return None;
}
}
for item in buffer.drain(..)
{
item.wake();
}
Some(permits_removed)
}
pub fn remove_permit(&self) -> Option<bool>
{
let opt_waker;
{
#[cfg(feature="use_std_sync")]
let mut mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.internal_mut_state.lock();
if let Some(val) = &mut *mg
{
let permits = val.permits;
if let Some(resultant_permits) = permits.checked_sub(1)
{
val.permits = resultant_permits;
opt_waker = val.max_permits_queue.pop_front();
if let Some(front_waker) = &opt_waker
{
if let Some(shouldve_awoken) = val.active_ids.get_mut(&front_waker.id())
{
*shouldve_awoken = true;
}
}
}
else
{
return Some(false);
}
}
else
{
return None;
}
}
if let Some(waker) = opt_waker
{
waker.wake();
}
Some(true)
}
pub fn decrement_permits_or_wait<'a>(&'a self) -> LimitedWakerPermitQueueDecrementPermitsOrWait<'a>
{
LimitedWakerPermitQueueDecrementPermitsOrWait::new(self)
}
pub fn increment_permits_or_wait<'a>(&'a self) -> LimitedWakerPermitQueueIncrementPermitsOrWait<'a>
{
LimitedWakerPermitQueueIncrementPermitsOrWait::new(self)
}
pub fn close(&self)
{
let opt_internals;
{
#[cfg(feature="use_std_sync")]
let mut mg = self.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.internal_mut_state.lock();
opt_internals = mg.take();
}
if let Some(mut internal_mut_state) = opt_internals
{
for item in internal_mut_state.no_permits_queue.drain(..)
{
item.wake();
}
for item in internal_mut_state.max_permits_queue.drain(..)
{
item.wake();
}
}
}
}
impl Drop for LimitedWakerPermitQueue
{
fn drop(&mut self)
{
self.close();
}
}
#[derive(Debug, PartialEq)]
pub struct LimitedWakerPermitQueueClosedError
{
}
impl LimitedWakerPermitQueueClosedError
{
pub fn new() -> Self
{
Self
{}
}
pub fn err() -> Result<(), Self>
{
Err(Self::new())
}
}
impl Display for LimitedWakerPermitQueueClosedError
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result
{
write!(f, "LimitedWakerPermitQueue is closed")
}
}
impl Error for LimitedWakerPermitQueueClosedError
{
}
#[derive(Debug)]
pub struct LimitedWakerPermitQueueDecrementPermitsOrWait<'a>
{
waker_permit_queue_ref: &'a LimitedWakerPermitQueue,
opt_waker_id: Option<usize>
}
impl<'a> LimitedWakerPermitQueueDecrementPermitsOrWait<'a>
{
pub fn new(waker_permit_queue_ref: &'a LimitedWakerPermitQueue) -> Self
{
Self
{
waker_permit_queue_ref,
opt_waker_id: None
}
}
}
impl Future for LimitedWakerPermitQueueDecrementPermitsOrWait<'_>
{
type Output = Result<(), LimitedWakerPermitQueueClosedError>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>
{
let opt_waker;
match self.opt_waker_id
{
Some(id) =>
{
#[cfg(feature="use_std_sync")]
let mut mg = self.waker_permit_queue_ref.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.waker_permit_queue_ref.internal_mut_state.lock();
match &mut *mg
{
Some(val) =>
{
if let Some(shouldve_awoken) = val.active_ids.get(&id)
{
if *shouldve_awoken
{
let self_mut = self.get_mut();
let permits = val.permits;
if let Some(new_permits) = permits.checked_sub(1)
{
val.permits = new_permits;
val.active_ids.remove(&id);
self_mut.opt_waker_id = None;
opt_waker = val.max_permits_queue.pop_front();
if let Some(waker) = &opt_waker
{
if let Some(mut_ref) = val.active_ids.get_mut(&waker.id())
{
*mut_ref = true;
}
}
else
{
return Poll::Ready(Ok(()));
}
}
else
{
let waker = cx.waker().clone();
let queued_waker = QueuedWaker::new(waker, id);
val.no_permits_queue.push_back(queued_waker);
if let Some(mut_ref) = val.active_ids.get_mut(&id)
{
*mut_ref = false;
}
self_mut.opt_waker_id = Some(id);
return Poll::Pending;
}
}
else
{
let waker = cx.waker().clone();
let queued_waker = QueuedWaker::new(waker, id);
val.no_permits_queue.push_back(queued_waker);
let self_mut = self.get_mut();
self_mut.opt_waker_id = Some(id);
return Poll::Pending;
}
}
else
{
return Poll::Ready(Ok(()));
}
}
None =>
{
return Poll::Ready(LimitedWakerPermitQueueClosedError::err());
}
}
}
None =>
{
let mut id = 0;
#[cfg(feature="use_std_sync")]
let mut mg = self.waker_permit_queue_ref.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.waker_permit_queue_ref.internal_mut_state.lock();
match &mut *mg
{
Some(val) =>
{
let permits = val.permits;
if val.no_permits_queue.is_empty() && let Some(new_permits) = permits.checked_sub(1)
{
val.permits = new_permits;
opt_waker = val.max_permits_queue.pop_front();
if let Some(waker) = &opt_waker
{
if let Some(mut_ref) = val.active_ids.get_mut(&waker.id())
{
*mut_ref = true;
}
}
else
{
return Poll::Ready(Ok(()));
}
}
else
{
let mut inserted = false;
let waker = cx.waker().clone();
while !inserted
{
id = val.latest_id.wpp();
inserted = val.active_ids.insert(id, false).is_none();
}
let queued_waker = QueuedWaker::new(waker, id);
val.no_permits_queue.push_back(queued_waker);
let self_mut = self.get_mut();
self_mut.opt_waker_id = Some(id);
return Poll::Pending;
}
}
None =>
{
return Poll::Ready(LimitedWakerPermitQueueClosedError::err());
}
}
}
}
if let Some(waker) = opt_waker
{
waker.wake();
}
Poll::Ready(Ok(()))
}
}
impl Drop for LimitedWakerPermitQueueDecrementPermitsOrWait<'_>
{
fn drop(&mut self)
{
if let Some(id) = self.opt_waker_id
{
#[cfg(feature="use_std_sync")]
let mut mg = self.waker_permit_queue_ref.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.waker_permit_queue_ref.internal_mut_state.lock();
if let Some(wqi) = &mut *mg
{
wqi.active_ids.remove(&id);
let mut index = 0;
let mut index_found = false;
for item in wqi.no_permits_queue.iter()
{
if id == item.id()
{
index_found = true;
break;
}
index.pp();
}
if index_found
{
wqi.no_permits_queue.remove(index);
}
}
}
}
}
#[derive(Debug)]
pub struct LimitedWakerPermitQueueIncrementPermitsOrWait<'a>
{
waker_permit_queue_ref: &'a LimitedWakerPermitQueue,
opt_waker_id: Option<usize>
}
impl<'a> LimitedWakerPermitQueueIncrementPermitsOrWait<'a>
{
pub fn new(waker_permit_queue_ref: &'a LimitedWakerPermitQueue) -> Self
{
Self
{
waker_permit_queue_ref,
opt_waker_id: None
}
}
}
impl Future for LimitedWakerPermitQueueIncrementPermitsOrWait<'_>
{
type Output = Result<(), LimitedWakerPermitQueueClosedError>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>
{
let opt_waker;
match self.opt_waker_id
{
Some(id) =>
{
#[cfg(feature="use_std_sync")]
let mut mg = self.waker_permit_queue_ref.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.waker_permit_queue_ref.internal_mut_state.lock();
match &mut *mg
{
Some(val) =>
{
if let Some(shouldve_awoken) = val.active_ids.get(&id)
{
if *shouldve_awoken
{
let self_mut = self.get_mut();
let permits = val.permits;
if let Some(new_permits) = permits.checked_add(1) && new_permits <= self_mut.waker_permit_queue_ref.max_permits
{
val.permits = new_permits;
val.active_ids.remove(&id);
self_mut.opt_waker_id = None;
opt_waker = val.no_permits_queue.pop_front();
if let Some(waker) = &opt_waker
{
if let Some(mut_ref) = val.active_ids.get_mut(&waker.id())
{
*mut_ref = true;
}
}
else
{
return Poll::Ready(Ok(()));
}
}
else
{
let waker = cx.waker().clone();
let queued_waker = QueuedWaker::new(waker, id);
val.max_permits_queue.push_back(queued_waker);
if let Some(mut_ref) = val.active_ids.get_mut(&id)
{
*mut_ref = false;
}
self_mut.opt_waker_id = Some(id);
return Poll::Pending;
}
}
else
{
let waker = cx.waker().clone();
let queued_waker = QueuedWaker::new(waker, id);
val.max_permits_queue.push_back(queued_waker);
let self_mut = self.get_mut();
self_mut.opt_waker_id = Some(id);
return Poll::Pending;
}
}
else
{
return Poll::Ready(Ok(()));
}
}
None =>
{
return Poll::Ready(LimitedWakerPermitQueueClosedError::err());
}
}
}
None =>
{
#[cfg(feature="use_std_sync")]
let mut mg = self.waker_permit_queue_ref.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.waker_permit_queue_ref.internal_mut_state.lock();
match &mut *mg
{
Some(val) =>
{
let permits = val.permits;
if val.max_permits_queue.is_empty() && let Some(new_permits) = permits.checked_add(1) && new_permits <= self.waker_permit_queue_ref.max_permits
{
val.permits = new_permits;
opt_waker = val.no_permits_queue.pop_front();
if let Some(waker) = &opt_waker
{
if let Some(mut_ref) = val.active_ids.get_mut(&waker.id())
{
*mut_ref = true;
}
}
else
{
return Poll::Ready(Ok(()));
}
}
else
{
let mut id = 0;
let mut inserted = false;
let waker = cx.waker().clone();
while !inserted
{
id = val.latest_id.wpp();
inserted = val.active_ids.insert(id, false).is_none();
}
let queued_waker = QueuedWaker::new(waker, id);
val.max_permits_queue.push_back(queued_waker);
let self_mut = self.get_mut();
self_mut.opt_waker_id = Some(id);
return Poll::Pending;
}
}
None =>
{
return Poll::Ready(LimitedWakerPermitQueueClosedError::err());
}
}
}
}
if let Some(waker) = opt_waker
{
waker.wake();
}
Poll::Ready(Ok(()))
}
}
impl Drop for LimitedWakerPermitQueueIncrementPermitsOrWait<'_>
{
fn drop(&mut self)
{
if let Some(id) = self.opt_waker_id
{
#[cfg(feature="use_std_sync")]
let mut mg = self.waker_permit_queue_ref.get_mg();
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
let mut mg = self.waker_permit_queue_ref.internal_mut_state.lock();
if let Some(wqi) = &mut *mg
{
wqi.active_ids.remove(&id);
let mut index = 0;
let mut index_found = false;
for item in wqi.max_permits_queue.iter()
{
if id == item.id()
{
index_found = true;
break;
}
index.pp();
}
if index_found
{
wqi.max_permits_queue.remove(index);
}
}
}
}
}