scoped_stream_sink/
scoped_stream.rs

1use alloc::boxed::Box;
2use core::cell::{Cell, UnsafeCell};
3use core::convert::Infallible;
4use core::future::Future;
5use core::marker::{PhantomData, PhantomPinned};
6use core::mem::transmute;
7use core::ops::DerefMut;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use futures_core::Stream;
12use futures_sink::Sink;
13use pin_project_lite::pin_project;
14
15#[cfg(feature = "std")]
16use crate::LocalThread;
17
18#[cfg(feature = "std")]
19type DynStreamFut<'scope> = Pin<Box<dyn Future<Output = ()> + Send + 'scope>>;
20#[cfg(feature = "std")]
21type DynTryStreamFut<'scope, E> = Pin<Box<dyn Future<Output = Result<(), E>> + Send + 'scope>>;
22
23#[cfg(feature = "std")]
24pin_project! {
25    /// Stream with a scoped future.
26    ///
27    /// It is useful to easily create [`Stream`] type, without
28    /// hassle of manually constructing one or using macros
29    /// (like [`async_stream`](https://docs.rs/async-stream/latest/async_stream/)).
30    /// Safety is guaranteed by carefully scoping [`StreamInner`],
31    /// similiar to [`scope`](std::thread::scope).
32    #[must_use = "Stream will not do anything if not used"]
33    pub struct ScopedStream<'env, T> {
34        fut: Option<DynStreamFut<'env>>,
35
36        data: Pin<Box<StreamInner<'env, 'env, T>>>,
37    }
38}
39
40#[cfg(feature = "std")]
41pin_project! {
42    /// Fallible stream with a scoped future.
43    ///
44    /// Similiar to [`ScopedStream`], but allows for an error type. Future inside may fail,
45    /// unlike [`ScopedStream`]. Also, the inner [`TryStreamInner`] allows for either sending
46    /// an item or [`Result`] type.
47    #[must_use = "Stream will not do anything if not used"]
48    pub struct ScopedTryStream<'env, T, E> {
49        fut: Option<DynTryStreamFut<'env, E>>,
50
51        data: Pin<Box<TryStreamInner<'env, 'env, T, E>>>,
52    }
53}
54
55struct StreamInnerData<T> {
56    data: UnsafeCell<Option<T>>,
57    closed: Cell<bool>,
58
59    // Borrow technique from Tokio to pass pesky Miri :table-flip:
60    // <https://github.com/rust-lang/rust/pull/82834>
61    _pinned: PhantomPinned,
62}
63
64// SAFETY: We don't ever use immutable borrow for any of the operations, so it's automatically Sync too.
65// Similar to unstable Exclusive struct.
66unsafe impl<T: Send> Send for StreamInnerData<T> {}
67unsafe impl<T: Send> Sync for StreamInnerData<T> {}
68
69impl<T> StreamInnerData<T> {
70    const fn new() -> Self {
71        Self {
72            data: UnsafeCell::new(None),
73            closed: Cell::new(false),
74            _pinned: PhantomPinned,
75        }
76    }
77}
78
79#[cfg(feature = "std")]
80pin_project! {
81    /// Inner type of [`ScopedStream`].
82    ///
83    /// Implements [`Sink`] to send data for the stream.
84    ///
85    /// # Note About Thread-safety
86    ///
87    /// Even though [`StreamInner`] is both [`Send`] and [`Sink`], it's reference
88    /// **should** not be sent across thread. This is currently impossible, due to
89    /// lack of async version of [`scope`](std::thread::scope).
90    /// To future-proof that possibility, any usage of it will panic if called from different
91    /// thread than the outer thread. It also may panics outer thread too.
92    ///
93    /// Also do note that some of the check depends on `debug_assertions` build config
94    /// (AKA only on debug builds).
95    #[must_use = "StreamInner will not do anything if not used"]
96    pub struct StreamInner<'scope, 'env: 'scope, T> {
97        #[pin]
98        inner: LocalThread<StreamInnerData<T>>,
99
100        phantom: PhantomData<&'scope mut &'env T>,
101    }
102}
103
104#[cfg(feature = "std")]
105pin_project! {
106    /// Inner type of [`ScopedTryStream`].
107    ///
108    /// Implements [`Sink`] for both item type and a [`Result`],
109    /// allowing to send error (if you so choose).
110    ///
111    /// # Note About Thread-safety
112    ///
113    /// Even though [`TryStreamInner`] is both [`Send`] and [`Sink`], it's reference
114    /// **should** not be sent across thread. This is currently impossible, due to
115    /// lack of async version of [`scope`](std::thread::scope).
116    /// To future-proof that possibility, any usage of it will panic if called from different
117    /// thread than the outer thread. It also may panics outer thread too.
118    ///
119    /// Also do note that some of the check depends on `debug_assertions` build config
120    /// (AKA only on debug builds).
121    #[must_use = "StreamInner will not do anything if not used"]
122    pub struct TryStreamInner<'scope, 'env: 'scope, T, E> {
123        #[pin]
124        inner: LocalThread<StreamInnerData<Result<T, E>>>,
125
126        phantom: PhantomData<&'scope mut &'env (T, E)>,
127    }
128}
129
130#[cfg(feature = "std")]
131impl<'env, T> ScopedStream<'env, T> {
132    /// Create new [`ScopedStream`].
133    ///
134    /// Future must return unit type. If you want fallible future, use [`ScopedTryStream`].
135    ///
136    /// # Examples
137    ///
138    /// ```
139    /// # use anyhow::Error;
140    /// # use futures_util::{SinkExt, StreamExt};
141    /// # use scoped_stream_sink::ScopedStream;
142    /// # fn main() -> Result<(), Error> {
143    /// # tokio::runtime::Builder::new_current_thread().enable_all().build()?.block_on(async {
144    /// let mut stream = <ScopedStream<usize>>::new(|mut sink| Box::pin(async move {
145    ///     // Send a value.
146    ///     // It is okay to unwrap() because it is infallible.
147    ///     sink.send(1).await.unwrap();
148    ///
149    ///     // (Optional) close the sink. NOTE: sink cannot be used afterwards.
150    ///     // sink.close().await.unwrap();
151    /// }));
152    ///
153    /// // Receive all values
154    /// while let Some(i) = stream.next().await {
155    ///     println!("{i}");
156    /// }
157    /// # Ok(()) })}
158    /// ```
159    pub fn new<F>(f: F) -> Self
160    where
161        for<'scope> F: FnOnce(
162            Pin<&'scope mut StreamInner<'scope, 'env, T>>,
163        ) -> Pin<Box<dyn Future<Output = ()> + Send + 'scope>>,
164    {
165        let mut data = Box::pin(StreamInner {
166            inner: LocalThread::new(StreamInnerData::new()),
167
168            phantom: PhantomData,
169        });
170
171        let ptr = unsafe { transmute::<Pin<&mut StreamInner<T>>, _>(data.as_mut()) };
172        let fut = f(ptr);
173
174        Self {
175            fut: Some(fut),
176            data,
177        }
178    }
179}
180
181#[cfg(feature = "std")]
182impl<'env, T, E> ScopedTryStream<'env, T, E> {
183    /// Create new [`ScopedTryStream`].
184    ///
185    /// Future can fails, and it's sink can receive [`Result`] type too (see [`TryStreamInner`]).
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// # use anyhow::Error;
191    /// # use futures_util::{SinkExt, StreamExt};
192    /// # use scoped_stream_sink::ScopedTryStream;
193    /// # fn main() -> Result<(), Error> {
194    /// # tokio::runtime::Builder::new_current_thread().enable_all().build()?.block_on(async {
195    /// let mut stream = <ScopedTryStream<_, Error>>::new(|mut sink| Box::pin(async move {
196    ///     // Send a value.
197    ///     sink.send(1).await?;
198    ///
199    ///     // (Optional) close the sink. NOTE: sink cannot be used afterwards.
200    ///     // sink.close().await.unwrap();
201    ///
202    ///     Ok(())
203    /// }));
204    ///
205    /// // Receive all values
206    /// while let Some(i) = stream.next().await.transpose()? {
207    ///     println!("{i}");
208    /// }
209    ///
210    /// # Ok(()) })}
211    /// ```
212    pub fn new<F>(f: F) -> Self
213    where
214        for<'scope> F: FnOnce(
215            Pin<&'scope mut TryStreamInner<'scope, 'env, T, E>>,
216        )
217            -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + 'scope>>,
218    {
219        let mut data = Box::pin(TryStreamInner {
220            inner: LocalThread::new(StreamInnerData::new()),
221
222            phantom: PhantomData,
223        });
224
225        let ptr = unsafe { transmute::<Pin<&mut TryStreamInner<T, E>>, _>(data.as_mut()) };
226        let fut = f(ptr);
227
228        Self {
229            fut: Some(fut),
230            data,
231        }
232    }
233}
234
235impl<T, E> StreamInnerData<Result<T, E>> {
236    fn next_fallible<U>(
237        &self,
238        cx: &mut Context<'_>,
239        fut: &mut Option<Pin<U>>,
240    ) -> Poll<Option<Result<T, E>>>
241    where
242        U: DerefMut,
243        U::Target: Future<Output = Result<(), E>>,
244    {
245        let res = match fut {
246            Some(v) => v.as_mut().poll(cx),
247            None => return Poll::Ready(None),
248        };
249        if res.is_ready() {
250            *fut = None;
251
252            if let Poll::Ready(Err(e)) = res {
253                return Poll::Ready(Some(Err(e)));
254            }
255        }
256
257        let ret = unsafe { (*self.data.get()).take() };
258        if ret.is_none() && res.is_pending() {
259            Poll::Pending
260        } else {
261            Poll::Ready(ret)
262        }
263    }
264}
265
266impl<T> StreamInnerData<T> {
267    fn next<F>(&self, cx: &mut Context<'_>, fut: &mut Option<Pin<F>>) -> Poll<Option<T>>
268    where
269        F: DerefMut,
270        F::Target: Future<Output = ()>,
271    {
272        let res = match fut {
273            Some(v) => v.as_mut().poll(cx),
274            None => return Poll::Ready(None),
275        };
276        if res.is_ready() {
277            *fut = None;
278        }
279
280        let ret = unsafe { (*self.data.get()).take() };
281        if ret.is_none() && res.is_pending() {
282            Poll::Pending
283        } else {
284            Poll::Ready(ret)
285        }
286    }
287
288    fn flush<E>(&self) -> Poll<Result<(), E>> {
289        if self.closed.get() || unsafe { (*self.data.get()).is_some() } {
290            Poll::Pending
291        } else {
292            Poll::Ready(Ok(()))
293        }
294    }
295
296    fn send(&self, item: T) {
297        if self.closed.get() {
298            panic!("Stream is closed");
299        }
300        let data = unsafe { &mut *self.data.get() };
301        if data.is_some() {
302            panic!("poll_ready() is not called yet!");
303        }
304
305        *data = Some(item);
306    }
307
308    fn close<E>(&self) -> Poll<Result<(), E>> {
309        self.closed.set(true);
310
311        if unsafe { (*self.data.get()).is_some() } {
312            Poll::Pending
313        } else {
314            Poll::Ready(Ok(()))
315        }
316    }
317}
318
319#[cfg(feature = "std")]
320impl<'env, T> Stream for ScopedStream<'env, T> {
321    type Item = T;
322
323    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
324        let this = self.project();
325        this.data
326            .as_mut()
327            .project()
328            .inner
329            .set_inner_ctx()
330            .next(cx, this.fut)
331    }
332}
333
334#[cfg(feature = "std")]
335impl<'env, T, E> Stream for ScopedTryStream<'env, T, E> {
336    type Item = Result<T, E>;
337
338    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
339        let this = self.project();
340        this.data.inner.set_inner_ctx().next_fallible(cx, this.fut)
341    }
342}
343
344#[cfg(feature = "std")]
345impl<'scope, 'env, T> Sink<T> for StreamInner<'scope, 'env, T> {
346    type Error = Infallible;
347
348    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
349        self.poll_flush(cx)
350    }
351
352    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
353        self.inner.get_inner().flush()
354    }
355
356    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
357        self.inner.get_inner().send(item);
358        Ok(())
359    }
360
361    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
362        self.inner.get_inner().close()
363    }
364}
365
366#[cfg(feature = "std")]
367impl<'scope, 'env, T, E> Sink<Result<T, E>> for TryStreamInner<'scope, 'env, T, E> {
368    type Error = Infallible;
369
370    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
371        <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
372    }
373
374    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
375        self.inner.get_inner().flush()
376    }
377
378    fn start_send(self: Pin<&mut Self>, item: Result<T, E>) -> Result<(), Infallible> {
379        self.inner.get_inner().send(item);
380        Ok(())
381    }
382
383    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
384        self.inner.get_inner().close()
385    }
386}
387
388#[cfg(feature = "std")]
389impl<'scope, 'env, T, E> Sink<T> for TryStreamInner<'scope, 'env, T, E> {
390    type Error = Infallible;
391
392    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
393        <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
394    }
395
396    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
397        <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
398    }
399
400    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
401        <Self as Sink<Result<T, E>>>::start_send(self, Ok(item))
402    }
403
404    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
405        <Self as Sink<Result<T, E>>>::poll_close(self, cx)
406    }
407}
408
409type DynLocalStreamFut<'scope> = Pin<Box<dyn Future<Output = ()> + 'scope>>;
410type DynLocalTryStreamFut<'scope, E> = Pin<Box<dyn Future<Output = Result<(), E>> + 'scope>>;
411
412pin_project! {
413    /// Local stream with a scoped future.
414    ///
415    /// Unlike [`ScopedStream`] it is not [`Send`], so it can work in no-std environment.
416    #[must_use = "Stream will not do anything if not used"]
417    pub struct LocalScopedStream<'env, T> {
418        fut: Option<DynLocalStreamFut<'env>>,
419
420        data: Pin<Box<LocalStreamInner<'env, 'env, T>>>,
421    }
422}
423
424pin_project! {
425    /// Local stream with a scoped future.
426    ///
427    /// Unlike [`ScopedTryStream`] it is not [`Send`], so it can work in no-std environment.
428    #[must_use = "Stream will not do anything if not used"]
429    pub struct LocalScopedTryStream<'env, T, E> {
430        fut: Option<DynLocalTryStreamFut<'env, E>>,
431
432        data: Pin<Box<LocalTryStreamInner<'env, 'env, T, E>>>,
433    }
434}
435
436pin_project! {
437    /// Inner type of [`LocalScopedStream`].
438    ///
439    /// Similiar to [`StreamInner`], but not [`Send`].
440    #[must_use = "StreamInner will not do anything if not used"]
441    pub struct LocalStreamInner<'scope, 'env: 'scope, T> {
442        #[pin]
443        inner: StreamInnerData<T>,
444
445        phantom: PhantomData<(&'scope mut &'env T, *mut u8)>,
446    }
447}
448
449pin_project! {
450    /// Inner type of [`LocalScopedTryStream`].
451    ///
452    /// Similiar to [`TryStreamInner`], but not [`Send`].
453    #[must_use = "StreamInner will not do anything if not used"]
454    pub struct LocalTryStreamInner<'scope, 'env: 'scope, T, E> {
455        #[pin]
456        inner: StreamInnerData<Result<T, E>>,
457
458        phantom: PhantomData<(&'scope mut &'env (T, E), *mut u8)>,
459    }
460}
461
462impl<'env, T> LocalScopedStream<'env, T> {
463    /// Create new [`LocalScopedStream`].
464    ///
465    /// Future must return unit type. If you want fallible future, use [`LocalScopedTryStream`].
466    ///
467    /// # Examples
468    ///
469    /// ```
470    /// # use anyhow::Error;
471    /// # use futures_util::{SinkExt, StreamExt};
472    /// # use scoped_stream_sink::LocalScopedStream;
473    /// # fn main() -> Result<(), Error> {
474    /// # tokio::runtime::Builder::new_current_thread().enable_all().build()?.block_on(async {
475    /// let mut stream = <LocalScopedStream<usize>>::new(|mut sink| Box::pin(async move {
476    ///     // Send a value.
477    ///     // It is okay to unwrap() because it is infallible.
478    ///     sink.send(1).await.unwrap();
479    ///
480    ///     // (Optional) close the sink. NOTE: sink cannot be used afterwards.
481    ///     // sink.close().await.unwrap();
482    /// }));
483    ///
484    /// // Receive all values
485    /// while let Some(i) = stream.next().await {
486    ///     println!("{i}");
487    /// }
488    /// # Ok(()) })}
489    /// ```
490    pub fn new<F>(f: F) -> Self
491    where
492        for<'scope> F: FnOnce(
493            Pin<&'scope mut LocalStreamInner<'scope, 'env, T>>,
494        ) -> Pin<Box<dyn Future<Output = ()> + 'scope>>,
495    {
496        let mut data = Box::pin(LocalStreamInner {
497            inner: StreamInnerData::new(),
498
499            phantom: PhantomData,
500        });
501
502        let ptr = unsafe { transmute::<Pin<&mut LocalStreamInner<T>>, _>(data.as_mut()) };
503        let fut = f(ptr);
504
505        Self {
506            fut: Some(fut),
507            data,
508        }
509    }
510}
511
512impl<'env, T, E> LocalScopedTryStream<'env, T, E> {
513    /// Create new [`LocalScopedTryStream`].
514    ///
515    /// Future can fails, and it's sink can receive [`Result`] type too (see [`LocalTryStreamInner`]).
516    ///
517    /// # Examples
518    ///
519    /// ```
520    /// # use anyhow::Error;
521    /// # use futures_util::{SinkExt, StreamExt};
522    /// # use scoped_stream_sink::LocalScopedTryStream;
523    /// # fn main() -> Result<(), Error> {
524    /// # tokio::runtime::Builder::new_current_thread().enable_all().build()?.block_on(async {
525    /// let mut stream = <LocalScopedTryStream<_, Error>>::new(|mut sink| Box::pin(async move {
526    ///     // Send a value.
527    ///     sink.send(1).await?;
528    ///
529    ///     // (Optional) close the sink. NOTE: sink cannot be used afterwards.
530    ///     // sink.close().await.unwrap();
531    ///
532    ///     Ok(())
533    /// }));
534    ///
535    /// // Receive all values
536    /// while let Some(i) = stream.next().await.transpose()? {
537    ///     println!("{i}");
538    /// }
539    ///
540    /// # Ok(()) })}
541    /// ```
542    pub fn new<F>(f: F) -> Self
543    where
544        for<'scope> F: FnOnce(
545            Pin<&'scope mut LocalTryStreamInner<'scope, 'env, T, E>>,
546        ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'scope>>,
547    {
548        let mut data = Box::pin(LocalTryStreamInner {
549            inner: StreamInnerData::new(),
550
551            phantom: PhantomData,
552        });
553
554        let ptr = unsafe { transmute::<Pin<&mut LocalTryStreamInner<T, E>>, _>(data.as_mut()) };
555        let fut = f(ptr);
556
557        Self {
558            fut: Some(fut),
559            data,
560        }
561    }
562}
563
564impl<'env, T> Stream for LocalScopedStream<'env, T> {
565    type Item = T;
566
567    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
568        let this = self.project();
569        this.data.inner.next(cx, this.fut)
570    }
571}
572
573impl<'env, T, E> Stream for LocalScopedTryStream<'env, T, E> {
574    type Item = Result<T, E>;
575
576    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
577        let this = self.project();
578        this.data.inner.next_fallible(cx, this.fut)
579    }
580}
581
582impl<'scope, 'env, T> Sink<T> for LocalStreamInner<'scope, 'env, T> {
583    type Error = Infallible;
584
585    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
586        self.poll_flush(cx)
587    }
588
589    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
590        self.inner.flush()
591    }
592
593    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
594        self.inner.send(item);
595        Ok(())
596    }
597
598    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
599        self.inner.close()
600    }
601}
602
603impl<'scope, 'env, T, E> Sink<Result<T, E>> for LocalTryStreamInner<'scope, 'env, T, E> {
604    type Error = Infallible;
605
606    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
607        <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
608    }
609
610    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
611        self.inner.flush()
612    }
613
614    fn start_send(self: Pin<&mut Self>, item: Result<T, E>) -> Result<(), Infallible> {
615        self.inner.send(item);
616        Ok(())
617    }
618
619    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
620        self.inner.close()
621    }
622}
623
624impl<'scope, 'env, T, E> Sink<T> for LocalTryStreamInner<'scope, 'env, T, E> {
625    type Error = Infallible;
626
627    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
628        <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
629    }
630
631    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
632        <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
633    }
634
635    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
636        <Self as Sink<Result<T, E>>>::start_send(self, Ok(item))
637    }
638
639    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
640        <Self as Sink<Result<T, E>>>::poll_close(self, cx)
641    }
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647
648    use std::pin::pin;
649    use std::prelude::rust_2021::*;
650    use std::ptr::NonNull;
651    use std::task::{Context, RawWaker, RawWakerVTable, Waker};
652    use std::time::Duration;
653
654    use anyhow::{bail, Error as AnyError, Result as AnyResult};
655    use futures_util::{join, pending, SinkExt, StreamExt};
656    use tokio::sync::mpsc::channel;
657    use tokio::task::yield_now;
658    use tokio::time::timeout;
659    use tokio::{select, spawn};
660
661    async fn test_helper<F>(f: F) -> AnyResult<()>
662    where
663        F: Future<Output = AnyResult<()>> + Send,
664    {
665        match timeout(Duration::from_secs(5), f).await {
666            Ok(v) => v,
667            Err(_) => bail!("Time ran out"),
668        }
669    }
670    /*
671    #[tokio::test]
672    async fn test_simple_fail() -> AnyResult<()> {
673        use std::pin::pin;
674
675        let mut stream: Pin<&mut ScopedStream<'static, usize>> =
676            pin!(ScopedStream::new(|mut src| Box::pin(async move {
677                tokio::spawn(async move {
678                    src.send(1).await.unwrap();
679                    src.send(2).await.unwrap();
680                    src.close().await.unwrap();
681                })
682                .await
683                .unwrap();
684            })));
685
686        test_helper(async move {
687            while let Some(i) = stream.next().await {
688                println!("{i}");
689            }
690            drop(stream);
691
692            Ok(())
693        })
694        .await
695    }
696    */
697    #[tokio::test]
698    async fn test_simple() -> AnyResult<()> {
699        let mut stream: ScopedStream<usize> = ScopedStream::new(|_| Box::pin(async {}));
700
701        test_helper(async move {
702            assert_eq!(stream.next().await, None);
703
704            Ok(())
705        })
706        .await
707    }
708
709    #[tokio::test]
710    async fn test_recv_one() -> AnyResult<()> {
711        let mut stream: ScopedStream<usize> = ScopedStream::new(|mut src| {
712            Box::pin(async move {
713                src.send(1).await.unwrap();
714            })
715        });
716
717        test_helper(async move {
718            assert_eq!(stream.next().await, Some(1));
719            assert_eq!(stream.next().await, None);
720
721            Ok(())
722        })
723        .await
724    }
725
726    #[tokio::test]
727    async fn test_recv_yield() -> AnyResult<()> {
728        let mut stream = <ScopedStream<usize>>::new(|_| {
729            Box::pin(async move {
730                for _ in 0..5 {
731                    yield_now().await;
732                }
733            })
734        });
735
736        test_helper(async move {
737            assert_eq!(stream.next().await, None);
738
739            Ok(())
740        })
741        .await
742    }
743
744    #[tokio::test]
745    async fn test_recv_many() -> AnyResult<()> {
746        let mut stream = <ScopedStream<usize>>::new(|mut sink| {
747            Box::pin(async move {
748                for i in 0..10 {
749                    sink.send(i).await.unwrap();
750                }
751            })
752        });
753
754        test_helper(async move {
755            for i in 0..10 {
756                assert_eq!(stream.next().await, Some(i));
757            }
758            assert_eq!(stream.next().await, None);
759
760            Ok(())
761        })
762        .await
763    }
764
765    #[tokio::test]
766    async fn test_recv_many_yield() -> AnyResult<()> {
767        let mut stream = <ScopedStream<usize>>::new(|mut sink| {
768            Box::pin(async move {
769                for i in 0..10 {
770                    sink.send(i).await.unwrap();
771                    for _ in 0..i {
772                        yield_now().await;
773                    }
774                }
775            })
776        });
777
778        test_helper(async move {
779            for i in 0..10 {
780                assert_eq!(stream.next().await, Some(i));
781            }
782            assert_eq!(stream.next().await, None);
783
784            Ok(())
785        })
786        .await
787    }
788
789    #[tokio::test]
790    async fn test_double_scoped() -> AnyResult<()> {
791        let mut stream = <ScopedStream<usize>>::new(|mut sink| {
792            Box::pin(async move {
793                let mut stream2 = <ScopedStream<usize>>::new(|mut sink2| {
794                    let sink = &mut sink;
795                    Box::pin(async move {
796                        for i in 0..10 {
797                            sink.send(i + 100).await.unwrap();
798                            sink2.send(i).await.unwrap();
799                        }
800                    })
801                });
802
803                for i in 0..10 {
804                    assert_eq!(stream2.next().await, Some(i));
805                }
806                assert_eq!(stream2.next().await, None);
807            })
808        });
809
810        test_helper(async move {
811            for i in 0..10 {
812                assert_eq!(stream.next().await, Some(i + 100));
813            }
814            assert_eq!(stream.next().await, None);
815
816            Ok(())
817        })
818        .await
819    }
820
821    #[tokio::test]
822    async fn test_double_scoped2() -> AnyResult<()> {
823        let mut stream = <ScopedStream<usize>>::new(|mut sink| {
824            Box::pin(async move {
825                let mut stream2 = <ScopedStream<usize>>::new(|mut sink2| {
826                    let sink = &mut sink;
827                    Box::pin(async move {
828                        for i in 0..10 {
829                            assert_eq!(join!(sink.send(i + 100), sink2.send(i)), (Ok(()), Ok(())));
830                        }
831                    })
832                });
833
834                for i in 0..10 {
835                    assert_eq!(stream2.next().await, Some(i));
836                }
837                assert_eq!(stream2.next().await, None);
838            })
839        });
840
841        test_helper(async move {
842            for i in 0..10 {
843                assert_eq!(stream.next().await, Some(i + 100));
844            }
845            assert_eq!(stream.next().await, None);
846
847            Ok(())
848        })
849        .await
850    }
851
852    #[tokio::test]
853    async fn test_try_simple() -> AnyResult<()> {
854        let mut stream = <ScopedTryStream<usize, AnyError>>::new(|_| Box::pin(async { Ok(()) }));
855
856        test_helper(async move {
857            assert_eq!(stream.next().await.transpose()?, None);
858
859            Ok(())
860        })
861        .await
862    }
863
864    #[tokio::test]
865    async fn test_try_recv_one() -> AnyResult<()> {
866        let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut src| {
867            Box::pin(async move {
868                src.send(1).await.unwrap();
869
870                Ok(())
871            })
872        });
873
874        test_helper(async move {
875            assert_eq!(stream.next().await.transpose()?, Some(1));
876            assert_eq!(stream.next().await.transpose()?, None);
877
878            Ok(())
879        })
880        .await
881    }
882
883    #[tokio::test]
884    async fn test_try_recv_yield() -> AnyResult<()> {
885        let mut stream = <ScopedTryStream<usize, AnyError>>::new(|_| {
886            Box::pin(async move {
887                for _ in 0..5 {
888                    yield_now().await;
889                }
890
891                Ok(())
892            })
893        });
894
895        test_helper(async move {
896            assert_eq!(stream.next().await.transpose()?, None);
897
898            Ok(())
899        })
900        .await
901    }
902
903    #[tokio::test]
904    async fn test_try_recv_many() -> AnyResult<()> {
905        let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut sink| {
906            Box::pin(async move {
907                for i in 0..10 {
908                    sink.send(i).await.unwrap();
909                }
910
911                Ok(())
912            })
913        });
914
915        test_helper(async move {
916            for i in 0..10 {
917                assert_eq!(stream.next().await.transpose()?, Some(i));
918            }
919            assert_eq!(stream.next().await.transpose()?, None);
920
921            Ok(())
922        })
923        .await
924    }
925
926    #[tokio::test]
927    async fn test_try_recv_many_yield() -> AnyResult<()> {
928        let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut sink| {
929            Box::pin(async move {
930                for i in 0..10 {
931                    sink.send(i).await?;
932                    for _ in 0..i {
933                        yield_now().await;
934                    }
935                }
936
937                Ok(())
938            })
939        });
940
941        test_helper(async move {
942            for i in 0..10 {
943                assert_eq!(stream.next().await.transpose()?, Some(i));
944            }
945            assert_eq!(stream.next().await.transpose()?, None);
946
947            Ok(())
948        })
949        .await
950    }
951
952    #[tokio::test]
953    async fn test_try_double_scoped() -> AnyResult<()> {
954        let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut sink| {
955            Box::pin(async move {
956                let mut stream2 = <ScopedTryStream<usize, AnyError>>::new(|mut sink2| {
957                    let sink = &mut sink;
958                    Box::pin(async move {
959                        for i in 0..10 {
960                            sink.send(i + 100).await?;
961                            sink2.send(i).await?;
962                        }
963
964                        Ok(())
965                    })
966                });
967
968                for i in 0..10 {
969                    assert_eq!(stream2.next().await.transpose()?, Some(i));
970                }
971                assert_eq!(stream2.next().await.transpose()?, None);
972
973                Ok(())
974            })
975        });
976
977        test_helper(async move {
978            for i in 0..10 {
979                assert_eq!(stream.next().await.transpose()?, Some(i + 100));
980            }
981            assert_eq!(stream.next().await.transpose()?, None);
982
983            Ok(())
984        })
985        .await
986    }
987
988    #[tokio::test]
989    async fn test_try_double_scoped2() -> AnyResult<()> {
990        let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut sink| {
991            Box::pin(async move {
992                let mut stream2 = <ScopedTryStream<usize, AnyError>>::new(|mut sink2| {
993                    let sink = &mut sink;
994                    Box::pin(async move {
995                        for i in 0..10 {
996                            let (r1, r2) = join!(sink.send(i + 100), sink2.send(i));
997                            r1?;
998                            r2?;
999                        }
1000
1001                        Ok(())
1002                    })
1003                });
1004
1005                for i in 0..10 {
1006                    assert_eq!(stream2.next().await.transpose()?, Some(i));
1007                }
1008                assert_eq!(stream2.next().await.transpose()?, None);
1009
1010                Ok(())
1011            })
1012        });
1013
1014        test_helper(async move {
1015            for i in 0..10 {
1016                assert_eq!(stream.next().await.transpose()?, Some(i + 100));
1017            }
1018            assert_eq!(stream.next().await.transpose()?, None);
1019
1020            Ok(())
1021        })
1022        .await
1023    }
1024
1025    #[tokio::test]
1026    async fn test_try_fail() -> AnyResult<()> {
1027        let mut stream = <ScopedTryStream<usize, usize>>::new(|mut sink| {
1028            Box::pin(async move {
1029                for i in 0..10 {
1030                    sink.send(Ok(i)).await.unwrap();
1031                }
1032
1033                Err(500)
1034            })
1035        });
1036
1037        test_helper(async move {
1038            for i in 0..10 {
1039                assert_eq!(stream.next().await, Some(Ok(i)));
1040            }
1041            assert_eq!(stream.next().await, Some(Err(500)));
1042            assert_eq!(stream.next().await, None);
1043
1044            Ok(())
1045        })
1046        .await
1047    }
1048
1049    #[tokio::test]
1050    async fn test_try_fail2() -> AnyResult<()> {
1051        let mut stream = <ScopedTryStream<usize, usize>>::new(|mut sink| {
1052            Box::pin(async move {
1053                for i in 0..10 {
1054                    sink.send(Ok(i)).await.unwrap();
1055                }
1056
1057                for i in 0..10 {
1058                    sink.send(Err(i)).await.unwrap();
1059                }
1060
1061                Err(500)
1062            })
1063        });
1064
1065        test_helper(async move {
1066            for i in 0..10 {
1067                assert_eq!(stream.next().await, Some(Ok(i)));
1068            }
1069
1070            for i in 0..10 {
1071                assert_eq!(stream.next().await, Some(Err(i)));
1072            }
1073
1074            assert_eq!(stream.next().await, Some(Err(500)));
1075            assert_eq!(stream.next().await, None);
1076
1077            Ok(())
1078        })
1079        .await
1080    }
1081
1082    #[tokio::test]
1083    async fn test_spawn_mpsc() -> AnyResult<()> {
1084        let (s1, mut r1) = channel::<(usize, usize)>(4);
1085        let (s2, mut r2) = channel::<(usize, usize)>(4);
1086
1087        let mut stream = ScopedStream::new(|mut sink| {
1088            Box::pin(async move {
1089                loop {
1090                    let r;
1091                    select! {
1092                        Some(v) = r1.recv() => r = v,
1093                        Some(v) = r2.recv() => r = v,
1094                        else => return,
1095                    }
1096
1097                    println!("Received: {r:?}");
1098                    sink.feed(r).await.unwrap();
1099                }
1100            })
1101        });
1102
1103        let it = [0..10, 5..20, 10..100, 25..100, 50..75];
1104        let mut handles = Vec::new();
1105
1106        let mut it_ = it.clone();
1107        handles.push(spawn(test_helper(async move {
1108            while let Some((i, v)) = stream.next().await {
1109                assert_eq!(it_[i].next(), Some(v));
1110            }
1111
1112            for mut v in it_ {
1113                assert_eq!(v.next(), None);
1114            }
1115
1116            Ok(())
1117        })));
1118
1119        for (i, v) in it.into_iter().enumerate() {
1120            let s = if i % 2 == 0 { s1.clone() } else { s2.clone() };
1121            handles.push(spawn(test_helper(async move {
1122                for j in v {
1123                    s.send((i, j)).await.unwrap();
1124
1125                    for _ in 0..i {
1126                        yield_now().await
1127                    }
1128                }
1129
1130                Ok(())
1131            })));
1132        }
1133        drop((s1, s2));
1134
1135        let mut has_error = false;
1136        for f in handles {
1137            if let Err(e) = f.await? {
1138                eprintln!("{e:?}");
1139                has_error = true;
1140            }
1141        }
1142
1143        if has_error {
1144            bail!("Some error has happened");
1145        }
1146
1147        Ok(())
1148    }
1149
1150    fn nil_waker() -> Waker {
1151        fn raw() -> RawWaker {
1152            RawWaker::new(NonNull::dangling().as_ptr(), &VTABLE)
1153        }
1154
1155        unsafe fn clone(_: *const ()) -> RawWaker {
1156            raw()
1157        }
1158        unsafe fn wake(_: *const ()) {}
1159        unsafe fn wake_by_ref(_: *const ()) {}
1160        unsafe fn drop(_: *const ()) {}
1161
1162        static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
1163
1164        unsafe { Waker::from_raw(raw()) }
1165    }
1166
1167    #[test]
1168    fn test_generator() {
1169        let mut stream = pin!(ScopedStream::new(|mut sink| {
1170            Box::pin(async move {
1171                for i in 0usize..10 {
1172                    sink.send(i).await.unwrap();
1173                }
1174            })
1175        }));
1176
1177        let waker = nil_waker();
1178        let mut cx = Context::from_waker(&waker);
1179        for j in 0usize..10 {
1180            assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(j)));
1181        }
1182
1183        assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(None));
1184    }
1185
1186    #[test]
1187    fn test_generator_yield() {
1188        let mut stream = pin!(ScopedStream::new(|mut sink| {
1189            Box::pin(async move {
1190                for i in 0usize..10 {
1191                    sink.send(i).await.unwrap();
1192                    pending!();
1193                }
1194            })
1195        }));
1196
1197        let waker = nil_waker();
1198        let mut cx = Context::from_waker(&waker);
1199        for j in 0usize..10 {
1200            assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(j)));
1201            assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending);
1202        }
1203
1204        assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(None));
1205    }
1206}