use crate::{
loom::{atomic::AtomicUsize, hint},
recycling::{take, Recycle},
wait::{Notify, WaitCell, WaitQueue, WaitResult},
Core, Ref, Slot,
};
use core::{fmt, task::Poll};
pub mod errors;
use self::errors::{TryRecvError, TrySendError};
#[derive(Debug)]
struct ChannelCore<N> {
core: Core,
rx_wait: WaitCell<N>,
tx_count: AtomicUsize,
tx_wait: WaitQueue<N>,
}
struct SendRefInner<'a, T, N: Notify> {
slot: Ref<'a, T>,
_notify: NotifyRx<'a, N>,
}
struct RecvRefInner<'a, T, N: Notify + Unpin> {
slot: Ref<'a, T>,
_notify: crate::mpsc::NotifyTx<'a, N>,
}
struct NotifyRx<'a, N: Notify>(&'a WaitCell<N>);
struct NotifyTx<'a, N: Notify + Unpin>(&'a WaitQueue<N>);
impl<N> ChannelCore<N> {
#[cfg(not(loom))]
const fn new(capacity: usize) -> Self {
Self {
core: Core::new(capacity),
rx_wait: WaitCell::new(),
tx_count: AtomicUsize::new(1),
tx_wait: WaitQueue::new(),
}
}
#[cfg(loom)]
fn new(capacity: usize) -> Self {
Self {
core: Core::new(capacity),
rx_wait: WaitCell::new(),
tx_count: AtomicUsize::new(1),
tx_wait: WaitQueue::new(),
}
}
}
impl<N> ChannelCore<N>
where
N: Notify + Unpin,
{
fn close_rx(&self) {
if self.core.close() {
crate::loom::hint::spin_loop();
test_println!("draining_queue");
self.tx_wait.close();
}
}
}
impl<N> ChannelCore<N>
where
N: Notify + Unpin,
{
fn try_send_ref<'a, T, R>(
&'a self,
slots: &'a [Slot<T>],
recycle: &R,
) -> Result<SendRefInner<'a, T, N>, TrySendError>
where
R: Recycle<T>,
{
self.core.push_ref(slots, recycle).map(|slot| SendRefInner {
_notify: NotifyRx(&self.rx_wait),
slot,
})
}
fn try_send<T, R>(&self, slots: &[Slot<T>], val: T, recycle: &R) -> Result<(), TrySendError<T>>
where
R: Recycle<T>,
{
match self.try_send_ref(slots, recycle) {
Ok(mut slot) => {
slot.with_mut(|slot| *slot = val);
Ok(())
}
Err(e) => Err(e.with_value(val)),
}
}
fn try_recv_ref<'a, T>(
&'a self,
slots: &'a [Slot<T>],
) -> Result<RecvRefInner<'a, T, N>, TryRecvError> {
self.core.pop_ref(slots).map(|slot| RecvRefInner {
_notify: NotifyTx(&self.tx_wait),
slot,
})
}
fn try_recv<T, R>(&self, slots: &[Slot<T>], recycle: &R) -> Result<T, TryRecvError>
where
R: Recycle<T>,
{
match self.try_recv_ref(slots) {
Ok(mut slot) => Ok(take(&mut *slot, recycle)),
Err(e) => Err(e),
}
}
fn poll_recv_ref<'a, T>(
&'a self,
slots: &'a [Slot<T>],
mk_waiter: impl Fn() -> N,
) -> Poll<Option<Ref<'a, T>>> {
macro_rules! try_poll_recv {
() => {
match self.core.pop_ref(slots) {
Ok(slot) => return Poll::Ready(Some(slot)),
Err(TryRecvError::Closed) => return Poll::Ready(None),
_ => {}
}
};
}
test_println!("poll_recv_ref");
loop {
test_println!("poll_recv_ref => loop");
try_poll_recv!();
match test_dbg!(self.rx_wait.wait_with(&mk_waiter)) {
WaitResult::Wait => {
try_poll_recv!();
test_println!("-> yield");
return Poll::Pending;
}
WaitResult::Closed => {
return Poll::Ready(self.core.pop_ref(slots).ok());
}
WaitResult::Notified => {
hint::spin_loop();
}
}
}
}
}
impl<T, N: Notify> core::ops::Deref for SendRefInner<'_, T, N> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
self.slot.deref()
}
}
impl<T, N: Notify> core::ops::DerefMut for SendRefInner<'_, T, N> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.slot.deref_mut()
}
}
impl<T, N: Notify> SendRefInner<'_, T, N> {
#[inline]
pub(crate) fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
self.slot.with(f)
}
#[inline]
pub(crate) fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
self.slot.with_mut(f)
}
}
impl<T: fmt::Debug, N: Notify> fmt::Debug for SendRefInner<'_, T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.with(|val| fmt::Debug::fmt(val, f))
}
}
impl<T: fmt::Display, N: Notify> fmt::Display for SendRefInner<'_, T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.with(|val| fmt::Display::fmt(val, f))
}
}
impl<T: fmt::Write, N: Notify> fmt::Write for SendRefInner<'_, T, N> {
#[inline]
fn write_str(&mut self, s: &str) -> fmt::Result {
self.with_mut(|val| val.write_str(s))
}
#[inline]
fn write_char(&mut self, c: char) -> fmt::Result {
self.with_mut(|val| val.write_char(c))
}
#[inline]
fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
self.with_mut(|val| val.write_fmt(f))
}
}
impl<T, N: Notify + Unpin> core::ops::Deref for RecvRefInner<'_, T, N> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
self.slot.deref()
}
}
impl<T, N: Notify + Unpin> core::ops::DerefMut for RecvRefInner<'_, T, N> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.slot.deref_mut()
}
}
impl<T: fmt::Debug, N: Notify + Unpin> fmt::Debug for RecvRefInner<'_, T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.slot.fmt(f)
}
}
impl<T: fmt::Display, N: Notify + Unpin> fmt::Display for RecvRefInner<'_, T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.slot.fmt(f)
}
}
impl<T: fmt::Write, N: Notify + Unpin> fmt::Write for RecvRefInner<'_, T, N> {
#[inline]
fn write_str(&mut self, s: &str) -> fmt::Result {
self.slot.write_str(s)
}
#[inline]
fn write_char(&mut self, c: char) -> fmt::Result {
self.slot.write_char(c)
}
#[inline]
fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
self.slot.write_fmt(f)
}
}
impl<N: Notify> Drop for NotifyRx<'_, N> {
#[inline]
fn drop(&mut self) {
test_println!("notifying rx ({})", core::any::type_name::<N>());
self.0.notify();
}
}
impl<N: Notify + Unpin> Drop for NotifyTx<'_, N> {
#[inline]
fn drop(&mut self) {
test_println!("notifying tx ({})", core::any::type_name::<N>());
self.0.notify();
}
}
macro_rules! impl_ref_inner {
($(#[$m:meta])*, $inner:ident, $name:ident, $notify:ty) => {
$(#[$m])*
pub struct $name<'a, T>($inner<'a, T, $notify>);
impl<T> core::ops::Deref for $name<'_, T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl<T> core::ops::DerefMut for $name<'_, T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}
impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl<T: fmt::Display> fmt::Display for $name<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl<T: fmt::Write> fmt::Write for $name<'_, T> {
#[inline]
fn write_str(&mut self, s: &str) -> fmt::Result {
self.0.write_str(s)
}
#[inline]
fn write_char(&mut self, c: char) -> fmt::Result {
self.0.write_char(c)
}
#[inline]
fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
self.0.write_fmt(f)
}
}
};
}
macro_rules! impl_send_ref {
($(#[$m:meta])* pub struct $name:ident<$notify:ty>;) => {
impl_ref_inner!($(#[$m])*, SendRefInner, $name, $notify);
};
}
macro_rules! impl_recv_ref {
($(#[$m:meta])* pub struct $name:ident<$notify:ty>;) => {
impl_ref_inner!($(#[$m])*, RecvRefInner, $name, $notify);
};
}
mod async_impl;
pub use self::async_impl::*;
feature! {
#![feature = "std"]
pub mod blocking;
}
#[cfg(all(loom, test))]
mod tests;