async_gen/
async_coroutine.rs1use std::{
2 cell::UnsafeCell,
3 future::{poll_fn, Future},
4 pin::Pin,
5 ptr::NonNull,
6 task::{Context, Poll},
7};
8
9use crate::{AsyncGenerator, AsyncIter, GeneratorState};
10
11pub fn gen<Fut, Y, R>(fut: impl FnOnce(Yielder<Y>) -> Fut) -> AsyncGen<Fut, Y>
34where
35 Fut: Future<Output = Return<R>>,
36{
37 let cell = Box::new(UnsafeCell::new(None));
38 let data = Box::leak(cell).into();
39 let fut = fut(Yielder {
40 cell: Cell { data },
41 });
42 AsyncGen {
43 cell: Cell { data },
44 fut,
45 }
46}
47
48struct Cell<Y> {
49 data: NonNull<UnsafeCell<Option<Y>>>,
50}
51
52impl<Y> Cell<Y> {
53 #[inline]
54 fn data(&self) -> &UnsafeCell<Option<Y>> {
55 unsafe { self.data.as_ref() }
56 }
57}
58
59unsafe impl<Y: Send> Send for Cell<Y> {}
60unsafe impl<Y: Send + Sync> Sync for Cell<Y> {}
61
62#[repr(transparent)]
67pub struct Return<T = ()>(T);
68
69#[doc(hidden)]
70pub struct Yielder<Y = ()> {
71 cell: Cell<Y>,
72}
73
74impl<Y> Yielder<Y> {
75 pub fn yield_(&mut self, val: Y) -> impl Future<Output = ()> + use<'_, Y> {
79 unsafe {
88 *self.cell.data().get() = Some(val);
89 }
90
91 poll_fn(|_| {
92 if unsafe { (*self.cell.data().get()).is_some() } {
93 return Poll::Pending;
94 }
95 Poll::Ready(())
96 })
97 }
98
99 #[inline]
100 pub fn return_<R>(self, v: R) -> Return<R> {
101 Return(v)
102 }
103}
104
105pub struct AsyncGen<Fut, Y> {
109 cell: Cell<Y>,
110 fut: Fut,
111}
112
113impl<Fut, Y> Drop for AsyncGen<Fut, Y> {
114 fn drop(&mut self) {
115 unsafe {
116 drop(Box::from_raw(self.cell.data.as_ptr()));
117 };
118 }
119}
120
121impl<Fut, Y, R> AsyncGen<Fut, Y>
122where
123 Fut: Future<Output = Return<R>>,
124{
125 pub fn poll_resume(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<GeneratorState<Y, R>> {
127 let me = unsafe { self.get_unchecked_mut() };
128 match unsafe { Pin::new_unchecked(&mut me.fut).poll(cx) } {
129 Poll::Ready(Return(val)) => Poll::Ready(GeneratorState::Complete(val)),
130 Poll::Pending => {
131 unsafe {
134 if let Some(val) = (*me.cell.data().get()).take() {
135 return Poll::Ready(GeneratorState::Yielded(val));
136 }
137 }
138 Poll::Pending
139 }
140 }
141 }
142
143 pub async fn resume(self: &mut Pin<&mut Self>) -> GeneratorState<Y, R> {
145 poll_fn(|cx| self.as_mut().poll_resume(cx)).await
146 }
147}
148
149impl<Fut, Y> AsyncGen<Fut, Y>
150where
151 Fut: Future<Output = Return<()>>,
152{
153 pub fn into_async_iter(self) -> AsyncIter<Self> {
157 AsyncIter::from(self)
158 }
159
160 pub fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Y>> {
162 let me = unsafe { self.get_unchecked_mut() };
163 match unsafe { Pin::new_unchecked(&mut me.fut).poll(cx) } {
164 Poll::Ready(Return(())) => Poll::Ready(None),
165 Poll::Pending => {
166 unsafe {
169 if let Some(val) = (*me.cell.data().get()).take() {
170 return Poll::Ready(Some(val));
171 }
172 }
173 Poll::Pending
174 }
175 }
176 }
177}
178
179impl<Fut, Y, R> AsyncGenerator for AsyncGen<Fut, Y>
180where
181 Fut: Future<Output = Return<R>>,
182{
183 type Yield = Y;
184 type Return = R;
185
186 fn poll_resume(
187 self: Pin<&mut Self>,
188 cx: &mut Context<'_>,
189 ) -> Poll<GeneratorState<Self::Yield, Self::Return>> {
190 AsyncGen::poll_resume(self, cx)
191 }
192}