use futures_core::Stream;
use super::core::{AsyncWaiter, WaiterData};
use super::{AsyncReceiver, AsyncSender};
use crate::error::{SendError, TryRecvError, TrySendError};
use core::marker::PhantomPinned;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[must_use = "futures do nothing unless you .await or poll them"]
#[derive(Debug)]
pub struct SendFuture<'a, T: Send> {
sender: &'a AsyncSender<T>,
item: Option<T>,
_phantom: PhantomPinned,
}
impl<'a, T: Send> SendFuture<'a, T> {
pub(super) fn new(sender: &'a AsyncSender<T>, item: T) -> Self {
Self {
sender,
item: Some(item),
_phantom: PhantomPinned,
}
}
}
impl<'a, T: Send> Future for SendFuture<'a, T> {
type Output = Result<(), SendError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.as_mut().get_unchecked_mut() };
'poll_loop: loop {
if this.item.is_none() {
return Poll::Ready(Ok(()));
}
let item_to_send = this.item.take().unwrap();
match this.sender.shared.try_send_core(item_to_send) {
Ok(()) => {
return Poll::Ready(Ok(())); }
Err(TrySendError::Full(returned_item)) => {
this.item = Some(returned_item);
}
Err(TrySendError::Closed(_)) => {
return Poll::Ready(Err(SendError::Closed));
}
Err(TrySendError::Sent(_)) => unreachable!(),
}
let is_rendezvous = this.sender.shared.capacity == 0;
{
let mut guard = this.sender.shared.internal.lock();
if !guard.waiting_async_receivers.is_empty()
|| !guard.waiting_sync_receivers.is_empty()
|| (this.sender.shared.capacity > 0 && guard.queue.len() < this.sender.shared.capacity)
{
drop(guard);
continue 'poll_loop; }
if guard.receiver_count == 0 {
this.item = None; return Poll::Ready(Err(SendError::Closed));
}
let waiter = AsyncWaiter {
waker: cx.waker().clone(),
data: if is_rendezvous {
Some(WaiterData::SenderItem(this.item.take()))
} else {
None
},
};
guard.waiting_async_senders.push_back(waiter);
return Poll::Pending;
}
}
}
}
impl<T: Send> Stream for AsyncReceiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
'poll_loop: loop {
match self.shared.try_recv_core() {
Ok(item) => {
return Poll::Ready(Some(item));
}
Err(TryRecvError::Disconnected) => return Poll::Ready(None),
Err(TryRecvError::Empty) => { }
}
{
let mut guard = self.shared.internal.lock();
if !guard.queue.is_empty()
|| (self.shared.capacity == 0
&& (!guard.waiting_sync_senders.is_empty() || !guard.waiting_async_senders.is_empty()))
{
drop(guard);
continue 'poll_loop; }
if guard.sender_count == 0 {
return Poll::Ready(None);
}
let waiter = AsyncWaiter {
waker: cx.waker().clone(),
data: None, };
guard.waiting_async_receivers.push_back(waiter);
return Poll::Pending;
}
}
}
}