jaeb 0.3.9

simple snapshot-driven event bus
Documentation
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};

use jaeb::{EventBus, EventHandler, HandlerResult, SubscriptionPolicy, SyncEventHandler, SyncSubscriptionPolicy};

#[derive(Clone)]
struct PriorityEvent;

struct OrderedSync {
    id: usize,
    order: Arc<Mutex<Vec<usize>>>,
}

impl SyncEventHandler<PriorityEvent> for OrderedSync {
    fn handle(&self, _event: &PriorityEvent) -> HandlerResult {
        let mut guard = self.order.lock().expect("lock order");
        guard.push(self.id);
        Ok(())
    }
}

struct OrderedAsync {
    id: usize,
    order: Arc<Mutex<Vec<usize>>>,
    hits: Arc<AtomicUsize>,
}

impl EventHandler<PriorityEvent> for OrderedAsync {
    async fn handle(&self, _event: &PriorityEvent) -> HandlerResult {
        self.hits.fetch_add(1, Ordering::SeqCst);
        let mut guard = self.order.lock().expect("lock order");
        guard.push(self.id);
        Ok(())
    }
}

#[tokio::test]
async fn sync_priority_orders_high_to_low() {
    let bus = EventBus::new(64).expect("valid config");
    let order = Arc::new(Mutex::new(Vec::new()));

    let low = SyncSubscriptionPolicy::default().with_priority(-10);
    let mid = SyncSubscriptionPolicy::default().with_priority(0);
    let high = SyncSubscriptionPolicy::default().with_priority(10);

    let _ = bus
        .subscribe_with_policy::<PriorityEvent, _, _>(
            OrderedSync {
                id: 1,
                order: Arc::clone(&order),
            },
            low,
        )
        .await
        .expect("subscribe low");

    let _ = bus
        .subscribe_with_policy::<PriorityEvent, _, _>(
            OrderedSync {
                id: 2,
                order: Arc::clone(&order),
            },
            high,
        )
        .await
        .expect("subscribe high");

    let _ = bus
        .subscribe_with_policy::<PriorityEvent, _, _>(
            OrderedSync {
                id: 3,
                order: Arc::clone(&order),
            },
            mid,
        )
        .await
        .expect("subscribe mid");

    bus.publish(PriorityEvent).await.expect("publish");
    bus.shutdown().await.expect("shutdown");

    let got = order.lock().expect("lock order").clone();
    assert_eq!(got, vec![2, 3, 1]);
}

#[tokio::test]
async fn sync_equal_priority_keeps_fifo_order() {
    let bus = EventBus::new(64).expect("valid config");
    let order = Arc::new(Mutex::new(Vec::new()));

    let p = SyncSubscriptionPolicy::default().with_priority(7);

    let _ = bus
        .subscribe_with_policy::<PriorityEvent, _, _>(
            OrderedSync {
                id: 1,
                order: Arc::clone(&order),
            },
            p,
        )
        .await
        .expect("subscribe 1");

    let _ = bus
        .subscribe_with_policy::<PriorityEvent, _, _>(
            OrderedSync {
                id: 2,
                order: Arc::clone(&order),
            },
            p,
        )
        .await
        .expect("subscribe 2");

    let _ = bus
        .subscribe_with_policy::<PriorityEvent, _, _>(
            OrderedSync {
                id: 3,
                order: Arc::clone(&order),
            },
            p,
        )
        .await
        .expect("subscribe 3");

    bus.publish(PriorityEvent).await.expect("publish");
    bus.shutdown().await.expect("shutdown");

    let got = order.lock().expect("lock order").clone();
    assert_eq!(got, vec![1, 2, 3]);
}

#[tokio::test]
async fn async_priority_orders_spawn_sequence() {
    let bus = EventBus::builder().buffer_size(64).max_concurrent_async(1).build().expect("valid config");

    let order = Arc::new(Mutex::new(Vec::new()));
    let hits = Arc::new(AtomicUsize::new(0));

    let _ = bus
        .subscribe_with_policy::<PriorityEvent, _, _>(
            OrderedAsync {
                id: 1,
                order: Arc::clone(&order),
                hits: Arc::clone(&hits),
            },
            SubscriptionPolicy::default().with_priority(-1),
        )
        .await
        .expect("subscribe low");

    let _ = bus
        .subscribe_with_policy::<PriorityEvent, _, _>(
            OrderedAsync {
                id: 2,
                order: Arc::clone(&order),
                hits: Arc::clone(&hits),
            },
            SubscriptionPolicy::default().with_priority(20),
        )
        .await
        .expect("subscribe high");

    let _ = bus
        .subscribe_with_policy::<PriorityEvent, _, _>(
            OrderedAsync {
                id: 3,
                order: Arc::clone(&order),
                hits: Arc::clone(&hits),
            },
            SubscriptionPolicy::default().with_priority(0),
        )
        .await
        .expect("subscribe mid");

    bus.publish(PriorityEvent).await.expect("publish");
    bus.shutdown().await.expect("shutdown");

    assert_eq!(hits.load(Ordering::SeqCst), 3);
    let got = order.lock().expect("lock order").clone();
    assert_eq!(got, vec![2, 3, 1]);
}