#![no_std]
#![doc(test(
no_crate_inject,
attr(
deny(warnings, rust_2018_idioms, single_use_lifetimes),
allow(dead_code, unused_variables)
)
))]
#![warn(
missing_docs,
rust_2018_idioms,
single_use_lifetimes,
unreachable_pub,
unsafe_op_in_unsafe_fn
)]
#![warn(
clippy::pedantic,
// lints for public library
clippy::alloc_instead_of_core,
clippy::exhaustive_enums,
clippy::exhaustive_structs,
clippy::std_instead_of_alloc,
clippy::std_instead_of_core,
// lints that help writing unsafe code
clippy::default_union_representation,
clippy::trailing_empty_array,
clippy::transmute_undefined_repr,
clippy::undocumented_unsafe_blocks,
)]
#![allow(clippy::must_use_candidate)]
#![feature(generator_trait)]
#[cfg(doctest)]
#[doc = include_str!("../README.md")]
const _README: () = ();
#[doc(inline)]
pub use futures_async_stream_macro::for_await;
#[doc(inline)]
pub use futures_async_stream_macro::stream;
#[doc(inline)]
pub use futures_async_stream_macro::stream_block;
#[doc(inline)]
pub use futures_async_stream_macro::try_stream;
#[doc(inline)]
pub use futures_async_stream_macro::try_stream_block;
mod future {
use core::{
future::Future,
ops::{Generator, GeneratorState},
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use pin_project::pin_project;
#[doc(hidden)]
#[derive(Debug, Copy, Clone)]
pub struct ResumeTy(pub(crate) NonNull<Context<'static>>);
unsafe impl Send for ResumeTy {}
unsafe impl Sync for ResumeTy {}
#[doc(hidden)]
pub fn from_generator<G>(gen: G) -> impl Future<Output = G::Return>
where
G: Generator<ResumeTy, Yield = ()>,
{
GenFuture(gen)
}
#[pin_project]
pub(crate) struct GenFuture<G>(#[pin] G);
impl<G> Future for GenFuture<G>
where
G: Generator<ResumeTy, Yield = ()>,
{
type Output = G::Return;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
GeneratorState::Yielded(()) => Poll::Pending,
GeneratorState::Complete(x) => Poll::Ready(x),
}
}
}
#[doc(hidden)]
pub unsafe fn get_context<'a, 'b>(cx: ResumeTy) -> &'a mut Context<'b> {
unsafe { &mut *cx.0.as_ptr().cast() }
}
}
mod stream {
use core::{
future::Future,
ops::{Generator, GeneratorState},
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use futures_core::stream::Stream;
use pin_project::pin_project;
use crate::future::ResumeTy;
#[doc(hidden)]
pub fn from_generator<G, T>(gen: G) -> impl Stream<Item = T>
where
G: Generator<ResumeTy, Yield = Poll<T>, Return = ()>,
{
GenStream(gen)
}
#[pin_project]
pub(crate) struct GenStream<G>(#[pin] G);
impl<G, T> Stream for GenStream<G>
where
G: Generator<ResumeTy, Yield = Poll<T>, Return = ()>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
GeneratorState::Yielded(x) => x.map(Some),
GeneratorState::Complete(()) => Poll::Ready(None),
}
}
}
#[doc(hidden)]
pub fn next<S>(stream: &mut S) -> impl Future<Output = Option<S::Item>> + '_
where
S: Stream + Unpin,
{
Next(stream)
}
pub(crate) struct Next<'a, S>(&'a mut S);
impl<S> Future for Next<'_, S>
where
S: Stream + Unpin,
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll_next(cx)
}
}
}
mod try_stream {
use core::{
ops::{Generator, GeneratorState},
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use futures_core::stream::{FusedStream, Stream};
use pin_project::pin_project;
use crate::future::ResumeTy;
#[doc(hidden)]
pub fn from_generator<G, T, E>(
gen: G,
) -> impl Stream<Item = Result<T, E>> + FusedStream<Item = Result<T, E>>
where
G: Generator<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
{
GenTryStream(Some(gen))
}
#[pin_project]
pub(crate) struct GenTryStream<G>(#[pin] Option<G>);
impl<G, T, E> Stream for GenTryStream<G>
where
G: Generator<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
{
type Item = Result<T, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(gen) = this.0.as_mut().as_pin_mut() {
let res = match gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
GeneratorState::Yielded(x) => x.map(|x| Some(Ok(x))),
GeneratorState::Complete(Err(e)) => Poll::Ready(Some(Err(e))),
GeneratorState::Complete(Ok(())) => Poll::Ready(None),
};
if let Poll::Ready(Some(Err(_)) | None) = &res {
this.0.set(None);
}
res
} else {
Poll::Ready(None)
}
}
}
impl<G, T, E> FusedStream for GenTryStream<G>
where
G: Generator<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
{
fn is_terminated(&self) -> bool {
self.0.is_none()
}
}
}
#[doc(hidden)]
pub mod __private {
#[doc(hidden)]
pub use core::{
marker::Send,
option::Option::{None, Some},
pin::Pin,
result::Result::{self, Ok},
task::Poll,
};
#[doc(hidden)]
pub mod future {
#[doc(hidden)]
pub use core::future::Future;
#[doc(hidden)]
#[allow(unreachable_pub)] pub use crate::future::{from_generator, get_context, ResumeTy};
}
#[doc(hidden)]
pub mod stream {
#[doc(hidden)]
pub use futures_core::stream::Stream;
#[doc(hidden)]
pub use crate::stream::{from_generator, next};
}
#[doc(hidden)]
pub mod try_stream {
#[doc(hidden)]
pub use crate::try_stream::from_generator;
}
}
#[allow(clippy::wildcard_imports)]
#[cfg(test)]
mod tests {
use core::marker::PhantomPinned;
use static_assertions::{
assert_impl_all as assert_impl, assert_not_impl_all as assert_not_impl,
};
use crate::*;
assert_impl!(future::GenFuture<()>: Send);
assert_not_impl!(future::GenFuture<*const ()>: Send);
assert_impl!(future::GenFuture<()>: Sync);
assert_not_impl!(future::GenFuture<*const ()>: Sync);
assert_impl!(future::GenFuture<()>: Unpin);
assert_not_impl!(future::GenFuture<PhantomPinned>: Unpin);
assert_impl!(stream::GenStream<()>: Send);
assert_not_impl!(stream::GenStream<*const ()>: Send);
assert_impl!(stream::GenStream<()>: Sync);
assert_not_impl!(stream::GenStream<*const ()>: Sync);
assert_impl!(stream::GenStream<()>: Unpin);
assert_not_impl!(stream::GenStream<PhantomPinned>: Unpin);
assert_impl!(stream::Next<'_, ()>: Send);
assert_not_impl!(stream::Next<'_, *const ()>: Send);
assert_impl!(stream::Next<'_, ()>: Sync);
assert_not_impl!(stream::Next<'_, *const ()>: Sync);
assert_impl!(stream::Next<'_, PhantomPinned>: Unpin);
assert_impl!(try_stream::GenTryStream<()>: Send);
assert_not_impl!(try_stream::GenTryStream<*const ()>: Send);
assert_impl!(try_stream::GenTryStream<()>: Sync);
assert_not_impl!(try_stream::GenTryStream<*const ()>: Sync);
assert_impl!(try_stream::GenTryStream<()>: Unpin);
assert_not_impl!(try_stream::GenTryStream<PhantomPinned>: Unpin);
}