#![allow(clippy::while_let_loop, clippy::redundant_pattern_matching)]
use super::*;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[tokio::test]
async fn test_drop_closes_channels() {
let stream = Relay::new();
let mut sub = stream.subscribe::<String>();
drop(stream);
let result = tokio::time::timeout(Duration::from_millis(100), sub.recv())
.await
.expect("should not timeout");
assert!(
result.is_none(),
"recv should return None when stream dropped"
);
}
#[tokio::test]
async fn test_basic_send_subscribe() {
let stream = Relay::new();
let mut sub = stream.subscribe::<String>();
stream.send("hello".to_string()).await.unwrap();
let msg = tokio::time::timeout(Duration::from_millis(100), sub.recv())
.await
.unwrap();
assert_eq!(msg.as_deref(), Some(&"hello".to_string()));
}
#[tokio::test]
async fn test_type_filtering() {
let stream = Relay::new();
let mut string_sub = stream.subscribe::<String>();
let mut int_sub = stream.subscribe::<i32>();
stream.send("hello".to_string()).await.unwrap();
stream.send(42i32).await.unwrap();
let string_msg = tokio::time::timeout(Duration::from_millis(100), string_sub.recv())
.await
.unwrap();
let int_msg = tokio::time::timeout(Duration::from_millis(100), int_sub.recv())
.await
.unwrap();
assert_eq!(string_msg.as_deref(), Some(&"hello".to_string()));
assert_eq!(int_msg.as_deref(), Some(&42i32));
}
#[tokio::test]
async fn test_broadcast_multiple_subscribers() {
let stream = Relay::new();
let mut sub1 = stream.subscribe::<String>();
let mut sub2 = stream.subscribe::<String>();
stream.send("hello".to_string()).await.unwrap();
let msg1 = tokio::time::timeout(Duration::from_millis(100), sub1.recv())
.await
.unwrap();
let msg2 = tokio::time::timeout(Duration::from_millis(100), sub2.recv())
.await
.unwrap();
assert_eq!(msg1.as_deref(), Some(&"hello".to_string()));
assert_eq!(msg2.as_deref(), Some(&"hello".to_string()));
}
#[tokio::test]
async fn test_tap() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let stream = Relay::new();
stream.tap::<i32, _, _>(move |n| {
counter_clone.fetch_add(*n as usize, Ordering::SeqCst);
});
stream.send(10i32).await.unwrap();
stream.send(20i32).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 30);
}
#[tokio::test]
async fn test_sink() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let stream = Relay::new();
stream.sink::<i32, _, _>(move |n| {
counter_clone.fetch_add(*n as usize, Ordering::SeqCst);
});
stream.send(5i32).await.unwrap();
stream.send(15i32).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 20);
}
#[tokio::test]
async fn stress_test_concurrent_senders() {
let stream = Relay::with_channel_size(1024);
let mut sub = stream.subscribe::<i32>();
const SENDERS: usize = 10;
const MESSAGES_PER_SENDER: usize = 100;
let handles: Vec<_> = (0..SENDERS)
.map(|sender_id| {
let stream = stream.clone();
tokio::spawn(async move {
for i in 0..MESSAGES_PER_SENDER {
stream
.send((sender_id * MESSAGES_PER_SENDER + i) as i32)
.await
.unwrap();
}
})
})
.collect();
for handle in handles {
handle.await.unwrap();
}
let mut count = 0;
loop {
match tokio::time::timeout(Duration::from_millis(100), sub.recv()).await {
Ok(Some(_)) => count += 1,
_ => break,
}
}
assert_eq!(count, SENDERS * MESSAGES_PER_SENDER);
}
#[derive(Debug, Clone)]
struct TestError(String);
impl std::fmt::Display for TestError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TestError: {}", self.0)
}
}
impl std::error::Error for TestError {}
#[tokio::test]
async fn test_sink_error_propagation() {
let stream = Relay::new();
stream.sink::<String, _, _>(|msg| {
if msg.as_str() == "bad" {
return Err(TestError("message was bad".to_string()));
}
Ok(())
});
let result = stream.send("good".to_string()).await;
assert!(result.is_ok());
let result = stream.send("bad".to_string()).await;
assert!(result.is_err());
if let Err(SendError::Downstream(err)) = result {
assert_eq!(err.source, "sink");
} else {
panic!("Expected Downstream error");
}
}
#[tokio::test]
async fn test_tap_error_propagation() {
let stream = Relay::new();
let _tapped = stream.tap::<i32, _, _>(|n| {
if *n < 0 {
return Err(TestError("negative number".to_string()));
}
Ok(())
});
let result = stream.send(42i32).await;
assert!(result.is_ok());
let result = stream.send(-1i32).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_error_subscription() {
let stream = Relay::new();
let mut error_sub = stream.subscribe::<RelayError>();
stream.sink::<String, _, _>(|msg| {
if msg.as_str() == "error" {
return Err(TestError("triggered error".to_string()));
}
Ok(())
});
let _ = stream.send("error".to_string()).await;
let error = tokio::time::timeout(Duration::from_millis(100), error_sub.recv())
.await
.unwrap();
assert!(error.is_some());
let error = error.unwrap();
assert_eq!(error.source, "sink");
}
#[tokio::test]
async fn test_sink_without_error() {
let stream = Relay::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
stream.sink::<i32, _, _>(move |n| {
counter_clone.fetch_add(*n as usize, Ordering::SeqCst);
});
stream.send(10i32).await.unwrap();
stream.send(20i32).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 30);
}
#[tokio::test]
async fn test_fire_and_forget() {
let stream = Relay::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
stream.sink::<i32, _, _>(move |n| {
counter_clone.fetch_add(*n as usize, Ordering::SeqCst);
});
let _ = stream.send(10i32).await;
let _ = stream.send(20i32).await;
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 30);
}
#[tokio::test]
async fn test_send_any() {
use std::any::TypeId;
let stream = Relay::new();
let mut sub = stream.subscribe::<String>();
let value: Arc<dyn std::any::Any + Send + Sync> = Arc::new("hello".to_string());
stream
.send_any(value, TypeId::of::<String>())
.await
.unwrap();
let msg = tokio::time::timeout(Duration::from_millis(100), sub.recv())
.await
.unwrap();
assert_eq!(msg.as_deref(), Some(&"hello".to_string()));
}
#[tokio::test]
async fn test_send_envelope() {
use crate::Envelope;
let stream = Relay::new();
let mut sub = stream.subscribe::<i32>();
let envelope = Envelope::new(42i32, 0, None);
stream.send_envelope(envelope).await.unwrap();
let msg = tokio::time::timeout(Duration::from_millis(100), sub.recv())
.await
.unwrap();
assert_eq!(msg.as_deref(), Some(&42i32));
}
#[tokio::test]
async fn test_within_basic() {
let stream = Relay::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let mut sub = stream.subscribe::<i32>();
stream.within(move || async move {
while let Some(n) = sub.recv().await {
counter_clone.fetch_add(*n as usize, Ordering::SeqCst);
}
});
stream.send(10i32).await.unwrap();
stream.send(20i32).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 30);
}
#[tokio::test]
async fn test_multiple_handlers_all_must_complete() {
let stream = Relay::new();
let counter1 = Arc::new(AtomicUsize::new(0));
let counter2 = Arc::new(AtomicUsize::new(0));
let c1 = counter1.clone();
let c2 = counter2.clone();
stream.sink::<i32, _, _>(move |n| {
c1.fetch_add(*n as usize, Ordering::SeqCst);
});
stream.sink::<i32, _, _>(move |n| {
c2.fetch_add(*n as usize * 2, Ordering::SeqCst);
});
stream.send(10i32).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter1.load(Ordering::SeqCst), 10);
assert_eq!(counter2.load(Ordering::SeqCst), 20);
}
#[tokio::test]
async fn test_error_from_one_handler() {
let stream = Relay::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
stream.sink::<i32, _, _>(move |n| {
counter_clone.fetch_add(*n as usize, Ordering::SeqCst);
});
stream.sink::<i32, _, _>(|n| {
if *n < 0 {
return Err(TestError("negative".to_string()));
}
Ok(())
});
stream.send(10i32).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 10);
let result = stream.send(-5i32).await;
assert!(result.is_err());
tokio::time::sleep(Duration::from_millis(50)).await;
}
#[tokio::test]
async fn test_no_handlers_immediate_return() {
let stream = Relay::new();
let result = stream.send("test".to_string()).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn stress_test_concurrent_errors() {
let stream = Relay::new();
let success_count = Arc::new(AtomicUsize::new(0));
let error_count = Arc::new(AtomicUsize::new(0));
for i in 0..5 {
let sc = success_count.clone();
let ec = error_count.clone();
stream.sink::<i32, _, _>(move |_n| {
if i % 2 == 1 {
ec.fetch_add(1, Ordering::SeqCst);
Err(TestError(format!("handler {} failed", i)))
} else {
sc.fetch_add(1, Ordering::SeqCst);
Ok(())
}
});
}
for _ in 0..100 {
let result = stream.send(1i32).await;
assert!(result.is_err());
}
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(success_count.load(Ordering::SeqCst), 300); assert_eq!(error_count.load(Ordering::SeqCst), 200); }
#[tokio::test]
async fn stress_test_rapid_error_sending() {
let stream = Relay::with_channel_size(1024);
stream.sink::<i32, _, _>(|_| Err(TestError("always fails".to_string())));
let mut error_count = 0;
for i in 0..500 {
let result = stream.send(i).await;
if result.is_err() {
error_count += 1;
}
}
assert_eq!(error_count, 500);
}
#[tokio::test]
async fn stress_test_error_subscription_under_load() {
let stream = Relay::with_channel_size(1024);
let errors_received = Arc::new(AtomicUsize::new(0));
let errors_received_clone = errors_received.clone();
let mut error_sub = stream.subscribe::<RelayError>();
tokio::spawn(async move {
while let Some(_err) = error_sub.recv().await {
errors_received_clone.fetch_add(1, Ordering::SeqCst);
}
});
stream.sink::<i32, _, _>(|n| {
if *n % 2 == 0 {
Err(TestError("even number".to_string()))
} else {
Ok(())
}
});
for i in 0..200 {
let _ = stream.send(i).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(errors_received.load(Ordering::SeqCst), 100);
}
#[tokio::test]
async fn stress_test_concurrent_senders_with_errors() {
let stream = Relay::with_channel_size(2048);
let error_count = Arc::new(AtomicUsize::new(0));
let success_count = Arc::new(AtomicUsize::new(0));
let ec = error_count.clone();
let sc = success_count.clone();
stream.sink::<i32, _, _>(move |n| {
if *n % 10 == 0 {
ec.fetch_add(1, Ordering::SeqCst);
Err(TestError("multiple of 10".to_string()))
} else {
sc.fetch_add(1, Ordering::SeqCst);
Ok(())
}
});
let handles: Vec<_> = (0..10)
.map(|sender_id| {
let stream = stream.clone();
tokio::spawn(async move {
for i in 0..100 {
let value = sender_id * 100 + i;
let _ = stream.send(value).await;
}
})
})
.collect();
for handle in handles {
handle.await.unwrap();
}
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(error_count.load(Ordering::SeqCst), 100);
assert_eq!(success_count.load(Ordering::SeqCst), 900);
}
#[tokio::test]
async fn stress_test_handler_registration_during_sends() {
let stream = Relay::new();
let total_received = Arc::new(AtomicUsize::new(0));
let stream_clone = stream.clone();
let sender_handle = tokio::spawn(async move {
for i in 0..100 {
let _ = stream_clone.send(i).await;
if i % 10 == 0 {
tokio::task::yield_now().await;
}
}
});
for _ in 0..5 {
tokio::task::yield_now().await;
let tr = total_received.clone();
stream.sink::<i32, _, _>(move |_| {
tr.fetch_add(1, Ordering::SeqCst);
});
}
sender_handle.await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(total_received.load(Ordering::SeqCst) > 0);
}
#[tokio::test]
async fn stress_test_within_with_panics() {
let stream = Relay::new();
let processed = Arc::new(AtomicUsize::new(0));
let processed_clone = processed.clone();
let mut error_sub = stream.subscribe::<RelayError>();
let panic_errors = Arc::new(AtomicUsize::new(0));
let pe = panic_errors.clone();
tokio::spawn(async move {
while let Some(err) = error_sub.recv().await {
if err.source == "within" || err.source == "subscription" {
pe.fetch_add(1, Ordering::SeqCst);
}
}
});
let mut sub = stream.subscribe::<i32>();
stream.within(move || async move {
while let Some(n) = sub.recv().await {
processed_clone.fetch_add(1, Ordering::SeqCst);
if *n == 50 {
panic!("deliberate panic at 50");
}
}
});
for i in 0..100 {
let result = stream.send(i).await;
if i == 50 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
if i > 50 && result.is_err() {
break;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(processed.load(Ordering::SeqCst) >= 50);
}
#[tokio::test]
async fn stress_test_mixed_success_and_error_types() {
let stream = Relay::new();
let string_count = Arc::new(AtomicUsize::new(0));
let int_count = Arc::new(AtomicUsize::new(0));
let error_count = Arc::new(AtomicUsize::new(0));
let sc = string_count.clone();
stream.sink::<String, _, _>(move |_| {
sc.fetch_add(1, Ordering::SeqCst);
});
let ic = int_count.clone();
let ec = error_count.clone();
stream.sink::<i32, _, _>(move |n| {
ic.fetch_add(1, Ordering::SeqCst);
if *n < 0 {
ec.fetch_add(1, Ordering::SeqCst);
Err(TestError("negative".to_string()))
} else {
Ok(())
}
});
for i in 0..50 {
let _ = stream.send(format!("msg_{}", i)).await;
let _ = stream.send(i).await;
let _ = stream.send(-i).await; }
tokio::time::sleep(Duration::from_millis(200)).await;
let sc = string_count.load(Ordering::SeqCst);
let ic = int_count.load(Ordering::SeqCst);
let ec = error_count.load(Ordering::SeqCst);
assert!(
(48..=50).contains(&sc),
"string_count {} not in expected range",
sc
);
assert!(
(98..=100).contains(&ic),
"int_count {} not in expected range",
ic
);
assert!(
(48..=50).contains(&ec),
"error_count {} not in expected range",
ec
);
}
#[tokio::test]
async fn stress_test_rapid_subscribe_unsubscribe() {
let stream = Relay::new();
let stream_clone = stream.clone();
let sender = tokio::spawn(async move {
for i in 0..200 {
let _ = stream_clone.send(i).await;
tokio::task::yield_now().await;
}
});
for _ in 0..100 {
let mut sub = stream.subscribe::<i32>();
let _ = tokio::time::timeout(Duration::from_micros(100), sub.recv()).await;
drop(sub);
}
sender.await.unwrap();
}
#[tokio::test]
async fn test_multiple_errors_first_wins() {
let stream = Relay::new();
stream.sink::<i32, _, _>(|_| Err(TestError("error 1".to_string())));
stream.sink::<i32, _, _>(|_| Err(TestError("error 2".to_string())));
stream.sink::<i32, _, _>(|_| Err(TestError("error 3".to_string())));
let result = stream.send(42i32).await;
assert!(result.is_err());
if let Err(SendError::Downstream(e)) = result {
assert!(e.source == "sink");
} else {
panic!("Expected Downstream error");
}
}
#[tokio::test]
async fn test_tracker_zero_expected() {
let stream = Relay::new();
let start = std::time::Instant::now();
let result = stream.send("test".to_string()).await;
let elapsed = start.elapsed();
assert!(result.is_ok());
assert!(
elapsed < Duration::from_millis(50),
"Should complete immediately"
);
}
#[tokio::test]
async fn test_sink_panic_propagation() {
let stream = Relay::new();
let mut error_sub = stream.subscribe::<RelayError>();
stream.sink::<i32, _, _>(|n| {
if *n == 42 {
panic!("deliberate panic");
}
});
let result = stream.send(42i32).await;
assert!(result.is_err());
let error = tokio::time::timeout(Duration::from_millis(100), error_sub.recv())
.await
.unwrap();
assert!(error.is_some());
let error = error.unwrap();
assert_eq!(error.source, "sink");
assert!(error.error.to_string().contains("panic"));
}
#[tokio::test]
async fn test_tap_panic_propagation() {
let stream = Relay::new();
let _tapped = stream.tap::<String, _, _>(|s| {
if s.as_str() == "boom" {
panic!("tap panic");
}
});
let result = stream.send("hello".to_string()).await;
assert!(result.is_ok());
let result = stream.send("boom".to_string()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_handler_slow_completion() {
let stream = Relay::new();
let completed = Arc::new(AtomicUsize::new(0));
let completed_clone = completed.clone();
stream.sink::<i32, _, _>(move |_| {
std::thread::sleep(Duration::from_millis(50));
completed_clone.fetch_add(1, Ordering::SeqCst);
});
let start = std::time::Instant::now();
stream.send(1i32).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(40),
"Should wait for handler"
);
assert_eq!(completed.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_subscription_drop_mid_message() {
let stream = Relay::new();
let processed = Arc::new(AtomicUsize::new(0));
let processed_clone = processed.clone();
let sub = stream.subscribe::<i32>();
stream.within({
let mut sub = sub;
move || async move {
if let Some(_) = sub.recv().await {
processed_clone.fetch_add(1, Ordering::SeqCst);
}
}
});
tokio::time::sleep(Duration::from_millis(10)).await;
stream.send(1i32).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(processed.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_error_does_not_block_subsequent_sends() {
let stream = Relay::new();
let success_count = Arc::new(AtomicUsize::new(0));
let sc = success_count.clone();
stream.sink::<i32, _, _>(move |n| {
if *n < 0 {
return Err(TestError("negative".to_string()));
}
sc.fetch_add(1, Ordering::SeqCst);
Ok(())
});
let r1 = stream.send(-1i32).await;
assert!(r1.is_err());
let r2 = stream.send(1i32).await;
assert!(r2.is_ok());
let r3 = stream.send(2i32).await;
assert!(r3.is_ok());
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(success_count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_mixed_handler_types_completion() {
let stream = Relay::new();
let string_done = Arc::new(AtomicUsize::new(0));
let int_done = Arc::new(AtomicUsize::new(0));
let sd = string_done.clone();
let id = int_done.clone();
stream.sink::<String, _, _>(move |_| {
sd.fetch_add(1, Ordering::SeqCst);
});
stream.sink::<i32, _, _>(move |_| {
id.fetch_add(1, Ordering::SeqCst);
});
stream.send("test".to_string()).await.unwrap();
stream.send(42i32).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(string_done.load(Ordering::SeqCst), 1);
assert_eq!(int_done.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_rapid_handler_registration_deregistration() {
let stream = Relay::new();
let total = Arc::new(AtomicUsize::new(0));
for _ in 0..10 {
let t = total.clone();
stream.sink::<i32, _, _>(move |_| {
t.fetch_add(1, Ordering::SeqCst);
});
}
stream.send(1i32).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(total.load(Ordering::SeqCst), 10);
}
#[tokio::test]
async fn test_handler_count() {
let stream = Relay::new();
assert_eq!(stream.handler_count(), 0);
stream.sink::<i32, _, _>(|_| {});
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(stream.handler_count(), 1);
stream.tap::<i32, _, _>(|_| {});
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(stream.handler_count(), 2);
let _sub = stream.subscribe::<i32>();
assert_eq!(stream.handler_count(), 2);
}
#[tokio::test]
async fn test_stream_basic_send_subscribe() {
let stream = Relay::new();
let mut sub = stream.subscribe::<String>();
stream.send("hello".to_string()).await.unwrap();
let msg = tokio::time::timeout(Duration::from_millis(100), sub.recv())
.await
.unwrap();
assert_eq!(msg.as_deref(), Some(&"hello".to_string()));
}
#[tokio::test]
async fn test_channel_no_echo() {
let (tx, rx) = Relay::channel();
let mut sub = rx.subscribe::<String>();
tx.send("test".to_string()).await.unwrap();
let msg = tokio::time::timeout(Duration::from_millis(100), sub.recv())
.await
.unwrap();
assert_eq!(msg.as_deref(), Some(&"test".to_string()));
}
#[tokio::test]
async fn test_stream_is_loopback() {
let stream = Relay::new();
let mut sub = stream.subscribe::<i32>();
stream.send(123i32).await.unwrap();
let msg = tokio::time::timeout(Duration::from_millis(100), sub.recv())
.await
.unwrap();
assert_eq!(msg.as_deref(), Some(&123i32));
}
#[tokio::test]
async fn test_stream_close() {
let stream = Relay::new();
assert!(!stream.is_closed());
stream.close();
assert!(stream.is_closed());
let result = stream.send("test".to_string()).await;
assert!(matches!(result, Err(SendError::Closed)));
}
#[tokio::test]
async fn invariant_send_waits_for_all_tracked_handlers() {
let relay = Relay::new();
let flag = Arc::new(AtomicBool::new(false));
let f = flag.clone();
relay.tap::<i32, _, _>(move |_| {
std::thread::sleep(Duration::from_millis(50));
f.store(true, Ordering::SeqCst);
});
relay.send(1i32).await.unwrap();
assert!(
flag.load(Ordering::SeqCst),
"send() must wait for handler to complete"
);
}
#[tokio::test]
async fn invariant_multiple_handlers_all_complete() {
let relay = Relay::new();
let count = Arc::new(AtomicUsize::new(0));
for _ in 0..5 {
let c = count.clone();
relay.sink::<i32, _, _>(move |_| {
std::thread::sleep(Duration::from_millis(10));
c.fetch_add(1, Ordering::SeqCst);
});
}
relay.send(1i32).await.unwrap();
assert_eq!(
count.load(Ordering::SeqCst),
5,
"all 5 handlers must complete"
);
}
#[tokio::test]
async fn invariant_downstream_error_propagates() {
let relay = Relay::new();
relay.tap::<i32, _, Result<(), std::io::Error>>(|_| {
Err(std::io::Error::new(std::io::ErrorKind::Other, "fail"))
});
let result = relay.send(1i32).await;
assert!(
matches!(result, Err(SendError::Downstream(_))),
"error must propagate"
);
}
#[tokio::test]
async fn invariant_panic_propagates_as_error() {
let relay = Relay::new();
relay.tap::<i32, _, ()>(|_| {
panic!("deliberate panic");
});
let result = relay.send(1i32).await;
assert!(
matches!(result, Err(SendError::Downstream(_))),
"panic must propagate as error"
);
}
#[tokio::test]
async fn invariant_error_emitted_exactly_once() {
let relay = Relay::new();
let error_count = Arc::new(AtomicUsize::new(0));
let ec = error_count.clone();
let mut error_sub = relay.subscribe::<RelayError>();
tokio::spawn(async move {
while let Some(_) = error_sub.recv().await {
ec.fetch_add(1, Ordering::SeqCst);
}
});
relay.sink::<i32, _, _>(|_| Err(TestError("fail".to_string())));
let _ = relay.send(1i32).await;
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
error_count.load(Ordering::SeqCst),
1,
"error must be emitted exactly once"
);
}
#[tokio::test]
async fn invariant_no_deadlock_on_handler_panic() {
let relay = Relay::new();
relay.tap::<i32, _, _>(|v| {
if *v == 1 {
panic!("boom");
}
});
let result = tokio::time::timeout(Duration::from_millis(500), relay.send(1i32)).await;
assert!(result.is_ok(), "send() must not deadlock on handler panic");
}
#[tokio::test]
async fn invariant_handler_count_restored_after_panic() {
let relay = Relay::new();
relay.tap::<i32, _, ()>(|_| {
panic!("boom");
});
let _ = relay.send(1i32).await;
tokio::time::sleep(Duration::from_millis(50)).await;
relay.close();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
relay.handler_count(),
0,
"handler_count must be 0 after relay closed"
);
}
#[tokio::test]
async fn invariant_closed_relay_rejects_sends() {
let relay = Relay::new();
relay.close();
let result = relay.send(1i32).await;
assert!(matches!(result, Err(SendError::Closed)));
}
#[tokio::test]
async fn invariant_subscriptions_close_on_relay_close() {
let relay = Relay::new();
let mut sub = relay.subscribe::<i32>();
relay.send(1i32).await.unwrap();
relay.close();
let msg = sub.recv().await;
assert!(msg.is_some());
let closed = sub.recv().await;
assert!(closed.is_none(), "subscription must close");
}
#[tokio::test]
async fn regression_handler_guard_decrements_on_panic() {
let relay = Relay::new();
relay.sink::<i32, _, ()>(|_| {
panic!("intentional panic");
});
assert_eq!(relay.handler_count(), 1);
let _ = relay.send(1i32).await;
relay.close();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
relay.handler_count(),
0,
"handler_count must be 0 after handler task exits"
);
}
#[tokio::test]
async fn soak_concurrent_subscribe_unsubscribe() {
let relay = Relay::with_channel_size(256);
let relay_clone = relay.clone();
let sender = tokio::spawn(async move {
for i in 0..200 {
let _ = relay_clone.send(i).await;
tokio::task::yield_now().await;
}
});
let relay_clone2 = relay.clone();
let churner = tokio::spawn(async move {
for _ in 0..50 {
let sub = relay_clone2.subscribe::<i32>();
tokio::task::yield_now().await;
drop(sub);
}
});
let results = tokio::time::timeout(Duration::from_secs(5), async {
let _ = sender.await;
let _ = churner.await;
})
.await;
assert!(
results.is_ok(),
"must not deadlock under subscribe/unsubscribe churn"
);
}
#[tokio::test]
async fn test_send_with_immediate_ready() {
let relay = Relay::new();
let received = Arc::new(AtomicBool::new(false));
let received_clone = received.clone();
relay.sink(move |_msg: &String| {
received_clone.store(true, Ordering::SeqCst);
});
let result = tokio::time::timeout(
Duration::from_millis(100),
relay.send("test".to_string())
)
.await;
assert!(
result.is_ok(),
"send() should not block - subscriptions signal ready immediately"
);
assert!(
received.load(Ordering::SeqCst),
"handler should have received the message"
);
}
#[tokio::test]
async fn test_subscription_immediately_ready() {
let relay = Relay::new();
relay.sink(|_msg: &String| {});
let result = tokio::time::timeout(
Duration::from_millis(100),
relay.send("test".to_string())
)
.await;
assert!(
result.is_ok(),
"send() should not block - subscriptions are ready immediately"
);
}