use std::cell::Cell;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use futures_util::Stream;
#[must_use]
pub struct SenderFuture {
is_ready: bool,
}
impl SenderFuture {
fn new() -> SenderFuture {
SenderFuture { is_ready: false }
}
}
impl Future for SenderFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.is_ready {
Poll::Ready(())
} else {
self.is_ready = true;
Poll::Pending
}
}
}
pub struct Sender<I, E>(Rc<Cell<Option<I>>>, PhantomData<E>);
unsafe impl<I: Sync, E: Sync> Sync for Sender<I, E> {}
unsafe impl<I: Send, E: Send> Send for Sender<I, E> {}
impl<I, E> Sender<I, E> {
fn new(item_opt: Option<I>) -> Sender<I, E> {
Sender(Rc::new(Cell::new(item_opt)), PhantomData::<E>)
}
fn clone(&self) -> Sender<I, E> {
Sender(self.0.clone(), PhantomData::<E>)
}
pub fn send<T>(&mut self, item: T) -> SenderFuture
where
T: Into<I>,
{
self.0.set(Some(item.into()));
SenderFuture::new()
}
}
#[must_use]
pub struct AsyncStream<Item, Error> {
item: Sender<Item, Error>,
#[allow(clippy::type_complexity)]
fut: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + 'static + Send>>>,
}
impl<Item, Error: 'static + Send> AsyncStream<Item, Error> {
pub fn new<F, R>(f: F) -> Self
where
F: FnOnce(Sender<Item, Error>) -> R,
R: Future<Output = Result<(), Error>> + Send + 'static,
Item: 'static,
{
let sender = Sender::new(None);
AsyncStream::<Item, Error> {
item: sender.clone(),
fut: Some(Box::pin(f(sender))),
}
}
}
impl<I, E: Unpin> Stream for AsyncStream<I, E> {
type Item = Result<I, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<I, E>>> {
let pollres = {
let fut = self.fut.as_mut().unwrap();
fut.as_mut().poll(cx)
};
match pollres {
Poll::Ready(Ok(_)) => Poll::Ready(None),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => {
let mut item = self.item.0.replace(None);
if item.is_none() {
Poll::Pending
} else {
Poll::Ready(Some(Ok(item.take().unwrap())))
}
}
}
}
}