#![feature(generator_trait)]
#![feature(gen_future)]
#![feature(never_type)]
use {
core::{
ops::{Generator, GeneratorState},
pin::Pin,
task::{Context, Poll},
},
futures_core::*,
pin_utils::unsafe_pinned,
std::future::set_task_context,
};
#[macro_export]
macro_rules! gen_await {
($e:expr) => {{
let mut pinned = $e;
loop {
if let ::core::task::Poll::Ready(x) = std::future::poll_with_tls_context(unsafe {
::core::pin::Pin::new_unchecked(&mut pinned)
}) {
break x;
}
yield ::core::task::Poll::Pending;
}
}};
}
pub struct GenStream<G> {
inner: G,
}
impl<G> GenStream<G> {
unsafe_pinned!(inner: G);
}
impl<G> From<G> for GenStream<G> {
fn from(inner: G) -> Self {
Self { inner }
}
}
impl<G, Y> Stream for GenStream<G>
where
G: Generator<Yield = Poll<Y>, Return = ()>,
{
type Item = Y;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
set_task_context(cx, || match self.inner().resume() {
GeneratorState::Yielded(v) => v.map(Some),
GeneratorState::Complete(_) => Poll::Ready(None),
})
}
}
impl<G: Unpin> Unpin for GenStream<G> {}
pub struct GenPerpetualStream<G> {
inner: G,
}
impl<G> GenPerpetualStream<G> {
unsafe_pinned!(inner: G);
}
impl<G> From<G> for GenPerpetualStream<G> {
fn from(inner: G) -> Self {
Self { inner }
}
}
impl<G, Y> Stream for GenPerpetualStream<G>
where
G: Generator<Yield = Poll<Y>, Return = !>,
{
type Item = Y;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
set_task_context(cx, || match self.inner().resume() {
GeneratorState::Yielded(v) => v.map(Some),
GeneratorState::Complete(_) => unreachable!(),
})
}
}
impl<G: Unpin> Unpin for GenPerpetualStream<G> {}
pub struct GenTryStream<G> {
inner: G,
finished: bool,
}
impl<G> GenTryStream<G> {
unsafe_pinned!(inner: G);
unsafe_pinned!(finished: bool);
}
impl<G> From<G> for GenTryStream<G> {
fn from(inner: G) -> Self {
Self {
inner,
finished: false,
}
}
}
impl<G, T, E> Stream for GenTryStream<G>
where
G: Generator<Yield = Poll<T>, Return = Result<(), E>>,
{
type Item = Result<T, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.finished {
return Poll::Ready(None);
}
set_task_context(cx, || match self.as_mut().inner().resume() {
GeneratorState::Yielded(v) => v.map(Ok).map(Some),
GeneratorState::Complete(res) => {
self.as_mut().finished().set(true);
if let Err(e) = res {
Poll::Ready(Some(Err(e)))
} else {
Poll::Ready(None)
}
}
})
}
}
impl<G: Unpin> Unpin for GenTryStream<G> {}