use std::error::Error;
use std::future::{Future, pending};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use async_stream::stream;
use futures::{Stream, StreamExt, FutureExt};
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::stream;
pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
pub trait Runtime: Clone + Send + Sync + 'static {
fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>>;
fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>>;
fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static;
}
#[cfg(feature = "tokio-runtime")]
#[derive(Clone)]
pub struct TokioRuntime;
#[cfg(feature = "tokio-runtime")]
impl Runtime for TokioRuntime {
fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(tokio::time::sleep(duration))
}
fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
use async_stream::stream;
Box::pin(stream! {
let mut interval = tokio::time::interval(period);
loop {
interval.tick().await;
yield ();
}
})
}
fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
{
tokio::spawn(future);
}
}
#[cfg(feature = "smol-runtime")]
#[derive(Clone)]
pub struct SmolRuntime;
#[cfg(feature = "smol-runtime")]
impl Runtime for SmolRuntime {
fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(async_io::Timer::after(duration))
}
fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
use async_stream::stream;
Box::pin(stream! {
loop {
async_io::Timer::after(period).await;
yield ();
}
})
}
fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
{
smol::spawn(future).detach();
}
}
#[cfg(feature = "async-std-runtime")]
#[derive(Clone)]
pub struct AsyncStdRuntime;
#[cfg(feature = "async-std-runtime")]
impl Runtime for AsyncStdRuntime {
fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(async_std::task::sleep(duration))
}
fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
use async_stream::stream;
Box::pin(stream! {
loop {
async_std::task::sleep(period).await;
yield ();
}
})
}
fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
{
async_std::task::spawn(future);
}
}
pub fn just<T>(value: T) -> impl Stream<Item = T> {
stream! { yield value; }
}
pub fn of<T: Clone>(value: T) -> impl Stream<Item = T> {
just(value)
}
#[cfg(test)]
mod just_tests {
use super::*;
#[tokio::test]
async fn test_just_emits_single_value() {
let stream = just(42);
let values: Vec<_> = stream.collect().await;
assert_eq!(values, vec![42]);
}
#[tokio::test]
async fn test_just_with_string() {
let stream = just("hello".to_string());
let values: Vec<_> = stream.collect().await;
assert_eq!(values, vec!["hello"]);
}
#[tokio::test]
async fn test_of_alias() {
let stream = of(99);
let values: Vec<_> = stream.collect().await;
assert_eq!(values, vec![99]);
}
}
pub fn from_future<T, F: Future<Output = T>>(future: F) -> impl Stream<Item = T> {
stream! {
let value = future.await;
yield value;
}
}
#[cfg(test)]
mod from_future_tests {
use super::*;
#[tokio::test]
async fn test_from_future_emits_resolved_value() {
let future = async { 42 };
let stream = from_future(future);
let values: Vec<_> = stream.collect().await;
assert_eq!(values, vec![42]);
}
#[tokio::test]
async fn test_from_future_with_async_computation() {
let future = async {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
"computed".to_string()
};
let stream = from_future(future);
let values: Vec<_> = stream.collect().await;
assert_eq!(values, vec!["computed"]);
}
}
pub use futures::stream::iter as from_iter;
#[cfg(test)]
mod from_iter_tests {
use super::*;
#[tokio::test]
async fn test_from_iter_emits_all_values() {
let stream = from_iter(vec![1, 2, 3]);
let values: Vec<_> = stream.collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_from_iter_handles_empty() {
let stream = from_iter(Vec::<i32>::new());
let values: Vec<_> = stream.collect().await;
assert!(values.is_empty());
}
#[tokio::test]
async fn test_from_iter_with_range() {
let stream = from_iter(0..5);
let values: Vec<_> = stream.collect().await;
assert_eq!(values, vec![0, 1, 2, 3, 4]);
}
}
pub fn periodic<R: Runtime>(interval_ms: u64) -> impl Stream<Item = ()> {
let interval_stream = R::interval(Duration::from_millis(interval_ms));
stream! {
futures::pin_mut!(interval_stream);
loop {
interval_stream.next().await;
yield ();
}
}
}
pub fn periodic_with_timer<T, F>(
interval_ms: u64,
make_timer: impl Fn(Duration) -> F + Send + 'static,
) -> impl Stream<Item = ()>
where
F: std::future::Future<Output = ()> + Send,
{
stream! {
let duration = Duration::from_millis(interval_ms);
loop {
make_timer(duration).await;
yield ();
}
}
}
#[cfg(test)]
mod periodic_tests {
}
pub use futures::stream::empty;
#[cfg(test)]
mod empty_tests {
use super::*;
#[tokio::test]
async fn test_empty_yields_nothing() {
let values: Vec<i32> = empty::<i32>().collect().await;
assert!(values.is_empty());
}
#[tokio::test]
async fn test_empty_completes_immediately() {
let stream = empty::<String>();
futures::pin_mut!(stream);
assert!(stream.next().await.is_none());
}
}
pub use futures::stream::pending as never;
#[cfg(test)]
mod never_tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn test_never_does_not_complete() {
let never_stream = never::<i32>();
futures::pin_mut!(never_stream);
let timeout = tokio::time::sleep(Duration::from_millis(10));
futures::pin_mut!(timeout);
let result = futures::future::select(never_stream.next(), timeout).await;
match result {
futures::future::Either::Right(_) => {} futures::future::Either::Left(_) => panic!("never() should not emit"),
}
}
}
pub fn iterate<T: Clone, F: Fn(T) -> T>(seed: T, f: F) -> impl Stream<Item = T> {
stream! {
let mut current = seed;
loop {
yield current.clone();
current = f(current);
}
}
}
#[cfg(test)]
mod iterate_tests {
use super::*;
#[tokio::test]
async fn test_iterate_generates_sequence() {
let stream = iterate(1, |x| x * 2);
let values: Vec<_> = stream.take(5).collect().await;
assert_eq!(values, vec![1, 2, 4, 8, 16]);
}
#[tokio::test]
async fn test_iterate_with_addition() {
let stream = iterate(0, |x| x + 1);
let values: Vec<_> = stream.take(4).collect().await;
assert_eq!(values, vec![0, 1, 2, 3]);
}
#[tokio::test]
async fn test_iterate_with_strings() {
let stream = iterate("a".to_string(), |s| s.clone() + "a");
let values: Vec<_> = stream.take(3).collect().await;
assert_eq!(values, vec!["a", "aa", "aaa"]);
}
}
pub struct UnfoldResult<T, S> {
pub value: T,
pub next_seed: S,
pub done: bool,
}
pub fn unfold<T, S: Clone, F>(seed: S, f: F) -> impl Stream<Item = T>
where
F: Fn(S) -> UnfoldResult<T, S> + Clone + Send + 'static,
{
let f = f.clone();
futures::stream::unfold(seed, move |state| {
let f = f.clone();
async move {
let result = f(state);
if result.done {
None
} else {
Some((result.value, result.next_seed))
}
}
})
}
#[cfg(test)]
mod unfold_tests {
use super::*;
#[tokio::test]
async fn test_unfold_generates_values() {
let stream = unfold(1, |n| UnfoldResult {
value: n,
next_seed: n + 1,
done: n > 3,
});
let values: Vec<_> = stream.collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_unfold_stops_immediately_when_done() {
let stream = unfold(0, |_| UnfoldResult {
value: 999,
next_seed: 0,
done: true,
});
let values: Vec<i32> = stream.collect().await;
assert!(values.is_empty());
}
#[tokio::test]
async fn test_unfold_with_different_types() {
let stream = unfold(0, |n| UnfoldResult {
value: format!("item-{}", n),
next_seed: n + 1,
done: n >= 2,
});
let values: Vec<_> = stream.collect().await;
assert_eq!(values, vec!["item-0", "item-1"]);
}
}
pub fn start_with<T: Clone, S: Stream<Item = T>>(value: T, s: S) -> impl Stream<Item = T> {
stream! {
yield value;
futures::pin_mut!(s);
while let Some(item) = s.next().await { yield item; }
}
}
#[cfg(test)]
mod start_with_tests {
use super::*;
#[tokio::test]
async fn test_start_with_prepends_value() {
let source = futures::stream::iter(vec![1, 2, 3]);
let result = start_with(0, source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![0, 1, 2, 3]);
}
#[tokio::test]
async fn test_start_with_on_empty_stream() {
let source = stream::empty::<i32>();
let result = start_with(42, source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![42]);
}
}
pub fn concat<T, S: Stream<Item = T>>(streams: Vec<S>) -> impl Stream<Item = T> {
stream! {
for s in streams {
futures::pin_mut!(s);
while let Some(item) = s.next().await { yield item; }
}
}
}
pub fn concat2<T, S1: Stream<Item = T>, S2: Stream<Item = T>>(s1: S1, s2: S2) -> impl Stream<Item = T> {
stream! {
futures::pin_mut!(s1);
futures::pin_mut!(s2);
while let Some(item) = s1.next().await { yield item; }
while let Some(item) = s2.next().await { yield item; }
}
}
#[cfg(test)]
mod concat_tests {
use super::*;
#[tokio::test]
async fn test_concat_joins_streams() {
let s1 = futures::stream::iter(vec![1, 2]);
let s2 = futures::stream::iter(vec![3, 4]);
let result = concat2(s1, s2);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![1, 2, 3, 4]);
}
#[tokio::test]
async fn test_concat_with_empty_first() {
let s1 = stream::empty::<i32>();
let s2 = futures::stream::iter(vec![5, 6]);
let result = concat2(s1, s2);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![5, 6]);
}
#[tokio::test]
async fn test_concat_vec_of_streams() {
let streams = vec![
futures::stream::iter(vec![1]),
futures::stream::iter(vec![2, 3]),
futures::stream::iter(vec![4]),
];
}
}
pub fn from_channel<T>(mut rx: mpsc::UnboundedReceiver<T>) -> impl Stream<Item = T> {
stream! { while let Some(item) = rx.next().await { yield item; } }
}
pub fn from_bounded_channel<T>(mut rx: mpsc::Receiver<T>) -> impl Stream<Item = T> {
stream! { while let Some(item) = rx.next().await { yield item; } }
}
#[cfg(test)]
mod channel_tests {
}
macro_rules! pipe {
($initial:expr $(, $fn:expr)*) => {{
let mut result = $initial;
$(result = $fn(result);)*
result
}};
}
pub fn map<T, U, S, F>(s: S, f: F) -> impl Stream<Item = U>
where
S: Stream<Item = T>,
F: Fn(T) -> U,
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await { yield f(item); }
}
}
#[cfg(test)]
mod map_tests {
use super::*;
#[tokio::test]
async fn test_map_transforms_values() {
let source = futures::stream::iter(vec![1, 2, 3]);
let result = map(source, |x| x * 2);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![2, 4, 6]);
}
#[tokio::test]
async fn test_map_with_type_change() {
let source = futures::stream::iter(vec![1, 2, 3]);
let result = map(source, |x| format!("num-{}", x));
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec!["num-1", "num-2", "num-3"]);
}
#[tokio::test]
async fn test_map_empty_stream() {
let source = stream::empty::<i32>();
let result = map(source, |x| x * 2);
let values: Vec<_> = result.collect().await;
assert!(values.is_empty());
}
}
pub fn constant<T, U: Clone, S: Stream<Item = T>>(value: U, s: S) -> impl Stream<Item = U> {
stream! {
futures::pin_mut!(s);
while let Some(_) = s.next().await { yield value.clone(); }
}
}
#[cfg(test)]
mod constant_tests {
use super::*;
#[tokio::test]
async fn test_constant_replaces_all_values() {
let source = futures::stream::iter(vec![1, 2, 3]);
let result = constant("x", source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec!["x", "x", "x"]);
}
#[tokio::test]
async fn test_constant_empty_stream() {
let source = stream::empty::<i32>();
let result = constant(42, source);
let values: Vec<_> = result.collect().await;
assert!(values.is_empty());
}
}
pub fn scan<T, U: Clone, S, F>(accumulator: F, seed: U, s: S) -> impl Stream<Item = U>
where
S: Stream<Item = T>,
F: Fn(U, T) -> U,
{
stream! {
let mut acc = seed.clone();
yield acc.clone();
futures::pin_mut!(s);
while let Some(item) = s.next().await {
acc = accumulator(acc, item);
yield acc.clone();
}
}
}
#[cfg(test)]
mod scan_tests {
use super::*;
#[tokio::test]
async fn test_scan_accumulates_with_seed() {
let source = futures::stream::iter(vec![1, 2, 3]);
let result = scan(|acc, x| acc + x, 0, source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![0, 1, 3, 6]);
}
#[tokio::test]
async fn test_scan_product() {
let source = futures::stream::iter(vec![2, 3, 4]);
let result = scan(|acc, x| acc * x, 1, source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![1, 2, 6, 24]);
}
#[tokio::test]
async fn test_scan_empty_stream() {
let source = stream::empty::<i32>();
let result = scan(|acc, x| acc + x, 100, source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![100]); }
}
pub fn tap<T: Clone, S, F>(side_effect: F, s: S) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
F: Fn(&T),
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await {
side_effect(&item);
yield item;
}
}
}
pub fn tap_spawn<R, T, S, F, Fut>(
side_effect: F,
s: S,
) -> impl Stream<Item = T>
where
R: Runtime,
T: Clone + Send + 'static,
S: Stream<Item = T>,
F: Fn(T) -> Fut + Clone + Send + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await {
let f = side_effect.clone();
let item_clone = item.clone();
R::spawn(async move { f(item_clone).await });
yield item;
}
}
}
#[cfg(test)]
mod tap_runtime_tests {
}
pub fn await_tap<T: Clone, S, F, Fut>(side_effect: F, s: S) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
F: Fn(T) -> Fut,
Fut: Future<Output = ()>,
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await {
side_effect(item.clone()).await;
yield item;
}
}
}
#[cfg(test)]
mod await_tap_tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[tokio::test]
async fn test_await_tap_executes_side_effect() {
let count = Arc::new(AtomicUsize::new(0));
let count_clone = count.clone();
let source = futures::stream::iter(vec![1, 2, 3]);
let tapped = await_tap(
move |_: i32| {
let c = count_clone.clone();
async move { c.fetch_add(1, Ordering::SeqCst); }
},
source,
);
let values: Vec<_> = tapped.collect().await;
assert_eq!(values, vec![1, 2, 3]);
assert_eq!(count.load(Ordering::SeqCst), 3);
}
}
pub fn continue_with<T, S1, S2, F>(f: F, s: S1) -> impl Stream<Item = T>
where
S1: Stream<Item = T>,
S2: Stream<Item = T>,
F: FnOnce() -> S2,
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await { yield item; }
let s2 = f();
futures::pin_mut!(s2);
while let Some(item) = s2.next().await { yield item; }
}
}
#[cfg(test)]
mod continue_with_tests {
use super::*;
#[tokio::test]
async fn test_continue_with_appends() {
let first = futures::stream::iter(vec![1, 2]);
let second = || futures::stream::iter(vec![3, 4]);
let values: Vec<_> = continue_with(second, first).collect().await;
assert_eq!(values, vec![1, 2, 3, 4]);
}
#[tokio::test]
async fn test_continue_with_lazy() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
let first = futures::stream::iter(vec![1]);
let second = move || {
called_clone.store(true, Ordering::SeqCst);
futures::stream::iter(vec![2])
};
let mut stream = continue_with(second, first);
futures::pin_mut!(stream);
assert_eq!(stream.next().await, Some(1));
assert_eq!(stream.next().await, Some(2));
assert!(called.load(Ordering::SeqCst));
}
}
pub fn concat_all<T, Inner, Outer>(outer: Outer) -> impl Stream<Item = T>
where
Inner: Stream<Item = T>,
Outer: Stream<Item = Inner>,
{
stream! {
futures::pin_mut!(outer);
while let Some(inner) = outer.next().await {
futures::pin_mut!(inner);
while let Some(item) = inner.next().await { yield item; }
}
}
}
#[cfg(test)]
mod concat_all_tests {
use super::*;
#[tokio::test]
async fn test_concat_all_flattens() {
let s1 = futures::stream::iter(vec![1, 2]);
let s2 = futures::stream::iter(vec![3, 4]);
let outer = futures::stream::iter(vec![s1, s2]);
let values: Vec<_> = concat_all(outer).collect().await;
assert_eq!(values, vec![1, 2, 3, 4]);
}
}
pub fn concat_map<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
where
S: Stream<Item = T>,
Inner: Stream<Item = U>,
F: Fn(T) -> Inner,
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await {
let inner = f(item);
futures::pin_mut!(inner);
while let Some(inner_item) = inner.next().await { yield inner_item; }
}
}
}
#[cfg(test)]
mod concat_map_tests {
use super::*;
#[tokio::test]
async fn test_concat_map_sequential() {
let source = futures::stream::iter(vec![1, 2]);
let result = concat_map(|x| futures::stream::iter(vec![x * 10, x * 10 + 1]), source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![10, 11, 20, 21]);
}
}
pub fn filter<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
P: Fn(&T) -> bool,
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await { if predicate(&item) { yield item; } }
}
}
pub fn filter_async<T, S, P, Fut>(predicate: P, s: S) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
P: Fn(&T) -> Fut,
Fut: std::future::Future<Output = bool>,
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await { if predicate(&item).await { yield item; } }
}
}
#[cfg(test)]
mod filter_tests {
use super::*;
#[tokio::test]
async fn test_filter_keeps_matching() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
let result = filter(|x| *x > 2, source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![3, 4, 5]);
}
#[tokio::test]
async fn test_filter_even_numbers() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 5, 6]);
let result = filter(|x| x % 2 == 0, source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![2, 4, 6]);
}
#[tokio::test]
async fn test_filter_empty_result() {
let source = futures::stream::iter(vec![1, 2, 3]);
let result = filter(|x| *x > 100, source);
let values: Vec<_> = result.collect().await;
assert!(values.is_empty());
}
}
pub fn skip_repeats_with<T: Clone, S, F>(equals: F, s: S) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
F: Fn(&T, &T) -> bool,
{
stream! {
futures::pin_mut!(s);
let mut last: Option<T> = None;
while let Some(item) = s.next().await {
let should_yield = match &last {
None => true,
Some(prev) => !equals(&item, prev),
};
if should_yield {
last = Some(item.clone());
yield item;
}
}
}
}
pub fn skip_repeats<T: Clone + PartialEq, S>(s: S) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
{
skip_repeats_with(|a, b| a == b, s)
}
#[cfg(test)]
mod skip_repeats_tests {
use super::*;
#[tokio::test]
async fn test_skip_repeats() {
let source = futures::stream::iter(vec![1, 1, 2, 2, 3, 1, 1]);
let result = skip_repeats(source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![1, 2, 3, 1]);
}
#[tokio::test]
async fn test_skip_repeats_with_custom_eq() {
let source = futures::stream::iter(vec!["apple", "ant", "banana", "berry"]);
let result = skip_repeats_with(
|a: &&str, b: &&str| a.chars().next() == b.chars().next(),
source
);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec!["apple", "banana"]);
}
}
pub fn take<T, S: Stream<Item = T>>(n: usize, s: S) -> impl Stream<Item = T> {
stream! {
futures::pin_mut!(s);
let mut count = 0;
while let Some(item) = s.next().await {
if count < n {
yield item;
count += 1;
} else {
break;
}
}
}
}
#[cfg(test)]
mod take_tests {
use super::*;
#[tokio::test]
async fn test_take_first_n() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
let result = take(2, source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![1, 2]);
}
#[tokio::test]
async fn test_take_more_than_available() {
let source = futures::stream::iter(vec![1, 2]);
let result = take(10, source);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![1, 2]);
}
#[tokio::test]
async fn test_take_zero() {
let source = futures::stream::iter(vec![1, 2, 3]);
let result = take(0, source);
let values: Vec<_> = result.collect().await;
assert!(values.is_empty());
}
}
pub fn skip<T, S: Stream<Item = T>>(n: usize, s: S) -> impl Stream<Item = T> {
stream! {
futures::pin_mut!(s);
let mut count = 0;
while let Some(item) = s.next().await {
if count >= n { yield item; }
count += 1;
}
}
}
#[cfg(test)]
mod skip_tests {
use super::*;
#[tokio::test]
async fn test_skip_first_n() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
let values: Vec<_> = skip(2, source).collect().await;
assert_eq!(values, vec![3, 4, 5]);
}
#[tokio::test]
async fn test_skip_zero() {
let source = futures::stream::iter(vec![1, 2, 3]);
let values: Vec<_> = skip(0, source).collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_skip_more_than_available() {
let source = futures::stream::iter(vec![1, 2]);
let values: Vec<_> = skip(5, source).collect().await;
assert!(values.is_empty());
}
}
pub fn slice<T, S: Stream<Item = T>>(start: usize, end: usize, s: S) -> impl Stream<Item = T> {
stream! {
futures::pin_mut!(s);
let mut index = 0;
while let Some(item) = s.next().await {
if index >= start && index < end { yield item; }
index += 1;
if index >= end { break; }
}
}
}
#[cfg(test)]
mod slice_tests {
use super::*;
#[tokio::test]
async fn test_slice() {
let source = futures::stream::iter(vec![0, 1, 2, 3, 4, 5]);
let values: Vec<_> = slice(2, 5, source).collect().await;
assert_eq!(values, vec![2, 3, 4]);
}
#[tokio::test]
async fn test_slice_empty_range() {
let source = futures::stream::iter(vec![0, 1, 2, 3, 4]);
let values: Vec<_> = slice(2, 2, source).collect().await;
assert_eq!(values, Vec::<i32>::new());
}
#[tokio::test]
async fn test_slice_beyond_length() {
let source = futures::stream::iter(vec![0, 1, 2]);
let values: Vec<_> = slice(1, 10, source).collect().await;
assert_eq!(values, vec![1, 2]);
}
}
pub fn take_while<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
P: Fn(&T) -> bool,
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await {
if predicate(&item) { yield item; }
else { break; }
}
}
}
#[cfg(test)]
mod take_while_tests {
use super::*;
#[tokio::test]
async fn test_take_while() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 2, 1]);
let values: Vec<_> = take_while(|x| *x < 4, source).collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_take_while_all_pass() {
let source = futures::stream::iter(vec![1, 2, 3]);
let values: Vec<_> = take_while(|_x| true, source).collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_take_while_none_pass() {
let source = futures::stream::iter(vec![1, 2, 3]);
let values: Vec<_> = take_while(|_x| false, source).collect().await;
assert_eq!(values, Vec::<i32>::new());
}
}
pub fn skip_while<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
P: Fn(&T) -> bool,
{
stream! {
futures::pin_mut!(s);
let mut skipping = true;
while let Some(item) = s.next().await {
if skipping && !predicate(&item) { skipping = false; }
if !skipping { yield item; }
}
}
}
#[cfg(test)]
mod skip_while_tests {
use super::*;
#[tokio::test]
async fn test_skip_while() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 2, 1]);
let values: Vec<_> = skip_while(|x| *x < 3, source).collect().await;
assert_eq!(values, vec![3, 4, 2, 1]);
}
#[tokio::test]
async fn test_skip_while_all_fail() {
let source = futures::stream::iter(vec![1, 2, 3]);
let values: Vec<_> = skip_while(|_x| false, source).collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_skip_while_all_pass() {
let source = futures::stream::iter(vec![1, 2, 3]);
let values: Vec<_> = skip_while(|_x| true, source).collect().await;
assert_eq!(values, Vec::<i32>::new());
}
}
pub fn take_until<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
P: Fn(&T) -> bool,
{
stream! {
futures::pin_mut!(s);
while let Some(item) = s.next().await {
if predicate(&item) { break; }
yield item;
}
}
}
#[cfg(test)]
mod take_until_tests {
use super::*;
#[tokio::test]
async fn test_take_until() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
let values: Vec<_> = take_until(|x| *x == 3, source).collect().await;
assert_eq!(values, vec![1, 2]);
}
#[tokio::test]
async fn test_take_until_never_matches() {
let source = futures::stream::iter(vec![1, 2, 3]);
let values: Vec<_> = take_until(|_x| false, source).collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_take_until_first_matches() {
let source = futures::stream::iter(vec![1, 2, 3]);
let values: Vec<_> = take_until(|x| *x == 1, source).collect().await;
assert_eq!(values, Vec::<i32>::new());
}
}
pub fn delay<R: Runtime, T, S: Stream<Item = T>>(ms: u64, s: S) -> impl Stream<Item = T> {
stream! {
futures::pin_mut!(s);
let duration = Duration::from_millis(ms);
while let Some(item) = s.next().await {
R::sleep(duration).await;
yield item;
}
}
}
pub fn delay_with<T, S, F, Fut>(
ms: u64,
s: S,
sleep_fn: F,
) -> impl Stream<Item = T>
where
S: Stream<Item = T>,
F: Fn(Duration) -> Fut + Clone,
Fut: std::future::Future<Output = ()>,
{
stream! {
futures::pin_mut!(s);
let duration = Duration::from_millis(ms);
while let Some(item) = s.next().await {
sleep_fn(duration).await;
yield item;
}
}
}
#[cfg(test)]
mod delay_tests {
use super::*;
#[tokio::test]
async fn test_delay_with() {
let source = futures::stream::iter(vec![1, 2, 3]);
let start = std::time::Instant::now();
let values: Vec<_> = delay_with(
10,
source,
|d| tokio::time::sleep(d),
).collect().await;
let elapsed = start.elapsed();
assert_eq!(values, vec![1, 2, 3]);
assert!(elapsed >= Duration::from_millis(25)); }
#[tokio::test]
async fn test_delay_empty_stream() {
let source = futures::stream::iter(Vec::<i32>::new());
let values: Vec<_> = delay_with(
100,
source,
|d| tokio::time::sleep(d),
).collect().await;
assert_eq!(values, Vec::<i32>::new());
}
}
pub fn debounce<R, T, S>(ms: u64, s: S) -> impl Stream<Item = T>
where
R: Runtime,
T: Clone + Send + 'static,
S: Stream<Item = T> + Send + 'static,
{
debounce_with(ms, s, R::sleep)
}
pub fn debounce_with<T, S, F, Fut>(ms: u64, s: S, sleep_fn: F) -> impl Stream<Item = T>
where
T: Clone + Send + 'static,
S: Stream<Item = T> + Send + 'static,
F: Fn(Duration) -> Fut + Clone + Send + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
stream! {
let duration = Duration::from_millis(ms);
let mut pending: Option<T> = None;
futures::pin_mut!(s);
while let Some(value) = s.next().await {
pending = Some(value);
loop {
let timeout = sleep_fn(duration);
futures::pin_mut!(timeout);
let next = s.next();
futures::pin_mut!(next);
match futures::future::select(next, timeout).await {
futures::future::Either::Left((Some(v), _)) => {
pending = Some(v);
}
futures::future::Either::Left((None, _)) => {
if let Some(v) = pending.take() {
yield v;
}
return;
}
futures::future::Either::Right((_, _)) => {
if let Some(v) = pending.take() {
yield v;
}
break;
}
}
}
}
}
}
#[cfg(test)]
mod debounce_tests {
}
pub struct ThrottleOptions {
pub leading: bool,
pub trailing: bool,
}
impl Default for ThrottleOptions {
fn default() -> Self {
Self { leading: true, trailing: true }
}
}
impl ThrottleOptions {
pub fn leading_only() -> Self {
Self { leading: true, trailing: false }
}
pub fn trailing_only() -> Self {
Self { leading: false, trailing: true }
}
}
pub fn throttle<T: Clone, S: Stream<Item = T> + Unpin>(
ms: u64,
options: ThrottleOptions,
mut s: S,
) -> impl Stream<Item = T> {
stream! {
let duration = Duration::from_millis(ms);
let mut last_emit = Instant::now() - duration; let mut trailing_value: Option<T> = None;
while let Some(item) = s.next().await {
let now = Instant::now();
let elapsed = now.duration_since(last_emit);
if elapsed >= duration {
if options.leading {
yield item;
last_emit = now;
trailing_value = None;
} else {
trailing_value = Some(item);
}
} else {
trailing_value = Some(item);
}
}
if options.trailing {
if let Some(value) = trailing_value { yield value; }
}
}
}
#[cfg(test)]
mod throttle_tests {
use super::*;
#[tokio::test]
async fn test_throttle_leading() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
let values: Vec<_> = throttle(
100,
ThrottleOptions::leading_only(),
source,
).collect().await;
assert!(!values.is_empty());
assert_eq!(values[0], 1);
}
#[tokio::test]
async fn test_throttle_trailing() {
let source = futures::stream::iter(vec![1, 2, 3]);
let values: Vec<_> = throttle(
100,
ThrottleOptions::trailing_only(),
source,
).collect().await;
assert!(!values.is_empty());
}
#[tokio::test]
async fn test_throttle_empty() {
let source = futures::stream::iter(Vec::<i32>::new());
let values: Vec<_> = throttle(
100,
ThrottleOptions::default(),
source,
).collect().await;
assert_eq!(values, Vec::<i32>::new());
}
}
pub fn recover_with<T, E, S, S2, F>(
recover_fn: F,
s: S,
) -> impl Stream<Item = T>
where
S: Stream<Item = Result<T, E>>,
S2: Stream<Item = T>,
F: FnOnce(E) -> S2,
E: Error,
{
stream! {
futures::pin_mut!(s);
loop {
match s.next().await {
Some(Ok(item)) => yield item,
Some(Err(e)) => {
let recovery = recover_fn(e);
futures::pin_mut!(recovery);
while let Some(item) = recovery.next().await { yield item; }
break;
}
None => break,
}
}
}
}
#[cfg(test)]
mod recover_with_tests {
use super::*;
#[derive(Debug)]
struct SimpleError;
impl std::fmt::Display for SimpleError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SimpleError")
}
}
impl std::error::Error for SimpleError {}
#[tokio::test]
async fn test_recover_with_no_error() {
let source = futures::stream::iter(vec![Ok::<_, SimpleError>(1), Ok(2), Ok(3)]);
let values: Vec<_> = recover_with(
|_e: SimpleError| futures::stream::iter(vec![99]),
source,
).collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_recover_with_error() {
let source = futures::stream::iter(vec![
Ok(1),
Err(SimpleError),
Ok(3),
]);
let values: Vec<_> = recover_with(
|_e: SimpleError| futures::stream::iter(vec![99, 100]),
source,
).collect().await;
assert_eq!(values, vec![1, 99, 100]);
}
}
pub fn recover_with_stream<T, E, S, Alt, AltIter>(
mut alternatives: AltIter,
source: S,
) -> impl Stream<Item = T>
where
S: Stream<Item = Result<T, E>> + Send + 'static,
Alt: Stream<Item = Result<T, E>> + Send + 'static,
AltIter: Iterator<Item = Alt> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
stream! {
futures::pin_mut!(source);
let mut current: Pin<Box<dyn Stream<Item = Result<T, E>> + Send>> = Box::pin(source);
loop {
let mut errored = false;
while let Some(result) = current.next().await {
match result {
Ok(value) => yield value,
Err(_) => {
errored = true;
break;
}
}
}
if !errored {
break; }
match alternatives.next() {
Some(alt) => current = Box::pin(alt),
None => break, }
}
}
}
#[cfg(test)]
mod recover_with_stream_tests {
use super::*;
#[derive(Debug, Clone)]
struct TestErr;
#[tokio::test]
async fn test_recover_with_stream_success() {
let source = futures::stream::iter(vec![Ok::<i32, TestErr>(1), Ok(2), Ok(3)]);
let alts: Vec<Pin<Box<dyn Stream<Item = Result<i32, TestErr>> + Send>>> = vec![];
let values: Vec<_> = recover_with_stream(alts.into_iter(), source).collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_recover_with_stream_uses_alternative() {
let source = futures::stream::iter(vec![Ok(1), Err(TestErr), Ok(3)]);
let alt = futures::stream::iter(vec![Ok(10), Ok(20)]);
let alts: Vec<Pin<Box<dyn Stream<Item = Result<i32, TestErr>> + Send>>> = vec![Box::pin(alt)];
let values: Vec<_> = recover_with_stream(alts.into_iter(), source).collect().await;
assert_eq!(values, vec![1, 10, 20]);
}
}
pub fn throw_error<T, E: Clone>(error: E) -> impl Stream<Item = Result<T, E>> {
stream::once(async move { Err(error) })
}
pub fn throw_panic<T>(message: &'static str) -> impl Stream<Item = T> {
stream! {
panic!("{}", message);
#[allow(unreachable_code)]
loop { yield unreachable!(); }
}
}
#[cfg(test)]
mod throw_error_tests {
use super::*;
#[tokio::test]
async fn test_throw_error_emits_error() {
let err_stream = throw_error::<i32, _>("test error".to_string());
let results: Vec<_> = err_stream.collect().await;
assert_eq!(results.len(), 1);
assert!(results[0].is_err());
}
}
pub struct RetryOptions<F> {
pub max_attempts: usize,
pub delay_ms: u64,
pub should_retry: F,
}
pub fn retry<R, T, E, S, F>(
max_attempts: usize,
delay_ms: u64,
mut stream_factory: F,
) -> impl Stream<Item = Result<T, E>>
where
R: Runtime,
S: Stream<Item = Result<T, E>>,
F: FnMut() -> S,
E: Clone,
{
stream! {
let mut attempt = 0;
loop {
let s = stream_factory();
futures::pin_mut!(s);
let mut failed = false;
while let Some(item) = s.next().await {
match item {
Ok(value) => yield Ok(value),
Err(e) => {
attempt += 1;
if attempt >= max_attempts {
yield Err(e);
return;
}
if delay_ms > 0 { R::sleep(Duration::from_millis(delay_ms)).await; }
failed = true;
break;
}
}
}
if !failed { return; } }
}
}
pub fn retry_with<T, E, S, F, SF, SFut>(
max_attempts: usize,
delay_ms: u64,
mut stream_factory: F,
sleep_fn: SF,
) -> impl Stream<Item = Result<T, E>>
where
S: Stream<Item = Result<T, E>>,
F: FnMut() -> S,
E: Clone,
SF: Fn(Duration) -> SFut,
SFut: std::future::Future<Output = ()>,
{
stream! {
let mut attempt = 0;
loop {
let s = stream_factory();
futures::pin_mut!(s);
let mut failed = false;
while let Some(item) = s.next().await {
match item {
Ok(value) => yield Ok(value),
Err(e) => {
attempt += 1;
if attempt >= max_attempts {
yield Err(e);
return;
}
if delay_ms > 0 { sleep_fn(Duration::from_millis(delay_ms)).await; }
failed = true;
break;
}
}
}
if !failed { return; }
}
}
}
#[cfg(test)]
mod retry_tests {
use super::*;
#[derive(Debug, Clone, PartialEq)]
struct TestError(String);
#[tokio::test]
async fn test_retry_with_success() {
let values: Vec<Result<i32, TestError>> = retry_with(
3,
10,
|| futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]),
|_d| std::future::ready(()),
).collect().await;
let ok_values: Vec<_> = values.into_iter().filter_map(|r| r.ok()).collect();
assert_eq!(ok_values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_retry_with_eventual_success() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let attempt = Arc::new(AtomicUsize::new(0));
let attempt_clone = attempt.clone();
let values: Vec<Result<i32, TestError>> = retry_with(
3,
0,
move || {
let n = attempt_clone.fetch_add(1, Ordering::SeqCst);
if n < 2 { futures::stream::iter(vec![Err(TestError("fail".into()))]) }
else { futures::stream::iter(vec![Ok(42)]) }
},
|_d| std::future::ready(()),
).collect().await;
let ok_values: Vec<_> = values.into_iter().filter_map(|r| r.ok()).collect();
assert_eq!(ok_values, vec![42]);
}
}
pub use futures::stream::select as merge;
pub use futures::stream::select_all as merge_all;
#[cfg(test)]
mod merge_tests {
use super::*;
#[tokio::test]
async fn test_merge() {
let s1 = futures::stream::iter(vec![1, 3, 5]);
let s2 = futures::stream::iter(vec![2, 4, 6]);
let values: Vec<_> = merge(s1, s2).collect().await;
assert_eq!(values.len(), 6);
assert!(values.contains(&1));
assert!(values.contains(&6));
}
#[tokio::test]
async fn test_merge_all() {
let streams = vec![
Box::pin(futures::stream::iter(vec![1, 2])),
Box::pin(futures::stream::iter(vec![3, 4])),
];
let values: Vec<_> = merge_all(streams).collect().await;
assert_eq!(values.len(), 4);
}
}
pub fn flat_map<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
where
S: Stream<Item = T>,
Inner: Stream<Item = U>,
F: Fn(T) -> Inner,
{
s.map(f).flatten()
}
pub fn chain<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
where
S: Stream<Item = T>,
Inner: Stream<Item = U>,
F: Fn(T) -> Inner,
{
flat_map(f, s)
}
#[cfg(test)]
mod chain_tests {
use super::*;
#[tokio::test]
async fn test_chain_flattens() {
let source = futures::stream::iter(vec![1, 2]);
let result = chain(
|x: i32| futures::stream::iter(vec![x * 10, x * 10 + 1]),
source,
);
let values: Vec<_> = result.collect().await;
assert_eq!(values, vec![10, 11, 20, 21]);
}
}
pub fn switch_map<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
where
S: Stream<Item = T> + Unpin,
Inner: Stream<Item = U> + Unpin,
F: Fn(T) -> Inner,
{
stream! {
futures::pin_mut!(s);
let mut current_inner: Option<std::pin::Pin<Box<dyn Stream<Item = U> + Unpin>>> = None;
loop {
futures::select! {
outer_item = s.next().fuse() => {
match outer_item {
Some(item) => {
current_inner = Some(Box::pin(f(item)));
}
None => {
if let Some(ref mut inner) = current_inner { while let Some(v) = inner.next().await { yield v; } }
break;
}
}
}
inner_item = async {
if let Some(ref mut inner) = current_inner { inner.next().await }
else { std::future::pending().await }
}.fuse() => {
match inner_item {
Some(v) => yield v,
None => current_inner = None,
}
}
}
}
}
}
#[cfg(test)]
mod switch_map_tests {
use super::*;
#[tokio::test]
async fn test_switch_map_switches() {
let source = futures::stream::iter(vec![1, 2]);
let result = switch_map(
|x: i32| futures::stream::iter(vec![x * 10]),
source,
);
let values: Vec<_> = result.collect().await;
assert!(!values.is_empty());
}
}
pub fn latest2<T: Clone + Send + 'static, U: Clone + Send + 'static>(
s1: impl Stream<Item = T> + Send + 'static,
s2: impl Stream<Item = U> + Send + 'static,
) -> impl Stream<Item = (T, U)> {
enum Either<A, B> { Left(A), Right(B) }
let tagged1: Pin<Box<dyn Stream<Item = Either<T, U>> + Send>> =
Box::pin(s1.map(Either::Left));
let tagged2: Pin<Box<dyn Stream<Item = Either<T, U>> + Send>> =
Box::pin(s2.map(Either::Right));
stream! {
let mut latest1: Option<T> = None;
let mut latest2: Option<U> = None;
let mut merged = futures::stream::select(tagged1, tagged2);
while let Some(item) = merged.next().await {
match item {
Either::Left(v) => {
latest1 = Some(v);
if let (Some(ref a), Some(ref b)) = (&latest1, &latest2) {
yield (a.clone(), b.clone());
}
}
Either::Right(v) => {
latest2 = Some(v);
if let (Some(ref a), Some(ref b)) = (&latest1, &latest2) {
yield (a.clone(), b.clone());
}
}
}
}
}
}
#[cfg(test)]
mod latest2_tests {
use super::*;
#[tokio::test]
async fn test_latest2_combines() {
let s1 = futures::stream::iter(vec![1, 2]);
let s2 = futures::stream::iter(vec!["a", "b"]);
let values: Vec<_> = latest2(s1, s2).collect().await;
assert!(!values.is_empty());
}
}
pub fn apply_latest<T, U, F, S1, S2>(fn_stream: S1, value_stream: S2) -> impl Stream<Item = U>
where
S1: Stream<Item = F> + Send + 'static,
S2: Stream<Item = T> + Send + 'static,
F: Fn(T) -> U + Clone + Send + 'static,
T: Clone + Send + 'static,
U: Send + 'static,
{
latest2(fn_stream, value_stream).map(|(f, v)| f(v))
}
#[cfg(test)]
mod apply_latest_tests {
use super::*;
#[tokio::test]
async fn test_apply_latest() {
let fns = futures::stream::iter(vec![|x: i32| x * 2, |x| x + 10]);
let vals = futures::stream::iter(vec![1, 2, 3]);
let values: Vec<_> = apply_latest(fns, vals).collect().await;
assert!(!values.is_empty());
}
}
pub fn until_stream<T, U, S: Stream<Item = T> + Unpin, Stop: Stream<Item = U> + Unpin>(
mut stop: Stop,
mut source: S,
) -> impl Stream<Item = T> {
stream! {
loop {
futures::select! {
_ = stop.next().fuse() => break,
item = source.next().fuse() => {
match item {
Some(v) => yield v,
None => break,
}
}
}
}
}
}
#[cfg(test)]
mod until_stream_tests {
use super::*;
#[tokio::test]
async fn test_until_stream_stops() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
let stop = futures::stream::iter(vec![()]);
let values: Vec<_> = until_stream(stop, source).collect().await;
assert!(values.len() <= 5);
}
}
pub fn since_stream<T, U, S: Stream<Item = T> + Unpin, Start: Stream<Item = U> + Unpin>(
mut start: Start,
mut source: S,
) -> impl Stream<Item = T> {
stream! {
let mut started = false;
loop {
futures::select! {
_ = async {
if !started { start.next().await }
else { std::future::pending().await }
}.fuse() => {
started = true;
}
item = source.next().fuse() => {
match item {
Some(v) if started => yield v,
Some(_) => {} None => break,
}
}
}
}
}
}
#[cfg(test)]
mod since_stream_tests {
use super::*;
#[tokio::test]
async fn test_since_stream_waits() {
let source = futures::stream::iter(vec![1, 2, 3, 4]);
let start = futures::stream::iter(vec![()]);
let values: Vec<_> = since_stream(start, source).collect().await;
assert!(!values.is_empty());
}
}
pub fn buffer<T, S: Stream<Item = T>>(size: usize, s: S) -> impl Stream<Item = Vec<T>> {
stream! {
futures::pin_mut!(s);
let mut buf: Vec<T> = Vec::with_capacity(size);
while let Some(item) = s.next().await {
buf.push(item);
if buf.len() >= size { yield std::mem::replace(&mut buf, Vec::with_capacity(size)); }
}
if !buf.is_empty() { yield buf; }
}
}
#[cfg(test)]
mod buffer_tests {
use super::*;
#[tokio::test]
async fn test_buffer() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
let values: Vec<_> = buffer(2, source).collect().await;
assert_eq!(values, vec![vec![1, 2], vec![3, 4], vec![5]]);
}
#[tokio::test]
async fn test_buffer_exact_multiple() {
let source = futures::stream::iter(vec![1, 2, 3, 4]);
let values: Vec<_> = buffer(2, source).collect().await;
assert_eq!(values, vec![vec![1, 2], vec![3, 4]]);
}
#[tokio::test]
async fn test_buffer_empty() {
let source = futures::stream::iter(Vec::<i32>::new());
let values: Vec<_> = buffer(3, source).collect().await;
assert_eq!(values, Vec::<Vec<i32>>::new());
}
}
pub fn buffer_time<R, T, S>(ms: u64, mut s: S) -> impl Stream<Item = Vec<T>>
where
R: Runtime,
T: Clone,
S: Stream<Item = T> + Unpin,
{
stream! {
let duration = Duration::from_millis(ms);
let mut buf: Vec<T> = Vec::new();
let mut timer = R::sleep(duration);
loop {
futures::select! {
_ = (&mut timer).fuse() => {
if !buf.is_empty() { yield std::mem::take(&mut buf); }
timer = R::sleep(duration);
}
item = s.next().fuse() => {
match item {
Some(v) => buf.push(v),
None => {
if !buf.is_empty() { yield buf; }
break;
}
}
}
}
}
}
}
pub fn buffer_time_with<T, S, SF, SFut>(
ms: u64,
mut s: S,
sleep_fn: SF,
) -> impl Stream<Item = Vec<T>>
where
S: Stream<Item = T> + Unpin,
SF: Fn(Duration) -> SFut,
SFut: std::future::Future<Output = ()> + Unpin,
{
stream! {
let duration = Duration::from_millis(ms);
let mut buf: Vec<T> = Vec::new();
let mut timer = sleep_fn(duration);
loop {
futures::select! {
_ = (&mut timer).fuse() => {
if !buf.is_empty() { yield std::mem::take(&mut buf); }
timer = sleep_fn(duration);
}
item = s.next().fuse() => {
match item {
Some(v) => buf.push(v),
None => {
if !buf.is_empty() { yield buf; }
break;
}
}
}
}
}
}
}
#[cfg(test)]
mod buffer_time_tests {
}
pub fn window<T: Clone + Send + 'static>(
size: usize,
s: impl Stream<Item = T> + Send + 'static,
) -> impl Stream<Item = Vec<T>> {
stream! {
futures::pin_mut!(s);
loop {
let mut window = Vec::with_capacity(size);
while window.len() < size {
match s.next().await {
Some(item) => window.push(item),
None => {
if !window.is_empty() {
yield window;
}
return;
}
}
}
yield window;
}
}
}
pub fn eager<R, T, S>(buffer_size: usize, s: S) -> impl Stream<Item = T>
where
R: Runtime,
T: Send + 'static,
S: Stream<Item = T> + Send + Unpin + 'static,
{
let (mut tx, mut rx) = mpsc::channel::<T>(buffer_size.max(1));
let mut spawned = false;
let mut s = Some(s);
stream! {
if !spawned {
spawned = true;
let mut source = s.take().unwrap();
R::spawn(async move {
use futures::StreamExt;
use futures::SinkExt;
while let Some(item) = source.next().await { if tx.send(item).await.is_err() { break; } } });
}
while let Some(item) = rx.next().await { yield item; }
}
}
pub fn eager_now<R, T, S>(buffer_size: usize, s: S) -> impl Stream<Item = T>
where
R: Runtime,
T: Send + 'static,
S: Stream<Item = T> + Send + Unpin + 'static,
{
let (mut tx, mut rx) = mpsc::channel::<T>(buffer_size.max(1));
let mut source = s;
R::spawn(async move {
use futures::StreamExt;
use futures::SinkExt;
while let Some(item) = source.next().await { if tx.send(item).await.is_err() { break; } }
});
stream! { while let Some(item) = rx.next().await { yield item; } }
}
#[cfg(test)]
mod eager_now_tests {
}
pub struct ReplaySubject<T: Clone + Send + 'static> {
inner: Arc<Mutex<ReplaySubjectInner<T>>>,
}
struct ReplaySubjectInner<T> {
buffer: Vec<T>,
buffer_size: usize,
completed: bool,
error: Option<Arc<dyn std::error::Error + Send + Sync>>,
subscribers: Vec<mpsc::UnboundedSender<T>>,
}
impl<T: Clone + Send + 'static> ReplaySubject<T> {
pub fn new(buffer_size: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(ReplaySubjectInner {
buffer: Vec::new(),
buffer_size,
completed: false,
error: None,
subscribers: Vec::new(),
})),
}
}
pub async fn next(&self, value: T) {
let mut inner = self.inner.lock().await;
inner.buffer.push(value.clone());
if inner.buffer.len() > inner.buffer_size { inner.buffer.remove(0); }
inner.subscribers.retain(|tx| tx.unbounded_send(value.clone()).is_ok());
}
pub async fn complete(&self) {
let mut inner = self.inner.lock().await;
inner.completed = true;
inner.subscribers.clear();
}
pub fn subscribe(&self) -> impl Stream<Item = T> {
let inner = self.inner.clone();
stream! {
let (tx, mut rx) = mpsc::unbounded();
let buffered: Vec<T>;
let was_completed: bool;
{
let mut guard = inner.lock().await;
buffered = guard.buffer.clone();
was_completed = guard.completed;
if !guard.completed { guard.subscribers.push(tx); }
}
for item in buffered { yield item; }
if was_completed { return; }
while let Some(item) = rx.next().await { yield item; }
}
}
}
#[cfg(test)]
mod replay_subject_tests {
use super::*;
#[tokio::test]
async fn test_replay_subject_buffer() {
let subject = ReplaySubject::new(2);
subject.next(1).await;
subject.next(2).await;
subject.next(3).await; subject.complete().await;
let values: Vec<_> = subject.subscribe().collect().await;
assert_eq!(values, vec![2, 3]);
}
#[tokio::test]
async fn test_replay_subject_empty() {
let subject: ReplaySubject<i32> = ReplaySubject::new(5);
subject.complete().await;
let values: Vec<_> = subject.subscribe().collect().await;
assert_eq!(values, Vec::<i32>::new());
}
#[tokio::test]
async fn test_replay_subject_unlimited() {
let subject = ReplaySubject::new(usize::MAX);
subject.next(1).await;
subject.next(2).await;
subject.next(3).await;
subject.complete().await;
let values: Vec<_> = subject.subscribe().collect().await;
assert_eq!(values, vec![1, 2, 3]);
}
}
struct Replay<T> {
inner: Arc<Mutex<ReplayInner<T>>>,
}
struct ReplayInner<T> {
buffer: Vec<T>,
buffer_size: usize,
completed: bool,
error: Option<Arc<dyn std::error::Error + Send + Sync>>,
source_started: bool,
subscribers: Vec<futures::channel::mpsc::UnboundedSender<Result<T, Arc<dyn std::error::Error + Send + Sync>>>>,
}
impl<T: Clone + Send + 'static> Replay<T> {
fn new<S>(buffer_size: usize, source: S) -> Self
where
S: futures::Stream<Item = T> + Send + Unpin + 'static,
{
let inner = Arc::new(Mutex::new(ReplayInner {
buffer: Vec::new(),
buffer_size,
completed: false,
error: None,
source_started: false,
subscribers: Vec::new(),
}));
Replay { inner }
}
fn subscribe(&self) -> impl futures::Stream<Item = T> {
let inner = self.inner.clone();
async_stream::stream! {
let (tx, mut rx) = futures::channel::mpsc::unbounded();
let buffered: Vec<T>;
{
let mut guard = inner.lock().await;
buffered = guard.buffer.clone();
if !guard.completed && guard.error.is_none() { guard.subscribers.push(tx); }
}
for value in buffered { yield value; }
while let Some(result) = rx.next().await {
match result {
Ok(value) => yield value,
Err(_) => break,
}
}
}
}
async fn start_source<S>(&self, mut source: S)
where
S: futures::Stream<Item = T> + Send + Unpin + 'static,
{
while let Some(value) = source.next().await {
let mut guard = self.inner.lock().await;
guard.buffer.push(value.clone());
if guard.buffer.len() > guard.buffer_size {
guard.buffer.remove(0);
}
guard.subscribers.retain(|tx| tx.unbounded_send(Ok(value.clone())).is_ok());
}
let mut guard = self.inner.lock().await;
guard.completed = true;
guard.subscribers.clear();
}
}
fn replay<T, S>(buffer_size: usize, source: S) -> impl futures::Stream<Item = T>
where
T: Clone + Send + 'static,
S: futures::Stream<Item = T> + Send + 'static,
{
let _ = buffer_size; source
}
#[cfg(test)]
mod replay_tests {
use super::*;
#[tokio::test]
async fn test_replay_buffered() {
let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
let replay = Replay::new(2, source);
}
}
fn share<T, S>(source: S) -> impl futures::Stream<Item = T>
where
T: Clone + Send + 'static,
S: futures::Stream<Item = T> + Send + Unpin + 'static,
{
replay(0, source)
}
#[cfg(test)]
mod share_tests {
use super::*;
#[tokio::test]
async fn test_share_basic() {
let source = futures::stream::iter(vec![1, 2, 3]);
let shared = share(source);
futures::pin_mut!(shared);
let first = shared.next().await;
assert_eq!(first, Some(1));
let second = shared.next().await;
assert_eq!(second, Some(2));
let third = shared.next().await;
assert_eq!(third, Some(3));
let done = shared.next().await;
assert_eq!(done, None);
}
}
fn replay_factory<T, S>(
buffer_size: usize,
source: S,
) -> impl Fn() -> BoxedStream<T>
where
T: Clone + Send + Sync + 'static,
S: futures::Stream<Item = T> + Send + Unpin + 'static,
{
struct SharedState<T> {
buffer: Vec<T>,
buffer_size: usize,
completed: bool,
subscribers: Vec<futures::channel::mpsc::UnboundedSender<T>>,
}
let state = Arc::new(Mutex::new(SharedState {
buffer: Vec::new(),
buffer_size,
completed: false,
subscribers: Vec::new(),
}));
let started = Arc::new(AtomicBool::new(false));
let source = Arc::new(Mutex::new(Some(source)));
move || {
let state = state.clone();
let started = started.clone();
let source = source.clone();
Box::pin(async_stream::stream! {
if !started.swap(true, Ordering::SeqCst) {
let state_clone = state.clone();
if let Some(mut src) = source.lock().await.take() {
while let Some(value) = src.next().await {
let mut guard = state_clone.lock().await;
guard.buffer.push(value.clone());
if guard.buffer.len() > guard.buffer_size { guard.buffer.remove(0); }
guard.subscribers.retain(|tx| tx.unbounded_send(value.clone()).is_ok());
}
state_clone.lock().await.completed = true;
}
}
let (tx, mut rx) = futures::channel::mpsc::unbounded();
let buffered: Vec<T>;
{
let mut guard = state.lock().await;
buffered = guard.buffer.clone();
if !guard.completed { guard.subscribers.push(tx); }
}
for value in buffered { yield value; }
while let Some(value) = rx.next().await { yield value; }
})
}
}
pub fn replay_factory_spawned<R, T, S>(
buffer_size: usize,
source: S,
) -> impl Fn() -> BoxedStream<T>
where
R: Runtime,
T: Clone + Send + Sync + 'static,
S: futures::Stream<Item = T> + Send + Unpin + 'static,
{
struct SharedState<T> {
buffer: Vec<T>,
buffer_size: usize,
completed: bool,
subscribers: Vec<futures::channel::mpsc::UnboundedSender<T>>,
}
let state = Arc::new(Mutex::new(SharedState {
buffer: Vec::new(),
buffer_size,
completed: false,
subscribers: Vec::new(),
}));
let started = Arc::new(AtomicBool::new(false));
let source = Arc::new(Mutex::new(Some(source)));
move || {
let state = state.clone();
let started = started.clone();
let source = source.clone();
Box::pin(async_stream::stream! {
if !started.swap(true, Ordering::SeqCst) {
let state_clone = state.clone();
if let Some(src) = source.lock().await.take() {
R::spawn(async move {
futures::pin_mut!(src);
while let Some(value) = src.next().await {
let mut guard = state_clone.lock().await;
guard.buffer.push(value.clone());
if guard.buffer.len() > guard.buffer_size { guard.buffer.remove(0); }
guard.subscribers.retain(|tx| tx.unbounded_send(value.clone()).is_ok());
}
state_clone.lock().await.completed = true;
});
}
}
let (tx, mut rx) = futures::channel::mpsc::unbounded();
let buffered: Vec<T>;
{
let mut guard = state.lock().await;
buffered = guard.buffer.clone();
if !guard.completed { guard.subscribers.push(tx); }
}
for value in buffered { yield value; }
while let Some(value) = rx.next().await { yield value; }
})
}
}
fn replay_stream<T, S>(
buffer_size: usize,
source: S,
) -> impl futures::Stream<Item = impl futures::Stream<Item = T>>
where
T: Clone + Send + Sync + 'static,
S: futures::Stream<Item = T> + Send + Unpin + 'static,
{
let factory = replay_factory(buffer_size, source);
async_stream::stream! {
loop { yield factory(); }
}
}
async fn replay_stream_example() {
let source = futures::stream::iter(vec![1, 2, 3]);
let copies = replay_stream(usize::MAX, source);
futures::pin_mut!(copies);
if let Some(copy) = copies.next().await {
futures::pin_mut!(copy);
let values: Vec<_> = copy.collect().await;
println!("Copy values: {:?}", values);
}
}
#[cfg(test)]
mod replay_stream_tests {
}
use std::sync::atomic::AtomicU64;
use std::task::Waker;
#[derive(Clone)]
pub struct TestRuntime {
inner: Arc<TestRuntimeInner>,
}
struct TestRuntimeInner {
current_time_ns: AtomicU64,
timers: std::sync::Mutex<Vec<PendingTimer>>,
}
struct PendingTimer {
fire_at_ns: u64,
waker: Option<Waker>,
fired: Arc<std::sync::atomic::AtomicBool>,
}
impl TestRuntime {
pub fn new() -> Self {
Self {
inner: Arc::new(TestRuntimeInner {
current_time_ns: AtomicU64::new(0),
timers: std::sync::Mutex::new(Vec::new()),
}),
}
}
pub fn now(&self) -> Duration {
Duration::from_nanos(self.inner.current_time_ns.load(Ordering::SeqCst))
}
pub async fn advance_by(&self, duration: Duration) {
let target = self.now() + duration;
self.advance_to(target).await;
}
pub async fn advance_to(&self, target: Duration) {
let target_ns = target.as_nanos() as u64;
loop {
let wakers_to_wake: Vec<Waker> = {
let mut timers = self.inner.timers.lock().unwrap();
let current = self.inner.current_time_ns.load(Ordering::SeqCst);
let mut earliest: Option<u64> = None;
for timer in timers.iter() {
if !timer.fired.load(Ordering::SeqCst) && timer.fire_at_ns <= target_ns {
earliest = Some(match earliest {
Some(e) => e.min(timer.fire_at_ns),
None => timer.fire_at_ns,
});
}
}
match earliest {
Some(fire_time) if fire_time > current => {
self.inner.current_time_ns.store(fire_time, Ordering::SeqCst);
timers.iter_mut()
.filter(|t| t.fire_at_ns == fire_time && !t.fired.load(Ordering::SeqCst))
.filter_map(|t| {
t.fired.store(true, Ordering::SeqCst);
t.waker.take()
})
.collect()
}
_ => {
self.inner.current_time_ns.store(target_ns, Ordering::SeqCst);
break;
}
}
};
for waker in wakers_to_wake {
waker.wake();
}
futures::future::poll_fn(|_| std::task::Poll::Ready(())).await;
}
{
let mut timers = self.inner.timers.lock().unwrap();
timers.retain(|t| !t.fired.load(Ordering::SeqCst));
}
}
fn register_timer(&self, fire_at: Duration) -> Arc<std::sync::atomic::AtomicBool> {
let fired = Arc::new(std::sync::atomic::AtomicBool::new(false));
let timer = PendingTimer {
fire_at_ns: fire_at.as_nanos() as u64,
waker: None,
fired: fired.clone(),
};
self.inner.timers.lock().unwrap().push(timer);
fired
}
fn set_timer_waker(&self, fire_at_ns: u64, waker: Waker) {
let mut timers = self.inner.timers.lock().unwrap();
for timer in timers.iter_mut() {
if timer.fire_at_ns == fire_at_ns && !timer.fired.load(Ordering::SeqCst) {
timer.waker = Some(waker);
break;
}
}
}
}
impl Default for TestRuntime {
fn default() -> Self {
Self::new()
}
}
pub struct TestSleep {
runtime: TestRuntime,
target_ns: u64,
fired: Arc<std::sync::atomic::AtomicBool>,
registered: bool,
}
impl TestSleep {
fn new(runtime: TestRuntime, duration: Duration) -> Self {
let current = runtime.now();
let target = current + duration;
let target_ns = target.as_nanos() as u64;
Self {
runtime,
target_ns,
fired: Arc::new(std::sync::atomic::AtomicBool::new(false)),
registered: false,
}
}
}
impl Future for TestSleep {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<()> {
if self.fired.load(Ordering::SeqCst) {
return std::task::Poll::Ready(());
}
let current_ns = self.runtime.inner.current_time_ns.load(Ordering::SeqCst);
if current_ns >= self.target_ns {
self.fired.store(true, Ordering::SeqCst);
return std::task::Poll::Ready(());
}
if !self.registered {
self.fired = self.runtime.register_timer(Duration::from_nanos(self.target_ns));
self.registered = true;
}
self.runtime.set_timer_waker(self.target_ns, cx.waker().clone());
std::task::Poll::Pending
}
}
pub struct TestInterval {
runtime: TestRuntime,
period_ns: u64,
next_fire_ns: u64,
current_timer: Option<Arc<std::sync::atomic::AtomicBool>>,
}
impl TestInterval {
fn new(runtime: TestRuntime, period: Duration) -> Self {
let period_ns = period.as_nanos() as u64;
let start = runtime.inner.current_time_ns.load(Ordering::SeqCst);
Self {
runtime,
period_ns,
next_fire_ns: start + period_ns,
current_timer: None,
}
}
}
impl futures::Stream for TestInterval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<()>> {
let current_ns = self.runtime.inner.current_time_ns.load(Ordering::SeqCst);
if current_ns >= self.next_fire_ns {
self.next_fire_ns += self.period_ns;
self.current_timer = None;
return std::task::Poll::Ready(Some(()));
}
if self.current_timer.is_none() {
self.current_timer = Some(self.runtime.register_timer(Duration::from_nanos(self.next_fire_ns)));
}
self.runtime.set_timer_waker(self.next_fire_ns, cx.waker().clone());
std::task::Poll::Pending
}
}
impl Runtime for TestRuntime {
fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
panic!("TestRuntime::sleep() cannot be called statically. Use runtime.test_sleep(duration) instead.")
}
fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
panic!("TestRuntime::interval() cannot be called statically. Use runtime.test_interval(period) instead.")
}
fn spawn<F>(_future: F)
where
F: Future<Output = ()> + Send + 'static,
{
panic!("TestRuntime::spawn() is not supported. Use advance_by() to drive futures.")
}
}
impl TestRuntime {
pub fn test_sleep(&self, duration: Duration) -> TestSleep {
TestSleep::new(self.clone(), duration)
}
pub fn test_interval(&self, period: Duration) -> TestInterval {
TestInterval::new(self.clone(), period)
}
}
impl TestRuntime {
pub async fn run_timed_test<T, F, Fut>(&self, steps: Vec<Duration>, mut f: F) -> T
where
F: FnMut() -> Fut,
Fut: Future<Output = T>,
{
for step in steps {
self.advance_by(step).await;
}
f().await
}
pub async fn assert_completes_within<T, Fut>(&self, timeout: Duration, fut: Fut) -> T
where
Fut: Future<Output = T>,
{
use futures::future::{select, Either};
let timeout_fut = self.test_sleep(timeout);
futures::pin_mut!(fut);
futures::pin_mut!(timeout_fut);
match futures::future::select(fut, timeout_fut).await {
Either::Left((result, _)) => result,
Either::Right(_) => panic!("Future did not complete within {:?}", timeout),
}
}
}
#[cfg(test)]
mod test_runtime_tests {
use super::*;
#[tokio::test]
async fn test_virtual_sleep() {
let runtime = TestRuntime::new();
assert_eq!(runtime.now(), Duration::ZERO);
let sleep = runtime.test_sleep(Duration::from_millis(100));
futures::pin_mut!(sleep);
let waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert!(Pin::new(&mut sleep).poll(&mut cx).is_pending());
runtime.advance_by(Duration::from_millis(150)).await;
assert_eq!(runtime.now(), Duration::from_millis(150));
}
#[tokio::test]
async fn test_virtual_interval() {
let runtime = TestRuntime::new();
let mut interval = runtime.test_interval(Duration::from_millis(100));
runtime.advance_by(Duration::from_millis(100)).await;
assert_eq!(interval.next().await, Some(()));
runtime.advance_by(Duration::from_millis(100)).await;
assert_eq!(interval.next().await, Some(()));
assert_eq!(runtime.now(), Duration::from_millis(200));
}
#[tokio::test]
async fn test_multiple_timers() {
let runtime = TestRuntime::new();
let sleep1 = runtime.test_sleep(Duration::from_millis(50));
let sleep2 = runtime.test_sleep(Duration::from_millis(100));
let sleep3 = runtime.test_sleep(Duration::from_millis(150));
futures::pin_mut!(sleep1);
futures::pin_mut!(sleep2);
futures::pin_mut!(sleep3);
let waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert!(Pin::new(&mut sleep1).poll(&mut cx).is_pending());
assert!(Pin::new(&mut sleep2).poll(&mut cx).is_pending());
assert!(Pin::new(&mut sleep3).poll(&mut cx).is_pending());
runtime.advance_to(Duration::from_millis(75)).await;
assert!(Pin::new(&mut sleep1).poll(&mut cx).is_ready());
assert!(Pin::new(&mut sleep2).poll(&mut cx).is_pending());
assert!(Pin::new(&mut sleep3).poll(&mut cx).is_pending());
runtime.advance_to(Duration::from_millis(125)).await;
assert!(Pin::new(&mut sleep2).poll(&mut cx).is_ready());
assert!(Pin::new(&mut sleep3).poll(&mut cx).is_pending());
runtime.advance_to(Duration::from_millis(200)).await;
assert!(Pin::new(&mut sleep3).poll(&mut cx).is_ready());
}
}
pub fn delay_test<T, S>(
runtime: TestRuntime,
duration: Duration,
source: S,
) -> impl futures::Stream<Item = T>
where
T: Send + 'static,
S: futures::Stream<Item = T> + Send + 'static,
{
stream! {
futures::pin_mut!(source);
while let Some(value) = source.next().await {
runtime.test_sleep(duration).await;
yield value;
}
}
}
pub fn periodic_test(runtime: TestRuntime, period: Duration) -> impl futures::Stream<Item = u64> {
stream! {
let mut count = 0u64;
let mut interval = runtime.test_interval(period);
loop {
interval.next().await;
yield count;
count += 1;
}
}
}