use std::error::Error;
use async_rx::StreamExt as _;
use futures_util::stream;
use stream_assert::{assert_closed, assert_next_eq, assert_pending};
use tokio::sync::mpsc::{channel, unbounded_channel};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
#[tokio::test]
async fn batch_with() -> Result<(), Box<dyn Error>> {
let (drainer, drainer_receiver) = channel::<()>(1);
let (stream_sender, stream_receiver) = unbounded_channel();
let mut batch_stream = UnboundedReceiverStream::new(stream_receiver)
.batch_with(ReceiverStream::new(drainer_receiver));
assert_pending!(batch_stream);
stream_sender.send(1)?;
stream_sender.send(2)?;
stream_sender.send(3)?;
assert_pending!(batch_stream);
drainer.send(()).await?;
assert_next_eq!(batch_stream, vec![1, 2, 3]);
stream_sender.send(4)?;
stream_sender.send(5)?;
assert_pending!(batch_stream);
drainer.send(()).await?;
assert_next_eq!(batch_stream, vec![4, 5]);
assert_pending!(batch_stream);
stream_sender.send(6)?;
stream_sender.send(7)?;
stream_sender.send(8)?;
assert_pending!(batch_stream);
drop(stream_sender);
assert_next_eq!(batch_stream, vec![6, 7, 8]);
assert_closed!(batch_stream);
Ok(())
}
#[tokio::test]
async fn empty_primary_stream() {
let (_drainer, drainer_receiver) = channel::<()>(1);
let (stream_sender, stream_receiver) = unbounded_channel::<usize>();
let mut batch_stream = UnboundedReceiverStream::new(stream_receiver)
.batch_with(ReceiverStream::new(drainer_receiver));
drop(stream_sender);
assert_closed!(batch_stream);
}
#[tokio::test]
async fn trigger_happy_batch_stream() -> Result<(), Box<dyn Error>> {
let (drainer, drainer_receiver) = unbounded_channel::<()>();
let mut stream =
stream::pending::<u8>().batch_with(UnboundedReceiverStream::new(drainer_receiver));
assert_pending!(stream);
drainer.send(())?;
assert_pending!(stream);
drainer.send(())?;
drainer.send(())?;
drainer.send(())?;
assert_pending!(stream);
assert_pending!(stream);
assert_pending!(stream);
assert_pending!(stream);
Ok(())
}