use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use jaeb::{DeadLetter, EventBus, EventBusError, SubscriptionPolicy, SyncEventHandler, SyncSubscriptionPolicy};
use proptest::prelude::*;
#[derive(Clone, Debug)]
struct PropEvent {
_value: u16,
}
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread().enable_all().build().expect("runtime")
}
fn test_config() -> proptest::test_runner::Config {
proptest::test_runner::Config {
cases: 200,
max_shrink_iters: 200,
..proptest::test_runner::Config::default()
}
}
struct DeadLetterCounter {
count: Arc<AtomicUsize>,
}
impl SyncEventHandler<DeadLetter> for DeadLetterCounter {
fn handle(&self, _event: &DeadLetter) -> Result<(), jaeb::HandlerError> {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
proptest! {
#![proptest_config(test_config())]
#[test]
fn all_sync_listeners_see_every_event(listeners in 1usize..8, events in 1usize..20) {
rt().block_on(async move {
let bus = EventBus::new(256).expect("valid config");
let hits = Arc::new(AtomicUsize::new(0));
for _ in 0..listeners {
let hits_for_handler = Arc::clone(&hits);
let _sub = bus
.subscribe::<PropEvent, _, _>(move |_event: &PropEvent| {
hits_for_handler.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.expect("subscribe");
}
for _ in 0..events {
bus.publish(PropEvent { _value: 1 }).await.expect("publish");
}
bus.shutdown().await.expect("shutdown");
assert_eq!(hits.load(Ordering::SeqCst), listeners * events);
});
}
#[test]
fn priority_ordering_respected(p1 in -20i32..20, p2 in -20i32..20, p3 in -20i32..20) {
rt().block_on(async move {
let bus = EventBus::new(64).expect("valid config");
let order = Arc::new(Mutex::new(Vec::<usize>::new()));
let register = |id: usize, priority: i32, order: Arc<Mutex<Vec<usize>>>, bus: EventBus| async move {
let _sub = bus
.subscribe_with_policy::<PropEvent, _, _>(
move |_event: &PropEvent| {
order.lock().expect("order lock").push(id);
Ok(())
},
SyncSubscriptionPolicy::default().with_priority(priority),
)
.await
.expect("subscribe");
};
register(1, p1, Arc::clone(&order), bus.clone()).await;
register(2, p2, Arc::clone(&order), bus.clone()).await;
register(3, p3, Arc::clone(&order), bus.clone()).await;
bus.publish(PropEvent { _value: 1 }).await.expect("publish");
bus.shutdown().await.expect("shutdown");
let got = order.lock().expect("order lock").clone();
assert_eq!(got.len(), 3);
let mut expected = vec![(1usize, p1), (2usize, p2), (3usize, p3)];
expected.sort_by(|a, b| b.1.cmp(&a.1));
let expected_ids: Vec<usize> = expected.into_iter().map(|(id, _)| id).collect();
assert_eq!(got, expected_ids);
});
}
#[test]
fn retry_count_bounded(max_retries in 0usize..5) {
rt().block_on(async move {
let bus = EventBus::new(64).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = SubscriptionPolicy::default()
.with_max_retries(max_retries)
.with_dead_letter(false);
let attempts_for_handler = Arc::clone(&attempts);
let _sub = bus
.subscribe_with_policy::<PropEvent, _, _>(
move |_event: PropEvent| {
let attempts_for_handler = Arc::clone(&attempts_for_handler);
async move {
attempts_for_handler.fetch_add(1, Ordering::SeqCst);
Err::<(), _>("always fail".into())
}
},
policy,
)
.await
.expect("subscribe");
bus.publish(PropEvent { _value: 1 }).await.expect("publish");
bus.shutdown().await.expect("shutdown");
assert_eq!(attempts.load(Ordering::SeqCst), max_retries + 1);
});
}
#[test]
fn dead_letter_iff_enabled_and_failed(dead_letter_enabled in any::<bool>(), fail_count in 0usize..4, max_retries in 0usize..4) {
rt().block_on(async move {
let bus = EventBus::new(64).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let dead_letters = Arc::new(AtomicUsize::new(0));
let dead_letters_for_handler = Arc::clone(&dead_letters);
let _dl = bus
.subscribe_dead_letters(DeadLetterCounter {
count: dead_letters_for_handler,
})
.await
.expect("subscribe dead letters");
let policy = SubscriptionPolicy::default()
.with_max_retries(max_retries)
.with_dead_letter(dead_letter_enabled);
let attempts_for_handler = Arc::clone(&attempts);
let _sub = bus
.subscribe_with_policy::<PropEvent, _, _>(
move |_event: PropEvent| {
let attempts_for_handler = Arc::clone(&attempts_for_handler);
async move {
let n = attempts_for_handler.fetch_add(1, Ordering::SeqCst);
if n < fail_count {
Err::<(), _>("fail".into())
} else {
Ok(())
}
}
},
policy,
)
.await
.expect("subscribe");
bus.publish(PropEvent { _value: 1 }).await.expect("publish");
bus.shutdown().await.expect("shutdown");
let terminal_failure = fail_count > max_retries;
let expected_dead_letters = if dead_letter_enabled && terminal_failure { 1 } else { 0 };
assert_eq!(dead_letters.load(Ordering::SeqCst), expected_dead_letters);
});
}
#[test]
fn once_handler_fires_at_most_once(events in 1usize..50, values in proptest::collection::vec(0u16..1000, 1..50)) {
rt().block_on(async move {
let bus = EventBus::new(64).expect("valid config");
let hits = Arc::new(AtomicUsize::new(0));
let hits_for_handler = Arc::clone(&hits);
let _sub = bus
.subscribe_once::<PropEvent, _, _>(move |_event: &PropEvent| {
hits_for_handler.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.expect("subscribe once");
for i in 0..events {
let value = values.get(i % values.len()).copied().unwrap_or(0);
bus.publish(PropEvent { _value: value }).await.expect("publish");
}
bus.shutdown().await.expect("shutdown");
assert!(hits.load(Ordering::SeqCst) <= 1);
});
}
#[test]
fn unsubscribe_prevents_future_dispatch(first_batch in 1usize..10, second_batch in 1usize..10) {
rt().block_on(async move {
let bus = EventBus::new(64).expect("valid config");
let hits = Arc::new(AtomicUsize::new(0));
let hits_for_handler = Arc::clone(&hits);
let sub = bus
.subscribe::<PropEvent, _, _>(move |_event: &PropEvent| {
hits_for_handler.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.expect("subscribe");
for _ in 0..first_batch {
bus.publish(PropEvent { _value: 1 }).await.expect("publish before unsubscribe");
}
let removed = sub.unsubscribe().await.expect("unsubscribe");
assert!(removed, "first unsubscribe should remove the listener");
for _ in 0..second_batch {
bus.publish(PropEvent { _value: 2 }).await.expect("publish after unsubscribe");
}
bus.shutdown().await.expect("shutdown");
assert_eq!(hits.load(Ordering::SeqCst), first_batch);
});
}
}
#[test]
fn prop_event_payload_roundtrip_sanity() {
let event = PropEvent { _value: 42 };
assert_eq!(event._value, 42);
}
#[tokio::test]
async fn unsubscribe_returns_false_when_already_removed() {
let bus = EventBus::new(32).expect("valid config");
let sub = bus.subscribe::<PropEvent, _, _>(|_event: &PropEvent| Ok(())).await.expect("subscribe");
let id = sub.id();
assert!(sub.unsubscribe().await.expect("unsubscribe"));
assert!(!bus.unsubscribe(id).await.expect("second unsubscribe"));
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn publish_after_shutdown_is_stopped() {
let bus = EventBus::new(32).expect("valid config");
bus.shutdown().await.expect("shutdown");
let err = bus.publish(PropEvent { _value: 1 }).await.expect_err("publish after shutdown");
assert_eq!(err, EventBusError::Stopped);
}