1#![allow(clippy::tabs_in_doc_comments)]
2#![cfg_attr(not(test), no_std)]
3
4extern crate alloc;
5extern crate core;
6
7use alloc::sync::{Arc, Weak};
8use core::{
9 cell::Cell,
10 future::Future,
11 pin::Pin,
12 sync::atomic::{AtomicBool, Ordering},
13 task::{Context, Poll}
14};
15
16use futures_core::stream::{FusedStream, Stream};
17
18#[cfg(test)]
19mod tests;
20mod r#try;
21
22pub(crate) struct SharedStore<T> {
23 entered: AtomicBool,
24 cell: Cell<Option<T>>
25}
26
27impl<T> Default for SharedStore<T> {
28 #[inline]
29 fn default() -> Self {
30 Self {
31 entered: AtomicBool::new(false),
32 cell: Cell::new(None)
33 }
34 }
35}
36
37impl<T> SharedStore<T> {
38 #[inline(always)]
39 pub fn has_value(&self) -> bool {
40 unsafe { &*self.cell.as_ptr() }.is_some()
41 }
42}
43
44unsafe impl<T> Sync for SharedStore<T> {}
45
46pub struct Yielder<T> {
47 pub(crate) store: Weak<SharedStore<T>>
48}
49
50impl<T> Yielder<T> {
51 #[inline]
52 pub fn y(&self, value: T) -> YieldFut<T> {
53 #[cold]
54 fn invalid_usage() -> ! {
55 panic!("attempted to use async-stream-lite yielder outside of stream context or across threads")
56 }
57
58 let Some(store) = self.store.upgrade() else {
59 invalid_usage();
60 };
61 if !store.entered.load(Ordering::Relaxed) {
62 invalid_usage();
63 }
64
65 store.cell.replace(Some(value));
66
67 YieldFut { store }
68 }
69
70 #[inline(always)]
71 #[deprecated = "use yielder.y() instead"]
72 pub fn r#yield(&self, value: T) -> YieldFut<T> {
73 self.y(value)
74 }
75}
76
77#[must_use = "stream will not yield this item unless the future returned by yield is awaited"]
81pub struct YieldFut<T> {
82 store: Arc<SharedStore<T>>
83}
84
85impl<T> Unpin for YieldFut<T> {}
86
87impl<T> Future for YieldFut<T> {
88 type Output = ();
89
90 #[inline(always)]
91 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
92 if !self.store.has_value() {
93 return Poll::Ready(());
94 }
95
96 Poll::Pending
97 }
98}
99
100struct Enter<'s, T> {
101 store: &'s SharedStore<T>
102}
103
104#[inline]
105fn enter<T>(store: &SharedStore<T>) -> Enter<'_, T> {
106 store.entered.store(true, Ordering::Relaxed);
107 Enter { store }
108}
109
110impl<T> Drop for Enter<'_, T> {
111 #[inline]
112 fn drop(&mut self) {
113 self.store.entered.store(false, Ordering::Relaxed);
114 }
115}
116
117pin_project_lite::pin_project! {
118 pub struct AsyncStream<T, U> {
122 store: Arc<SharedStore<T>>,
123 done: bool,
124 #[pin]
125 generator: U
126 }
127}
128
129impl<T, U> FusedStream for AsyncStream<T, U>
130where
131 U: Future<Output = ()>
132{
133 #[inline(always)]
134 fn is_terminated(&self) -> bool {
135 self.done
136 }
137}
138
139impl<T, U> Stream for AsyncStream<T, U>
140where
141 U: Future<Output = ()>
142{
143 type Item = T;
144
145 #[inline]
146 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147 let me = self.project();
148 if *me.done {
149 return Poll::Ready(None);
150 }
151
152 let res = {
153 let _enter = enter(&me.store);
154 me.generator.poll(cx)
155 };
156
157 *me.done = res.is_ready();
158
159 if let Some(value) = me.store.cell.take() {
160 return Poll::Ready(Some(value));
161 }
162
163 if *me.done { Poll::Ready(None) } else { Poll::Pending }
164 }
165
166 #[inline]
167 fn size_hint(&self) -> (usize, Option<usize>) {
168 if self.done { (0, Some(0)) } else { (0, None) }
169 }
170}
171
172#[inline]
281pub fn async_stream<T, F, U>(generator: F) -> AsyncStream<T, U>
282where
283 F: FnOnce(Yielder<T>) -> U,
284 U: Future<Output = ()>
285{
286 let store = Arc::new(SharedStore::default());
287 let generator = generator(Yielder { store: Arc::downgrade(&store) });
288 AsyncStream { store, done: false, generator }
289}
290
291pub use self::r#try::{TryAsyncStream, try_async_stream};