1#![no_std]
96#![warn(missing_docs)]
97#![feature(coroutine_trait)]
98#![feature(trait_alias)]
99#![feature(try_trait_v2, try_trait_v2_residual)]
100
101use core::{
102 future::Future,
103 ops::{ControlFlow, Coroutine, CoroutineState, FromResidual, Residual, Try},
104 pin::Pin,
105 ptr::NonNull,
106 task::{Context, Poll},
107};
108use pin_project::pin_project;
109
110#[doc(no_inline)]
111pub use futures_core::Stream;
112#[doc(no_inline)]
113pub use stream_future_impl::{stream, try_stream};
114
115#[doc(hidden)]
117#[derive(Debug, Copy, Clone)]
118pub struct ResumeTy(NonNull<Context<'static>>);
119
120unsafe impl Send for ResumeTy {}
121unsafe impl Sync for ResumeTy {}
122
123impl ResumeTy {
124 pub fn get_context<'a, 'b>(self) -> &'a mut Context<'b> {
125 unsafe { &mut *self.0.as_ptr().cast() }
126 }
127
128 pub fn poll_future<F: Future>(self, f: Pin<&mut F>) -> Poll<F::Output> {
129 f.poll(self.get_context())
130 }
131}
132
133#[doc(hidden)]
134#[pin_project]
135pub struct GenStreamFuture<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> {
136 #[pin]
137 gen: T,
138 ret: Option<T::Return>,
139}
140
141impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> GenStreamFuture<P, T> {
142 pub const fn new(gen: T) -> Self {
143 Self { gen, ret: None }
144 }
145}
146
147impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> Future for GenStreamFuture<P, T> {
148 type Output = T::Return;
149
150 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151 let cx = NonNull::from(cx);
152 let this = self.project();
153 if let Some(x) = this.ret.take() {
154 Poll::Ready(x)
155 } else {
156 let gen = this.gen;
157 match gen.resume(ResumeTy(cx.cast())) {
158 CoroutineState::Yielded(p) => match p {
159 Poll::Pending => Poll::Pending,
160 Poll::Ready(_) => {
161 unsafe { cx.as_ref() }.waker().wake_by_ref();
162 Poll::Pending
163 }
164 },
165 CoroutineState::Complete(x) => Poll::Ready(x),
166 }
167 }
168 }
169}
170
171impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>>> Stream for GenStreamFuture<P, T> {
172 type Item = P;
173
174 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
175 let this = self.project();
176 let gen = this.gen;
177 match gen.resume(ResumeTy(NonNull::from(cx).cast())) {
178 CoroutineState::Yielded(p) => match p {
179 Poll::Pending => Poll::Pending,
180 Poll::Ready(p) => Poll::Ready(Some(p)),
181 },
182 CoroutineState::Complete(x) => {
183 *this.ret = Some(x);
184 Poll::Ready(None)
185 }
186 }
187 }
188}
189
190#[doc(hidden)]
191pub type TryStreamItemType<R, P> = <<R as Try>::Residual as Residual<P>>::TryType;
192
193#[doc(hidden)]
194#[pin_project]
195pub struct GenTryStreamFuture<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try>> {
196 #[pin]
197 gen: T,
198 ret: Option<<T::Return as Try>::Output>,
199}
200
201impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try>> GenTryStreamFuture<P, T> {
202 pub const fn new(gen: T) -> Self {
203 Self { gen, ret: None }
204 }
205}
206
207impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try>> Future for GenTryStreamFuture<P, T> {
208 type Output = T::Return;
209
210 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211 let cx = NonNull::from(cx);
212 let this = self.project();
213 if let Some(x) = this.ret.take() {
214 Poll::Ready(T::Return::from_output(x))
215 } else {
216 let gen = this.gen;
217 match gen.resume(ResumeTy(cx.cast())) {
218 CoroutineState::Yielded(p) => match p {
219 Poll::Pending => Poll::Pending,
220 Poll::Ready(_) => {
221 unsafe { cx.as_ref() }.waker().wake_by_ref();
222 Poll::Pending
223 }
224 },
225 CoroutineState::Complete(x) => Poll::Ready(x),
226 }
227 }
228 }
229}
230
231impl<P, T: Coroutine<ResumeTy, Yield = Poll<P>, Return: Try<Residual: Residual<P>>>> Stream
232 for GenTryStreamFuture<P, T>
233{
234 type Item = <<T::Return as Try>::Residual as Residual<P>>::TryType;
235
236 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
237 let this = self.project();
238 let gen = this.gen;
239 match gen.resume(ResumeTy(NonNull::from(cx).cast())) {
240 CoroutineState::Yielded(p) => match p {
241 Poll::Pending => Poll::Pending,
242 Poll::Ready(p) => Poll::Ready(Some(Self::Item::from_output(p))),
243 },
244 CoroutineState::Complete(x) => match x.branch() {
245 ControlFlow::Continue(x) => {
246 *this.ret = Some(x);
247 Poll::Ready(None)
248 }
249 ControlFlow::Break(e) => Poll::Ready(Some(Self::Item::from_residual(e))),
250 },
251 }
252 }
253}