use futures::stream::{Stream, StreamExt};
use std::time::Duration;
pub fn interval_stream(duration: Duration) -> impl Stream<Item = ()> {
use tokio_stream::wrappers::IntervalStream;
let interval = tokio::time::interval(duration);
IntervalStream::new(interval).map(|_| ())
}
pub fn channel_stream<T>(rx: tokio::sync::mpsc::Receiver<T>) -> impl Stream<Item = T> + Send + Unpin
where
T: Send + 'static,
{
Box::pin(futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
}))
}
pub fn unbounded_channel_stream<T>(
rx: tokio::sync::mpsc::UnboundedReceiver<T>,
) -> impl Stream<Item = T> + Send + Unpin
where
T: Send + 'static,
{
Box::pin(futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
}))
}
pub fn delayed_stream(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
Box::pin(futures::stream::once(async move {
tokio::time::sleep(duration).await;
}))
}
pub fn timeout_stream(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
delayed_stream(duration)
}
pub fn merge_streams<T>(
streams: Vec<Box<dyn Stream<Item = T> + Send + Unpin>>,
) -> impl Stream<Item = T>
where
T: Send + 'static,
{
use futures::stream::SelectAll;
let mut select_all = SelectAll::new();
for stream in streams {
select_all.push(stream);
}
select_all
}
pub fn watch_stream<T>(rx: tokio::sync::watch::Receiver<T>) -> impl Stream<Item = T> + Send + Unpin
where
T: Clone + Send + Sync + 'static,
{
Box::pin(futures::stream::unfold(rx, |mut rx| async move {
rx.changed().await.ok()?;
let value = rx.borrow().clone();
Some((value, rx))
}))
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use std::time::Duration;
#[tokio::test]
async fn test_interval_stream() {
let mut stream = interval_stream(Duration::from_millis(10)).take(3);
let mut count = 0;
while stream.next().await.is_some() {
count += 1;
}
assert_eq!(count, 3);
}
#[tokio::test]
async fn test_timeout_stream() {
let mut stream = timeout_stream(Duration::from_millis(10));
let result = stream.next().await;
assert!(result.is_some());
let result = stream.next().await;
assert!(result.is_none()); }
#[tokio::test]
async fn test_channel_stream() {
let (tx, rx) = tokio::sync::mpsc::channel(10);
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
tx.send(3).await.unwrap();
drop(tx);
let mut stream = channel_stream(rx);
assert_eq!(stream.next().await, Some(1));
assert_eq!(stream.next().await, Some(2));
assert_eq!(stream.next().await, Some(3));
assert_eq!(stream.next().await, None);
}
}