fauxgen/
stream.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures_core::Stream;
5
6use crate::{generator, AsyncGenerator, GeneratorState};
7
8used_in_docs!(generator);
9
10/// Wrapper around an async generator that implements [`Stream`].
11///
12/// The generators created by the [`generator`] macro implement [`Stream`] by
13/// default. However, other implementations of [`AsyncGenerator`] will need this
14/// wrapper type in order to be used as a stream.
15pub struct GeneratorStream<G>(G);
16
17impl<G> GeneratorStream<G> {
18    pub fn new(gen: G) -> Self {
19        Self(gen)
20    }
21
22    pub fn into_inner(self) -> G {
23        self.0
24    }
25}
26
27impl<G> Stream for GeneratorStream<G>
28where
29    G: AsyncGenerator<(), Return = ()>,
30{
31    type Item = G::Yield;
32
33    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34        let gen = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
35        gen.poll_resume(cx, Some(())).map(|state| match state {
36            GeneratorState::Yielded(value) => Some(value),
37            GeneratorState::Complete(()) => None,
38        })
39    }
40}
41
42/// Wrapper around a generator that yields values and returns a result.
43///
44/// Often when working with streams you end up with a stream of results where
45/// you want to abort things after the first error. By using a generator with
46/// a result return type you can use `?` within the generator itself and still
47/// get a stream which is usable.
48///
49/// This wrapper type wraps a generator that yields a series of values and
50/// returns a result. All the yielded values become `Ok(v)` values in the stream
51/// and returning an error emits a final `Err(e)` value before the stream
52/// completes.
53///
54/// # Example
55/// This stream will yield `Ok(44)`, `Ok(88)`, `Err("ran out of numbers")` and
56/// then finish:
57/// ```
58/// use fauxgen::GeneratorTryStream;
59///
60/// #[fauxgen::generator(yield = i32)]
61/// fn my_stream() -> Result<(), &'static str> {
62///     r#yield!(44);
63///     r#yield!(88);
64///     Err("ran out of numbers")
65/// }
66///
67/// let stream = GeneratorTryStream::new(my_stream());
68/// ```
69pub struct GeneratorTryStream<G> {
70    gen: G,
71    done: bool,
72}
73
74impl<G> GeneratorTryStream<G> {
75    /// Create a stream from an existing generator.
76    pub fn new(gen: G) -> Self {
77        Self { gen, done: false }
78    }
79
80    /// Convert this stream back into the generator.
81    pub fn into_inner(self) -> G {
82        self.gen
83    }
84}
85
86impl<G, E> Stream for GeneratorTryStream<G>
87where
88    G: AsyncGenerator<(), Return = Result<(), E>>,
89{
90    type Item = Result<G::Yield, E>;
91
92    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93        // SAFETY: This is just pin projection so it is safe.
94        let (gen, done) = unsafe {
95            let this = self.get_unchecked_mut();
96            (Pin::new_unchecked(&mut this.gen), &mut this.done)
97        };
98
99        if *done {
100            return Poll::Ready(None);
101        }
102
103        match gen.poll_resume(cx, Some(())) {
104            Poll::Pending => Poll::Pending,
105            Poll::Ready(GeneratorState::Yielded(item)) => Poll::Ready(Some(Ok(item))),
106            Poll::Ready(GeneratorState::Complete(result)) => {
107                *done = true;
108
109                match result {
110                    Ok(()) => Poll::Ready(None),
111                    Err(e) => Poll::Ready(Some(Err(e))),
112                }
113            }
114        }
115    }
116}