use crate::flavor::{FlavorMC, FlavorSelect};
use crate::select::SelectResult;
use crate::stream::AsyncStream;
#[cfg(feature = "trace_log")]
use crate::tokio_task_id;
use crate::{shared::*, trace_log, MRx, NotCloneable, ReceiverType, Rx};
use std::cell::Cell;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
pub struct AsyncRx<F: Flavor> {
pub(crate) shared: Arc<ChannelShared<F>>,
_phan: PhantomData<Cell<()>>,
}
unsafe impl<F: Flavor> Send for AsyncRx<F> {}
impl<F: Flavor> fmt::Debug for AsyncRx<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "AsyncRx{:p}", self)
}
}
impl<F: Flavor> fmt::Display for AsyncRx<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "AsyncRx{:p}", self)
}
}
impl<F: Flavor> Drop for AsyncRx<F> {
#[inline(always)]
fn drop(&mut self) {
self.shared.close_rx();
}
}
impl<F: Flavor> From<Rx<F>> for AsyncRx<F> {
fn from(value: Rx<F>) -> Self {
value.add_rx();
Self::new(value.shared.clone())
}
}
impl<F: Flavor> AsyncRx<F> {
#[inline]
pub(crate) fn new(shared: Arc<ChannelShared<F>>) -> Self {
Self { shared, _phan: Default::default() }
}
#[inline(always)]
pub fn is_disconnected(&self) -> bool {
self.shared.is_tx_closed()
}
#[inline]
pub fn into_stream(self) -> AsyncStream<F> {
AsyncStream::new(self)
}
#[inline]
pub fn into_blocking(self) -> Rx<F> {
self.into()
}
}
impl<F: Flavor> AsyncRx<F> {
#[inline(always)]
pub fn recv<'a>(&'a self) -> RecvFuture<'a, F> {
RecvFuture { rx: self, waker: None }
}
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
#[inline]
pub fn recv_timeout(
&self, duration: std::time::Duration,
) -> RecvTimeoutFuture<'_, F, tokio::time::Sleep, ()> {
let sleep = tokio::time::sleep(duration);
self.recv_with_timer(sleep)
}
#[cfg(feature = "async_std")]
#[cfg_attr(docsrs, doc(cfg(feature = "async_std")))]
#[inline]
pub fn recv_timeout(
&self, duration: std::time::Duration,
) -> RecvTimeoutFuture<'_, F, impl Future<Output = ()>, ()> {
let sleep = async_std::task::sleep(duration);
self.recv_with_timer(sleep)
}
#[inline]
pub fn recv_with_timer<'a, FR, R>(&'a self, sleep: FR) -> RecvTimeoutFuture<'a, F, FR, R>
where
FR: Future<Output = R>,
{
RecvTimeoutFuture { rx: self, waker: None, sleep }
}
#[inline(always)]
pub fn try_recv(&self) -> Result<F::Item, TryRecvError> {
self.shared.try_recv()
}
#[inline(always)]
pub fn read_select(&self, result: SelectResult) -> Result<F::Item, RecvError>
where
F: FlavorSelect,
{
assert_eq!(
self as *const Self as *const u8, result.channel,
"invalid use select with another channel"
);
self.as_ref().read_with_token(result.token)
}
#[inline(always)]
pub(crate) fn poll_item<const STREAM: bool>(
&self, ctx: &mut Context, o_waker: &mut Option<<F::Recv as Registry>::Waker>,
) -> Result<F::Item, TryRecvError> {
let shared = &self.shared;
macro_rules! on_recv_no_waker {
() => {{
trace_log!("rx{:?}: recv", tokio_task_id!());
}};
}
macro_rules! on_recv_waker {
($state: expr) => {{
trace_log!("rx{:?}: recv {:?} {:?}", tokio_task_id!(), o_waker, $state);
shared.recvs.cancel_waker(o_waker);
}};
}
macro_rules! try_recv {
($recv_func: ident => $waker_handle: block) => {
if let Some(item) = shared.inner.$recv_func() {
shared.on_recv();
$waker_handle
return Ok(item);
}
};
}
loop {
if o_waker.is_none() {
try_recv!(try_recv=>{ on_recv_no_waker!()});
if let Some(mut backoff) = shared.get_async_backoff() {
loop {
let complete = backoff.spin();
try_recv!(try_recv=>{ on_recv_no_waker!()});
if complete {
break;
}
}
}
} else {
try_recv!(try_recv => {on_recv_waker!(WakerState::Woken)});
}
if shared.recvs.reg_waker_async(ctx, o_waker).is_some() {
break;
}
try_recv!(try_recv_final =>{ on_recv_waker!(WakerState::Init)});
if !STREAM {
let state = shared.recvs.commit_waiting(o_waker);
trace_log!("rx{:?}: commit_waiting {:?} {}", tokio_task_id!(), o_waker, state);
if state == WakerState::Woken as u8 {
continue;
}
}
break;
}
if shared.is_tx_closed() {
try_recv!(try_recv =>{ on_recv_waker!(WakerState::Closed)});
trace_log!("rx{:?}: disconnected {:?}", tokio_task_id!(), o_waker);
Err(TryRecvError::Disconnected)
} else {
Err(TryRecvError::Empty)
}
}
}
#[must_use]
pub struct RecvFuture<'a, F: Flavor> {
rx: &'a AsyncRx<F>,
waker: Option<<F::Recv as Registry>::Waker>,
}
unsafe impl<F: Flavor> Send for RecvFuture<'_, F> {}
impl<F: Flavor> Drop for RecvFuture<'_, F> {
#[inline]
fn drop(&mut self) {
if let Some(waker) = self.waker.as_ref() {
self.rx.shared.abandon_recv_waker(waker);
}
}
}
impl<F: Flavor> Future for RecvFuture<'_, F> {
type Output = Result<F::Item, RecvError>;
#[inline]
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let mut _self = self.get_mut();
match _self.rx.poll_item::<false>(ctx, &mut _self.waker) {
Err(e) => {
if !e.is_empty() {
let _ = _self.waker.take();
Poll::Ready(Err(RecvError {}))
} else {
Poll::Pending
}
}
Ok(item) => {
debug_assert!(_self.waker.is_none());
Poll::Ready(Ok(item))
}
}
}
}
#[must_use]
pub struct RecvTimeoutFuture<'a, F, FR, R>
where
F: Flavor,
FR: Future<Output = R>,
{
rx: &'a AsyncRx<F>,
waker: Option<<F::Recv as Registry>::Waker>,
sleep: FR,
}
unsafe impl<F, FR, R> Send for RecvTimeoutFuture<'_, F, FR, R>
where
F: Flavor,
FR: Future<Output = R>,
{
}
impl<F, FR, R> Drop for RecvTimeoutFuture<'_, F, FR, R>
where
F: Flavor,
FR: Future<Output = R>,
{
#[inline]
fn drop(&mut self) {
if let Some(waker) = self.waker.as_ref() {
self.rx.shared.abandon_recv_waker(waker);
}
}
}
impl<F, FR, R> Future for RecvTimeoutFuture<'_, F, FR, R>
where
F: Flavor,
FR: Future<Output = R>,
{
type Output = Result<F::Item, RecvTimeoutError>;
#[inline]
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let mut _self = unsafe { self.get_unchecked_mut() };
match _self.rx.poll_item::<false>(ctx, &mut _self.waker) {
Err(TryRecvError::Empty) => {
if unsafe { Pin::new_unchecked(&mut _self.sleep) }.poll(ctx).is_ready() {
return Poll::Ready(Err(RecvTimeoutError::Timeout));
}
Poll::Pending
}
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvTimeoutError::Disconnected)),
Ok(item) => Poll::Ready(Ok(item)),
}
}
}
pub trait AsyncRxTrait<T>: Send + 'static + fmt::Debug + fmt::Display {
fn recv(&self) -> impl Future<Output = Result<T, RecvError>> + Send;
#[cfg(any(feature = "tokio", feature = "async_std"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async_std"))))]
fn recv_timeout(
&self, timeout: std::time::Duration,
) -> impl Future<Output = Result<T, RecvTimeoutError>> + Send;
fn recv_with_timer<FR, R>(
&self, fut: FR,
) -> impl Future<Output = Result<T, RecvTimeoutError>> + Send
where
FR: Future<Output = R>;
fn try_recv(&self) -> Result<T, TryRecvError>;
fn len(&self) -> usize;
fn capacity(&self) -> Option<usize>;
fn is_empty(&self) -> bool;
fn is_full(&self) -> bool;
fn is_disconnected(&self) -> bool;
fn get_tx_count(&self) -> usize;
fn get_rx_count(&self) -> usize;
fn clone_to_vec(self, count: usize) -> Vec<Self>
where
Self: Sized;
fn to_stream(self) -> Pin<Box<dyn futures_core::stream::Stream<Item = T>>>;
fn get_wakers_count(&self) -> (usize, usize);
}
impl<F: Flavor> AsyncRxTrait<F::Item> for AsyncRx<F> {
#[inline(always)]
fn clone_to_vec(self, _count: usize) -> Vec<Self> {
assert_eq!(_count, 1);
vec![self]
}
#[inline(always)]
fn recv(&self) -> impl Future<Output = Result<F::Item, RecvError>> + Send {
AsyncRx::recv(self)
}
#[cfg(any(feature = "tokio", feature = "async_std"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async_std"))))]
#[inline(always)]
fn recv_timeout(
&self, duration: std::time::Duration,
) -> impl Future<Output = Result<F::Item, RecvTimeoutError>> + Send {
AsyncRx::recv_timeout(self, duration)
}
#[inline(always)]
fn recv_with_timer<FR, R>(
&self, sleep: FR,
) -> impl Future<Output = Result<F::Item, RecvTimeoutError>> + Send
where
FR: Future<Output = R>,
{
AsyncRx::recv_with_timer(self, sleep)
}
#[inline(always)]
fn try_recv(&self) -> Result<F::Item, TryRecvError> {
AsyncRx::<F>::try_recv(self)
}
#[inline(always)]
fn len(&self) -> usize {
self.as_ref().len()
}
#[inline(always)]
fn capacity(&self) -> Option<usize> {
self.as_ref().capacity()
}
#[inline(always)]
fn is_empty(&self) -> bool {
self.as_ref().is_empty()
}
#[inline(always)]
fn is_full(&self) -> bool {
self.as_ref().is_full()
}
#[inline(always)]
fn is_disconnected(&self) -> bool {
self.as_ref().get_tx_count() == 0
}
#[inline(always)]
fn get_tx_count(&self) -> usize {
self.as_ref().get_tx_count()
}
#[inline(always)]
fn get_rx_count(&self) -> usize {
self.as_ref().get_rx_count()
}
#[inline(always)]
fn to_stream(self) -> Pin<Box<dyn futures_core::stream::Stream<Item = F::Item>>> {
Box::pin(self.into_stream())
}
fn get_wakers_count(&self) -> (usize, usize) {
self.as_ref().get_wakers_count()
}
}
pub struct MAsyncRx<F: Flavor>(pub(crate) AsyncRx<F>);
impl<F: Flavor> fmt::Debug for MAsyncRx<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MAsyncRx{:p}", self)
}
}
impl<F: Flavor> fmt::Display for MAsyncRx<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MAsyncRx{:p}", self)
}
}
unsafe impl<F: Flavor> Sync for MAsyncRx<F> {}
impl<F: Flavor> Clone for MAsyncRx<F> {
#[inline]
fn clone(&self) -> Self {
let inner = &self.0;
inner.shared.add_rx();
Self(AsyncRx::new(inner.shared.clone()))
}
}
impl<F: Flavor> From<MAsyncRx<F>> for AsyncRx<F> {
fn from(rx: MAsyncRx<F>) -> Self {
rx.0
}
}
impl<F: Flavor + FlavorMC> MAsyncRx<F> {
#[inline]
pub(crate) fn new(shared: Arc<ChannelShared<F>>) -> Self {
Self(AsyncRx::new(shared))
}
}
impl<F: Flavor> MAsyncRx<F> {
#[inline]
pub fn into_stream(self) -> AsyncStream<F> {
AsyncStream::new(self.0)
}
#[inline]
pub fn into_blocking(self) -> MRx<F> {
self.into()
}
}
impl<F: Flavor> Deref for MAsyncRx<F> {
type Target = AsyncRx<F>;
#[inline(always)]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<F: Flavor> From<MRx<F>> for MAsyncRx<F> {
fn from(value: MRx<F>) -> Self {
value.add_rx();
Self(AsyncRx::new(value.shared.clone()))
}
}
impl<F: Flavor + FlavorMC> AsyncRxTrait<F::Item> for MAsyncRx<F> {
#[inline(always)]
fn clone_to_vec(self, count: usize) -> Vec<Self> {
let mut v = Vec::with_capacity(count);
for _ in 0..count - 1 {
v.push(self.clone());
}
v.push(self);
v
}
#[inline(always)]
fn try_recv(&self) -> Result<F::Item, TryRecvError> {
self.0.try_recv()
}
#[inline(always)]
fn recv(&self) -> impl Future<Output = Result<F::Item, RecvError>> + Send {
self.0.recv()
}
#[cfg(any(feature = "tokio", feature = "async_std"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async_std"))))]
#[inline(always)]
fn recv_timeout(
&self, duration: std::time::Duration,
) -> impl Future<Output = Result<F::Item, RecvTimeoutError>> + Send {
self.0.recv_timeout(duration)
}
#[inline(always)]
fn recv_with_timer<FR, R>(
&self, fut: FR,
) -> impl Future<Output = Result<F::Item, RecvTimeoutError>>
where
FR: Future<Output = R>,
{
self.0.recv_with_timer(fut)
}
#[inline(always)]
fn len(&self) -> usize {
self.as_ref().len()
}
#[inline(always)]
fn capacity(&self) -> Option<usize> {
self.as_ref().capacity()
}
#[inline(always)]
fn is_empty(&self) -> bool {
self.as_ref().is_empty()
}
#[inline(always)]
fn is_full(&self) -> bool {
self.as_ref().is_full()
}
#[inline(always)]
fn is_disconnected(&self) -> bool {
self.as_ref().get_tx_count() == 0
}
#[inline(always)]
fn get_tx_count(&self) -> usize {
self.as_ref().get_tx_count()
}
#[inline(always)]
fn get_rx_count(&self) -> usize {
self.as_ref().get_rx_count()
}
#[inline(always)]
fn to_stream(self) -> Pin<Box<dyn futures_core::stream::Stream<Item = F::Item>>> {
Box::pin(self.into_stream())
}
fn get_wakers_count(&self) -> (usize, usize) {
self.as_ref().get_wakers_count()
}
}
impl<F: Flavor> Deref for AsyncRx<F> {
type Target = ChannelShared<F>;
#[inline(always)]
fn deref(&self) -> &ChannelShared<F> {
&self.shared
}
}
impl<F: Flavor> AsRef<ChannelShared<F>> for AsyncRx<F> {
#[inline(always)]
fn as_ref(&self) -> &ChannelShared<F> {
&self.shared
}
}
impl<F: Flavor> AsRef<ChannelShared<F>> for MAsyncRx<F> {
#[inline(always)]
fn as_ref(&self) -> &ChannelShared<F> {
&self.0.shared
}
}
impl<T, F: Flavor<Item = T>> ReceiverType for AsyncRx<F> {
type Flavor = F;
#[inline(always)]
fn new(shared: Arc<ChannelShared<F>>) -> Self {
AsyncRx::new(shared)
}
}
impl<F: Flavor> NotCloneable for AsyncRx<F> {}
impl<T, F: Flavor<Item = T> + FlavorMC> ReceiverType for MAsyncRx<F> {
type Flavor = F;
#[inline(always)]
fn new(shared: Arc<ChannelShared<F>>) -> Self {
MAsyncRx::new(shared)
}
}