use crate::error::{
CloseError, RecvError, RecvErrorTimeout, SendError, TryRecvError, TrySendError,
};
pub use async_impl::{RecvFuture, SendFuture};
mod async_impl;
mod backoff;
mod core;
mod sync_impl;
use self::core::MpmcShared;
use ::core::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Debug)]
pub struct Sender<T: Send> {
shared: Arc<MpmcShared<T>>,
closed: AtomicBool,
}
#[derive(Debug)]
pub struct Receiver<T: Send> {
shared: Arc<MpmcShared<T>>,
closed: AtomicBool,
}
#[derive(Debug)]
pub struct AsyncSender<T: Send> {
shared: Arc<MpmcShared<T>>,
closed: AtomicBool,
}
#[derive(Debug)]
pub struct AsyncReceiver<T: Send> {
shared: Arc<MpmcShared<T>>,
closed: AtomicBool,
}
pub fn bounded<T: Send>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(MpmcShared::new(capacity));
(
Sender {
shared: Arc::clone(&shared),
closed: AtomicBool::new(false),
},
Receiver {
shared,
closed: AtomicBool::new(false),
},
)
}
pub fn unbounded<T: Send>() -> (Sender<T>, Receiver<T>) {
bounded(usize::MAX)
}
pub fn bounded_async<T: Send>(capacity: usize) -> (AsyncSender<T>, AsyncReceiver<T>) {
let shared = Arc::new(MpmcShared::new(capacity));
(
AsyncSender {
shared: Arc::clone(&shared),
closed: AtomicBool::new(false),
},
AsyncReceiver {
shared,
closed: AtomicBool::new(false),
},
)
}
pub fn unbounded_async<T: Send>() -> (AsyncSender<T>, AsyncReceiver<T>) {
bounded_async(usize::MAX)
}
impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Self {
self.shared.internal.lock().sender_count += 1;
self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
Sender {
shared: Arc::clone(&self.shared),
closed: AtomicBool::new(false),
}
}
}
impl<T: Send> Clone for Receiver<T> {
fn clone(&self) -> Self {
self.shared.internal.lock().receiver_count += 1;
Receiver {
shared: Arc::clone(&self.shared),
closed: AtomicBool::new(false),
}
}
}
impl<T: Send> Clone for AsyncSender<T> {
fn clone(&self) -> Self {
self.shared.internal.lock().sender_count += 1;
self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
AsyncSender {
shared: Arc::clone(&self.shared),
closed: AtomicBool::new(false),
}
}
}
impl<T: Send> Clone for AsyncReceiver<T> {
fn clone(&self) -> Self {
self.shared.internal.lock().receiver_count += 1;
AsyncReceiver {
shared: Arc::clone(&self.shared),
closed: AtomicBool::new(false),
}
}
}
impl<T: Send> Sender<T> {
pub fn send(&self, item: T) -> Result<(), SendError> {
if self.closed.load(Ordering::Relaxed) {
return Err(SendError::Closed);
}
sync_impl::send_sync(self, item)
}
pub fn try_send(&self, item: T) -> Result<(), TrySendError<T>> {
if self.closed.load(Ordering::Relaxed) {
return Err(TrySendError::Closed(item));
}
self.shared.try_send_core(item)
}
pub fn close(&self) -> Result<(), CloseError> {
if self
.closed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
self.close_internal();
Ok(())
} else {
Err(CloseError)
}
}
fn close_internal(&self) {
let sync_waiters;
let async_waiters;
{
let mut guard = self.shared.internal.lock();
guard.sender_count -= 1;
self.shared.sender_count.store(guard.sender_count, Ordering::Relaxed);
if guard.sender_count == 0 { sync_waiters = guard.waiting_sync_receivers.drain().collect::<Vec<_>>();
async_waiters = guard.waiting_async_receivers.drain().collect::<Vec<_>>();
} else {
return;
}
}
for waiter in sync_waiters {
unsafe { (*waiter.done).store(true, Ordering::Release) };
waiter.thread.unpark();
}
for waiter in async_waiters {
waiter.waker.wake();
}
}
pub fn is_closed(&self) -> bool {
self.shared.internal.lock().receiver_count == 0
}
pub fn capacity(&self) -> Option<usize> {
if self.shared.capacity == usize::MAX {
None
} else {
Some(self.shared.capacity)
}
}
pub fn to_async(self) -> AsyncSender<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncSender {
shared,
closed: AtomicBool::new(false),
}
}
#[inline]
pub fn len(&self) -> usize {
self.shared.internal.lock().queue.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
if self.shared.capacity == usize::MAX {
false
} else {
self.len() == self.shared.capacity
}
}
}
impl<T: Send> Drop for Sender<T> {
fn drop(&mut self) {
let _ = self.close();
}
}
impl<T: Send> Receiver<T> {
pub fn recv(&self) -> Result<T, RecvError> {
if self.closed.load(Ordering::Relaxed) {
return Err(RecvError::Disconnected);
}
sync_impl::recv_sync(self)
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
if self.closed.load(Ordering::Relaxed) {
return Err(TryRecvError::Disconnected);
}
self.shared.try_recv_core()
}
pub fn recv_timeout(&self, timeout: std::time::Duration) -> Result<T, RecvErrorTimeout> {
if self.closed.load(Ordering::Relaxed) {
}
sync_impl::recv_timeout_sync(self, timeout)
}
pub fn close(&self) -> Result<(), CloseError> {
if self
.closed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
self.close_internal();
Ok(())
} else {
Err(CloseError)
}
}
fn close_internal(&self) {
let sync_waiters;
let async_waiters;
{
let mut guard = self.shared.internal.lock();
guard.receiver_count -= 1;
if guard.receiver_count == 0 { sync_waiters = guard.waiting_sync_senders.drain().collect::<Vec<_>>();
async_waiters = guard.waiting_async_senders.drain().collect::<Vec<_>>();
} else {
sync_waiters = guard.waiting_sync_senders.pop_front().into_iter().collect();
async_waiters = guard
.waiting_async_senders
.pop_front()
.into_iter()
.collect();
}
}
for waiter in sync_waiters {
unsafe { (*waiter.done).store(true, Ordering::Release) };
waiter.thread.unpark();
}
for waiter in async_waiters {
waiter.waker.wake();
}
}
pub fn is_closed(&self) -> bool {
let guard = self.shared.internal.lock();
guard.sender_count == 0
&& guard.queue.is_empty()
&& guard.waiting_sync_senders.is_empty()
&& guard.waiting_async_senders.is_empty()
}
pub fn capacity(&self) -> Option<usize> {
if self.shared.capacity == usize::MAX {
None
} else {
Some(self.shared.capacity)
}
}
pub fn to_async(self) -> AsyncReceiver<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncReceiver {
shared,
closed: AtomicBool::new(false),
}
}
#[inline]
pub fn len(&self) -> usize {
self.shared.internal.lock().queue.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
if self.shared.capacity == usize::MAX {
false
} else {
self.len() == self.shared.capacity
}
}
}
impl<T: Send> Drop for Receiver<T> {
fn drop(&mut self) {
let _ = self.close();
}
}
impl<T: Send> AsyncSender<T> {
pub fn send(&self, item: T) -> SendFuture<'_, T> {
if self.closed.load(Ordering::Relaxed) {
}
async_impl::SendFuture::new(self, item)
}
pub fn try_send(&self, item: T) -> Result<(), TrySendError<T>> {
if self.closed.load(Ordering::Relaxed) {
return Err(TrySendError::Closed(item));
}
self.shared.try_send_core(item)
}
pub fn close(&self) -> Result<(), CloseError> {
if self
.closed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
self.close_internal();
Ok(())
} else {
Err(CloseError)
}
}
fn close_internal(&self) {
let sync_waiters;
let async_waiters;
{
let mut guard = self.shared.internal.lock();
guard.sender_count -= 1;
self.shared.sender_count.store(guard.sender_count, Ordering::Relaxed);
if guard.sender_count == 0 { sync_waiters = guard.waiting_sync_receivers.drain().collect::<Vec<_>>();
async_waiters = guard.waiting_async_receivers.drain().collect::<Vec<_>>();
} else {
return;
}
}
for waiter in sync_waiters {
unsafe { (*waiter.done).store(true, Ordering::Release) };
waiter.thread.unpark();
}
for waiter in async_waiters {
waiter.waker.wake();
}
}
pub fn is_closed(&self) -> bool {
self.shared.internal.lock().receiver_count == 0
}
pub fn capacity(&self) -> Option<usize> {
if self.shared.capacity == usize::MAX {
None
} else {
Some(self.shared.capacity)
}
}
pub fn to_sync(self) -> Sender<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
Sender {
shared,
closed: AtomicBool::new(false),
}
}
#[inline]
pub fn len(&self) -> usize {
self.shared.internal.lock().queue.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
if self.shared.capacity == usize::MAX {
false
} else {
self.len() == self.shared.capacity
}
}
}
impl<T: Send> Drop for AsyncSender<T> {
fn drop(&mut self) {
let _ = self.close();
}
}
impl<T: Send> AsyncReceiver<T> {
pub fn recv(&self) -> RecvFuture<'_, T> {
async_impl::RecvFuture::new(self)
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
if self.closed.load(Ordering::Relaxed) {
return Err(TryRecvError::Disconnected);
}
self.shared.try_recv_core()
}
pub fn close(&self) -> Result<(), CloseError> {
if self
.closed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
self.close_internal();
Ok(())
} else {
Err(CloseError)
}
}
fn close_internal(&self) {
let sync_waiters;
let async_waiters;
{
let mut guard = self.shared.internal.lock();
guard.receiver_count -= 1;
if guard.receiver_count == 0 { sync_waiters = guard.waiting_sync_senders.drain().collect::<Vec<_>>();
async_waiters = guard.waiting_async_senders.drain().collect::<Vec<_>>();
} else {
sync_waiters = guard.waiting_sync_senders.pop_front().into_iter().collect();
async_waiters = guard
.waiting_async_senders
.pop_front()
.into_iter()
.collect();
}
}
for waiter in sync_waiters {
unsafe { (*waiter.done).store(true, Ordering::Release) };
waiter.thread.unpark();
}
for waiter in async_waiters {
waiter.waker.wake();
}
}
pub fn is_closed(&self) -> bool {
let guard = self.shared.internal.lock();
guard.sender_count == 0
&& guard.queue.is_empty()
&& guard.waiting_sync_senders.is_empty()
&& guard.waiting_async_senders.is_empty()
}
pub fn capacity(&self) -> Option<usize> {
if self.shared.capacity == usize::MAX {
None
} else {
Some(self.shared.capacity)
}
}
pub fn to_sync(self) -> Receiver<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
Receiver {
shared,
closed: AtomicBool::new(false),
}
}
#[inline]
pub fn len(&self) -> usize {
self.shared.internal.lock().queue.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
if self.shared.capacity == usize::MAX {
false
} else {
self.len() == self.shared.capacity
}
}
}
impl<T: Send> Drop for AsyncReceiver<T> {
fn drop(&mut self) {
let _ = self.close();
}
}