#![no_std]
#![feature(generic_associated_types)]
mod yield_now;
use core::{
cell::{Cell, UnsafeCell},
future::Future,
hint::unreachable_unchecked,
marker::PhantomData,
mem::MaybeUninit,
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use futures_util::stream::{FusedStream, Stream};
use pin_project::pin_project;
use pinned_aliasable::Aliasable;
use yield_now::YieldNow;
pub trait HandlerFn<'scope, T: 'scope> {
type Fut<'yielder>: Future<Output = ()> + 'yielder
where
'scope: 'yielder;
fn call<'yielder>(self, yielder: Yielder<'yielder, T>) -> Self::Fut<'yielder>
where
'scope: 'yielder;
}
pub struct Yielder<'a, T>(NonNull<Option<T>>, PhantomData<Cell<&'a ()>>);
impl<'a, T> Yielder<'a, T> {
pub async fn yield_item(&mut self, val: T) {
unsafe { self.0.as_ptr().write(Some(val)) }
YieldNow::Created.await;
}
}
unsafe impl<'a, T: Send> Send for Yielder<'a, T> {}
unsafe impl<'a, T: Send> Sync for Yielder<'a, T> {}
#[pin_project(project = EnstreamStateProj)]
enum EnstreamState<G, F> {
Gen(MaybeUninit<G>),
Fut(#[pin] F),
Completed,
}
#[pin_project]
struct Enstream<T, G, F> {
#[pin]
cell: Aliasable<UnsafeCell<Option<T>>>,
#[pin]
state: EnstreamState<G, F>,
}
impl<'yielder, 'scope: 'yielder, T: 'scope, G: 'scope> Stream
for Enstream<T, G, <G as HandlerFn<'scope, T>>::Fut<'yielder>>
where
G: HandlerFn<'scope, T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let mut cell_pointer = unsafe { NonNull::new_unchecked(this.cell.as_ref().get().get()) };
let poll_result = match this.state.as_mut().project() {
EnstreamStateProj::Fut(fut) => fut.poll(cx),
EnstreamStateProj::Gen(gen) => {
let gen = unsafe { gen.assume_init_read() };
this.state.set(EnstreamState::Completed);
let fut = gen.call(Yielder(cell_pointer, PhantomData));
this.state.set(EnstreamState::Fut(fut));
match this.state.as_mut().project() {
EnstreamStateProj::Fut(fut) => fut.poll(cx),
_ => unsafe { unreachable_unchecked() },
}
}
EnstreamStateProj::Completed => return Poll::Ready(None),
};
match poll_result {
Poll::Ready(()) => {
this.state.set(EnstreamState::Completed);
Poll::Ready(None)
}
Poll::Pending => match unsafe { cell_pointer.as_mut() }.take() {
Some(val) => Poll::Ready(Some(val)),
None => Poll::Pending,
},
}
}
}
impl<'yielder, 'scope: 'yielder, T: 'scope, G: 'scope> FusedStream
for Enstream<T, G, <G as HandlerFn<'scope, T>>::Fut<'yielder>>
where
G: HandlerFn<'scope, T>,
{
fn is_terminated(&self) -> bool {
matches!(self.state, EnstreamState::Completed)
}
}
pub fn enstream<'scope, T: 'scope, G: 'scope>(generator: G) -> impl FusedStream<Item = T> + 'scope
where
G: HandlerFn<'scope, T>,
{
Enstream {
cell: Aliasable::new(UnsafeCell::new(None)),
state: EnstreamState::Gen(MaybeUninit::new(generator)),
}
}