async-stream 0.3.6

Asynchronous streams using async & await notation
Documentation
use async_stream::stream;

use futures_core::stream::{FusedStream, Stream};
use futures_util::pin_mut;
use futures_util::stream::StreamExt;
use tokio::sync::mpsc;
use tokio_test::assert_ok;

#[tokio::test]
async fn noop_stream() {
    let s = stream! {};
    pin_mut!(s);

    while s.next().await.is_some() {
        unreachable!();
    }
}

#[tokio::test]
async fn empty_stream() {
    let mut ran = false;

    {
        let r = &mut ran;
        let s = stream! {
            *r = true;
            println!("hello world!");
        };
        pin_mut!(s);

        while s.next().await.is_some() {
            unreachable!();
        }
    }

    assert!(ran);
}

#[tokio::test]
async fn yield_single_value() {
    let s = stream! {
        yield "hello";
    };

    let values: Vec<_> = s.collect().await;

    assert_eq!(1, values.len());
    assert_eq!("hello", values[0]);
}

#[tokio::test]
async fn fused() {
    let s = stream! {
        yield "hello";
    };
    pin_mut!(s);

    assert!(!s.is_terminated());
    assert_eq!(s.next().await, Some("hello"));
    assert_eq!(s.next().await, None);

    assert!(s.is_terminated());
    // This should return None from now on
    assert_eq!(s.next().await, None);
}

#[tokio::test]
async fn yield_multi_value() {
    let s = stream! {
        yield "hello";
        yield "world";
        yield "dizzy";
    };

    let values: Vec<_> = s.collect().await;

    assert_eq!(3, values.len());
    assert_eq!("hello", values[0]);
    assert_eq!("world", values[1]);
    assert_eq!("dizzy", values[2]);
}

#[tokio::test]
async fn unit_yield_in_select() {
    use tokio::select;

    async fn do_stuff_async() {}

    let s = stream! {
        select! {
            _ = do_stuff_async() => yield,
            else => yield,
        }
    };

    let values: Vec<_> = s.collect().await;
    assert_eq!(values.len(), 1);
}

#[tokio::test]
async fn yield_with_select() {
    use tokio::select;

    async fn do_stuff_async() {}
    async fn more_async_work() {}

    let s = stream! {
        select! {
            _ = do_stuff_async() => yield "hey",
            _ = more_async_work() => yield "hey",
            else => yield "hey",
        }
    };

    let values: Vec<_> = s.collect().await;
    assert_eq!(values, vec!["hey"]);
}

#[tokio::test]
async fn return_stream() {
    fn build_stream() -> impl Stream<Item = u32> {
        stream! {
            yield 1;
            yield 2;
            yield 3;
        }
    }

    let s = build_stream();

    let values: Vec<_> = s.collect().await;
    assert_eq!(3, values.len());
    assert_eq!(1, values[0]);
    assert_eq!(2, values[1]);
    assert_eq!(3, values[2]);
}

#[tokio::test]
async fn consume_channel() {
    let (tx, mut rx) = mpsc::channel(10);

    let s = stream! {
        while let Some(v) = rx.recv().await {
            yield v;
        }
    };

    pin_mut!(s);

    for i in 0..3 {
        assert_ok!(tx.send(i).await);
        assert_eq!(Some(i), s.next().await);
    }

    drop(tx);
    assert_eq!(None, s.next().await);
}

#[tokio::test]
async fn borrow_self() {
    struct Data(String);

    impl Data {
        fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a {
            stream! {
                yield &self.0[..];
            }
        }
    }

    let data = Data("hello".to_string());
    let s = data.stream();
    pin_mut!(s);

    assert_eq!(Some("hello"), s.next().await);
}

#[tokio::test]
async fn stream_in_stream() {
    let s = stream! {
        let s = stream! {
            for i in 0..3 {
                yield i;
            }
        };

        pin_mut!(s);
        while let Some(v) = s.next().await {
            yield v;
        }
    };

    let values: Vec<_> = s.collect().await;
    assert_eq!(3, values.len());
}

#[tokio::test]
async fn yield_non_unpin_value() {
    let s: Vec<_> = stream! {
        for i in 0..3 {
            yield async move { i };
        }
    }
    .buffered(1)
    .collect()
    .await;

    assert_eq!(s, vec![0, 1, 2]);
}

#[test]
fn inner_try_stream() {
    use async_stream::try_stream;
    use tokio::select;

    async fn do_stuff_async() {}

    let _ = stream! {
        select! {
            _ = do_stuff_async() => {
                let another_s = try_stream! {
                    yield;
                };
                let _: Result<(), ()> = Box::pin(another_s).next().await.unwrap();
            },
            else => {},
        }
        yield
    };
}

#[rustversion::attr(not(stable), ignore)]
#[test]
fn test() {
    let t = trybuild::TestCases::new();
    t.compile_fail("tests/ui/*.rs");
}