use crate::error::{TryRecvError, TrySendError};
use crate::internal::blocked_deque::BlockedVecDeque;
use crate::internal::cache_padded::CachePadded;
use crate::RecvError;
use core::task::{Context, Poll};
use std::collections::VecDeque;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Waker;
use std::thread::Thread;
#[derive(Debug)]
pub(crate) struct SyncSenderWaiter<T> {
pub(crate) thread: Thread,
pub(crate) data: Option<T>,
pub(crate) done: *const CachePadded<AtomicBool>,
}
unsafe impl<T: Send> Send for SyncSenderWaiter<T> {}
unsafe impl<T: Send> Sync for SyncSenderWaiter<T> {}
impl<T> SyncSenderWaiter<T> {
pub(crate) fn take_item_from_sender_slot(&mut self) -> Option<T> {
self.data.take()
}
}
#[derive(Debug)]
pub(crate) struct SyncReceiverWaiter {
pub(crate) thread: Thread,
pub(crate) done: *const CachePadded<AtomicBool>,
}
unsafe impl Send for SyncReceiverWaiter {}
unsafe impl Sync for SyncReceiverWaiter {}
#[derive(Debug)]
pub(crate) struct AsyncSenderWaiter<T> {
pub(crate) waker: Waker,
pub(crate) data: Option<T>,
}
impl<T> AsyncSenderWaiter<T> {
pub(crate) fn take_item_from_sender_slot(&mut self) -> Option<T> {
self.data.take()
}
}
#[derive(Debug)]
pub(crate) struct AsyncReceiverWaiter {
pub(crate) waker: Waker,
}
#[derive(Debug)]
pub(crate) struct MpmcChannelInternal<T> {
pub(crate) queue: VecDeque<T>,
pub(crate) waiting_sync_senders: BlockedVecDeque<SyncSenderWaiter<T>, 16>,
pub(crate) waiting_async_senders: BlockedVecDeque<AsyncSenderWaiter<T>, 16>,
pub(crate) waiting_sync_receivers: BlockedVecDeque<SyncReceiverWaiter, 16>,
pub(crate) waiting_async_receivers: BlockedVecDeque<AsyncReceiverWaiter, 16>,
pub(crate) sender_count: usize,
pub(crate) receiver_count: usize,
}
type TrySendFn<T> = fn(&MpmcShared<T>, T) -> Result<(), TrySendError<T>>;
type TryRecvFn<T> = fn(&MpmcShared<T>) -> Result<T, TryRecvError>;
#[derive(Debug)]
pub(crate) struct MpmcShared<T> {
pub(crate) internal: CachePadded<Mutex<MpmcChannelInternal<T>>>,
pub(crate) capacity: usize,
pub(crate) try_send: TrySendFn<T>,
pub(crate) try_recv: TryRecvFn<T>,
pub(crate) sender_count: AtomicUsize,
}
unsafe impl<T: Send> Send for MpmcShared<T> {}
unsafe impl<T: Send> Sync for MpmcShared<T> {}
impl<T: Send> MpmcShared<T> {
pub(crate) fn new(capacity: usize) -> Self {
let (try_send, try_recv) = if capacity == 0 {
(try_send_rendezvous::<T> as TrySendFn<T>, try_recv_rendezvous::<T> as TryRecvFn<T>)
} else if capacity == usize::MAX {
(try_send_unbounded::<T> as TrySendFn<T>, try_recv_unbounded::<T> as TryRecvFn<T>)
} else {
(try_send_buffered::<T> as TrySendFn<T>, try_recv_buffered::<T> as TryRecvFn<T>)
};
const PREALLOC_LIMIT: usize = 10240;
let queue = {
if capacity > 0 && capacity < PREALLOC_LIMIT {
VecDeque::with_capacity(capacity)
} else {
VecDeque::new()
}
};
MpmcShared {
internal: CachePadded::new(Mutex::new(MpmcChannelInternal {
queue,
waiting_sync_senders: BlockedVecDeque::new(),
waiting_async_senders: BlockedVecDeque::new(),
waiting_sync_receivers: BlockedVecDeque::new(),
waiting_async_receivers: BlockedVecDeque::new(),
sender_count: 1, receiver_count: 1,
})),
capacity,
try_send,
try_recv,
sender_count: AtomicUsize::new(1),
}
}
pub(crate) fn try_send_core(&self, item: T) -> Result<(), TrySendError<T>> {
(self.try_send)(self, item)
}
pub(crate) fn try_recv_core(&self) -> Result<T, TryRecvError> {
(self.try_recv)(self)
}
pub(crate) fn poll_recv_internal(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
'poll_loop: loop {
match self.try_recv_core() {
Ok(item) => {
return Poll::Ready(Ok(item));
}
Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError::Disconnected)),
Err(TryRecvError::Empty) => { }
}
{
let mut guard = self.internal.lock();
if !guard.queue.is_empty()
|| (self.capacity == 0
&& (!guard.waiting_sync_senders.is_empty() || !guard.waiting_async_senders.is_empty()))
{
drop(guard);
continue 'poll_loop; }
if guard.sender_count == 0 {
return Poll::Ready(Err(RecvError::Disconnected));
}
let new_waker = cx.waker();
let mut found_existing = false;
for existing_waiter in guard.waiting_async_receivers.iter_mut() {
if existing_waiter.waker.will_wake(new_waker) {
existing_waiter.waker = new_waker.clone(); found_existing = true;
break;
}
}
if !found_existing {
let waiter = AsyncReceiverWaiter {
waker: new_waker.clone(),
};
guard.waiting_async_receivers.push_back(waiter);
}
return Poll::Pending;
}
}
}
}
impl<T> Drop for MpmcShared<T> {
fn drop(&mut self) {
if let Some(mut guard) = self.internal.try_lock() {
guard.queue.clear();
let _ = guard.waiting_sync_senders.drain().collect::<Vec<_>>();
let _ = guard.waiting_async_senders.drain().collect::<Vec<_>>();
}
}
}
fn try_send_rendezvous<T: Send>(shared: &MpmcShared<T>, item: T) -> Result<(), TrySendError<T>> {
let mut guard = shared.internal.lock();
if guard.receiver_count == 0 {
return Err(TrySendError::Closed(item));
}
if let Some(waiter) = guard.waiting_async_receivers.pop_front() {
guard.queue.push_back(item);
waiter.waker.wake();
return Ok(());
}
if let Some(waiter) = guard.waiting_sync_receivers.pop_front() {
guard.queue.push_back(item);
unsafe { (*waiter.done).store(true, Ordering::Release) };
waiter.thread.unpark();
return Ok(());
}
Err(TrySendError::Full(item))
}
fn try_send_buffered<T: Send>(shared: &MpmcShared<T>, item: T) -> Result<(), TrySendError<T>> {
let mut guard = shared.internal.lock();
if guard.receiver_count == 0 {
return Err(TrySendError::Closed(item));
}
let can_send = !guard.waiting_async_receivers.is_empty()
|| !guard.waiting_sync_receivers.is_empty()
|| guard.queue.len() < shared.capacity;
if can_send {
if let Some(waiter) = guard.waiting_async_receivers.pop_front() {
waiter.waker.wake();
} else if let Some(waiter) = guard.waiting_sync_receivers.pop_front() {
unsafe { (*waiter.done).store(true, Ordering::Release) };
waiter.thread.unpark();
}
guard.queue.push_back(item);
Ok(())
} else {
Err(TrySendError::Full(item))
}
}
fn try_send_unbounded<T: Send>(shared: &MpmcShared<T>, item: T) -> Result<(), TrySendError<T>> {
let mut guard = shared.internal.lock();
if guard.receiver_count == 0 {
return Err(TrySendError::Closed(item));
}
if let Some(waiter) = guard.waiting_async_receivers.pop_front() {
waiter.waker.wake();
} else if let Some(waiter) = guard.waiting_sync_receivers.pop_front() {
unsafe { (*waiter.done).store(true, Ordering::Release) };
waiter.thread.unpark();
}
guard.queue.push_back(item);
Ok(())
}
fn try_recv_rendezvous<T: Send>(shared: &MpmcShared<T>) -> Result<T, TryRecvError> {
let mut guard = shared.internal.lock();
if let Some(mut waiter) = guard.waiting_async_senders.pop_front() {
if let Some(item) = waiter.take_item_from_sender_slot() {
waiter.waker.wake();
return Ok(item);
}
guard.waiting_async_senders.push_front(waiter);
} else if let Some(mut waiter) = guard.waiting_sync_senders.pop_front() {
if let Some(item) = waiter.take_item_from_sender_slot() {
unsafe { (*waiter.done).store(true, Ordering::Release) };
waiter.thread.unpark();
return Ok(item);
}
guard.waiting_sync_senders.push_front(waiter);
}
if let Some(item) = guard.queue.pop_front() {
return Ok(item);
}
if guard.sender_count == 0 {
return Err(TryRecvError::Disconnected);
}
Err(TryRecvError::Empty)
}
fn try_recv_unbounded<T: Send>(shared: &MpmcShared<T>) -> Result<T, TryRecvError> {
let mut guard = shared.internal.lock();
if let Some(item) = guard.queue.pop_front() {
return Ok(item);
}
if guard.sender_count == 0 {
return Err(TryRecvError::Disconnected);
}
Err(TryRecvError::Empty)
}
fn try_recv_buffered<T: Send>(shared: &MpmcShared<T>) -> Result<T, TryRecvError> {
let mut guard = shared.internal.lock();
if let Some(item) = guard.queue.pop_front() {
if let Some(waiter) = guard.waiting_async_senders.pop_front() {
waiter.waker.wake();
} else if let Some(waiter) = guard.waiting_sync_senders.pop_front() {
unsafe { (*waiter.done).store(true, Ordering::Release) };
waiter.thread.unpark();
}
return Ok(item);
} else if guard.sender_count == 0 {
return Err(TryRecvError::Disconnected);
}
Err(TryRecvError::Empty)
}