use atomr_streams::{BroadcastHub, MergeHub, Sink, Source};
use std::time::Duration;
use tokio::sync::mpsc;
#[tokio::test]
async fn broadcast_hub_every_consumer_sees_every_post_subscribe_element() {
let hub = BroadcastHub::<i32>::new(16);
let c1 = hub.consumer();
let c2 = hub.consumer();
let c3 = hub.consumer();
hub.attach(Source::from_iter(vec![1, 2, 3, 4]));
drop(hub);
let (a, b, c) = tokio::join!(Sink::collect(c1), Sink::collect(c2), Sink::collect(c3));
assert_eq!(a, vec![1, 2, 3, 4]);
assert_eq!(b, vec![1, 2, 3, 4]);
assert_eq!(c, vec![1, 2, 3, 4]);
}
#[tokio::test]
async fn broadcast_hub_late_subscriber_misses_pre_subscribe_elements() {
let hub = BroadcastHub::<i32>::new(16);
let early = hub.consumer();
let (tx, rx) = mpsc::unbounded_channel::<i32>();
hub.attach(Source::from_receiver(rx));
tx.send(1).unwrap();
tx.send(2).unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
let late = hub.consumer();
tx.send(3).unwrap();
tx.send(4).unwrap();
drop(tx);
drop(hub);
let (e, l) = tokio::join!(Sink::collect(early), Sink::collect(late));
assert_eq!(e, vec![1, 2, 3, 4]);
assert_eq!(l, vec![3, 4]);
}
#[tokio::test]
async fn broadcast_hub_upstream_completion_completes_all_consumers() {
let hub = BroadcastHub::<i32>::new(16);
let c1 = hub.consumer();
let c2 = hub.consumer();
hub.attach(Source::from_iter(vec![7, 8, 9]));
drop(hub);
let collect_both = async {
let (a, b) = tokio::join!(Sink::collect(c1), Sink::collect(c2));
(a, b)
};
let (a, b) = tokio::time::timeout(Duration::from_millis(200), collect_both)
.await
.expect("consumers should complete after upstream finishes");
assert_eq!(a, vec![7, 8, 9]);
assert_eq!(b, vec![7, 8, 9]);
}
#[tokio::test]
async fn broadcast_hub_supports_multiple_consumer_calls_independently() {
let hub = BroadcastHub::<i32>::new(16);
let keep = hub.consumer();
{
let _drop_me = hub.consumer();
}
assert_eq!(hub.consumer_count(), 1);
hub.attach(Source::from_iter(vec![100, 200]));
drop(hub);
let got = Sink::collect(keep).await;
assert_eq!(got, vec![100, 200]);
}
#[tokio::test]
async fn merge_hub_aggregates_n_producers_exactly_once() {
let hub = MergeHub::<i32>::new();
hub.attach(Source::from_iter(vec![1, 2, 3]));
hub.attach(Source::from_iter(vec![10, 20, 30]));
hub.attach(Source::from_iter(vec![100, 200, 300]));
let merged = hub.source();
drop(hub);
let mut got = Sink::collect(merged).await;
got.sort();
assert_eq!(got, vec![1, 2, 3, 10, 20, 30, 100, 200, 300]);
}
#[tokio::test]
async fn merge_hub_late_attached_producer_is_picked_up() {
let hub = MergeHub::<i32>::new();
let (tx_early, rx_early) = mpsc::unbounded_channel::<i32>();
hub.attach(Source::from_receiver(rx_early));
let merged = hub.source();
let collector = tokio::spawn(async move { Sink::collect(merged).await });
tx_early.send(1).unwrap();
tx_early.send(2).unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
hub.attach(Source::from_iter(vec![777]));
tokio::time::sleep(Duration::from_millis(20)).await;
drop(tx_early);
drop(hub);
let mut got = tokio::time::timeout(Duration::from_millis(500), collector)
.await
.expect("merged source must complete")
.unwrap();
got.sort();
assert_eq!(got, vec![1, 2, 777]);
}
#[tokio::test]
async fn merge_hub_supports_multiple_attach_calls() {
let hub = MergeHub::<i32>::new();
for i in 0..5 {
hub.attach(Source::single(i));
}
let merged = hub.source();
drop(hub);
let mut got = Sink::collect(merged).await;
got.sort();
assert_eq!(got, vec![0, 1, 2, 3, 4]);
}
#[tokio::test]
async fn merge_hub_second_source_call_yields_empty() {
let hub = MergeHub::<i32>::new();
hub.attach(Source::from_iter(vec![1, 2, 3]));
let _first = hub.source();
let second = hub.source();
let v = tokio::time::timeout(Duration::from_millis(50), Sink::collect(second)).await.unwrap_or_default();
assert!(v.is_empty(), "second source() must be empty, got {:?}", v);
}