fauxgen/stream.rs
1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures_core::Stream;
5
6use crate::{generator, AsyncGenerator, GeneratorState};
7
8used_in_docs!(generator);
9
10/// Wrapper around an async generator that implements [`Stream`].
11///
12/// The generators created by the [`generator`] macro implement [`Stream`] by
13/// default. However, other implementations of [`AsyncGenerator`] will need this
14/// wrapper type in order to be used as a stream.
15pub struct GeneratorStream<G>(G);
16
17impl<G> GeneratorStream<G> {
18 pub fn new(gen: G) -> Self {
19 Self(gen)
20 }
21
22 pub fn into_inner(self) -> G {
23 self.0
24 }
25}
26
27impl<G> Stream for GeneratorStream<G>
28where
29 G: AsyncGenerator<(), Return = ()>,
30{
31 type Item = G::Yield;
32
33 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34 let gen = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
35 gen.poll_resume(cx, Some(())).map(|state| match state {
36 GeneratorState::Yielded(value) => Some(value),
37 GeneratorState::Complete(()) => None,
38 })
39 }
40}
41
42/// Wrapper around a generator that yields values and returns a result.
43///
44/// Often when working with streams you end up with a stream of results where
45/// you want to abort things after the first error. By using a generator with
46/// a result return type you can use `?` within the generator itself and still
47/// get a stream which is usable.
48///
49/// This wrapper type wraps a generator that yields a series of values and
50/// returns a result. All the yielded values become `Ok(v)` values in the stream
51/// and returning an error emits a final `Err(e)` value before the stream
52/// completes.
53///
54/// # Example
55/// This stream will yield `Ok(44)`, `Ok(88)`, `Err("ran out of numbers")` and
56/// then finish:
57/// ```
58/// use fauxgen::GeneratorTryStream;
59///
60/// #[fauxgen::generator(yield = i32)]
61/// fn my_stream() -> Result<(), &'static str> {
62/// r#yield!(44);
63/// r#yield!(88);
64/// Err("ran out of numbers")
65/// }
66///
67/// let stream = GeneratorTryStream::new(my_stream());
68/// ```
69pub struct GeneratorTryStream<G> {
70 gen: G,
71 done: bool,
72}
73
74impl<G> GeneratorTryStream<G> {
75 /// Create a stream from an existing generator.
76 pub fn new(gen: G) -> Self {
77 Self { gen, done: false }
78 }
79
80 /// Convert this stream back into the generator.
81 pub fn into_inner(self) -> G {
82 self.gen
83 }
84}
85
86impl<G, E> Stream for GeneratorTryStream<G>
87where
88 G: AsyncGenerator<(), Return = Result<(), E>>,
89{
90 type Item = Result<G::Yield, E>;
91
92 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93 // SAFETY: This is just pin projection so it is safe.
94 let (gen, done) = unsafe {
95 let this = self.get_unchecked_mut();
96 (Pin::new_unchecked(&mut this.gen), &mut this.done)
97 };
98
99 if *done {
100 return Poll::Ready(None);
101 }
102
103 match gen.poll_resume(cx, Some(())) {
104 Poll::Pending => Poll::Pending,
105 Poll::Ready(GeneratorState::Yielded(item)) => Poll::Ready(Some(Ok(item))),
106 Poll::Ready(GeneratorState::Complete(result)) => {
107 *done = true;
108
109 match result {
110 Ok(()) => Poll::Ready(None),
111 Err(e) => Poll::Ready(Some(Err(e))),
112 }
113 }
114 }
115 }
116}