use std::{
cell::RefCell,
future::Future,
iter::FusedIterator,
ops::{Deref, DerefMut},
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use futures_core::{FusedStream, Stream};
use crate::utils::noop_waker;
struct Sender<T>(Rc<RefCell<Option<T>>>);
impl<T> Sender<T> {
#[track_caller]
fn set(&self, value: T) {
let mut data = self.0.borrow_mut();
assert!(data.is_none(), "The result of `ret` is not await.");
*data = Some(value);
}
}
impl<T> Future for Sender<T> {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
if self.0.borrow().is_some() {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
pub struct LocalIterContext<T>(Sender<T>);
impl<T> LocalIterContext<T> {
#[track_caller]
pub fn ret(&mut self, value: T) -> impl Future<Output = ()> + '_ {
self.0.set(value);
&mut self.0
}
pub async fn ret_iter(&mut self, iter: impl IntoIterator<Item = T>) {
for value in iter {
self.ret(value).await;
}
}
}
struct Data<'a, T> {
value: Rc<RefCell<Option<T>>>,
fut: Option<Pin<Box<dyn Future<Output = ()> + 'a>>>,
}
impl<T> Data<'_, T> {
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<T>> {
let Some(fut) = &mut self.fut else {
return Poll::Ready(None);
};
let poll = fut.as_mut().poll(cx);
match poll {
Poll::Ready(_) => {
assert!(
self.value.borrow().is_none(),
"The result of `ret` is not await."
);
self.fut = None;
Poll::Ready(None)
}
Poll::Pending => {
if let Some(value) = self.value.borrow_mut().take() {
Poll::Ready(Some(value))
} else {
Poll::Pending
}
}
}
}
}
pub struct LocalIter<'a, T>(Data<'a, T>);
impl<'a, T: 'a> LocalIter<'a, T> {
pub fn new<Fut: Future<Output = ()> + 'a>(f: impl FnOnce(LocalIterContext<T>) -> Fut) -> Self {
let value = Rc::new(RefCell::new(None));
let cx = LocalIterContext(Sender(value.clone()));
let fut: Pin<Box<dyn Future<Output = ()> + 'a>> = Box::pin(f(cx));
let fut = Some(fut);
Self(Data { value, fut })
}
}
impl<T> Iterator for LocalIter<'_, T> {
type Item = T;
#[track_caller]
fn next(&mut self) -> Option<Self::Item> {
match self.0.poll_next(&mut Context::from_waker(&noop_waker())) {
Poll::Ready(value) => value,
Poll::Pending => panic!("`YieldContext::ret` is not called."),
}
}
}
impl<T> FusedIterator for LocalIter<'_, T> {}
pub struct LocalAsyncIterContext<T>(LocalIterContext<T>);
impl<T> Deref for LocalAsyncIterContext<T> {
type Target = LocalIterContext<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for LocalAsyncIterContext<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub struct LocalAsyncIter<'a, T>(LocalIter<'a, T>);
impl<'a, T: 'a> LocalAsyncIter<'a, T> {
pub fn new<Fut: Future<Output = ()> + 'a>(
f: impl FnOnce(LocalAsyncIterContext<T>) -> Fut,
) -> Self {
Self(LocalIter::new(|cx| f(LocalAsyncIterContext(cx))))
}
}
impl<T> Stream for LocalAsyncIter<'_, T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0 .0.poll_next(cx)
}
}
impl<T> FusedStream for LocalAsyncIter<'_, T> {
fn is_terminated(&self) -> bool {
self.0 .0.fut.is_none()
}
}