use super::*;
use std::panic::{catch_unwind, AssertUnwindSafe};
fn assert_hub_props(hub: &TestHub, sinks_count: usize, subs_count: usize) {
assert_eq!(hub.sink_count(), sinks_count);
assert_eq!(hub.subs_count(), subs_count);
}
#[test]
fn t01() {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
assert_hub_props(&hub, 1, 1);
std::mem::drop(rx_01);
assert_hub_props(&hub, 0, 0);
}
#[test]
fn t02() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
assert_hub_props(&hub, 1, 1);
let rx_02 = hub.subscribe(SubsKey::new().with_receiver(rx_01), 100_000);
assert_hub_props(&hub, 2, 2);
let mut rx_03 = hub.subscribe(SubsKey::new(), 100_000);
assert_hub_props(&hub, 3, 3);
assert!(catch_unwind(AssertUnwindSafe(move || {
std::mem::drop(rx_02);
}))
.is_err());
assert_hub_props(&hub, 2, 2);
let mut rx_04 = hub.subscribe(SubsKey::new(), 100_000);
assert_hub_props(&hub, 3, 3);
hub.send(2);
assert_eq!(rx_03.next().await, Some(2));
assert_eq!(rx_04.next().await, Some(2));
std::mem::drop(rx_04);
hub.send(3);
assert_eq!(rx_03.next().await, Some(3));
std::mem::drop(rx_03);
hub.send(4);
assert_hub_props(&hub, 1, 1);
});
}
async fn add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(hub: &TestHub) {
let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
let rx_02 = hub.subscribe(SubsKey::new(), 100_000);
hub.send(1);
hub.send(2);
hub.send(3);
assert_eq!(rx_01.take(3).collect::<Vec<_>>().await, vec![1, 2, 3]);
hub.send(4);
hub.send(5);
hub.send(6);
assert_eq!(rx_02.take(6).collect::<Vec<_>>().await, vec![1, 2, 3, 4, 5, 6]);
}
#[test]
fn t03() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
assert!(catch_unwind(AssertUnwindSafe(|| hub
.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicBefore), 100_000)))
.is_err());
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
});
}
#[test]
fn t04() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
assert!(catch_unwind(AssertUnwindSafe(|| hub
.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicAfter), 100_000)))
.is_err());
assert_hub_props(&hub, 0, 1);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 1);
})
}
#[test]
fn t05() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
let rx_01 = hub
.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicBefore), 100_000);
assert_hub_props(&hub, 1, 1);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 1, 1);
assert!(catch_unwind(AssertUnwindSafe(move || std::mem::drop(rx_01))).is_err());
assert_hub_props(&hub, 0, 1);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 1);
})
}
#[test]
fn t06() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
let rx_01 = hub
.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicAfter), 100_000);
assert_hub_props(&hub, 1, 1);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 1, 1);
assert!(catch_unwind(AssertUnwindSafe(move || std::mem::drop(rx_01))).is_err());
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
})
}
#[test]
fn t07() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
let rx_01 =
hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicBefore), 100_000);
assert_hub_props(&hub, 1, 1);
assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err());
assert_hub_props(&hub, 1, 1);
std::mem::drop(rx_01);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
})
}
#[test]
fn t08() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
let rx_01 =
hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicAfter), 100_000);
assert_hub_props(&hub, 1, 1);
assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err());
assert_hub_props(&hub, 1, 1);
std::mem::drop(rx_01);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
})
}