async-stream-lite 0.2.0

Proc macro-free async/await Rust streams
Documentation
use futures::{
	Stream, StreamExt,
	future::join_all,
	pin_mut,
	stream::{BoxStream, FusedStream}
};
use tokio::sync::mpsc;

use super::{Yielder, async_stream};

#[tokio::test]
async fn noop_stream() {
	let s = async_stream(|_yielder: Yielder<()>| async move {});
	pin_mut!(s);

	#[allow(clippy::never_loop)]
	while s.next().await.is_some() {
		unreachable!();
	}
}

#[tokio::test]
async fn empty_stream() {
	let mut ran = false;

	{
		let r = &mut ran;
		let s = async_stream(|_yield: Yielder<()>| async move {
			*r = true;
			println!("hello world!");
		});
		pin_mut!(s);

		#[allow(clippy::never_loop)]
		while s.next().await.is_some() {
			unreachable!();
		}
	}

	assert!(ran);
}

#[tokio::test]
async fn yield_single_value() {
	let s = async_stream(|yielder| async move {
		yielder.r#yield("hello").await;
	});

	let values: Vec<_> = s.collect().await;

	assert_eq!(1, values.len());
	assert_eq!("hello", values[0]);
}

#[tokio::test]
async fn fused() {
	let s = async_stream(|yielder| async move {
		yielder.r#yield("hello").await;
	});
	pin_mut!(s);

	assert!(!s.is_terminated());
	assert_eq!(s.next().await, Some("hello"));
	assert_eq!(s.next().await, None);

	assert!(s.is_terminated());
	assert_eq!(s.next().await, None);
}

#[tokio::test]
async fn yield_multi_value() {
	let stream = async_stream(|yielder| async move {
		yielder.r#yield("hello").await;
		yielder.r#yield("world").await;
		yielder.r#yield("foobar").await;
	});

	let values: Vec<_> = stream.collect().await;

	assert_eq!(3, values.len());
	assert_eq!("hello", values[0]);
	assert_eq!("world", values[1]);
	assert_eq!("foobar", values[2]);
}

#[tokio::test]
async fn unit_yield_in_select() {
	#[allow(clippy::unused_async)]
	async fn do_stuff_async() {}

	let stream = async_stream(|yielder| async move {
		tokio::select! {
			() = do_stuff_async() => yielder.r#yield(()).await,
			else => yielder.r#yield(()).await
		}
	});

	let values: Vec<_> = stream.collect().await;
	assert_eq!(values.len(), 1);
}

#[tokio::test]
#[allow(clippy::unused_async)]
async fn yield_with_select() {
	async fn do_stuff_async() {}
	async fn more_async_work() {}

	let stream = async_stream(|yielder| async move {
		tokio::select! {
			() = do_stuff_async() => yielder.r#yield("hey").await,
			() = more_async_work() => yielder.r#yield("hey").await,
			else => yielder.r#yield("hey").await
		}
	});

	let values: Vec<_> = stream.collect().await;
	assert_eq!(values, vec!["hey"]);
}

#[tokio::test]
async fn return_stream() {
	fn build_stream() -> impl Stream<Item = u32> {
		async_stream(|yielder| async move {
			yielder.r#yield(1).await;
			yielder.r#yield(2).await;
			yielder.r#yield(3).await;
		})
	}

	let stream = build_stream();

	let values: Vec<_> = stream.collect().await;
	assert_eq!(3, values.len());
	assert_eq!(1, values[0]);
	assert_eq!(2, values[1]);
	assert_eq!(3, values[2]);
}

#[tokio::test]
async fn boxed_stream() {
	fn build_stream() -> BoxStream<'static, u32> {
		Box::pin(async_stream(|yielder| async move {
			yielder.r#yield(1).await;
			yielder.r#yield(2).await;
			yielder.r#yield(3).await;
		}))
	}

	let stream = build_stream();

	let values: Vec<_> = stream.collect().await;
	assert_eq!(3, values.len());
	assert_eq!(1, values[0]);
	assert_eq!(2, values[1]);
	assert_eq!(3, values[2]);
}

