use crate::coord::CapacityGate;
use crate::error::{RecvError, SendError, TryRecvError, TrySendError};
use crate::mpsc::unbounded;
use crate::sync_util;
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use super::bounded_async::{AsyncReceiver, AsyncSender};
#[derive(Debug)]
pub(crate) struct Permit {
pub(crate) gate: Arc<CapacityGate>,
pub(crate) is_rendezvous: bool,
}
impl Drop for Permit {
fn drop(&mut self) {
if !self.is_rendezvous {
self.gate.release();
}
}
}
pub(crate) struct BoundedMessage<T> {
pub(crate) value: T,
pub(crate) _permit: Permit,
}
#[derive(Debug)]
pub(crate) struct BoundedMpscShared<T: Send> {
pub(crate) gate: Arc<CapacityGate>,
pub(crate) channel: Arc<unbounded::MpscShared<BoundedMessage<T>>>,
}
#[derive(Debug)]
pub struct Sender<T: Send> {
pub(crate) shared: Arc<BoundedMpscShared<T>>,
pub(crate) closed: AtomicBool,
}
#[derive(Debug)]
pub struct Receiver<T: Send> {
pub(crate) shared: Arc<BoundedMpscShared<T>>,
pub(crate) closed: AtomicBool,
}
impl<T: Send> Sender<T> {
pub fn send(&self, value: T) -> Result<(), SendError> {
if self.closed.load(Ordering::Relaxed)
|| self.shared.channel.receiver_dropped.load(Ordering::Acquire)
{
return Err(SendError::Closed);
}
self.shared.gate.acquire_sync();
let permit = Permit {
gate: self.shared.gate.clone(),
is_rendezvous: self.capacity() == 0,
};
let message = BoundedMessage {
value,
_permit: permit,
};
if unbounded::send_internal(&self.shared.channel, message).is_err() {
return Err(SendError::Closed);
}
Ok(())
}
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
if self.closed.load(Ordering::Relaxed)
|| self.shared.channel.receiver_dropped.load(Ordering::Acquire)
{
return Err(TrySendError::Closed(value));
}
if !self.shared.gate.try_acquire() {
return Err(TrySendError::Full(value));
}
let permit = Permit {
gate: self.shared.gate.clone(),
is_rendezvous: self.capacity() == 0,
};
let message = BoundedMessage {
value,
_permit: permit,
};
if let Err(msg) = unbounded::send_internal(&self.shared.channel, message) {
return Err(TrySendError::Closed(msg.value));
}
Ok(())
}
pub fn clone(&self) -> Self {
self
.shared
.channel
.sender_count
.fetch_add(1, Ordering::Relaxed);
Self {
shared: self.shared.clone(),
closed: AtomicBool::new(false),
}
}
pub fn is_closed(&self) -> bool {
self.shared.channel.receiver_dropped.load(Ordering::Acquire)
}
pub fn len(&self) -> usize {
self.shared.channel.current_len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.shared.gate.capacity()
}
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
pub fn to_async(self) -> AsyncSender<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncSender {
shared,
closed: AtomicBool::new(false),
}
}
}
impl<T: Send> Drop for Sender<T> {
fn drop(&mut self) {
if !self.closed.swap(true, Ordering::AcqRel) {
if self
.shared
.channel
.sender_count
.fetch_sub(1, Ordering::AcqRel)
== 1
{
self.shared.channel.wake_consumer();
self.shared.gate.release();
}
}
}
}
impl<T: Send> Receiver<T> {
pub fn recv(&self) -> Result<T, RecvError> {
if self.closed.load(Ordering::Relaxed) {
return Err(RecvError::Disconnected);
}
if self.capacity() == 0 {
self.shared.gate.release();
}
loop {
match self.try_recv_internal_no_release() {
Ok(value) => return Ok(value),
Err(TryRecvError::Disconnected) => return Err(RecvError::Disconnected),
Err(TryRecvError::Empty) => {}
}
let lf_shared = &self.shared.channel;
*lf_shared.consumer_thread.lock().unwrap() = Some(thread::current());
lf_shared.consumer_parked.store(true, Ordering::Release);
match self.try_recv_internal_no_release() {
Ok(value) => {
if lf_shared
.consumer_parked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
*lf_shared.consumer_thread.lock().unwrap() = None;
}
return Ok(value);
}
Err(TryRecvError::Disconnected) => {
if lf_shared
.consumer_parked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
*lf_shared.consumer_thread.lock().unwrap() = None;
}
return Err(RecvError::Disconnected);
}
Err(TryRecvError::Empty) => {
sync_util::park_thread();
if lf_shared
.consumer_parked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
*lf_shared.consumer_thread.lock().unwrap() = None;
}
}
}
}
}
fn try_recv_internal_no_release(&self) -> Result<T, TryRecvError> {
if self.closed.load(Ordering::Relaxed) {
return Err(TryRecvError::Disconnected);
}
self.shared.channel.try_recv_internal().map(|msg| msg.value)
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
if self.closed.load(Ordering::Relaxed) {
return Err(TryRecvError::Disconnected);
}
if self.capacity() == 0 {
self.shared.gate.release();
}
self.shared.channel.try_recv_internal().map(|msg| msg.value)
}
pub fn is_closed(&self) -> bool {
let chan = &self.shared.channel;
chan.sender_count.load(Ordering::Acquire) == 0 && self.is_empty()
}
pub fn len(&self) -> usize {
self.shared.channel.current_len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.shared.gate.capacity()
}
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
pub fn to_async(self) -> AsyncReceiver<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncReceiver {
shared,
closed: AtomicBool::new(false),
}
}
}
impl<T: Send> Drop for Receiver<T> {
fn drop(&mut self) {
if !self.closed.swap(true, Ordering::AcqRel) {
self
.shared
.channel
.receiver_dropped
.store(true, Ordering::Release);
while self.shared.channel.try_recv_internal().is_ok() {}
self.shared.gate.release();
}
}
}