use std::collections::HashMap;
use atomr_streams::{
group_by, map_error, prefix_and_tail, recover, recover_with, recover_with_retries, select_error,
split_after, split_when, Sink, Source,
};
#[tokio::test]
async fn group_by_preserves_per_key_ordering() {
let s = Source::from_iter(vec![10, 21, 30, 41, 50, 61]);
let outer = group_by(s, 2, |x: &i32| *x % 2);
let pairs = Sink::collect(outer).await;
let mut by_key: HashMap<i32, Vec<i32>> = HashMap::new();
for (k, sub) in pairs {
by_key.insert(k, Sink::collect(sub).await);
}
assert_eq!(by_key.get(&0), Some(&vec![10, 30, 50]));
assert_eq!(by_key.get(&1), Some(&vec![21, 41, 61]));
}
#[tokio::test]
async fn group_by_drops_keys_past_max_substreams_cap() {
let s = Source::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
let outer = group_by(s, 2, |x: &i32| *x % 3);
let pairs = Sink::collect(outer).await;
assert_eq!(pairs.len(), 2);
let mut by_key: HashMap<i32, Vec<i32>> = HashMap::new();
for (k, sub) in pairs {
by_key.insert(k, Sink::collect(sub).await);
}
assert!(by_key.contains_key(&1));
assert!(by_key.contains_key(&2));
assert!(!by_key.contains_key(&0));
assert_eq!(by_key.get(&1), Some(&vec![1, 4, 7]));
assert_eq!(by_key.get(&2), Some(&vec![2, 5, 8]));
}
#[tokio::test]
async fn group_by_finishes_every_substream_when_upstream_finishes() {
let s = Source::from_iter(vec![1, 2, 3, 4]);
let outer = group_by(s, 4, |x: &i32| *x % 2);
let pairs = Sink::collect(outer).await;
assert_eq!(pairs.len(), 2);
for (_k, sub) in pairs {
let v = Sink::collect(sub).await;
assert!(!v.is_empty());
}
}
#[tokio::test]
async fn group_by_handles_empty_upstream() {
let s: Source<i32> = Source::empty();
let outer = group_by(s, 4, |x: &i32| *x);
let pairs = Sink::collect(outer).await;
assert!(pairs.is_empty());
}
#[tokio::test]
async fn split_when_places_pivot_in_new_chunk() {
let s = Source::from_iter(vec![1, 2, 10, 3, 4, 20, 5]);
let outer = split_when(s, |x: &i32| *x >= 10);
let mut chunks = Vec::new();
for sub in Sink::collect(outer).await {
chunks.push(Sink::collect(sub).await);
}
assert_eq!(chunks, vec![vec![1, 2], vec![10, 3, 4], vec![20, 5]]);
}
#[tokio::test]
async fn split_when_with_no_match_yields_single_chunk() {
let s = Source::from_iter(vec![1, 2, 3]);
let outer = split_when(s, |x: &i32| *x >= 99);
let mut chunks = Vec::new();
for sub in Sink::collect(outer).await {
chunks.push(Sink::collect(sub).await);
}
assert_eq!(chunks, vec![vec![1, 2, 3]]);
}
#[tokio::test]
async fn split_after_keeps_pivot_in_previous_chunk() {
let s = Source::from_iter(vec![1, 2, 10, 3, 4, 20, 5]);
let outer = split_after(s, |x: &i32| *x >= 10);
let mut chunks = Vec::new();
for sub in Sink::collect(outer).await {
chunks.push(Sink::collect(sub).await);
}
assert_eq!(chunks, vec![vec![1, 2, 10], vec![3, 4, 20], vec![5]]);
}
#[tokio::test]
async fn split_after_pivot_at_end_does_not_emit_empty_chunk() {
let s = Source::from_iter(vec![1, 2, 3]);
let outer = split_after(s, |x: &i32| *x == 3);
let mut chunks = Vec::new();
for sub in Sink::collect(outer).await {
chunks.push(Sink::collect(sub).await);
}
assert_eq!(chunks, vec![vec![1, 2, 3]]);
}
#[tokio::test]
async fn prefix_and_tail_with_zero_yields_empty_prefix_and_full_tail() {
let s = Source::from_iter(vec![1, 2, 3, 4]);
let outer = prefix_and_tail(s, 0);
let mut pairs = Sink::collect(outer).await;
assert_eq!(pairs.len(), 1);
let (prefix, tail) = pairs.pop().unwrap();
assert!(prefix.is_empty());
let rest = Sink::collect(tail).await;
assert_eq!(rest, vec![1, 2, 3, 4]);
}
#[tokio::test]
async fn prefix_and_tail_with_n_greater_than_len_yields_full_prefix_and_empty_tail() {
let s = Source::from_iter(vec![1, 2, 3]);
let outer = prefix_and_tail(s, 10);
let mut pairs = Sink::collect(outer).await;
assert_eq!(pairs.len(), 1);
let (prefix, tail) = pairs.pop().unwrap();
assert_eq!(prefix, vec![1, 2, 3]);
let rest = Sink::collect(tail).await;
assert!(rest.is_empty());
}
#[tokio::test]
async fn prefix_and_tail_with_exact_n_yields_full_prefix_and_empty_tail() {
let s = Source::from_iter(vec![7, 8, 9]);
let outer = prefix_and_tail(s, 3);
let mut pairs = Sink::collect(outer).await;
let (prefix, tail) = pairs.pop().unwrap();
assert_eq!(prefix, vec![7, 8, 9]);
assert!(Sink::collect(tail).await.is_empty());
}
#[tokio::test]
async fn recover_emits_fallback_element_and_terminates_on_first_error() {
let s: Source<Result<i32, &'static str>> =
Source::from_iter(vec![Ok(1), Ok(2), Err("boom"), Ok(99), Ok(100)]);
let recovered = recover(s, |_e| Some(-1));
let collected = Sink::collect(recovered).await;
assert_eq!(collected, vec![1, 2, -1]);
}
#[tokio::test]
async fn recover_with_none_drops_the_error_and_terminates() {
let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Ok(2), Err("e"), Ok(3)]);
let recovered = recover(s, |_e| None);
let collected = Sink::collect(recovered).await;
assert_eq!(collected, vec![1, 2]);
}
#[tokio::test]
async fn recover_with_switches_stream_tail_on_error() {
let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Ok(2), Err("e"), Ok(99)]);
let replacement: Source<i32> = Source::from_iter(vec![100, 200, 300]);
let recovered = recover_with(s, replacement);
let collected = Sink::collect(recovered).await;
assert_eq!(collected, vec![1, 2, 100, 200, 300]);
}
#[tokio::test]
async fn recover_with_passes_through_when_upstream_succeeds() {
let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Ok(2), Ok(3)]);
let replacement: Source<i32> = Source::from_iter(vec![100]);
let recovered = recover_with(s, replacement);
let collected = Sink::collect(recovered).await;
assert_eq!(collected, vec![1, 2, 3]);
}
#[tokio::test]
async fn recover_with_retries_exhausts_attempts_then_stops() {
let s: Source<Result<i32, &'static str>> =
Source::from_iter(vec![Ok(1), Err("e1"), Err("e2"), Err("e3"), Ok(999)]);
let mut counter = 0;
let recovered = recover_with_retries(s, 2, move || {
counter += 1;
Source::from_iter(vec![counter * 10])
});
let collected = Sink::collect(recovered).await;
assert_eq!(collected, vec![1, 10, 20]);
}
#[tokio::test]
async fn recover_with_retries_continues_while_attempts_remain() {
let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Err("e1"), Ok(2)]);
let recovered = recover_with_retries(s, 5, || Source::from_iter(vec![100, 200]));
let collected = Sink::collect(recovered).await;
assert_eq!(collected, vec![1, 100, 200, 2]);
}
#[tokio::test]
async fn recover_with_retries_zero_attempts_stops_on_first_error() {
let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Err("e"), Ok(2)]);
let recovered = recover_with_retries(s, 0, || Source::from_iter(vec![777]));
let collected = Sink::collect(recovered).await;
assert_eq!(collected, vec![1]);
}
#[tokio::test]
async fn map_error_rewrites_error_payload_without_collapsing_successes() {
let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Err("x"), Ok(2)]);
let mapped = map_error(s, |e| format!("wrapped:{e}"));
let collected = Sink::collect(mapped).await;
assert_eq!(collected.len(), 3);
assert_eq!(collected[0], Ok(1));
assert_eq!(collected[1], Err("wrapped:x".to_string()));
assert_eq!(collected[2], Ok(2));
}
#[tokio::test]
async fn select_error_alias_matches_map_error_behavior() {
let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Err("boom")]);
let mapped = select_error(s, |e| e.to_uppercase());
let collected = Sink::collect(mapped).await;
assert_eq!(collected, vec![Ok(1), Err("BOOM".to_string())]);
}