async_stream_lite/
lib.rs

1#![allow(clippy::tabs_in_doc_comments)]
2#![cfg_attr(not(test), no_std)]
3
4extern crate alloc;
5extern crate core;
6
7use alloc::sync::{Arc, Weak};
8use core::{
9	cell::Cell,
10	future::Future,
11	pin::Pin,
12	sync::atomic::{AtomicBool, Ordering},
13	task::{Context, Poll}
14};
15
16use futures_core::stream::{FusedStream, Stream};
17
18#[cfg(test)]
19mod tests;
20mod r#try;
21
22pub(crate) struct SharedStore<T> {
23	entered: AtomicBool,
24	cell: Cell<Option<T>>
25}
26
27impl<T> Default for SharedStore<T> {
28	fn default() -> Self {
29		Self {
30			entered: AtomicBool::new(false),
31			cell: Cell::new(None)
32		}
33	}
34}
35
36impl<T> SharedStore<T> {
37	pub fn has_value(&self) -> bool {
38		unsafe { &*self.cell.as_ptr() }.is_some()
39	}
40}
41
42unsafe impl<T> Sync for SharedStore<T> {}
43
44pub struct Yielder<T> {
45	pub(crate) store: Weak<SharedStore<T>>
46}
47
48impl<T> Yielder<T> {
49	pub fn r#yield(&self, value: T) -> YieldFut<T> {
50		#[cold]
51		fn invalid_usage() -> ! {
52			panic!("attempted to use async_stream_lite yielder outside of stream context or across threads")
53		}
54
55		let Some(store) = self.store.upgrade() else {
56			invalid_usage();
57		};
58		if !store.entered.load(Ordering::Relaxed) {
59			invalid_usage();
60		}
61
62		store.cell.replace(Some(value));
63
64		YieldFut { store }
65	}
66}
67
68/// Future returned by an [`AsyncStream`]'s yield function.
69///
70/// This future must be `.await`ed inside the generator in order for the item to be yielded by the stream.
71#[must_use = "stream will not yield this item unless the future returned by yield is awaited"]
72pub struct YieldFut<T> {
73	store: Arc<SharedStore<T>>
74}
75
76impl<T> Unpin for YieldFut<T> {}
77
78impl<T> Future for YieldFut<T> {
79	type Output = ();
80
81	fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
82		if !self.store.has_value() {
83			return Poll::Ready(());
84		}
85
86		Poll::Pending
87	}
88}
89
90struct Enter<'s, T> {
91	store: &'s SharedStore<T>
92}
93
94fn enter<T>(store: &SharedStore<T>) -> Enter<'_, T> {
95	store.entered.store(true, Ordering::Relaxed);
96	Enter { store }
97}
98
99impl<T> Drop for Enter<'_, T> {
100	fn drop(&mut self) {
101		self.store.entered.store(false, Ordering::Relaxed);
102	}
103}
104
105pin_project_lite::pin_project! {
106	/// A [`Stream`] created from an asynchronous generator-like function.
107	///
108	/// To create an [`AsyncStream`], use the [`async_stream`] function.
109	pub struct AsyncStream<T, U> {
110		store: Arc<SharedStore<T>>,
111		done: bool,
112		#[pin]
113		generator: U
114	}
115}
116
117impl<T, U> FusedStream for AsyncStream<T, U>
118where
119	U: Future<Output = ()>
120{
121	fn is_terminated(&self) -> bool {
122		self.done
123	}
124}
125
126impl<T, U> Stream for AsyncStream<T, U>
127where
128	U: Future<Output = ()>
129{
130	type Item = T;
131
132	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133		let me = self.project();
134		if *me.done {
135			return Poll::Ready(None);
136		}
137
138		let res = {
139			let _enter = enter(&me.store);
140			me.generator.poll(cx)
141		};
142
143		*me.done = res.is_ready();
144
145		if me.store.has_value() {
146			return Poll::Ready(me.store.cell.take());
147		}
148
149		if *me.done { Poll::Ready(None) } else { Poll::Pending }
150	}
151
152	fn size_hint(&self) -> (usize, Option<usize>) {
153		if self.done { (0, Some(0)) } else { (0, None) }
154	}
155}
156
157/// Create an asynchronous [`Stream`] from an asynchronous generator function.
158///
159/// The provided function will be given a [`Yielder`], which, when called, causes the stream to yield an item:
160/// ```
161/// use async_stream_lite::async_stream;
162/// use futures::{pin_mut, stream::StreamExt};
163///
164/// #[tokio::main]
165/// async fn main() {
166/// 	let stream = async_stream(|yielder| async move {
167/// 		for i in 0..3 {
168/// 			yielder.r#yield(i).await;
169/// 		}
170/// 	});
171/// 	pin_mut!(stream);
172/// 	while let Some(value) = stream.next().await {
173/// 		println!("{value}");
174/// 	}
175/// }
176/// ```
177///
178/// Streams may be returned by using `impl Stream<Item = T>`:
179/// ```
180/// use async_stream_lite::async_stream;
181/// use futures::{
182/// 	pin_mut,
183/// 	stream::{Stream, StreamExt}
184/// };
185///
186/// fn zero_to_three() -> impl Stream<Item = u32> {
187/// 	async_stream(|yielder| async move {
188/// 		for i in 0..3 {
189/// 			yielder.r#yield(i).await;
190/// 		}
191/// 	})
192/// }
193///
194/// #[tokio::main]
195/// async fn main() {
196/// 	let stream = zero_to_three();
197/// 	pin_mut!(stream);
198/// 	while let Some(value) = stream.next().await {
199/// 		println!("{value}");
200/// 	}
201/// }
202/// ```
203///
204/// or with [`futures::stream::BoxStream`]:
205/// ```
206/// use async_stream_lite::async_stream;
207/// use futures::{
208/// 	pin_mut,
209/// 	stream::{BoxStream, StreamExt}
210/// };
211///
212/// fn zero_to_three() -> BoxStream<'static, u32> {
213/// 	Box::pin(async_stream(|yielder| async move {
214/// 		for i in 0..3 {
215/// 			yielder.r#yield(i).await;
216/// 		}
217/// 	}))
218/// }
219///
220/// #[tokio::main]
221/// async fn main() {
222/// 	let mut stream = zero_to_three();
223/// 	while let Some(value) = stream.next().await {
224/// 		println!("{value}");
225/// 	}
226/// }
227/// ```
228///
229/// Streams may also be implemented in terms of other streams:
230/// ```
231/// use async_stream_lite::async_stream;
232/// use futures::{
233/// 	pin_mut,
234/// 	stream::{Stream, StreamExt}
235/// };
236///
237/// fn zero_to_three() -> impl Stream<Item = u32> {
238/// 	async_stream(|yielder| async move {
239/// 		for i in 0..3 {
240/// 			yielder.r#yield(i).await;
241/// 		}
242/// 	})
243/// }
244///
245/// fn double<S: Stream<Item = u32>>(input: S) -> impl Stream<Item = u32> {
246/// 	async_stream(|yielder| async move {
247/// 		pin_mut!(input);
248/// 		while let Some(value) = input.next().await {
249/// 			yielder.r#yield(value * 2).await;
250/// 		}
251/// 	})
252/// }
253///
254/// #[tokio::main]
255/// async fn main() {
256/// 	let stream = double(zero_to_three());
257/// 	pin_mut!(stream);
258/// 	while let Some(value) = stream.next().await {
259/// 		println!("{value}");
260/// 	}
261/// }
262/// ```
263///
264/// See also [`try_async_stream`], a variant of [`async_stream`] which supports try notation (`?`).
265pub fn async_stream<T, F, U>(generator: F) -> AsyncStream<T, U>
266where
267	F: FnOnce(Yielder<T>) -> U,
268	U: Future<Output = ()>
269{
270	let store = Arc::new(SharedStore::default());
271	let generator = generator(Yielder { store: Arc::downgrade(&store) });
272	AsyncStream { store, done: false, generator }
273}
274
275pub use self::r#try::{TryAsyncStream, try_async_stream};