#![no_std]
mod yield_now;
use core::{
cell::UnsafeCell,
future::Future,
hint::unreachable_unchecked,
marker::PhantomData,
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use futures_core::stream::{FusedStream, Stream};
use pin_project_lite::pin_project;
use pinned_aliasable::Aliasable;
use yield_now::YieldNow;
pub trait HandlerFnLifetime<'yielder, T, ImplicitBounds: Sealed = Bounds<Yielder<'yielder, T>>> {
type Fut: Future<Output = ()>;
}
pub trait HandlerFn<T>: for<'yielder> HandlerFnLifetime<'yielder, T> {
fn call(self, yielder: Yielder<'_, T>) -> <Self as HandlerFnLifetime<'_, T>>::Fut;
}
pub struct Yielder<'a, T>(NonNull<Option<T>>, PhantomData<&'a mut T>);
impl<'a, T> Yielder<'a, T> {
pub async fn yield_item(&mut self, val: T) {
let slot = unsafe { self.0.as_mut() };
assert!(slot.is_none(), "called `yield_item` twice in one poll");
*slot = Some(val);
YieldNow::Created.await;
}
}
unsafe impl<'a, T: Send> Send for Yielder<'a, T> {}
unsafe impl<'a, T> Sync for Yielder<'a, T> {}
pin_project! {
#[project = EnstreamStateProj]
#[project_replace = EnstreamStateProjReplace]
enum EnstreamState<G, F> {
Gen { gen: G },
Fut { #[pin] fut: F },
Completed,
}
}
pin_project! {
struct Enstream<T, G, F> {
#[pin]
state: EnstreamState<G, F>,
#[pin]
cell: Aliasable<UnsafeCell<Option<T>>>,
}
}
impl<'yielder, T, G> Stream for Enstream<T, G, <G as HandlerFnLifetime<'yielder, T>>::Fut>
where
G: HandlerFn<T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let mut cell_pointer = unsafe { NonNull::new_unchecked(this.cell.as_ref().get().get()) };
let poll_result = match this.state.as_mut().project() {
EnstreamStateProj::Fut { fut } => fut.poll(cx),
EnstreamStateProj::Gen { .. } => {
let gen = match this
.state
.as_mut()
.project_replace(EnstreamState::Completed)
{
EnstreamStateProjReplace::Gen { gen } => gen,
_ => unsafe { unreachable_unchecked() },
};
let fut = gen.call(Yielder(cell_pointer, PhantomData));
this.state.set(EnstreamState::Fut { fut });
match this.state.as_mut().project() {
EnstreamStateProj::Fut { fut } => fut.poll(cx),
_ => unsafe { unreachable_unchecked() },
}
}
EnstreamStateProj::Completed => return Poll::Ready(None),
};
match poll_result {
Poll::Ready(()) => {
this.state.set(EnstreamState::Completed);
Poll::Ready(None)
}
Poll::Pending => match unsafe { cell_pointer.as_mut() }.take() {
Some(val) => Poll::Ready(Some(val)),
None => Poll::Pending,
},
}
}
}
impl<'yielder, T, G> FusedStream for Enstream<T, G, <G as HandlerFnLifetime<'yielder, T>>::Fut>
where
G: HandlerFn<T>,
{
fn is_terminated(&self) -> bool {
matches!(self.state, EnstreamState::Completed)
}
}
unsafe impl<T: Send, G: Send, F: Send> Send for Enstream<T, G, F> {}
unsafe impl<T, G, F> Sync for Enstream<T, G, F> {}
pub fn enstream<T, G>(gen: G) -> impl FusedStream<Item = T>
where
G: HandlerFn<T>,
{
Enstream {
cell: Aliasable::new(UnsafeCell::new(None)),
state: EnstreamState::Gen { gen },
}
}
mod private {
pub trait Sealed {}
pub struct Bounds<T>(T);
impl<T> Sealed for Bounds<T> {}
}
use private::{Bounds, Sealed};