1use alloc::boxed::Box;
2use core::cell::{Cell, UnsafeCell};
3use core::convert::Infallible;
4use core::future::Future;
5use core::marker::{PhantomData, PhantomPinned};
6use core::mem::transmute;
7use core::ops::DerefMut;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use futures_core::Stream;
12use futures_sink::Sink;
13use pin_project_lite::pin_project;
14
15#[cfg(feature = "std")]
16use crate::LocalThread;
17
18#[cfg(feature = "std")]
19type DynStreamFut<'scope> = Pin<Box<dyn Future<Output = ()> + Send + 'scope>>;
20#[cfg(feature = "std")]
21type DynTryStreamFut<'scope, E> = Pin<Box<dyn Future<Output = Result<(), E>> + Send + 'scope>>;
22
23#[cfg(feature = "std")]
24pin_project! {
25 #[must_use = "Stream will not do anything if not used"]
33 pub struct ScopedStream<'env, T> {
34 fut: Option<DynStreamFut<'env>>,
35
36 data: Pin<Box<StreamInner<'env, 'env, T>>>,
37 }
38}
39
40#[cfg(feature = "std")]
41pin_project! {
42 #[must_use = "Stream will not do anything if not used"]
48 pub struct ScopedTryStream<'env, T, E> {
49 fut: Option<DynTryStreamFut<'env, E>>,
50
51 data: Pin<Box<TryStreamInner<'env, 'env, T, E>>>,
52 }
53}
54
55struct StreamInnerData<T> {
56 data: UnsafeCell<Option<T>>,
57 closed: Cell<bool>,
58
59 _pinned: PhantomPinned,
62}
63
64unsafe impl<T: Send> Send for StreamInnerData<T> {}
67unsafe impl<T: Send> Sync for StreamInnerData<T> {}
68
69impl<T> StreamInnerData<T> {
70 const fn new() -> Self {
71 Self {
72 data: UnsafeCell::new(None),
73 closed: Cell::new(false),
74 _pinned: PhantomPinned,
75 }
76 }
77}
78
79#[cfg(feature = "std")]
80pin_project! {
81 #[must_use = "StreamInner will not do anything if not used"]
96 pub struct StreamInner<'scope, 'env: 'scope, T> {
97 #[pin]
98 inner: LocalThread<StreamInnerData<T>>,
99
100 phantom: PhantomData<&'scope mut &'env T>,
101 }
102}
103
104#[cfg(feature = "std")]
105pin_project! {
106 #[must_use = "StreamInner will not do anything if not used"]
122 pub struct TryStreamInner<'scope, 'env: 'scope, T, E> {
123 #[pin]
124 inner: LocalThread<StreamInnerData<Result<T, E>>>,
125
126 phantom: PhantomData<&'scope mut &'env (T, E)>,
127 }
128}
129
130#[cfg(feature = "std")]
131impl<'env, T> ScopedStream<'env, T> {
132 pub fn new<F>(f: F) -> Self
160 where
161 for<'scope> F: FnOnce(
162 Pin<&'scope mut StreamInner<'scope, 'env, T>>,
163 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'scope>>,
164 {
165 let mut data = Box::pin(StreamInner {
166 inner: LocalThread::new(StreamInnerData::new()),
167
168 phantom: PhantomData,
169 });
170
171 let ptr = unsafe { transmute::<Pin<&mut StreamInner<T>>, _>(data.as_mut()) };
172 let fut = f(ptr);
173
174 Self {
175 fut: Some(fut),
176 data,
177 }
178 }
179}
180
181#[cfg(feature = "std")]
182impl<'env, T, E> ScopedTryStream<'env, T, E> {
183 pub fn new<F>(f: F) -> Self
213 where
214 for<'scope> F: FnOnce(
215 Pin<&'scope mut TryStreamInner<'scope, 'env, T, E>>,
216 )
217 -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + 'scope>>,
218 {
219 let mut data = Box::pin(TryStreamInner {
220 inner: LocalThread::new(StreamInnerData::new()),
221
222 phantom: PhantomData,
223 });
224
225 let ptr = unsafe { transmute::<Pin<&mut TryStreamInner<T, E>>, _>(data.as_mut()) };
226 let fut = f(ptr);
227
228 Self {
229 fut: Some(fut),
230 data,
231 }
232 }
233}
234
235impl<T, E> StreamInnerData<Result<T, E>> {
236 fn next_fallible<U>(
237 &self,
238 cx: &mut Context<'_>,
239 fut: &mut Option<Pin<U>>,
240 ) -> Poll<Option<Result<T, E>>>
241 where
242 U: DerefMut,
243 U::Target: Future<Output = Result<(), E>>,
244 {
245 let res = match fut {
246 Some(v) => v.as_mut().poll(cx),
247 None => return Poll::Ready(None),
248 };
249 if res.is_ready() {
250 *fut = None;
251
252 if let Poll::Ready(Err(e)) = res {
253 return Poll::Ready(Some(Err(e)));
254 }
255 }
256
257 let ret = unsafe { (*self.data.get()).take() };
258 if ret.is_none() && res.is_pending() {
259 Poll::Pending
260 } else {
261 Poll::Ready(ret)
262 }
263 }
264}
265
266impl<T> StreamInnerData<T> {
267 fn next<F>(&self, cx: &mut Context<'_>, fut: &mut Option<Pin<F>>) -> Poll<Option<T>>
268 where
269 F: DerefMut,
270 F::Target: Future<Output = ()>,
271 {
272 let res = match fut {
273 Some(v) => v.as_mut().poll(cx),
274 None => return Poll::Ready(None),
275 };
276 if res.is_ready() {
277 *fut = None;
278 }
279
280 let ret = unsafe { (*self.data.get()).take() };
281 if ret.is_none() && res.is_pending() {
282 Poll::Pending
283 } else {
284 Poll::Ready(ret)
285 }
286 }
287
288 fn flush<E>(&self) -> Poll<Result<(), E>> {
289 if self.closed.get() || unsafe { (*self.data.get()).is_some() } {
290 Poll::Pending
291 } else {
292 Poll::Ready(Ok(()))
293 }
294 }
295
296 fn send(&self, item: T) {
297 if self.closed.get() {
298 panic!("Stream is closed");
299 }
300 let data = unsafe { &mut *self.data.get() };
301 if data.is_some() {
302 panic!("poll_ready() is not called yet!");
303 }
304
305 *data = Some(item);
306 }
307
308 fn close<E>(&self) -> Poll<Result<(), E>> {
309 self.closed.set(true);
310
311 if unsafe { (*self.data.get()).is_some() } {
312 Poll::Pending
313 } else {
314 Poll::Ready(Ok(()))
315 }
316 }
317}
318
319#[cfg(feature = "std")]
320impl<'env, T> Stream for ScopedStream<'env, T> {
321 type Item = T;
322
323 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
324 let this = self.project();
325 this.data
326 .as_mut()
327 .project()
328 .inner
329 .set_inner_ctx()
330 .next(cx, this.fut)
331 }
332}
333
334#[cfg(feature = "std")]
335impl<'env, T, E> Stream for ScopedTryStream<'env, T, E> {
336 type Item = Result<T, E>;
337
338 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
339 let this = self.project();
340 this.data.inner.set_inner_ctx().next_fallible(cx, this.fut)
341 }
342}
343
344#[cfg(feature = "std")]
345impl<'scope, 'env, T> Sink<T> for StreamInner<'scope, 'env, T> {
346 type Error = Infallible;
347
348 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
349 self.poll_flush(cx)
350 }
351
352 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
353 self.inner.get_inner().flush()
354 }
355
356 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
357 self.inner.get_inner().send(item);
358 Ok(())
359 }
360
361 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
362 self.inner.get_inner().close()
363 }
364}
365
366#[cfg(feature = "std")]
367impl<'scope, 'env, T, E> Sink<Result<T, E>> for TryStreamInner<'scope, 'env, T, E> {
368 type Error = Infallible;
369
370 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
371 <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
372 }
373
374 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
375 self.inner.get_inner().flush()
376 }
377
378 fn start_send(self: Pin<&mut Self>, item: Result<T, E>) -> Result<(), Infallible> {
379 self.inner.get_inner().send(item);
380 Ok(())
381 }
382
383 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
384 self.inner.get_inner().close()
385 }
386}
387
388#[cfg(feature = "std")]
389impl<'scope, 'env, T, E> Sink<T> for TryStreamInner<'scope, 'env, T, E> {
390 type Error = Infallible;
391
392 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
393 <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
394 }
395
396 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
397 <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
398 }
399
400 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
401 <Self as Sink<Result<T, E>>>::start_send(self, Ok(item))
402 }
403
404 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
405 <Self as Sink<Result<T, E>>>::poll_close(self, cx)
406 }
407}
408
409type DynLocalStreamFut<'scope> = Pin<Box<dyn Future<Output = ()> + 'scope>>;
410type DynLocalTryStreamFut<'scope, E> = Pin<Box<dyn Future<Output = Result<(), E>> + 'scope>>;
411
412pin_project! {
413 #[must_use = "Stream will not do anything if not used"]
417 pub struct LocalScopedStream<'env, T> {
418 fut: Option<DynLocalStreamFut<'env>>,
419
420 data: Pin<Box<LocalStreamInner<'env, 'env, T>>>,
421 }
422}
423
424pin_project! {
425 #[must_use = "Stream will not do anything if not used"]
429 pub struct LocalScopedTryStream<'env, T, E> {
430 fut: Option<DynLocalTryStreamFut<'env, E>>,
431
432 data: Pin<Box<LocalTryStreamInner<'env, 'env, T, E>>>,
433 }
434}
435
436pin_project! {
437 #[must_use = "StreamInner will not do anything if not used"]
441 pub struct LocalStreamInner<'scope, 'env: 'scope, T> {
442 #[pin]
443 inner: StreamInnerData<T>,
444
445 phantom: PhantomData<(&'scope mut &'env T, *mut u8)>,
446 }
447}
448
449pin_project! {
450 #[must_use = "StreamInner will not do anything if not used"]
454 pub struct LocalTryStreamInner<'scope, 'env: 'scope, T, E> {
455 #[pin]
456 inner: StreamInnerData<Result<T, E>>,
457
458 phantom: PhantomData<(&'scope mut &'env (T, E), *mut u8)>,
459 }
460}
461
462impl<'env, T> LocalScopedStream<'env, T> {
463 pub fn new<F>(f: F) -> Self
491 where
492 for<'scope> F: FnOnce(
493 Pin<&'scope mut LocalStreamInner<'scope, 'env, T>>,
494 ) -> Pin<Box<dyn Future<Output = ()> + 'scope>>,
495 {
496 let mut data = Box::pin(LocalStreamInner {
497 inner: StreamInnerData::new(),
498
499 phantom: PhantomData,
500 });
501
502 let ptr = unsafe { transmute::<Pin<&mut LocalStreamInner<T>>, _>(data.as_mut()) };
503 let fut = f(ptr);
504
505 Self {
506 fut: Some(fut),
507 data,
508 }
509 }
510}
511
512impl<'env, T, E> LocalScopedTryStream<'env, T, E> {
513 pub fn new<F>(f: F) -> Self
543 where
544 for<'scope> F: FnOnce(
545 Pin<&'scope mut LocalTryStreamInner<'scope, 'env, T, E>>,
546 ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'scope>>,
547 {
548 let mut data = Box::pin(LocalTryStreamInner {
549 inner: StreamInnerData::new(),
550
551 phantom: PhantomData,
552 });
553
554 let ptr = unsafe { transmute::<Pin<&mut LocalTryStreamInner<T, E>>, _>(data.as_mut()) };
555 let fut = f(ptr);
556
557 Self {
558 fut: Some(fut),
559 data,
560 }
561 }
562}
563
564impl<'env, T> Stream for LocalScopedStream<'env, T> {
565 type Item = T;
566
567 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
568 let this = self.project();
569 this.data.inner.next(cx, this.fut)
570 }
571}
572
573impl<'env, T, E> Stream for LocalScopedTryStream<'env, T, E> {
574 type Item = Result<T, E>;
575
576 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
577 let this = self.project();
578 this.data.inner.next_fallible(cx, this.fut)
579 }
580}
581
582impl<'scope, 'env, T> Sink<T> for LocalStreamInner<'scope, 'env, T> {
583 type Error = Infallible;
584
585 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
586 self.poll_flush(cx)
587 }
588
589 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
590 self.inner.flush()
591 }
592
593 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
594 self.inner.send(item);
595 Ok(())
596 }
597
598 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
599 self.inner.close()
600 }
601}
602
603impl<'scope, 'env, T, E> Sink<Result<T, E>> for LocalTryStreamInner<'scope, 'env, T, E> {
604 type Error = Infallible;
605
606 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
607 <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
608 }
609
610 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
611 self.inner.flush()
612 }
613
614 fn start_send(self: Pin<&mut Self>, item: Result<T, E>) -> Result<(), Infallible> {
615 self.inner.send(item);
616 Ok(())
617 }
618
619 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
620 self.inner.close()
621 }
622}
623
624impl<'scope, 'env, T, E> Sink<T> for LocalTryStreamInner<'scope, 'env, T, E> {
625 type Error = Infallible;
626
627 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
628 <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
629 }
630
631 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
632 <Self as Sink<Result<T, E>>>::poll_flush(self, cx)
633 }
634
635 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
636 <Self as Sink<Result<T, E>>>::start_send(self, Ok(item))
637 }
638
639 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
640 <Self as Sink<Result<T, E>>>::poll_close(self, cx)
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647
648 use std::pin::pin;
649 use std::prelude::rust_2021::*;
650 use std::ptr::NonNull;
651 use std::task::{Context, RawWaker, RawWakerVTable, Waker};
652 use std::time::Duration;
653
654 use anyhow::{bail, Error as AnyError, Result as AnyResult};
655 use futures_util::{join, pending, SinkExt, StreamExt};
656 use tokio::sync::mpsc::channel;
657 use tokio::task::yield_now;
658 use tokio::time::timeout;
659 use tokio::{select, spawn};
660
661 async fn test_helper<F>(f: F) -> AnyResult<()>
662 where
663 F: Future<Output = AnyResult<()>> + Send,
664 {
665 match timeout(Duration::from_secs(5), f).await {
666 Ok(v) => v,
667 Err(_) => bail!("Time ran out"),
668 }
669 }
670 #[tokio::test]
698 async fn test_simple() -> AnyResult<()> {
699 let mut stream: ScopedStream<usize> = ScopedStream::new(|_| Box::pin(async {}));
700
701 test_helper(async move {
702 assert_eq!(stream.next().await, None);
703
704 Ok(())
705 })
706 .await
707 }
708
709 #[tokio::test]
710 async fn test_recv_one() -> AnyResult<()> {
711 let mut stream: ScopedStream<usize> = ScopedStream::new(|mut src| {
712 Box::pin(async move {
713 src.send(1).await.unwrap();
714 })
715 });
716
717 test_helper(async move {
718 assert_eq!(stream.next().await, Some(1));
719 assert_eq!(stream.next().await, None);
720
721 Ok(())
722 })
723 .await
724 }
725
726 #[tokio::test]
727 async fn test_recv_yield() -> AnyResult<()> {
728 let mut stream = <ScopedStream<usize>>::new(|_| {
729 Box::pin(async move {
730 for _ in 0..5 {
731 yield_now().await;
732 }
733 })
734 });
735
736 test_helper(async move {
737 assert_eq!(stream.next().await, None);
738
739 Ok(())
740 })
741 .await
742 }
743
744 #[tokio::test]
745 async fn test_recv_many() -> AnyResult<()> {
746 let mut stream = <ScopedStream<usize>>::new(|mut sink| {
747 Box::pin(async move {
748 for i in 0..10 {
749 sink.send(i).await.unwrap();
750 }
751 })
752 });
753
754 test_helper(async move {
755 for i in 0..10 {
756 assert_eq!(stream.next().await, Some(i));
757 }
758 assert_eq!(stream.next().await, None);
759
760 Ok(())
761 })
762 .await
763 }
764
765 #[tokio::test]
766 async fn test_recv_many_yield() -> AnyResult<()> {
767 let mut stream = <ScopedStream<usize>>::new(|mut sink| {
768 Box::pin(async move {
769 for i in 0..10 {
770 sink.send(i).await.unwrap();
771 for _ in 0..i {
772 yield_now().await;
773 }
774 }
775 })
776 });
777
778 test_helper(async move {
779 for i in 0..10 {
780 assert_eq!(stream.next().await, Some(i));
781 }
782 assert_eq!(stream.next().await, None);
783
784 Ok(())
785 })
786 .await
787 }
788
789 #[tokio::test]
790 async fn test_double_scoped() -> AnyResult<()> {
791 let mut stream = <ScopedStream<usize>>::new(|mut sink| {
792 Box::pin(async move {
793 let mut stream2 = <ScopedStream<usize>>::new(|mut sink2| {
794 let sink = &mut sink;
795 Box::pin(async move {
796 for i in 0..10 {
797 sink.send(i + 100).await.unwrap();
798 sink2.send(i).await.unwrap();
799 }
800 })
801 });
802
803 for i in 0..10 {
804 assert_eq!(stream2.next().await, Some(i));
805 }
806 assert_eq!(stream2.next().await, None);
807 })
808 });
809
810 test_helper(async move {
811 for i in 0..10 {
812 assert_eq!(stream.next().await, Some(i + 100));
813 }
814 assert_eq!(stream.next().await, None);
815
816 Ok(())
817 })
818 .await
819 }
820
821 #[tokio::test]
822 async fn test_double_scoped2() -> AnyResult<()> {
823 let mut stream = <ScopedStream<usize>>::new(|mut sink| {
824 Box::pin(async move {
825 let mut stream2 = <ScopedStream<usize>>::new(|mut sink2| {
826 let sink = &mut sink;
827 Box::pin(async move {
828 for i in 0..10 {
829 assert_eq!(join!(sink.send(i + 100), sink2.send(i)), (Ok(()), Ok(())));
830 }
831 })
832 });
833
834 for i in 0..10 {
835 assert_eq!(stream2.next().await, Some(i));
836 }
837 assert_eq!(stream2.next().await, None);
838 })
839 });
840
841 test_helper(async move {
842 for i in 0..10 {
843 assert_eq!(stream.next().await, Some(i + 100));
844 }
845 assert_eq!(stream.next().await, None);
846
847 Ok(())
848 })
849 .await
850 }
851
852 #[tokio::test]
853 async fn test_try_simple() -> AnyResult<()> {
854 let mut stream = <ScopedTryStream<usize, AnyError>>::new(|_| Box::pin(async { Ok(()) }));
855
856 test_helper(async move {
857 assert_eq!(stream.next().await.transpose()?, None);
858
859 Ok(())
860 })
861 .await
862 }
863
864 #[tokio::test]
865 async fn test_try_recv_one() -> AnyResult<()> {
866 let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut src| {
867 Box::pin(async move {
868 src.send(1).await.unwrap();
869
870 Ok(())
871 })
872 });
873
874 test_helper(async move {
875 assert_eq!(stream.next().await.transpose()?, Some(1));
876 assert_eq!(stream.next().await.transpose()?, None);
877
878 Ok(())
879 })
880 .await
881 }
882
883 #[tokio::test]
884 async fn test_try_recv_yield() -> AnyResult<()> {
885 let mut stream = <ScopedTryStream<usize, AnyError>>::new(|_| {
886 Box::pin(async move {
887 for _ in 0..5 {
888 yield_now().await;
889 }
890
891 Ok(())
892 })
893 });
894
895 test_helper(async move {
896 assert_eq!(stream.next().await.transpose()?, None);
897
898 Ok(())
899 })
900 .await
901 }
902
903 #[tokio::test]
904 async fn test_try_recv_many() -> AnyResult<()> {
905 let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut sink| {
906 Box::pin(async move {
907 for i in 0..10 {
908 sink.send(i).await.unwrap();
909 }
910
911 Ok(())
912 })
913 });
914
915 test_helper(async move {
916 for i in 0..10 {
917 assert_eq!(stream.next().await.transpose()?, Some(i));
918 }
919 assert_eq!(stream.next().await.transpose()?, None);
920
921 Ok(())
922 })
923 .await
924 }
925
926 #[tokio::test]
927 async fn test_try_recv_many_yield() -> AnyResult<()> {
928 let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut sink| {
929 Box::pin(async move {
930 for i in 0..10 {
931 sink.send(i).await?;
932 for _ in 0..i {
933 yield_now().await;
934 }
935 }
936
937 Ok(())
938 })
939 });
940
941 test_helper(async move {
942 for i in 0..10 {
943 assert_eq!(stream.next().await.transpose()?, Some(i));
944 }
945 assert_eq!(stream.next().await.transpose()?, None);
946
947 Ok(())
948 })
949 .await
950 }
951
952 #[tokio::test]
953 async fn test_try_double_scoped() -> AnyResult<()> {
954 let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut sink| {
955 Box::pin(async move {
956 let mut stream2 = <ScopedTryStream<usize, AnyError>>::new(|mut sink2| {
957 let sink = &mut sink;
958 Box::pin(async move {
959 for i in 0..10 {
960 sink.send(i + 100).await?;
961 sink2.send(i).await?;
962 }
963
964 Ok(())
965 })
966 });
967
968 for i in 0..10 {
969 assert_eq!(stream2.next().await.transpose()?, Some(i));
970 }
971 assert_eq!(stream2.next().await.transpose()?, None);
972
973 Ok(())
974 })
975 });
976
977 test_helper(async move {
978 for i in 0..10 {
979 assert_eq!(stream.next().await.transpose()?, Some(i + 100));
980 }
981 assert_eq!(stream.next().await.transpose()?, None);
982
983 Ok(())
984 })
985 .await
986 }
987
988 #[tokio::test]
989 async fn test_try_double_scoped2() -> AnyResult<()> {
990 let mut stream = <ScopedTryStream<usize, AnyError>>::new(|mut sink| {
991 Box::pin(async move {
992 let mut stream2 = <ScopedTryStream<usize, AnyError>>::new(|mut sink2| {
993 let sink = &mut sink;
994 Box::pin(async move {
995 for i in 0..10 {
996 let (r1, r2) = join!(sink.send(i + 100), sink2.send(i));
997 r1?;
998 r2?;
999 }
1000
1001 Ok(())
1002 })
1003 });
1004
1005 for i in 0..10 {
1006 assert_eq!(stream2.next().await.transpose()?, Some(i));
1007 }
1008 assert_eq!(stream2.next().await.transpose()?, None);
1009
1010 Ok(())
1011 })
1012 });
1013
1014 test_helper(async move {
1015 for i in 0..10 {
1016 assert_eq!(stream.next().await.transpose()?, Some(i + 100));
1017 }
1018 assert_eq!(stream.next().await.transpose()?, None);
1019
1020 Ok(())
1021 })
1022 .await
1023 }
1024
1025 #[tokio::test]
1026 async fn test_try_fail() -> AnyResult<()> {
1027 let mut stream = <ScopedTryStream<usize, usize>>::new(|mut sink| {
1028 Box::pin(async move {
1029 for i in 0..10 {
1030 sink.send(Ok(i)).await.unwrap();
1031 }
1032
1033 Err(500)
1034 })
1035 });
1036
1037 test_helper(async move {
1038 for i in 0..10 {
1039 assert_eq!(stream.next().await, Some(Ok(i)));
1040 }
1041 assert_eq!(stream.next().await, Some(Err(500)));
1042 assert_eq!(stream.next().await, None);
1043
1044 Ok(())
1045 })
1046 .await
1047 }
1048
1049 #[tokio::test]
1050 async fn test_try_fail2() -> AnyResult<()> {
1051 let mut stream = <ScopedTryStream<usize, usize>>::new(|mut sink| {
1052 Box::pin(async move {
1053 for i in 0..10 {
1054 sink.send(Ok(i)).await.unwrap();
1055 }
1056
1057 for i in 0..10 {
1058 sink.send(Err(i)).await.unwrap();
1059 }
1060
1061 Err(500)
1062 })
1063 });
1064
1065 test_helper(async move {
1066 for i in 0..10 {
1067 assert_eq!(stream.next().await, Some(Ok(i)));
1068 }
1069
1070 for i in 0..10 {
1071 assert_eq!(stream.next().await, Some(Err(i)));
1072 }
1073
1074 assert_eq!(stream.next().await, Some(Err(500)));
1075 assert_eq!(stream.next().await, None);
1076
1077 Ok(())
1078 })
1079 .await
1080 }
1081
1082 #[tokio::test]
1083 async fn test_spawn_mpsc() -> AnyResult<()> {
1084 let (s1, mut r1) = channel::<(usize, usize)>(4);
1085 let (s2, mut r2) = channel::<(usize, usize)>(4);
1086
1087 let mut stream = ScopedStream::new(|mut sink| {
1088 Box::pin(async move {
1089 loop {
1090 let r;
1091 select! {
1092 Some(v) = r1.recv() => r = v,
1093 Some(v) = r2.recv() => r = v,
1094 else => return,
1095 }
1096
1097 println!("Received: {r:?}");
1098 sink.feed(r).await.unwrap();
1099 }
1100 })
1101 });
1102
1103 let it = [0..10, 5..20, 10..100, 25..100, 50..75];
1104 let mut handles = Vec::new();
1105
1106 let mut it_ = it.clone();
1107 handles.push(spawn(test_helper(async move {
1108 while let Some((i, v)) = stream.next().await {
1109 assert_eq!(it_[i].next(), Some(v));
1110 }
1111
1112 for mut v in it_ {
1113 assert_eq!(v.next(), None);
1114 }
1115
1116 Ok(())
1117 })));
1118
1119 for (i, v) in it.into_iter().enumerate() {
1120 let s = if i % 2 == 0 { s1.clone() } else { s2.clone() };
1121 handles.push(spawn(test_helper(async move {
1122 for j in v {
1123 s.send((i, j)).await.unwrap();
1124
1125 for _ in 0..i {
1126 yield_now().await
1127 }
1128 }
1129
1130 Ok(())
1131 })));
1132 }
1133 drop((s1, s2));
1134
1135 let mut has_error = false;
1136 for f in handles {
1137 if let Err(e) = f.await? {
1138 eprintln!("{e:?}");
1139 has_error = true;
1140 }
1141 }
1142
1143 if has_error {
1144 bail!("Some error has happened");
1145 }
1146
1147 Ok(())
1148 }
1149
1150 fn nil_waker() -> Waker {
1151 fn raw() -> RawWaker {
1152 RawWaker::new(NonNull::dangling().as_ptr(), &VTABLE)
1153 }
1154
1155 unsafe fn clone(_: *const ()) -> RawWaker {
1156 raw()
1157 }
1158 unsafe fn wake(_: *const ()) {}
1159 unsafe fn wake_by_ref(_: *const ()) {}
1160 unsafe fn drop(_: *const ()) {}
1161
1162 static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
1163
1164 unsafe { Waker::from_raw(raw()) }
1165 }
1166
1167 #[test]
1168 fn test_generator() {
1169 let mut stream = pin!(ScopedStream::new(|mut sink| {
1170 Box::pin(async move {
1171 for i in 0usize..10 {
1172 sink.send(i).await.unwrap();
1173 }
1174 })
1175 }));
1176
1177 let waker = nil_waker();
1178 let mut cx = Context::from_waker(&waker);
1179 for j in 0usize..10 {
1180 assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(j)));
1181 }
1182
1183 assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(None));
1184 }
1185
1186 #[test]
1187 fn test_generator_yield() {
1188 let mut stream = pin!(ScopedStream::new(|mut sink| {
1189 Box::pin(async move {
1190 for i in 0usize..10 {
1191 sink.send(i).await.unwrap();
1192 pending!();
1193 }
1194 })
1195 }));
1196
1197 let waker = nil_waker();
1198 let mut cx = Context::from_waker(&waker);
1199 for j in 0usize..10 {
1200 assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(j)));
1201 assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending);
1202 }
1203
1204 assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(None));
1205 }
1206}