use for_streams::for_streams;
use std::task::Poll;
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
#[tokio::test]
async fn test_channels() {
let (sender1, receiver1) = channel::<i32>(1);
let (sender2, receiver2) = channel::<i32>(1);
let mut outputs1 = Vec::new();
let mut outputs2 = Vec::new();
for_streams! {
val in tokio_stream::iter(0..10) => move {
if val % 2 == 0 {
continue; }
sender1.send(val).await.expect("this channel stays open");
}
val in tokio_stream::iter(10..20) => move {
_ = sender2.send(val).await;
}
val in ReceiverStream::new(receiver1) => {
outputs1.push(val);
}
val in ReceiverStream::new(receiver2) => {
outputs2.push(val);
if val == 15 {
break; }
}
}
assert_eq!(outputs1, vec![1, 3, 5, 7, 9]);
assert_eq!(outputs2, vec![10, 11, 12, 13, 14, 15]);
}
#[tokio::test]
async fn test_return() {
let numbers_stream = futures::stream::iter(0..10);
let never_stream =
futures::stream::poll_fn(|_| -> Poll<Option<()>> { std::task::Poll::Pending });
let mut outputs = Vec::new();
for_streams! {
val in numbers_stream => {
outputs.push(val);
if val == 5 {
return;
}
}
_ in never_stream => {}
}
assert_eq!(outputs, vec![0, 1, 2, 3, 4, 5]);
let numbers_stream = futures::stream::iter(100..110);
let never_stream =
futures::stream::poll_fn(|_| -> Poll<Option<()>> { std::task::Poll::Pending });
let mut outputs = Vec::new();
let outputs_ref = &mut outputs;
for_streams! {
val in numbers_stream => move {
outputs_ref.push(val);
if val == 105 {
return;
}
}
_ in never_stream => {}
}
assert_eq!(outputs, vec![100, 101, 102, 103, 104, 105]);
}
#[tokio::test]
async fn test_background() {
let stream1 = futures::stream::iter(0..5);
let background = futures::stream::iter(5..10);
let mut outputs1 = Vec::new();
let mut outputs2 = Vec::new();
let never_stream =
futures::stream::poll_fn(|_| -> Poll<Option<()>> { std::task::Poll::Pending });
for_streams! {
val in stream1 => {
outputs1.push(val);
}
val in background => {
outputs2.push(val);
}
_ in background never_stream => {}
}
assert_eq!(outputs1, vec![0, 1, 2, 3, 4]);
assert_eq!(outputs2, vec![5, 6, 7, 8, 9]);
}