use crate::coord::CapacityGate;
use crate::error::{CloseError, RecvError, SendError, TryRecvError, TrySendError};
use crate::mpsc::unbounded_v2;
use crate::{sync_util, RecvErrorTimeout};
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use super::bounded_async::{AsyncReceiver, AsyncSender};
#[derive(Debug)]
pub(crate) struct Permit {
pub(crate) gate: Arc<CapacityGate>,
pub(crate) is_rendezvous: bool,
}
impl Drop for Permit {
fn drop(&mut self) {
if !self.is_rendezvous {
self.gate.release();
}
}
}
pub(crate) struct BoundedMessage<T> {
pub(crate) value: T,
pub(crate) _permit: Permit,
}
#[derive(Debug)]
pub(crate) struct BoundedMpscShared<T: Send> {
pub(crate) gate: Arc<CapacityGate>,
pub(crate) channel: Arc<unbounded_v2::MpscShared<BoundedMessage<T>>>,
}
#[derive(Debug)]
pub struct Sender<T: Send> {
pub(crate) shared: Arc<BoundedMpscShared<T>>,
pub(crate) closed: AtomicBool,
}
#[derive(Debug)]
pub struct Receiver<T: Send> {
pub(crate) shared: Arc<BoundedMpscShared<T>>,
pub(crate) closed: AtomicBool,
}
impl<T: Send> Sender<T> {
pub fn send(&self, value: T) -> Result<(), SendError> {
if self.closed.load(Ordering::Relaxed)
|| self.shared.channel.receiver_dropped.load(Ordering::Acquire)
{
return Err(SendError::Closed);
}
self.shared.gate.acquire_sync();
let permit = Permit {
gate: self.shared.gate.clone(),
is_rendezvous: self.capacity() == 0,
};
let message = BoundedMessage {
value,
_permit: permit,
};
let mut cache = None;
if unbounded_v2::send_internal(&self.shared.channel, message, &mut cache).is_err() {
return Err(SendError::Closed);
}
Ok(())
}
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
if self.closed.load(Ordering::Relaxed)
|| self.shared.channel.receiver_dropped.load(Ordering::Acquire)
{
return Err(TrySendError::Closed(value));
}
if !self.shared.gate.try_acquire() {
return Err(TrySendError::Full(value));
}
let permit = Permit {
gate: self.shared.gate.clone(),
is_rendezvous: self.capacity() == 0,
};
let message = BoundedMessage {
value,
_permit: permit,
};
let mut cache = None;
if let Err(msg) = unbounded_v2::send_internal(&self.shared.channel, message, &mut cache) {
return Err(TrySendError::Closed(msg.value));
}
Ok(())
}
pub fn close(&self) -> Result<(), CloseError> {
if self
.closed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
self.close_internal();
Ok(())
} else {
Err(CloseError)
}
}
fn close_internal(&self) {
if self
.shared
.channel
.sender_count
.fetch_sub(1, Ordering::AcqRel)
== 1
{
self.shared.channel.wake_consumer();
self.shared.gate.release();
}
}
pub fn is_closed(&self) -> bool {
self.shared.channel.receiver_dropped.load(Ordering::Acquire)
}
pub fn sender_count(&self) -> usize {
self.shared.channel.sender_count.load(Ordering::Relaxed)
}
pub fn len(&self) -> usize {
self.shared.channel.current_len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.shared.gate.capacity()
}
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
pub fn to_async(self) -> AsyncSender<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncSender {
shared,
closed: AtomicBool::new(false),
}
}
}
impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Self {
self
.shared
.channel
.sender_count
.fetch_add(1, Ordering::Relaxed);
Self {
shared: self.shared.clone(),
closed: AtomicBool::new(false),
}
}
}
impl<T: Send> Drop for Sender<T> {
fn drop(&mut self) {
if !self.closed.swap(true, Ordering::AcqRel) {
self.close_internal();
}
}
}
impl<T: Send> Receiver<T> {
pub fn recv(&self) -> Result<T, RecvError> {
if self.closed.load(Ordering::Relaxed) {
return Err(RecvError::Disconnected);
}
if self.capacity() == 0 {
self.shared.gate.release();
}
loop {
match self.try_recv_internal_no_release() {
Ok(value) => return Ok(value),
Err(TryRecvError::Disconnected) => return Err(RecvError::Disconnected),
Err(TryRecvError::Empty) => {}
}
let lf_shared = &self.shared.channel;
*lf_shared.consumer_thread.lock().unwrap() = Some(thread::current());
lf_shared.consumer_parked.store(true, Ordering::Release);
match self.try_recv_internal_no_release() {
Ok(value) => {
if lf_shared
.consumer_parked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
*lf_shared.consumer_thread.lock().unwrap() = None;
}
return Ok(value);
}
Err(TryRecvError::Disconnected) => {
if lf_shared
.consumer_parked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
*lf_shared.consumer_thread.lock().unwrap() = None;
}
return Err(RecvError::Disconnected);
}
Err(TryRecvError::Empty) => {
sync_util::park_thread();
if lf_shared
.consumer_parked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
*lf_shared.consumer_thread.lock().unwrap() = None;
}
}
}
}
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvErrorTimeout> {
if self.closed.load(Ordering::Relaxed) {
return Err(RecvErrorTimeout::Disconnected);
}
let start_time = Instant::now();
if self.capacity() == 0 {
self.shared.gate.release();
}
match self.try_recv_internal_no_release() {
Ok(value) => return Ok(value),
Err(TryRecvError::Disconnected) => return Err(RecvErrorTimeout::Disconnected),
Err(TryRecvError::Empty) => {} }
loop {
let elapsed = start_time.elapsed();
if elapsed >= timeout {
return Err(RecvErrorTimeout::Timeout);
}
let remaining_timeout = timeout - elapsed;
if self.capacity() == 0 {
self.shared.gate.release();
}
let lf_shared = &self.shared.channel;
*lf_shared.consumer_thread.lock().unwrap() = Some(thread::current());
lf_shared.consumer_parked.store(true, Ordering::Release);
match self.try_recv_internal_no_release() {
Ok(value) => {
if lf_shared
.consumer_parked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
*lf_shared.consumer_thread.lock().unwrap() = None;
}
return Ok(value);
}
Err(TryRecvError::Disconnected) => {
if lf_shared
.consumer_parked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
*lf_shared.consumer_thread.lock().unwrap() = None;
}
return Err(RecvErrorTimeout::Disconnected);
}
Err(TryRecvError::Empty) => {
sync_util::park_thread_timeout(remaining_timeout);
if lf_shared
.consumer_parked
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
*lf_shared.consumer_thread.lock().unwrap() = None;
}
}
}
match self.try_recv_internal_no_release() {
Ok(value) => return Ok(value),
Err(TryRecvError::Disconnected) => return Err(RecvErrorTimeout::Disconnected),
Err(TryRecvError::Empty) => {} }
}
}
fn try_recv_internal_no_release(&self) -> Result<T, TryRecvError> {
if self.closed.load(Ordering::Relaxed) {
return Err(TryRecvError::Disconnected);
}
self.shared.channel.try_recv_internal().map(|msg| msg.value)
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
if self.closed.load(Ordering::Relaxed) {
return Err(TryRecvError::Disconnected);
}
if self.capacity() == 0 {
self.shared.gate.release();
}
self.shared.channel.try_recv_internal().map(|msg| msg.value)
}
pub fn close(&self) -> Result<(), CloseError> {
if self
.closed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
self.close_internal();
Ok(())
} else {
Err(CloseError)
}
}
fn close_internal(&self) {
self
.shared
.channel
.receiver_dropped
.store(true, Ordering::Release);
while self.shared.channel.try_recv_internal().is_ok() {}
self.shared.gate.close();
}
pub fn is_closed(&self) -> bool {
let chan = &self.shared.channel;
chan.sender_count.load(Ordering::Acquire) == 0 && self.is_empty()
}
pub fn sender_count(&self) -> usize {
self.shared.channel.sender_count.load(Ordering::Relaxed)
}
pub fn len(&self) -> usize {
self.shared.channel.current_len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.shared.gate.capacity()
}
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
pub fn to_async(self) -> AsyncReceiver<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncReceiver {
shared,
closed: AtomicBool::new(false),
}
}
}
impl<T: Send> Drop for Receiver<T> {
fn drop(&mut self) {
if !self.closed.swap(true, Ordering::AcqRel) {
self.close_internal();
}
}
}