stream_future/
lib.rs

1//! A [`Future`] with item yielded.
2//!
3//! ```
4//! #![feature(coroutines)]
5//!
6//! # use anyhow::{Result, Ok};
7//! # use stream_future::stream;
8//! #[derive(Debug)]
9//! enum Prog {
10//!     Stage1,
11//!     Stage2,
12//! }
13//!
14//! #[stream(Prog)]
15//! async fn foo() -> Result<i32> {
16//!     yield Prog::Stage1;
17//!     // some works...
18//!     yield Prog::Stage2;
19//!     // some other works...
20//!     Ok(0)
21//! }
22//!
23//! # use tokio_stream::StreamExt;
24//! # #[tokio::main(flavor = "current_thread")]
25//! # async fn main() -> Result<()> {
26//! let bar = foo();
27//! tokio::pin!(bar);
28//! while let Some(prog) = bar.next().await {
29//!     println!("{:?}", prog);
30//! }
31//! let bar = bar.await?;
32//! assert_eq!(bar, 0);
33//! # Ok(())
34//! # }
35//! ```
36//!
37//! If a lifetime is needed, specify it in the attribute:
38//!
39//! ```
40//! #![feature(coroutines)]
41//!
42//! # use stream_future::stream;
43//! enum Prog {
44//!     Stage1,
45//!     Stage2,
46//! }
47//!
48//! #[stream(Prog, lifetime = 'a)]
49//! async fn foo<'a>(s: &'a str) {
50//!     yield Prog::Stage1;
51//!     println!("{}", s);
52//!     yield Prog::Stage2;
53//! }
54//!
55//! # #[tokio::main(flavor = "current_thread")]
56//! # async fn main() {
57//! foo("Hello world!").await;
58//! # }
59//! ```
60//!
61//! There's also a macro [`try_stream`] (usually used) to implement a stream iterates [`Result`].
62//!
63//! ```
64//! #![feature(coroutines)]
65//!
66//! # use stream_future::try_stream;
67//! # use anyhow::Result;
68//! #[derive(Debug)]
69//! enum Prog {
70//!     Stage1,
71//!     Stage2,
72//! }
73//!
74//! #[try_stream(Prog)]
75//! async fn foo() -> Result<()> {
76//!     yield Prog::Stage1;
77//!     // some works...
78//!     yield Prog::Stage2;
79//!     // some other works...
80//!     Ok(())
81//! }
82//!
83//! # use tokio_stream::StreamExt;
84//! # #[tokio::main(flavor = "current_thread")]
85//! # async fn main() -> Result<()> {
86//! let bar = foo();
87//! tokio::pin!(bar);
88//! while let Some(prog) = bar.try_next().await? {
89//!     println!("{:?}", prog);
90//! }
91//! # Ok(())
92//! # }
93//! ```
94
95#![no_std]
96#![warn(missing_docs)]
97#![feature(coroutine_trait)]
98#![feature(trait_alias)]
99#![feature(try_trait_v2, try_trait_v2_residual)]
100
101use core::{
102    future::Future,
103    ops::{ControlFlow, Coroutine, CoroutineState, FromResidual, Residual, Try},
104    pin::Pin,
105    ptr::NonNull,
106    task::{Context, Poll},
107};
108use pin_project::pin_project;
109
110#[doc(no_inline)]
111pub use futures_core::Stream;
112#[doc(no_inline)]
113pub use stream_future_impl::{stream, try_stream};
114
115/// See [`core::future::ResumeTy`].
116#[doc(hidden)]
117#[derive(Debug, Copy, Clone)]
118pub struct ResumeTy(NonNull<Context<'static>>);
119
120unsafe impl Send for ResumeTy {}
121unsafe impl Sync for ResumeTy {}
122
123impl ResumeTy {
124    pub fn get_context<'a, 'b>(self) -> &'a mut Context<'b> {
125        unsafe { &mut *self.0.as_ptr().cast() }
126    }
127
128    pub fn poll_future<F: Future>(self, f: Pin<&mut F>) -> Poll<F::Output> {
129        f.poll(self.get_context())
130    }
131}
132
133#[doc(hidden)]
134#[pin_project]
135pub struct GenStreamFuture<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> {
136    #[pin]
137    gen: T,
138    ret: Option<T::Return>,
139}
140
141impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> GenStreamFuture<P, T> {
142    pub const fn new(gen: T) -> Self {
143        Self { gen, ret: None }
144    }
145}
146
147impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> Future for GenStreamFuture<P, T> {
148    type Output = T::Return;
149
150    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151        let cx = NonNull::from(cx);
152        let this = self.project();
153        if let Some(x) = this.ret.take() {
154            Poll::Ready(x)
155        } else {
156            let gen = this.gen;
157            match gen.resume(ResumeTy(cx.cast())) {
158                CoroutineState::Yielded(p) => match p {
159                    Poll::Pending => Poll::Pending,
160                    Poll::Ready(_) => {
161                        unsafe { cx.as_ref() }.waker().wake_by_ref();
162                        Poll::Pending
163                    }
164                },
165                CoroutineState::Complete(x) => Poll::Ready(x),
166            }
167        }
168    }
169}
170
171impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> Stream for GenStreamFuture<P, T> {
172    type Item = P;
173
174    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
175        let this = self.project();
176        let gen = this.gen;
177        match gen.resume(ResumeTy(NonNull::from(cx).cast())) {
178            CoroutineState::Yielded(p) => match p {
179                Poll::Pending => Poll::Pending,
180                Poll::Ready(p) => Poll::Ready(Some(p)),
181            },
182            CoroutineState::Complete(x) => {
183                *this.ret = Some(x);
184                Poll::Ready(None)
185            }
186        }
187    }
188}
189
190#[doc(hidden)]
191pub type TryStreamItemType<R, P> = <<R as Try>::Residual as Residual<P>>::TryType;
192
193#[doc(hidden)]
194#[pin_project]
195pub struct GenTryStreamFuture<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try>> {
196    #[pin]
197    gen: T,
198    ret: Option<<T::Return as Try>::Output>,
199}
200
201impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try>> GenTryStreamFuture<P, T> {
202    pub const fn new(gen: T) -> Self {
203        Self { gen, ret: None }
204    }
205}
206
207impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try>> Future for GenTryStreamFuture<P, T> {
208    type Output = T::Return;
209
210    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211        let cx = NonNull::from(cx);
212        let this = self.project();
213        if let Some(x) = this.ret.take() {
214            Poll::Ready(T::Return::from_output(x))
215        } else {
216            let gen = this.gen;
217            match gen.resume(ResumeTy(cx.cast())) {
218                CoroutineState::Yielded(p) => match p {
219                    Poll::Pending => Poll::Pending,
220                    Poll::Ready(_) => {
221                        unsafe { cx.as_ref() }.waker().wake_by_ref();
222                        Poll::Pending
223                    }
224                },
225                CoroutineState::Complete(x) => Poll::Ready(x),
226            }
227        }
228    }
229}
230
231impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try<Residual: Residual<P>>>> Stream
232    for GenTryStreamFuture<P, T>
233{
234    type Item = <<T::Return as Try>::Residual as Residual<P>>::TryType;
235
236    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
237        let this = self.project();
238        let gen = this.gen;
239        match gen.resume(ResumeTy(NonNull::from(cx).cast())) {
240            CoroutineState::Yielded(p) => match p {
241                Poll::Pending => Poll::Pending,
242                Poll::Ready(p) => Poll::Ready(Some(Self::Item::from_output(p))),
243            },
244            CoroutineState::Complete(x) => match x.branch() {
245                ControlFlow::Continue(x) => {
246                    *this.ret = Some(x);
247                    Poll::Ready(None)
248                }
249                ControlFlow::Break(e) => Poll::Ready(Some(Self::Item::from_residual(e))),
250            },
251        }
252    }
253}