#[tokio::test]
async fn consume_channel() {
	let (tx, mut rx) = mpsc::channel(10);

	let stream = async_stream(|yielder| async move {
		while let Some(v) = rx.recv().await {
			yielder.r#yield(v).await;
		}
	});

	pin_mut!(stream);

	for i in 0..3 {
		assert!(tx.send(i).await.is_ok());
		assert_eq!(Some(i), stream.next().await);
	}

	drop(tx);
	assert_eq!(None, stream.next().await);
}

#[tokio::test]
async fn borrow_self() {
	struct Data(String);

	impl Data {
		fn stream(&self) -> impl Stream<Item = &str> + '_ {
			async_stream(|yielder| async move {
				yielder.r#yield(&self.0[..]).await;
			})
		}
	}

	let data = Data("hello".to_string());
	let s = data.stream();
	pin_mut!(s);

	assert_eq!(Some("hello"), s.next().await);
}

#[tokio::test]
async fn borrow_self_boxed() {
	struct Data(String);

	impl Data {
		fn stream(&self) -> BoxStream<'_, &str> {
			Box::pin(async_stream(|yielder| async move {
				yielder.r#yield(&self.0[..]).await;
			}))
		}
	}

	let data = Data("hello".to_string());
	let s = data.stream();
	pin_mut!(s);

	assert_eq!(Some("hello"), s.next().await);
}

#[tokio::test]
async fn stream_in_stream() {
	let s = async_stream(|yielder| async move {
		let s = async_stream(|yielder| async move {
			for i in 0..3 {
				yielder.r#yield(i).await;
			}
		});
		pin_mut!(s);

		while let Some(v) = s.next().await {
			yielder.r#yield(v).await;
		}
	});

	let values: Vec<_> = s.collect().await;
	assert_eq!(3, values.len());
}

#[tokio::test]
async fn streamception() {
	let s = async_stream(|yielder| async move {
		let s = async_stream(|yielder| async move {
			let s = async_stream(|yielder| async move {
				let s = async_stream(|yielder| async move {
					let s = async_stream(|yielder| async move {
						for i in 0..3 {
							yielder.r#yield(i).await;
						}
					});
					pin_mut!(s);

					while let Some(v) = s.next().await {
						yielder.r#yield(v).await;
					}
				});
				pin_mut!(s);

				while let Some(v) = s.next().await {
					yielder.r#yield(v).await;
				}
			});
			pin_mut!(s);

			while let Some(v) = s.next().await {
				yielder.r#yield(v).await;
			}
		});
		pin_mut!(s);

		while let Some(v) = s.next().await {
			yielder.r#yield(v).await;
		}
	});

	let values: Vec<_> = s.collect().await;
	assert_eq!(3, values.len());
}

#[tokio::test]
async fn yield_non_unpin_value() {
	let s: Vec<_> = async_stream(|yielder| async move {
		for i in 0..3 {
			yielder.r#yield(async move { i }).await;
		}
	})
	.buffered(1)
	.collect()
	.await;

	assert_eq!(s, vec![0, 1, 2]);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn multithreaded() {
	fn build_stream() -> impl Stream<Item = u32> {
		async_stream(|yielder| async move {
			yielder.r#yield(1).await;
			yielder.r#yield(2).await;
			yielder.r#yield(3).await;
		})
	}

	let mut futures = Vec::new();
	for _ in 0..4 {
		futures.push(tokio::spawn(async move {
			let stream = build_stream();
			let values: Vec<_> = stream.collect().await;
			assert_eq!(3, values.len());
			assert_eq!(1, values[0]);
			assert_eq!(2, values[1]);
			assert_eq!(3, values[2]);
		}));
	}
	join_all(futures).await;
}

#[tokio::test]
#[should_panic = "attempted to use async_stream_lite yielder outside of stream context or across threads"]
async fn test_move_yielder() {
	let mut slot = None;
	let s = async_stream(|yielder: Yielder<()>| async {
		slot.replace(yielder);
	});
	pin_mut!(s);

	let _ = s.next().await;
	drop(s);

	slot.take().unwrap().r#yield(()).await;
}