1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use crate::{generator, AsyncGenerator, GeneratorState};
used_in_docs!(generator);
/// Wrapper around an async generator that implements [`Stream`].
///
/// The generators created by the [`generator`] macro implement [`Stream`] by
/// default. However, other implementations of [`AsyncGenerator`] will need this
/// wrapper type in order to be used as a stream.
pub struct GeneratorStream<G>(G);
impl<G> GeneratorStream<G> {
pub fn new(gen: G) -> Self {
Self(gen)
}
pub fn into_inner(self) -> G {
self.0
}
}
impl<G> Stream for GeneratorStream<G>
where
G: AsyncGenerator<(), Return = ()>,
{
type Item = G::Yield;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let gen = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
gen.poll_resume(cx, Some(())).map(|state| match state {
GeneratorState::Yielded(value) => Some(value),
GeneratorState::Complete(()) => None,
})
}
}
/// Wrapper around a generator that yields values and returns a result.
///
/// Often when working with streams you end up with a stream of results where
/// you want to abort things after the first error. By using a generator with
/// a result return type you can use `?` within the generator itself and still
/// get a stream which is usable.
///
/// This wrapper type wraps a generator that yields a series of values and
/// returns a result. All the yielded values become `Ok(v)` values in the stream
/// and returning an error emits a final `Err(e)` value before the stream
/// completes.
///
/// # Example
/// This stream will yield `Ok(44)`, `Ok(88)`, `Err("ran out of numbers")` and
/// then finish:
/// ```
/// use fauxgen::GeneratorTryStream;
///
/// #[fauxgen::generator(yield = i32)]
/// fn my_stream() -> Result<(), &'static str> {
/// r#yield!(44);
/// r#yield!(88);
/// Err("ran out of numbers")
/// }
///
/// let stream = GeneratorTryStream::new(my_stream());
/// ```
pub struct GeneratorTryStream<G> {
gen: G,
done: bool,
}
impl<G> GeneratorTryStream<G> {
/// Create a stream from an existing generator.
pub fn new(gen: G) -> Self {
Self { gen, done: false }
}
/// Convert this stream back into the generator.
pub fn into_inner(self) -> G {
self.gen
}
}
impl<G, E> Stream for GeneratorTryStream<G>
where
G: AsyncGenerator<(), Return = Result<(), E>>,
{
type Item = Result<G::Yield, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// SAFETY: This is just pin projection so it is safe.
let (gen, done) = unsafe {
let this = self.get_unchecked_mut();
(Pin::new_unchecked(&mut this.gen), &mut this.done)
};
if *done {
return Poll::Ready(None);
}
match gen.poll_resume(cx, Some(())) {
Poll::Pending => Poll::Pending,
Poll::Ready(GeneratorState::Yielded(item)) => Poll::Ready(Some(Ok(item))),
Poll::Ready(GeneratorState::Complete(result)) => {
*done = true;
match result {
Ok(()) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}
}
}