#[macro_export]
macro_rules! stream {
(|$yielder:ident| { $($tt:tt)*}) => {{
$crate::futures::stream::generate_stream(|$yielder| Box::pin(async move {
$($tt)*
}))
}};
(|$yielder:ident| move { $($tt:tt)*}) => {{
$crate::futures::stream::generate_stream(move |$yielder| Box::pin(async move {
$($tt)*
}))
}};
($yielder:ident => $($tt:tt)* ) => {{
$crate::stream!(|$yielder| { $($tt)* })
}};
($yielder:ident move => $($tt:tt)* ) => {{
$crate::stream!(|$yielder| move { $($tt)* })
}};
}
#[macro_export]
macro_rules! box_stream {
(|$yielder:ident| { $($tt:tt)*}) => {{
Box::pin($crate::futures::stream::generate_stream(|$yielder| Box::pin(async move {
$($tt)*
})))
} as std::pin::Pin<Box<dyn $crate::futures::Stream<Item = _> + Send + '_>>};
(|$yielder:ident| move { $($tt:tt)*}) => {{
Box::pin($crate::futures::stream::generate_stream(move |$yielder| Box::pin(async move {
$($tt)*
})))
} as std::pin::Pin<Box<dyn $crate::futures::Stream<Item = _> + Send + '_>>};
($yielder:ident => $($tt:tt)* ) => {{
$crate::box_stream!(|$yielder| { $($tt)* })
}};
($yielder:ident move => $($tt:tt)* ) => {{
$crate::box_stream!(|$yielder| move { $($tt)* })
}};
}
#[macro_export]
macro_rules! try_stream {
(|$yielder:ident| { $($tt:tt)*}) => {{
#[allow(unreachable_code)]
$crate::futures::stream::generate_try_stream(|$yielder| Box::pin(async move {
let _ = { $($tt)* };
Ok(())
}))
}};
(|$yielder:ident| move { $($tt:tt)*}) => {{
#[allow(unreachable_code)]
$crate::futures::stream::generate_try_stream(move |$yielder| Box::pin(async move {
let _ = { $($tt)* };
Ok(())
}))
}};
($yielder:ident => $($tt:tt)* ) => {{
$crate::try_stream!(|$yielder| { $($tt)* })
}};
($yielder:ident move => $($tt:tt)* ) => {{
$crate::try_stream!(|$yielder| move { $($tt)* })
}};
}
#[macro_export]
macro_rules! box_try_stream {
(|$yielder:ident| { $($tt:tt)*}) => {{
#[allow(unreachable_code)]
Box::pin($crate::futures::stream::generate_try_stream(|$yielder| Box::pin(async move {
let _ = { $($tt)* };
Ok(())
})))
} as std::pin::Pin<Box<dyn $crate::futures::Stream<Item = Result<_, _>> + Send + '_>>};
(|$yielder:ident| move { $($tt:tt)*}) => {{
#[allow(unreachable_code)]
Box::pin($crate::futures::stream::generate_try_stream(move |$yielder| Box::pin(async move {
let _ = { $($tt)* };
Ok(())
})))
} as std::pin::Pin<Box<dyn $crate::futures::Stream<Item = Result<_, _>> + Send + '_>>};
($yielder:ident => $($tt:tt)* ) => {{
$crate::box_try_stream!(|$yielder| { $($tt)* })
}};
($yielder:ident move => $($tt:tt)* ) => {{
$crate::box_try_stream!(|$yielder| move { $($tt)* })
}};
}
#[cfg(test)]
mod tests {
use tokio_stream::StreamExt;
#[tokio::test(flavor = "multi_thread")]
async fn stream_test() {
let mut stream = stream!(|yx| {
yx.yield_one(1);
yx.yield_one(2);
yx.yield_one(3);
yx.yield_all(vec![4, 5, 6]);
yx.yield_stream(stream!(|yx2| {
yx2.yield_one(7);
yx2.yield_one(8);
yx2.yield_one(9);
yx2.yield_one(10);
}))
.await;
});
for i in 1..=10 {
assert_eq!(stream.next().await, Some(i));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn box_stream_test() {
let mut stream = box_stream!(|yx| {
yx.yield_one(1);
yx.yield_one(2);
yx.yield_one(3);
yx.yield_all(vec![4, 5, 6]);
yx.yield_stream(box_stream!(|yx2| {
yx2.yield_one(7);
yx2.yield_one(8);
yx2.yield_one(9);
yx2.yield_one(10);
}))
.await;
});
for i in 1..=10 {
assert_eq!(stream.next().await, Some(i));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn box_try_stream_test() {
let mut stream = box_try_stream!(|yx| {
yx.yield_one(1);
yx.yield_one(2);
yx.yield_one(3);
return Err("Something went wrong");
});
assert_eq!(Ok(1), stream.next().await.unwrap());
assert_eq!(Ok(2), stream.next().await.unwrap());
assert_eq!(Ok(3), stream.next().await.unwrap());
assert_eq!(Err("Something went wrong"), stream.next().await.unwrap());
assert!(stream.next().await.is_none());
}
}