#![no_std]
#![doc(test(
no_crate_inject,
attr(
deny(warnings, rust_2018_idioms, single_use_lifetimes),
allow(dead_code, unused_variables)
)
))]
#![warn(
// Lints that may help when writing public library.
missing_debug_implementations,
missing_docs,
clippy::alloc_instead_of_core,
clippy::exhaustive_enums,
clippy::exhaustive_structs,
clippy::impl_trait_in_params,
clippy::missing_inline_in_public_items,
clippy::std_instead_of_alloc,
clippy::std_instead_of_core,
)]
#![feature(coroutine_trait)]
#[cfg(test)]
extern crate std;
#[cfg(test)]
mod tests;
#[cfg(test)]
#[path = "gen/tests/assert_impl.rs"]
mod assert_impl;
#[cfg(test)]
#[path = "gen/tests/track_size.rs"]
mod track_size;
#[doc(inline)]
pub use futures_async_stream_macro::for_await;
#[doc(inline)]
pub use futures_async_stream_macro::stream;
#[doc(inline)]
pub use futures_async_stream_macro::stream_block;
#[doc(inline)]
pub use futures_async_stream_macro::try_stream;
#[doc(inline)]
pub use futures_async_stream_macro::try_stream_block;
mod future {
use core::{
future::Future,
ops::{Coroutine, CoroutineState},
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use pin_project::pin_project;
#[doc(hidden)]
#[derive(Debug, Clone, Copy)]
pub struct ResumeTy(pub(crate) NonNull<Context<'static>>);
unsafe impl Send for ResumeTy {}
unsafe impl Sync for ResumeTy {}
impl core::panic::UnwindSafe for ResumeTy {}
impl core::panic::RefUnwindSafe for ResumeTy {}
#[doc(hidden)]
#[inline]
pub fn from_coroutine<G>(g: G) -> impl Future<Output = G::Return>
where
G: Coroutine<ResumeTy, Yield = ()>,
{
GenFuture(g)
}
#[pin_project]
pub(crate) struct GenFuture<G>(#[pin] G);
impl<G> Future for GenFuture<G>
where
G: Coroutine<ResumeTy, Yield = ()>,
{
type Output = G::Return;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
CoroutineState::Yielded(()) => Poll::Pending,
CoroutineState::Complete(x) => Poll::Ready(x),
}
}
}
#[doc(hidden)]
#[inline]
#[must_use]
pub unsafe fn get_context<'a, 'b>(cx: ResumeTy) -> &'a mut Context<'b> {
unsafe { &mut *cx.0.as_ptr().cast::<Context<'b>>() }
}
}
mod stream {
use core::{
future::Future,
ops::{Coroutine, CoroutineState},
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use futures_core::stream::Stream;
use pin_project::pin_project;
use crate::future::ResumeTy;
#[doc(hidden)]
#[inline]
pub fn from_coroutine<G, T>(g: G) -> impl Stream<Item = T>
where
G: Coroutine<ResumeTy, Yield = Poll<T>, Return = ()>,
{
GenStream(g)
}
#[pin_project]
pub(crate) struct GenStream<G>(#[pin] G);
impl<G, T> Stream for GenStream<G>
where
G: Coroutine<ResumeTy, Yield = Poll<T>, Return = ()>,
{
type Item = T;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
CoroutineState::Yielded(x) => x.map(Some),
CoroutineState::Complete(()) => Poll::Ready(None),
}
}
}
#[doc(hidden)]
#[inline]
pub fn next<S>(stream: &mut S) -> impl Future<Output = Option<S::Item>> + '_
where
S: Stream + Unpin,
{
Next(stream)
}
pub(crate) struct Next<'a, S>(&'a mut S);
impl<S> Future for Next<'_, S>
where
S: Stream + Unpin,
{
type Output = Option<S::Item>;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll_next(cx)
}
}
}
mod try_stream {
use core::{
ops::{Coroutine, CoroutineState},
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use futures_core::stream::{FusedStream, Stream};
use pin_project::pin_project;
use crate::future::ResumeTy;
#[doc(hidden)]
#[inline]
pub fn from_coroutine<G, T, E>(g: G) -> impl FusedStream<Item = Result<T, E>>
where
G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
{
GenTryStream(Some(g))
}
#[pin_project]
pub(crate) struct GenTryStream<G>(#[pin] Option<G>);
impl<G, T, E> Stream for GenTryStream<G>
where
G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
{
type Item = Result<T, E>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(g) = this.0.as_mut().as_pin_mut() {
let res = match g.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
CoroutineState::Yielded(x) => x.map(|x| Some(Ok(x))),
CoroutineState::Complete(Err(e)) => Poll::Ready(Some(Err(e))),
CoroutineState::Complete(Ok(())) => Poll::Ready(None),
};
if let Poll::Ready(Some(Err(_)) | None) = &res {
this.0.set(None);
}
res
} else {
Poll::Ready(None)
}
}
}
impl<G, T, E> FusedStream for GenTryStream<G>
where
G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
{
#[inline]
fn is_terminated(&self) -> bool {
self.0.is_none()
}
}
}
#[doc(hidden)]
pub mod __private {
#[doc(hidden)]
pub use core::{
marker::Send,
option::Option::{None, Some},
pin::Pin,
result::Result::{self, Ok},
task::Poll,
};
#[doc(hidden)]
pub mod future {
#[doc(hidden)]
pub use core::future::Future;
#[doc(hidden)]
pub use crate::future::{ResumeTy, from_coroutine, get_context};
}
#[doc(hidden)]
pub mod stream {
#[doc(hidden)]
pub use futures_core::stream::Stream;
#[doc(hidden)]
pub use crate::stream::{from_coroutine, next};
}
#[doc(hidden)]
pub mod try_stream {
#[doc(hidden)]
pub use crate::try_stream::from_coroutine;
}
}