use futures_core::Stream;
use super::core::AsyncSenderWaiter;
use super::{AsyncReceiver, AsyncSender};
use crate::error::{SendError, TryRecvError, TrySendError};
use crate::RecvError;
use core::marker::PhantomPinned;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[must_use = "futures do nothing unless you .await or poll them"]
#[derive(Debug)]
pub struct SendFuture<'a, T: Send> {
sender: &'a AsyncSender<T>,
item: Option<T>,
_phantom: PhantomPinned,
}
impl<'a, T: Send> SendFuture<'a, T> {
pub(super) fn new(sender: &'a AsyncSender<T>, item: T) -> Self {
Self {
sender,
item: Some(item),
_phantom: PhantomPinned,
}
}
}
impl<'a, T: Send> Future for SendFuture<'a, T> {
type Output = Result<(), SendError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.as_mut().get_unchecked_mut() };
'poll_loop: loop {
if this.item.is_none() {
return Poll::Ready(Ok(()));
}
let item_to_send = this.item.take().unwrap();
match this.sender.shared.try_send_core(item_to_send) {
Ok(()) => {
return Poll::Ready(Ok(())); }
Err(TrySendError::Full(returned_item)) => {
this.item = Some(returned_item);
}
Err(TrySendError::Closed(_)) => {
return Poll::Ready(Err(SendError::Closed));
}
Err(TrySendError::Sent(_)) => unreachable!(),
}
let is_rendezvous = this.sender.shared.capacity == 0;
{
let mut guard = this.sender.shared.internal.lock();
if !guard.waiting_async_receivers.is_empty()
|| !guard.waiting_sync_receivers.is_empty()
|| (this.sender.shared.capacity > 0 && guard.queue.len() < this.sender.shared.capacity)
{
drop(guard);
continue 'poll_loop; }
if guard.receiver_count == 0 {
this.item = None; return Poll::Ready(Err(SendError::Closed));
}
let new_waker = cx.waker();
let mut found_existing = false;
for existing_waiter in guard.waiting_async_senders.iter_mut() {
if existing_waiter.waker.will_wake(new_waker) {
existing_waiter.waker = new_waker.clone();
if is_rendezvous {
existing_waiter.data = this.item.take();
}
found_existing = true;
break;
}
}
if !found_existing {
let waiter = AsyncSenderWaiter {
waker: new_waker.clone(),
data: if is_rendezvous {
this.item.take()
} else {
None
},
};
guard.waiting_async_senders.push_back(waiter);
}
return Poll::Pending;
}
}
}
}
#[must_use = "futures do nothing unless you .await or poll them"]
#[derive(Debug)]
pub struct RecvFuture<'a, T: Send> {
receiver: &'a AsyncReceiver<T>,
}
impl<'a, T: Send> RecvFuture<'a, T> {
pub(super) fn new(receiver: &'a AsyncReceiver<T>) -> Self {
Self { receiver }
}
}
impl<'a, T: Send> Future for RecvFuture<'a, T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.receiver.shared.poll_recv_internal(cx)
}
}
impl<T: Send> Stream for AsyncReceiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.shared.poll_recv_internal(cx) {
Poll::Ready(Ok(value)) => Poll::Ready(Some(value)),
Poll::Ready(Err(_)) => Poll::Ready(None), Poll::Pending => Poll::Pending,
}
}
}