use super::core::{AsyncWaiter, WaiterData};
use super::{AsyncReceiver, AsyncSender};
use crate::error::{RecvError, SendError, TryRecvError, TrySendError};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[must_use = "futures do nothing unless you .await or poll them"]
#[derive(Debug)]
pub struct SendFuture<'a, T: Send> {
sender: &'a AsyncSender<T>,
item: Option<T>,
}
impl<'a, T: Send> SendFuture<'a, T> {
pub(super) fn new(sender: &'a AsyncSender<T>, item: T) -> Self {
Self {
sender,
item: Some(item),
}
}
}
impl<'a, T: Send + Unpin> Future for SendFuture<'a, T> {
type Output = Result<(), SendError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
'poll_loop: loop {
if self.item.is_none() {
return Poll::Ready(Ok(()));
}
let item_to_send = self.item.take().unwrap();
match self.sender.shared.try_send_core(item_to_send) {
Ok(()) => {
return Poll::Ready(Ok(())); }
Err(TrySendError::Full(returned_item)) => {
self.item = Some(returned_item);
}
Err(TrySendError::Closed(_)) => {
return Poll::Ready(Err(SendError::Closed));
}
Err(TrySendError::Sent(_)) => unreachable!(),
}
let is_rendezvous = self.sender.shared.capacity == 0;
{
let mut guard = self.sender.shared.internal.lock();
if !guard.waiting_async_receivers.is_empty()
|| !guard.waiting_sync_receivers.is_empty()
|| (self.sender.shared.capacity > 0 && guard.queue.len() < self.sender.shared.capacity)
{
drop(guard);
continue 'poll_loop; }
if guard.receiver_count == 0 {
self.item = None; return Poll::Ready(Err(SendError::Closed));
}
let waiter = AsyncWaiter {
waker: cx.waker().clone(),
data: if is_rendezvous {
Some(WaiterData::SenderItem(self.item.take()))
} else {
None
},
};
guard.waiting_async_senders.push_back(waiter);
return Poll::Pending;
}
}
}
}
#[must_use = "futures do nothing unless you .await or poll them"]
#[derive(Debug)]
pub struct ReceiveFuture<'a, T: Send> {
receiver: &'a AsyncReceiver<T>,
}
impl<'a, T: Send> ReceiveFuture<'a, T> {
pub(super) fn new(receiver: &'a AsyncReceiver<T>) -> Self {
Self { receiver }
}
}
impl<'a, T: Send + Unpin> Future for ReceiveFuture<'a, T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
'poll_loop: loop {
match self.receiver.shared.try_recv_core() {
Ok(item) => {
return Poll::Ready(Ok(item));
}
Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError::Disconnected)),
Err(TryRecvError::Empty) => { }
}
{
let mut guard = self.receiver.shared.internal.lock();
if !guard.queue.is_empty()
|| (self.receiver.shared.capacity == 0
&& (!guard.waiting_sync_senders.is_empty() || !guard.waiting_async_senders.is_empty()))
{
drop(guard);
continue 'poll_loop; }
if guard.sender_count == 0 {
return Poll::Ready(Err(RecvError::Disconnected));
}
let waiter = AsyncWaiter {
waker: cx.waker().clone(),
data: None, };
guard.waiting_async_receivers.push_back(waiter);
return Poll::Pending;
}
}
}
}