1#![allow(clippy::tabs_in_doc_comments)]
2#![cfg_attr(not(test), no_std)]
3
4extern crate alloc;
5extern crate core;
6
7use alloc::sync::{Arc, Weak};
8use core::{
9 cell::Cell,
10 future::Future,
11 pin::Pin,
12 sync::atomic::{AtomicBool, Ordering},
13 task::{Context, Poll}
14};
15
16use futures_core::stream::{FusedStream, Stream};
17
18#[cfg(test)]
19mod tests;
20mod r#try;
21
22pub(crate) struct SharedStore<T> {
23 entered: AtomicBool,
24 cell: Cell<Option<T>>
25}
26
27impl<T> Default for SharedStore<T> {
28 fn default() -> Self {
29 Self {
30 entered: AtomicBool::new(false),
31 cell: Cell::new(None)
32 }
33 }
34}
35
36impl<T> SharedStore<T> {
37 pub fn has_value(&self) -> bool {
38 unsafe { &*self.cell.as_ptr() }.is_some()
39 }
40}
41
42unsafe impl<T> Sync for SharedStore<T> {}
43
44pub struct Yielder<T> {
45 pub(crate) store: Weak<SharedStore<T>>
46}
47
48impl<T> Yielder<T> {
49 pub fn r#yield(&self, value: T) -> YieldFut<T> {
50 #[cold]
51 fn invalid_usage() -> ! {
52 panic!("attempted to use async_stream_lite yielder outside of stream context or across threads")
53 }
54
55 let Some(store) = self.store.upgrade() else {
56 invalid_usage();
57 };
58 if !store.entered.load(Ordering::Relaxed) {
59 invalid_usage();
60 }
61
62 store.cell.replace(Some(value));
63
64 YieldFut { store }
65 }
66}
67
68#[must_use = "stream will not yield this item unless the future returned by yield is awaited"]
72pub struct YieldFut<T> {
73 store: Arc<SharedStore<T>>
74}
75
76impl<T> Unpin for YieldFut<T> {}
77
78impl<T> Future for YieldFut<T> {
79 type Output = ();
80
81 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
82 if !self.store.has_value() {
83 return Poll::Ready(());
84 }
85
86 Poll::Pending
87 }
88}
89
90struct Enter<'s, T> {
91 store: &'s SharedStore<T>
92}
93
94fn enter<T>(store: &SharedStore<T>) -> Enter<'_, T> {
95 store.entered.store(true, Ordering::Relaxed);
96 Enter { store }
97}
98
99impl<T> Drop for Enter<'_, T> {
100 fn drop(&mut self) {
101 self.store.entered.store(false, Ordering::Relaxed);
102 }
103}
104
105pin_project_lite::pin_project! {
106 pub struct AsyncStream<T, U> {
110 store: Arc<SharedStore<T>>,
111 done: bool,
112 #[pin]
113 generator: U
114 }
115}
116
117impl<T, U> FusedStream for AsyncStream<T, U>
118where
119 U: Future<Output = ()>
120{
121 fn is_terminated(&self) -> bool {
122 self.done
123 }
124}
125
126impl<T, U> Stream for AsyncStream<T, U>
127where
128 U: Future<Output = ()>
129{
130 type Item = T;
131
132 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133 let me = self.project();
134 if *me.done {
135 return Poll::Ready(None);
136 }
137
138 let res = {
139 let _enter = enter(&me.store);
140 me.generator.poll(cx)
141 };
142
143 *me.done = res.is_ready();
144
145 if me.store.has_value() {
146 return Poll::Ready(me.store.cell.take());
147 }
148
149 if *me.done { Poll::Ready(None) } else { Poll::Pending }
150 }
151
152 fn size_hint(&self) -> (usize, Option<usize>) {
153 if self.done { (0, Some(0)) } else { (0, None) }
154 }
155}
156
157pub fn async_stream<T, F, U>(generator: F) -> AsyncStream<T, U>
266where
267 F: FnOnce(Yielder<T>) -> U,
268 U: Future<Output = ()>
269{
270 let store = Arc::new(SharedStore::default());
271 let generator = generator(Yielder { store: Arc::downgrade(&store) });
272 AsyncStream { store, done: false, generator }
273}
274
275pub use self::r#try::{TryAsyncStream, try_async_stream};