use crate::error::{TryRecvError, TrySendError};
use crate::RecvError;
use core::future::PollFn;
use core::task::{Context, Poll};
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Waker;
use std::thread::Thread;
#[derive(Debug)]
pub(crate) enum WaiterData<T> {
SenderItem(Option<T>),
}
#[derive(Debug)]
pub(crate) struct SyncWaiter<T> {
pub(crate) thread: Thread,
pub(crate) data: Option<WaiterData<T>>,
pub(crate) done: Arc<AtomicBool>,
}
impl<T> SyncWaiter<T> {
pub(crate) fn take_item_from_sender_slot(&mut self) -> Option<T> {
if let Some(WaiterData::SenderItem(item_opt)) = self.data.as_mut() {
item_opt.take()
} else {
None
}
}
}
#[derive(Debug)]
pub(crate) struct AsyncWaiter<T> {
pub(crate) waker: Waker,
pub(crate) data: Option<WaiterData<T>>,
}
impl<T> AsyncWaiter<T> {
pub(crate) fn take_item_from_sender_slot(&mut self) -> Option<T> {
if let Some(WaiterData::SenderItem(item_opt)) = self.data.as_mut() {
item_opt.take()
} else {
None
}
}
}
#[derive(Debug)]
pub(crate) struct MpmcChannelInternal<T> {
pub(crate) queue: VecDeque<T>,
pub(crate) waiting_sync_senders: VecDeque<SyncWaiter<T>>,
pub(crate) waiting_async_senders: VecDeque<AsyncWaiter<T>>,
pub(crate) waiting_sync_receivers: VecDeque<SyncWaiter<T>>,
pub(crate) waiting_async_receivers: VecDeque<AsyncWaiter<T>>,
pub(crate) sender_count: usize,
pub(crate) receiver_count: usize,
}
#[derive(Debug)]
pub(crate) struct MpmcShared<T> {
pub(crate) internal: Mutex<MpmcChannelInternal<T>>,
pub(crate) capacity: usize,
}
unsafe impl<T: Send> Send for MpmcShared<T> {}
unsafe impl<T: Send> Sync for MpmcShared<T> {}
impl<T: Send> MpmcShared<T> {
pub(crate) fn new(capacity: usize) -> Self {
MpmcShared {
internal: Mutex::new(MpmcChannelInternal {
queue: VecDeque::with_capacity(if capacity == usize::MAX { 32 } else { capacity }),
waiting_sync_senders: VecDeque::new(),
waiting_async_senders: VecDeque::new(),
waiting_sync_receivers: VecDeque::new(),
waiting_async_receivers: VecDeque::new(),
sender_count: 1, receiver_count: 1,
}),
capacity,
}
}
pub(crate) fn try_send_core(&self, item: T) -> Result<(), TrySendError<T>> {
let mut guard = self.internal.lock();
if guard.receiver_count == 0 {
return Err(TrySendError::Closed(item));
}
if let Some(waiter) = guard.waiting_async_receivers.pop_front() {
guard.queue.push_back(item); waiter.waker.wake();
return Ok(());
}
if let Some(waiter) = guard.waiting_sync_receivers.pop_front() {
guard.queue.push_back(item);
waiter.done.store(true, Ordering::Release);
waiter.thread.unpark();
return Ok(());
}
if self.capacity == 0 {
return Err(TrySendError::Full(item));
}
if self.capacity == usize::MAX || guard.queue.len() < self.capacity {
guard.queue.push_back(item);
return Ok(());
}
Err(TrySendError::Full(item))
}
pub(crate) fn try_recv_core(&self) -> Result<T, TryRecvError> {
let mut guard = self.internal.lock();
if let Some(mut waiter) = guard.waiting_async_senders.pop_front() {
if let Some(item) = waiter.take_item_from_sender_slot() {
waiter.waker.wake();
return Ok(item);
}
guard.waiting_async_senders.push_front(waiter);
}
if let Some(mut waiter) = guard.waiting_sync_senders.pop_front() {
if let Some(item) = waiter.take_item_from_sender_slot() {
waiter.done.store(true, Ordering::Release);
waiter.thread.unpark();
return Ok(item);
}
guard.waiting_sync_senders.push_front(waiter);
}
if let Some(item) = guard.queue.pop_front() {
if let Some(waiter) = guard.waiting_async_senders.pop_front() {
waiter.waker.wake();
} else if let Some(waiter) = guard.waiting_sync_senders.pop_front() {
waiter.done.store(true, Ordering::Release);
waiter.thread.unpark();
}
return Ok(item);
}
if guard.sender_count == 0 {
return Err(TryRecvError::Disconnected);
}
Err(TryRecvError::Empty)
}
pub(crate) fn poll_recv_internal(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
'poll_loop: loop {
match self.try_recv_core() {
Ok(item) => {
return Poll::Ready(Ok(item));
}
Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError::Disconnected)),
Err(TryRecvError::Empty) => { }
}
{
let mut guard = self.internal.lock();
if !guard.queue.is_empty()
|| (self.capacity == 0
&& (!guard.waiting_sync_senders.is_empty() || !guard.waiting_async_senders.is_empty()))
{
drop(guard);
continue 'poll_loop; }
if guard.sender_count == 0 {
return Poll::Ready(Err(RecvError::Disconnected));
}
let new_waker = cx.waker();
if let Some(existing_waiter) = guard
.waiting_async_receivers
.iter_mut()
.find(|w| w.waker.will_wake(new_waker))
{
existing_waiter.waker = new_waker.clone(); } else {
let waiter = AsyncWaiter {
waker: new_waker.clone(),
data: None, };
guard.waiting_async_receivers.push_back(waiter);
}
return Poll::Pending;
}
}
}
}
impl<T> Drop for MpmcShared<T> {
fn drop(&mut self) {
if let Some(mut guard) = self.internal.try_lock() {
guard.queue.clear();
for mut waiter in guard.waiting_sync_senders.drain(..) {
if let Some(_item) = waiter.take_item_from_sender_slot() {
}
}
for mut waiter in guard.waiting_async_senders.drain(..) {
if let Some(_item) = waiter.take_item_from_sender_slot() {
}
}
}
}
}