#[cfg(not(feature = "std-mutex"))]
use crate::mutex::{Mutex, MutexGuard};
use crate::signal::{Signal, SignalTerminator};
extern crate alloc;
use alloc::{collections::VecDeque, sync::Arc};
#[cfg(feature = "std-mutex")]
use std::sync::{Mutex, MutexGuard};
pub(crate) type Internal<T> = Arc<Mutex<ChannelInternal<T>>>;
#[inline(always)]
pub(crate) fn acquire_internal<T>(internal: &'_ Internal<T>) -> MutexGuard<'_, ChannelInternal<T>> {
#[cfg(not(feature = "std-mutex"))]
return internal.lock();
#[cfg(feature = "std-mutex")]
internal.lock().unwrap()
}
#[inline(always)]
pub(crate) fn try_acquire_internal<T>(
internal: &'_ Internal<T>,
) -> Option<MutexGuard<'_, ChannelInternal<T>>> {
#[cfg(not(feature = "std-mutex"))]
return internal.try_lock();
#[cfg(feature = "std-mutex")]
internal.try_lock().ok()
}
pub(crate) struct ChannelInternal<T> {
pub(crate) queue: VecDeque<T>,
pub(crate) recv_blocking: bool,
pub(crate) wait_list: VecDeque<SignalTerminator<T>>,
pub(crate) capacity: usize,
pub(crate) recv_count: u32,
pub(crate) send_count: u32,
}
unsafe impl<T: Send> Send for ChannelInternal<T> {}
impl<T> ChannelInternal<T> {
#[inline(always)]
pub(crate) fn new(bounded: bool, capacity: usize) -> Internal<T> {
let mut abstract_capacity = capacity;
if !bounded {
abstract_capacity = usize::MAX;
}
let wait_list_size = if capacity == 0 { 8 } else { 4 };
let ret = Self {
queue: VecDeque::with_capacity(capacity),
recv_blocking: false,
wait_list: VecDeque::with_capacity(wait_list_size),
recv_count: 1,
send_count: 1,
capacity: abstract_capacity,
};
Arc::new(Mutex::from(ret))
}
pub(crate) fn terminate_signals(&mut self) {
for t in self.wait_list.iter() {
unsafe { t.terminate() }
}
self.wait_list.clear();
}
#[inline(always)]
pub(crate) fn next_send(&mut self) -> Option<SignalTerminator<T>> {
if self.recv_blocking {
return None;
}
match self.wait_list.pop_front() {
Some(sig) => Some(sig),
None => {
self.recv_blocking = true;
None
}
}
}
#[inline(always)]
pub(crate) fn push_send(&mut self, s: SignalTerminator<T>) {
self.wait_list.push_back(s);
}
#[inline(always)]
pub(crate) fn next_recv(&mut self) -> Option<SignalTerminator<T>> {
if !self.recv_blocking {
return None;
}
match self.wait_list.pop_front() {
Some(sig) => Some(sig),
None => {
self.recv_blocking = false;
None
}
}
}
#[inline(always)]
pub(crate) fn push_recv(&mut self, s: SignalTerminator<T>) {
self.wait_list.push_back(s);
}
pub(crate) fn cancel_send_signal(&mut self, sig: &Signal<T>) -> bool {
if !self.recv_blocking {
for (i, send) in self.wait_list.iter().enumerate() {
if send.eq(sig) {
self.wait_list.remove(i);
return true;
}
}
}
false
}
pub(crate) fn cancel_recv_signal(&mut self, sig: &Signal<T>) -> bool {
if self.recv_blocking {
for (i, recv) in self.wait_list.iter().enumerate() {
if recv.eq(sig) {
self.wait_list.remove(i);
return true;
}
}
}
false
}
#[cfg(feature = "async")]
pub(crate) fn send_signal_exists(&self, sig: &Signal<T>) -> bool {
if !self.recv_blocking {
for signal in self.wait_list.iter() {
if signal.eq(sig) {
return true;
}
}
}
false
}
#[cfg(feature = "async")]
pub(crate) fn recv_signal_exists(&self, sig: &Signal<T>) -> bool {
if self.recv_blocking {
for signal in self.wait_list.iter() {
if signal.eq(sig) {
return true;
}
}
}
false
}
}