Skip to main content

async_stream_lite/
lib.rs

1#![allow(clippy::tabs_in_doc_comments)]
2#![cfg_attr(not(test), no_std)]
3
4extern crate alloc;
5extern crate core;
6
7use alloc::sync::{Arc, Weak};
8use core::{
9	cell::Cell,
10	future::Future,
11	pin::Pin,
12	sync::atomic::{AtomicBool, Ordering},
13	task::{Context, Poll}
14};
15
16use futures_core::stream::{FusedStream, Stream};
17
18#[cfg(test)]
19mod tests;
20mod r#try;
21
22pub(crate) struct SharedStore<T> {
23	entered: AtomicBool,
24	cell: Cell<Option<T>>
25}
26
27impl<T> Default for SharedStore<T> {
28	#[inline]
29	fn default() -> Self {
30		Self {
31			entered: AtomicBool::new(false),
32			cell: Cell::new(None)
33		}
34	}
35}
36
37impl<T> SharedStore<T> {
38	#[inline(always)]
39	pub fn has_value(&self) -> bool {
40		unsafe { &*self.cell.as_ptr() }.is_some()
41	}
42}
43
44unsafe impl<T> Sync for SharedStore<T> {}
45
46pub struct Yielder<T> {
47	pub(crate) store: Weak<SharedStore<T>>
48}
49
50impl<T> Yielder<T> {
51	#[inline]
52	pub fn y(&self, value: T) -> YieldFut<T> {
53		#[cold]
54		fn invalid_usage() -> ! {
55			panic!("attempted to use async-stream-lite yielder outside of stream context or across threads")
56		}
57
58		let Some(store) = self.store.upgrade() else {
59			invalid_usage();
60		};
61		if !store.entered.load(Ordering::Relaxed) {
62			invalid_usage();
63		}
64
65		store.cell.replace(Some(value));
66
67		YieldFut { store }
68	}
69
70	#[inline(always)]
71	#[deprecated = "use yielder.y() instead"]
72	pub fn r#yield(&self, value: T) -> YieldFut<T> {
73		self.y(value)
74	}
75}
76
77/// Future returned by an [`AsyncStream`]'s yield function.
78///
79/// This future must be `.await`ed inside the generator in order for the item to be yielded by the stream.
80#[must_use = "stream will not yield this item unless the future returned by yield is awaited"]
81pub struct YieldFut<T> {
82	store: Arc<SharedStore<T>>
83}
84
85impl<T> Unpin for YieldFut<T> {}
86
87impl<T> Future for YieldFut<T> {
88	type Output = ();
89
90	#[inline(always)]
91	fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
92		if !self.store.has_value() {
93			return Poll::Ready(());
94		}
95
96		Poll::Pending
97	}
98}
99
100struct Enter<'s, T> {
101	store: &'s SharedStore<T>
102}
103
104#[inline]
105fn enter<T>(store: &SharedStore<T>) -> Enter<'_, T> {
106	store.entered.store(true, Ordering::Relaxed);
107	Enter { store }
108}
109
110impl<T> Drop for Enter<'_, T> {
111	#[inline]
112	fn drop(&mut self) {
113		self.store.entered.store(false, Ordering::Relaxed);
114	}
115}
116
117pin_project_lite::pin_project! {
118	/// A [`Stream`] created from an asynchronous generator-like function.
119	///
120	/// To create an [`AsyncStream`], use the [`async_stream`] function.
121	pub struct AsyncStream<T, U> {
122		store: Arc<SharedStore<T>>,
123		done: bool,
124		#[pin]
125		generator: U
126	}
127}
128
129impl<T, U> FusedStream for AsyncStream<T, U>
130where
131	U: Future<Output = ()>
132{
133	#[inline(always)]
134	fn is_terminated(&self) -> bool {
135		self.done
136	}
137}
138
139impl<T, U> Stream for AsyncStream<T, U>
140where
141	U: Future<Output = ()>
142{
143	type Item = T;
144
145	#[inline]
146	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147		let me = self.project();
148		if *me.done {
149			return Poll::Ready(None);
150		}
151
152		let res = {
153			let _enter = enter(&me.store);
154			me.generator.poll(cx)
155		};
156
157		*me.done = res.is_ready();
158
159		if let Some(value) = me.store.cell.take() {
160			return Poll::Ready(Some(value));
161		}
162
163		if *me.done { Poll::Ready(None) } else { Poll::Pending }
164	}
165
166	#[inline]
167	fn size_hint(&self) -> (usize, Option<usize>) {
168		if self.done { (0, Some(0)) } else { (0, None) }
169	}
170}
171
172/// Create an asynchronous [`Stream`] from an asynchronous generator function.
173///
174/// The provided function will be given a [`Yielder`], which, when called, causes the stream to yield an item:
175/// ```
176/// use async_stream_lite::async_stream;
177/// use futures::{pin_mut, stream::StreamExt};
178///
179/// #[tokio::main]
180/// async fn main() {
181/// 	let stream = async_stream(|yielder| async move {
182/// 		for i in 0..3 {
183/// 			yielder.y(i).await;
184/// 		}
185/// 	});
186/// 	pin_mut!(stream);
187/// 	while let Some(value) = stream.next().await {
188/// 		println!("{value}");
189/// 	}
190/// }
191/// ```
192///
193/// Streams may be returned by using `impl Stream<Item = T>`:
194/// ```
195/// use async_stream_lite::async_stream;
196/// use futures::{
197/// 	pin_mut,
198/// 	stream::{Stream, StreamExt}
199/// };
200///
201/// fn zero_to_three() -> impl Stream<Item = u32> {
202/// 	async_stream(|yielder| async move {
203/// 		for i in 0..3 {
204/// 			yielder.y(i).await;
205/// 		}
206/// 	})
207/// }
208///
209/// #[tokio::main]
210/// async fn main() {
211/// 	let stream = zero_to_three();
212/// 	pin_mut!(stream);
213/// 	while let Some(value) = stream.next().await {
214/// 		println!("{value}");
215/// 	}
216/// }
217/// ```
218///
219/// or with [`futures::stream::BoxStream`]:
220/// ```
221/// use async_stream_lite::async_stream;
222/// use futures::{
223/// 	pin_mut,
224/// 	stream::{BoxStream, StreamExt}
225/// };
226///
227/// fn zero_to_three() -> BoxStream<'static, u32> {
228/// 	Box::pin(async_stream(|yielder| async move {
229/// 		for i in 0..3 {
230/// 			yielder.y(i).await;
231/// 		}
232/// 	}))
233/// }
234///
235/// #[tokio::main]
236/// async fn main() {
237/// 	let mut stream = zero_to_three();
238/// 	while let Some(value) = stream.next().await {
239/// 		println!("{value}");
240/// 	}
241/// }
242/// ```
243///
244/// Streams may also be implemented in terms of other streams:
245/// ```
246/// use async_stream_lite::async_stream;
247/// use futures::{
248/// 	pin_mut,
249/// 	stream::{Stream, StreamExt}
250/// };
251///
252/// fn zero_to_three() -> impl Stream<Item = u32> {
253/// 	async_stream(|yielder| async move {
254/// 		for i in 0..3 {
255/// 			yielder.y(i).await;
256/// 		}
257/// 	})
258/// }
259///
260/// fn double<S: Stream<Item = u32>>(input: S) -> impl Stream<Item = u32> {
261/// 	async_stream(|yielder| async move {
262/// 		pin_mut!(input);
263/// 		while let Some(value) = input.next().await {
264/// 			yielder.y(value * 2).await;
265/// 		}
266/// 	})
267/// }
268///
269/// #[tokio::main]
270/// async fn main() {
271/// 	let stream = double(zero_to_three());
272/// 	pin_mut!(stream);
273/// 	while let Some(value) = stream.next().await {
274/// 		println!("{value}");
275/// 	}
276/// }
277/// ```
278///
279/// See also [`try_async_stream`], a variant of [`async_stream`] which supports try notation (`?`).
280#[inline]
281pub fn async_stream<T, F, U>(generator: F) -> AsyncStream<T, U>
282where
283	F: FnOnce(Yielder<T>) -> U,
284	U: Future<Output = ()>
285{
286	let store = Arc::new(SharedStore::default());
287	let generator = generator(Yielder { store: Arc::downgrade(&store) });
288	AsyncStream { store, done: false, generator }
289}
290
291pub use self::r#try::{TryAsyncStream, try_async_stream};