use core::task::{Context, Poll};
use core::future::Future;
use core::pin::Pin;
use alloc::vec::Vec;
use crate::fifo::{TmpArray, Storage};
use super::async_api::Receiver;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Closed;
pub trait FillStorage<T>: Storage<T> + Unpin {}
pub trait RecvStorage<T>: FillStorage<T> + Default {}
pub struct Recv<'a, S: RecvStorage<T>, T> {
receiver: &'a mut Receiver<T>,
storage: Option<S>,
}
pub struct Fill<'a, S: FillStorage<T>, T> {
receiver: &'a mut Receiver<T>,
storage: S,
}
pub type RecvOne<'a, T> = Recv<'a, Option<T>, T>;
pub type RecvArray<'a, const N: usize, T> = Recv<'a, TmpArray<N, T>, T>;
pub type FillExact<'a, T> = Fill<'a, &'a mut [T], T>;
pub type FillMany<'a, T> = Fill<'a, &'a mut Vec<T>, T>;
impl<T: Unpin> Receiver<T> {
pub fn into_recv<S: RecvStorage<T>>(&mut self) -> Recv<'_, S, T> {
Recv {
receiver: self,
storage: None,
}
}
pub fn into_fill<S: FillStorage<T>>(&mut self, storage: S) -> Fill<'_, S, T> {
Fill {
receiver: self,
storage,
}
}
}
pub(super) fn set_waker_check_no_prod<T: Unpin>(
cx: &mut Context<'_>,
receiver: &mut Receiver<T>,
) -> bool {
if receiver.no_senders() {
return true;
}
let waker = cx.waker().clone();
let last_wc = receiver.last_wake_count.take();
let subs = receiver.subscribers();
let current_wc = subs.subscribe(waker, last_wc);
receiver.last_wake_count = Some(current_wc);
receiver.no_senders()
}
impl<'a, S: RecvStorage<T>, T: Unpin> Future for Recv<'a, S, T> {
type Output = Result<S::Output, Closed>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
receiver,
storage,
} = &mut *self;
let taken_storage = storage.take().unwrap_or_default();
let taken_storage = match receiver.try_recv_into(taken_storage) {
Ok(result) => return Poll::Ready(Ok(result)),
Err(taken_storage) => taken_storage,
};
let mut fail_ret = Poll::Pending;
if set_waker_check_no_prod(cx, receiver) {
fail_ret = Poll::Ready(Err(Closed));
}
match receiver.try_recv_into(taken_storage) {
Ok(result) => Poll::Ready(Ok(result)),
Err(taken_storage) => {
*storage = Some(taken_storage);
fail_ret
},
}
}
}
impl<'a, T: Unpin, S: FillStorage<T>> Future for Fill<'a, S, T> {
type Output = Result<usize, Closed>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
receiver,
storage,
} = &mut *self;
if let len @ 1.. = receiver.fifo().pull(storage) {
return Poll::Ready(Ok(len));
}
let mut fail_ret = Poll::Pending;
if set_waker_check_no_prod(cx, receiver) {
fail_ret = Poll::Ready(Err(Closed));
}
match receiver.fifo().pull(storage) {
0 => fail_ret,
len => Poll::Ready(Ok(len)),
}
}
}
impl<T: Unpin, S: Storage<T> + Unpin> FillStorage<T> for S {}
impl<T: Unpin, S: FillStorage<T> + Default> RecvStorage<T> for S {}