#![no_std]
#![warn(missing_docs)]
#![feature(associated_type_bounds)]
#![feature(coroutine_trait)]
#![feature(trait_alias)]
#![feature(try_trait_v2, try_trait_v2_residual)]
use core::{
future::Future,
ops::{ControlFlow, Coroutine, CoroutineState, FromResidual, Residual, Try},
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use pin_project::pin_project;
#[doc(no_inline)]
pub use futures_core::Stream;
#[doc(no_inline)]
pub use stream_future_impl::{stream, try_stream};
#[doc(hidden)]
#[derive(Debug, Copy, Clone)]
pub struct ResumeTy(NonNull<Context<'static>>);
unsafe impl Send for ResumeTy {}
unsafe impl Sync for ResumeTy {}
impl ResumeTy {
pub fn get_context<'a, 'b>(self) -> &'a mut Context<'b> {
unsafe { &mut *self.0.as_ptr().cast() }
}
pub fn poll_future<F: Future>(self, f: Pin<&mut F>) -> Poll<F::Output> {
f.poll(self.get_context())
}
}
#[doc(hidden)]
#[pin_project]
pub struct GenStreamFuture<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> {
#[pin]
gen: T,
ret: Option<T::Return>,
}
impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> GenStreamFuture<P, T> {
pub const fn new(gen: T) -> Self {
Self { gen, ret: None }
}
}
impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> Future for GenStreamFuture<P, T> {
type Output = T::Return;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let cx = NonNull::from(cx);
let this = self.project();
if let Some(x) = this.ret.take() {
Poll::Ready(x)
} else {
let gen = this.gen;
match gen.resume(ResumeTy(cx.cast())) {
CoroutineState::Yielded(p) => match p {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => {
unsafe { cx.as_ref() }.waker().wake_by_ref();
Poll::Pending
}
},
CoroutineState::Complete(x) => Poll::Ready(x),
}
}
}
}
impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> Stream for GenStreamFuture<P, T> {
type Item = P;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let gen = this.gen;
match gen.resume(ResumeTy(NonNull::from(cx).cast())) {
CoroutineState::Yielded(p) => match p {
Poll::Pending => Poll::Pending,
Poll::Ready(p) => Poll::Ready(Some(p)),
},
CoroutineState::Complete(x) => {
*this.ret = Some(x);
Poll::Ready(None)
}
}
}
}
#[doc(hidden)]
pub type TryStreamItemType<R, P> = <<R as Try>::Residual as Residual<P>>::TryType;
#[doc(hidden)]
#[pin_project]
pub struct GenTryStreamFuture<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try>> {
#[pin]
gen: T,
ret: Option<<T::Return as Try>::Output>,
}
impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try>> GenTryStreamFuture<P, T> {
pub const fn new(gen: T) -> Self {
Self { gen, ret: None }
}
}
impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try>> Future for GenTryStreamFuture<P, T> {
type Output = T::Return;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let cx = NonNull::from(cx);
let this = self.project();
if let Some(x) = this.ret.take() {
Poll::Ready(T::Return::from_output(x))
} else {
let gen = this.gen;
match gen.resume(ResumeTy(cx.cast())) {
CoroutineState::Yielded(p) => match p {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => {
unsafe { cx.as_ref() }.waker().wake_by_ref();
Poll::Pending
}
},
CoroutineState::Complete(x) => Poll::Ready(x),
}
}
}
}
impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try<Residual: Residual<P>>>> Stream
for GenTryStreamFuture<P, T>
{
type Item = <<T::Return as Try>::Residual as Residual<P>>::TryType;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let gen = this.gen;
match gen.resume(ResumeTy(NonNull::from(cx).cast())) {
CoroutineState::Yielded(p) => match p {
Poll::Pending => Poll::Pending,
Poll::Ready(p) => Poll::Ready(Some(Self::Item::from_output(p))),
},
CoroutineState::Complete(x) => match x.branch() {
ControlFlow::Continue(x) => {
*this.ret = Some(x);
Poll::Ready(None)
}
ControlFlow::Break(e) => Poll::Ready(Some(Self::Item::from_residual(e))),
},
}
}
}