use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::Value;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::bus::event::{AckResult, Event};
use agent_block_types::error::BlockError;
#[async_trait]
pub trait Handler: Send + Sync + 'static {
async fn call(&self, kind: String, id: String, payload: Value, meta: Value) -> AckResult;
}
pub type HandlerKey = Arc<dyn Handler>;
pub struct EventBus {
rx: mpsc::Receiver<Event>,
handlers: HashMap<String, HandlerKey>,
any: Option<HandlerKey>,
running: bool,
}
impl EventBus {
pub fn new(rx: mpsc::Receiver<Event>) -> Self {
Self {
rx,
handlers: HashMap::new(),
any: None,
running: false,
}
}
pub fn on(&mut self, kind: impl Into<String>, handler: HandlerKey) -> Result<(), BlockError> {
if self.running {
return Err(BlockError::Bus(
"bus.on cannot be called after bus.serve() has started".into(),
));
}
let kind = kind.into();
if self.handlers.insert(kind.clone(), handler).is_some() {
tracing::warn!(kind = %kind, "bus.on: duplicate registration (last-write-wins)");
}
Ok(())
}
pub fn on_any(&mut self, handler: HandlerKey) -> Result<(), BlockError> {
if self.running {
return Err(BlockError::Bus(
"bus.on_any cannot be called after bus.serve() has started".into(),
));
}
if self.any.is_some() {
tracing::warn!("bus.on_any: duplicate registration (last-write-wins)");
}
self.any = Some(handler);
Ok(())
}
#[cfg(test)]
fn handler_count(&self) -> usize {
self.handlers.len()
}
pub async fn run(&mut self, shutdown: CancellationToken) -> Result<(), BlockError> {
self.running = true;
tracing::info!("bus: dispatcher loop starting");
loop {
tokio::select! {
biased;
_ = shutdown.cancelled() => {
tracing::info!("bus: shutdown signalled; closing receiver");
self.rx.close();
break;
}
maybe_evt = self.rx.recv() => {
let Some(evt) = maybe_evt else {
tracing::info!("bus: all senders dropped; exiting loop");
break;
};
self.dispatch(evt).await;
}
}
}
tracing::info!("bus: dispatcher loop exited");
Ok(())
}
async fn dispatch(&self, mut evt: Event) {
let handler = self
.handlers
.get(&evt.kind)
.cloned()
.or_else(|| self.any.clone());
let Some(handler) = handler else {
tracing::warn!(kind = %evt.kind, id = %evt.id, "bus: no handler for event; nacking");
let err = BlockError::Bus(format!("no handler for kind `{}`", evt.kind));
if let Err(e) = evt.deliver_ack(Err(err)) {
tracing::warn!(kind = %evt.kind, id = %evt.id, error = %e, "bus: failed to deliver nack");
}
return;
};
let kind = evt.kind.clone();
let id = evt.id.clone();
let payload = evt.payload.clone();
let meta = evt.meta.clone();
let join = tokio::spawn(async move { handler.call(kind, id, payload, meta).await });
let result: AckResult = match join.await {
Ok(ack) => ack,
Err(join_err) => {
let msg = if join_err.is_panic() {
panic_message(join_err.into_panic())
} else {
format!("handler task error: {join_err}")
};
tracing::error!(
kind = %evt.kind,
id = %evt.id,
"bus: handler panicked: {}",
msg
);
Err(BlockError::Bus(format!("handler panic: {msg}")))
}
};
if let Err(ref e) = result {
tracing::warn!(kind = %evt.kind, id = %evt.id, error = %e, "bus: handler returned error");
}
if let Err(e) = evt.deliver_ack(result) {
tracing::warn!(kind = %evt.kind, id = %evt.id, error = %e, "bus: ack delivery failed");
}
}
}
fn panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&'static str>() {
return (*s).to_string();
}
if let Some(s) = payload.downcast_ref::<String>() {
return s.clone();
}
"<non-string panic payload>".to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex as StdMutex;
use std::time::Duration;
use tokio::sync::{oneshot, Mutex as TokioMutex};
struct RecordingHandler {
label: &'static str,
calls: Arc<AtomicUsize>,
}
#[async_trait]
impl Handler for RecordingHandler {
async fn call(
&self,
_kind: String,
_id: String,
_payload: Value,
_meta: Value,
) -> AckResult {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(Value::String(self.label.to_string()))
}
}
struct PanickingHandler;
#[async_trait]
impl Handler for PanickingHandler {
async fn call(
&self,
_kind: String,
_id: String,
_payload: Value,
_meta: Value,
) -> AckResult {
panic!("boom");
}
}
fn send_event(tx: &mpsc::Sender<Event>, kind: &str, id: &str) -> oneshot::Receiver<AckResult> {
let (evt, rx) = Event::with_ack(kind, id, json!({"hello": "world"}), Value::Null);
tx.try_send(evt).expect("mpsc send");
rx
}
#[tokio::test]
async fn kind_specific_dispatch_hits_specialized_handler() {
let (tx, rx) = mpsc::channel::<Event>(4);
let mut bus = EventBus::new(rx);
let mesh_calls = Arc::new(AtomicUsize::new(0));
let any_calls = Arc::new(AtomicUsize::new(0));
bus.on(
"mesh",
Arc::new(RecordingHandler {
label: "mesh",
calls: mesh_calls.clone(),
}),
)
.unwrap();
bus.on_any(Arc::new(RecordingHandler {
label: "any",
calls: any_calls.clone(),
}))
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "mesh", "e1");
let got = ack.await.unwrap().unwrap();
assert_eq!(got, Value::String("mesh".into()));
assert_eq!(mesh_calls.load(Ordering::SeqCst), 1);
assert_eq!(any_calls.load(Ordering::SeqCst), 0);
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn on_any_fallback_fires_only_when_no_match() {
let (tx, rx) = mpsc::channel::<Event>(4);
let mut bus = EventBus::new(rx);
let any_calls = Arc::new(AtomicUsize::new(0));
bus.on_any(Arc::new(RecordingHandler {
label: "any",
calls: any_calls.clone(),
}))
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "unknown_kind", "e1");
let got = ack.await.unwrap().unwrap();
assert_eq!(got, Value::String("any".into()));
assert_eq!(any_calls.load(Ordering::SeqCst), 1);
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn no_handler_produces_nack() {
let (tx, rx) = mpsc::channel::<Event>(4);
let mut bus = EventBus::new(rx);
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "mesh", "e1");
let got = ack.await.unwrap();
match got {
Err(BlockError::Bus(msg)) => {
assert!(msg.contains("no handler"), "unexpected msg: {msg}");
}
other => panic!("expected Bus err, got {other:?}"),
}
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn shutdown_token_breaks_loop() {
let (tx, rx) = mpsc::channel::<Event>(4);
let mut bus = EventBus::new(rx);
bus.on_any(Arc::new(RecordingHandler {
label: "any",
calls: Arc::new(AtomicUsize::new(0)),
}))
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
token.cancel();
let res = tokio::time::timeout(Duration::from_millis(500), handle)
.await
.expect("timeout");
res.unwrap().unwrap();
drop(tx);
}
#[tokio::test]
async fn handler_panic_is_isolated_and_loop_continues() {
let (tx, rx) = mpsc::channel::<Event>(4);
let mut bus = EventBus::new(rx);
let ok_calls = Arc::new(AtomicUsize::new(0));
bus.on("boom", Arc::new(PanickingHandler)).unwrap();
bus.on(
"ok",
Arc::new(RecordingHandler {
label: "ok",
calls: ok_calls.clone(),
}),
)
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "boom", "e1");
let got = ack.await.unwrap();
match got {
Err(BlockError::Bus(msg)) => {
assert!(
msg.contains("panic") || msg.contains("boom"),
"unexpected msg: {msg}"
);
}
other => panic!("expected Bus err, got {other:?}"),
}
let ack = send_event(&tx, "ok", "e2");
let got = ack.await.unwrap().unwrap();
assert_eq!(got, Value::String("ok".into()));
assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn bounded_mpsc_applies_backpressure_not_drop() {
let (tx, rx) = mpsc::channel::<Event>(1);
let mut bus = EventBus::new(rx);
bus.on(
"slow",
Arc::new(RecordingHandler {
label: "slow",
calls: Arc::new(AtomicUsize::new(0)),
}),
)
.unwrap();
let (evt1, _ack1_rx) = Event::with_ack("slow", "e1", json!({}), Value::Null);
tx.try_send(evt1).expect("first send fits capacity 1");
let (evt2, _ack2_rx) = Event::with_ack("slow", "e2", json!({}), Value::Null);
let err = tx.try_send(evt2).unwrap_err();
assert!(
matches!(err, mpsc::error::TrySendError::Full(_)),
"expected Full, got {err:?}"
);
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
tokio::time::sleep(Duration::from_millis(50)).await;
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn on_after_running_returns_err() {
let (_tx, rx) = mpsc::channel::<Event>(1);
let mut bus = EventBus::new(rx);
bus.running = true;
let err = bus
.on(
"mesh",
Arc::new(RecordingHandler {
label: "x",
calls: Arc::new(AtomicUsize::new(0)),
}),
)
.unwrap_err();
match err {
BlockError::Bus(msg) => assert!(msg.contains("bus.on")),
other => panic!("expected Bus err, got {other:?}"),
}
let err = bus
.on_any(Arc::new(RecordingHandler {
label: "x",
calls: Arc::new(AtomicUsize::new(0)),
}))
.unwrap_err();
match err {
BlockError::Bus(msg) => assert!(msg.contains("bus.on_any")),
other => panic!("expected Bus err, got {other:?}"),
}
assert_eq!(bus.handler_count(), 0);
}
#[tokio::test]
async fn duplicate_on_is_last_write_wins() {
let (tx, rx) = mpsc::channel::<Event>(2);
let mut bus = EventBus::new(rx);
let first_calls = Arc::new(AtomicUsize::new(0));
let second_calls = Arc::new(AtomicUsize::new(0));
bus.on(
"mesh",
Arc::new(RecordingHandler {
label: "first",
calls: first_calls.clone(),
}),
)
.unwrap();
bus.on(
"mesh",
Arc::new(RecordingHandler {
label: "second",
calls: second_calls.clone(),
}),
)
.unwrap();
assert_eq!(bus.handler_count(), 1);
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "mesh", "e1");
let got = ack.await.unwrap().unwrap();
assert_eq!(got, Value::String("second".into()));
assert_eq!(first_calls.load(Ordering::SeqCst), 0);
assert_eq!(second_calls.load(Ordering::SeqCst), 1);
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
struct OrderingHandler {
order: Arc<StdMutex<Vec<String>>>,
delay: Duration,
}
#[async_trait]
impl Handler for OrderingHandler {
async fn call(
&self,
_kind: String,
id: String,
_payload: Value,
_meta: Value,
) -> AckResult {
tokio::time::sleep(self.delay).await;
self.order.lock().expect("order mutex").push(id.clone());
Ok(Value::String(id))
}
}
struct ErrHandler;
#[async_trait]
impl Handler for ErrHandler {
async fn call(
&self,
_kind: String,
_id: String,
_payload: Value,
_meta: Value,
) -> AckResult {
Err(BlockError::Bus("x".into()))
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_bus_event_serialization_arrival_order() {
const N: usize = 20;
let (tx, rx) = mpsc::channel::<Event>(N);
let mut bus = EventBus::new(rx);
let order = Arc::new(StdMutex::new(Vec::new()));
bus.on(
"k",
Arc::new(OrderingHandler {
order: Arc::clone(&order),
delay: Duration::from_millis(5),
}),
)
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let mut expected = Vec::with_capacity(N);
let mut acks = Vec::with_capacity(N);
for i in 0..N {
let id = format!("e{i}");
expected.push(id.clone());
let (evt, rx) = Event::with_ack("k", id, json!({}), Value::Null);
tx.send(evt).await.expect("send");
acks.push(rx);
}
for (i, ack) in acks.into_iter().enumerate() {
let got = ack.await.expect("ack recv").expect("ack ok");
assert_eq!(got, Value::String(format!("e{i}")));
}
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
let recorded = order.lock().unwrap().clone();
assert_eq!(recorded, expected, "dispatcher must preserve arrival order");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_bus_graceful_shutdown_within_grace_ms() {
let (tx, rx) = mpsc::channel::<Event>(4);
let mut bus = EventBus::new(rx);
bus.on_any(Arc::new(RecordingHandler {
label: "any",
calls: Arc::new(AtomicUsize::new(0)),
}))
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
token.cancel();
let res = tokio::time::timeout(Duration::from_millis(1500), handle)
.await
.expect("bus.run must exit within grace window");
res.unwrap().unwrap();
drop(tx);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_bus_handler_panic_isolation_catch_unwind() {
let (tx, rx) = mpsc::channel::<Event>(4);
let mut bus = EventBus::new(rx);
let ok_calls = Arc::new(AtomicUsize::new(0));
bus.on("crash", Arc::new(PanickingHandler)).unwrap();
bus.on(
"normal",
Arc::new(RecordingHandler {
label: "normal",
calls: ok_calls.clone(),
}),
)
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "crash", "e1");
let got = ack.await.unwrap();
assert!(matches!(got, Err(BlockError::Bus(_))), "panic must NACK");
let ack = send_event(&tx, "normal", "e2");
let got = ack.await.unwrap().unwrap();
assert_eq!(got, Value::String("normal".into()));
assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_bus_backpressure_bounded_mpsc_capacity() {
let (tx, rx) = mpsc::channel::<Event>(1);
let mut bus = EventBus::new(rx);
let calls = Arc::new(AtomicUsize::new(0));
bus.on(
"k",
Arc::new(RecordingHandler {
label: "k",
calls: calls.clone(),
}),
)
.unwrap();
let (evt1, _r1) = Event::with_ack("k", "e1", json!({}), Value::Null);
tx.try_send(evt1).expect("first send fits");
let (evt2, _r2) = Event::with_ack("k", "e2", json!({}), Value::Null);
let err = tx.try_send(evt2).expect_err("capacity full");
assert!(
matches!(err, mpsc::error::TrySendError::Full(_)),
"expected Full (not drop), got {err:?}"
);
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let (evt3, r3) = Event::with_ack("k", "e3", json!({}), Value::Null);
tx.send(evt3).await.expect("send e3");
let (evt4, r4) = Event::with_ack("k", "e4", json!({}), Value::Null);
tx.send(evt4).await.expect("send e4");
r3.await.unwrap().unwrap();
r4.await.unwrap().unwrap();
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
assert!(calls.load(Ordering::SeqCst) >= 2);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_bus_oneshot_ack_timeout_30s() {
let (tx, rx) = oneshot::channel::<AckResult>();
drop(tx);
let start = tokio::time::Instant::now();
let got = tokio::time::timeout(Duration::from_secs(30), rx)
.await
.expect("should not hit 30s timeout");
assert!(got.is_err(), "expected RecvError, got {got:?}");
assert!(
start.elapsed() < Duration::from_secs(1),
"sender-drop must short-circuit the 30s timeout"
);
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_bus_sigterm_sigint_race_select() {
use tokio::signal::unix::{signal, SignalKind};
let token = CancellationToken::new();
let token_for_task = token.clone();
let task = tokio::spawn(async move {
let mut term = signal(SignalKind::terminate()).expect("install SIGTERM");
tokio::select! {
_ = term.recv() => token_for_task.cancel(),
_ = tokio::signal::ctrl_c() => token_for_task.cancel(),
}
});
tokio::time::sleep(Duration::from_millis(100)).await;
nix::sys::signal::kill(nix::unistd::Pid::this(), nix::sys::signal::Signal::SIGTERM)
.expect("kill(SIGTERM)");
tokio::time::timeout(Duration::from_secs(2), token.cancelled())
.await
.expect("cancel must fire within 2s after SIGTERM");
task.await.expect("signal task");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_bus_arc_tokio_mutex_no_await_while_held() {
let (_tx, rx) = mpsc::channel::<Event>(1);
let shared: Arc<TokioMutex<EventBus>> = Arc::new(TokioMutex::new(EventBus::new(rx)));
let a = Arc::clone(&shared);
let t1 = tokio::spawn(async move {
let guard = a.lock().await;
let _ = guard.handler_count();
drop(guard);
tokio::time::sleep(Duration::from_millis(10)).await;
});
let b = Arc::clone(&shared);
let t2 = tokio::spawn(async move {
let guard = b.lock().await;
let _ = guard.handler_count();
drop(guard);
});
tokio::time::timeout(Duration::from_secs(2), async {
t1.await.unwrap();
t2.await.unwrap();
})
.await
.expect("no deadlock");
}
#[test]
fn test_bus_catch_unwind_paniceq_abort_not_caught() {
use std::panic::{catch_unwind, AssertUnwindSafe};
let ok = catch_unwind(|| 42);
assert_eq!(ok.ok(), Some(42));
let mut v = 0i32;
let caught = catch_unwind(AssertUnwindSafe(|| {
v += 1;
panic!("boom");
}));
assert!(caught.is_err(), "expected caught panic");
assert_eq!(v, 1, "side effect before panic still observable");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_bus_spawn_signal_task_cancellation() {
let shutdown = CancellationToken::new();
let shutdown_clone = shutdown.clone();
let task = tokio::spawn(async move {
shutdown_clone.cancelled().await;
});
task.abort();
let res = task.await;
match res {
Err(e) => assert!(e.is_cancelled(), "expected cancelled JoinError, got {e:?}"),
Ok(()) => panic!("task should have been cancelled before completing"),
}
assert!(!shutdown.is_cancelled());
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_bus_timeout_ack_expiry_30s_match() {
let (_tx, rx) = oneshot::channel::<AckResult>();
let fut = tokio::time::timeout(Duration::from_secs(30), rx);
tokio::pin!(fut);
tokio::time::advance(Duration::from_secs(29)).await;
assert!(
futures_poll_once(&mut fut).is_none(),
"timeout must not fire before 30s"
);
tokio::time::advance(Duration::from_secs(2)).await;
let got = (&mut fut).await;
assert!(got.is_err(), "expected Elapsed, got {got:?}");
}
fn futures_poll_once<F: std::future::Future>(
fut: &mut std::pin::Pin<&mut F>,
) -> Option<F::Output> {
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
fn raw_waker() -> RawWaker {
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker {
raw_waker()
}
static VT: RawWakerVTable = RawWakerVTable::new(clone, no_op, no_op, no_op);
RawWaker::new(std::ptr::null(), &VT)
}
let waker = unsafe { Waker::from_raw(raw_waker()) };
let mut cx = Context::from_waker(&waker);
match fut.as_mut().poll(&mut cx) {
Poll::Ready(v) => Some(v),
Poll::Pending => None,
}
}
#[test]
fn test_std_mutex_poison_on_handler_registration() {
let m: Arc<StdMutex<i32>> = Arc::new(StdMutex::new(0));
let m_panic = Arc::clone(&m);
let handle = std::thread::spawn(move || {
let _guard = m_panic.lock().expect("first lock");
panic!("poison me");
});
assert!(handle.join().is_err());
let err = m.lock().expect_err("mutex must be poisoned");
let _inner = err.into_inner();
}
#[tokio::test]
async fn general_on_any_fallback_vs_no_handler_warn() {
let (tx, rx) = mpsc::channel::<Event>(2);
let mut bus = EventBus::new(rx);
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "kind-x", "id1");
let got = ack.await.unwrap();
assert!(matches!(got, Err(BlockError::Bus(_))));
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
let (tx, rx) = mpsc::channel::<Event>(2);
let mut bus = EventBus::new(rx);
let any_calls = Arc::new(AtomicUsize::new(0));
bus.on_any(Arc::new(RecordingHandler {
label: "any",
calls: any_calls.clone(),
}))
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "anything", "id1");
assert_eq!(ack.await.unwrap().unwrap(), Value::String("any".into()));
assert_eq!(any_calls.load(Ordering::SeqCst), 1);
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn general_specialized_wins_over_on_any() {
let (tx, rx) = mpsc::channel::<Event>(2);
let mut bus = EventBus::new(rx);
let spec_calls = Arc::new(AtomicUsize::new(0));
let any_calls = Arc::new(AtomicUsize::new(0));
bus.on(
"k",
Arc::new(RecordingHandler {
label: "spec",
calls: spec_calls.clone(),
}),
)
.unwrap();
bus.on_any(Arc::new(RecordingHandler {
label: "any",
calls: any_calls.clone(),
}))
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "k", "e1");
let got = ack.await.unwrap().unwrap();
assert_eq!(got, Value::String("spec".into()));
assert_eq!(spec_calls.load(Ordering::SeqCst), 1);
assert_eq!(any_calls.load(Ordering::SeqCst), 0);
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn general_handler_error_ack_and_loop_continues() {
let (tx, rx) = mpsc::channel::<Event>(4);
let mut bus = EventBus::new(rx);
bus.on("err", Arc::new(ErrHandler)).unwrap();
let ok_calls = Arc::new(AtomicUsize::new(0));
bus.on(
"ok",
Arc::new(RecordingHandler {
label: "ok",
calls: ok_calls.clone(),
}),
)
.unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move { bus.run(token_clone).await });
let ack = send_event(&tx, "err", "e1");
let got = ack.await.unwrap();
match got {
Err(BlockError::Bus(msg)) => assert_eq!(msg, "x"),
other => panic!("expected Bus err 'x', got {other:?}"),
}
let ack = send_event(&tx, "ok", "e2");
assert_eq!(ack.await.unwrap().unwrap(), Value::String("ok".into()));
assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
token.cancel();
drop(tx);
handle.await.unwrap().unwrap();
}
}