1use std::collections::HashMap;
40use std::sync::Arc;
41
42use async_trait::async_trait;
43use serde_json::Value;
44use tokio::sync::mpsc;
45use tokio_util::sync::CancellationToken;
46
47use crate::bus::event::{AckResult, Event};
48use agent_block_types::error::BlockError;
49
50#[async_trait]
62pub trait Handler: Send + Sync + 'static {
63 async fn call(&self, kind: String, id: String, payload: Value, meta: Value) -> AckResult;
68}
69
70pub type HandlerKey = Arc<dyn Handler>;
76
77pub struct EventBus {
79 rx: mpsc::Receiver<Event>,
83 handlers: HashMap<String, HandlerKey>,
86 any: Option<HandlerKey>,
88 running: bool,
91}
92
93impl EventBus {
94 pub fn new(rx: mpsc::Receiver<Event>) -> Self {
97 Self {
98 rx,
99 handlers: HashMap::new(),
100 any: None,
101 running: false,
102 }
103 }
104
105 pub fn on(&mut self, kind: impl Into<String>, handler: HandlerKey) -> Result<(), BlockError> {
112 if self.running {
113 return Err(BlockError::Bus(
114 "bus.on cannot be called after bus.serve() has started".into(),
115 ));
116 }
117 let kind = kind.into();
118 if self.handlers.insert(kind.clone(), handler).is_some() {
119 tracing::warn!(kind = %kind, "bus.on: duplicate registration (last-write-wins)");
120 }
121 Ok(())
122 }
123
124 pub fn on_any(&mut self, handler: HandlerKey) -> Result<(), BlockError> {
127 if self.running {
128 return Err(BlockError::Bus(
129 "bus.on_any cannot be called after bus.serve() has started".into(),
130 ));
131 }
132 if self.any.is_some() {
133 tracing::warn!("bus.on_any: duplicate registration (last-write-wins)");
134 }
135 self.any = Some(handler);
136 Ok(())
137 }
138
139 #[cfg(test)]
142 fn handler_count(&self) -> usize {
143 self.handlers.len()
144 }
145
146 pub async fn run(&mut self, shutdown: CancellationToken) -> Result<(), BlockError> {
153 self.running = true;
154 tracing::info!("bus: dispatcher loop starting");
155 loop {
156 tokio::select! {
157 biased;
158 _ = shutdown.cancelled() => {
159 tracing::info!("bus: shutdown signalled; closing receiver");
160 self.rx.close();
161 break;
162 }
163 maybe_evt = self.rx.recv() => {
164 let Some(evt) = maybe_evt else {
165 tracing::info!("bus: all senders dropped; exiting loop");
166 break;
167 };
168 self.dispatch(evt).await;
169 }
170 }
171 }
172 tracing::info!("bus: dispatcher loop exited");
173 Ok(())
174 }
175
176 async fn dispatch(&self, mut evt: Event) {
179 let handler = self
180 .handlers
181 .get(&evt.kind)
182 .cloned()
183 .or_else(|| self.any.clone());
184
185 let Some(handler) = handler else {
186 tracing::warn!(kind = %evt.kind, id = %evt.id, "bus: no handler for event; nacking");
187 let err = BlockError::Bus(format!("no handler for kind `{}`", evt.kind));
188 if let Err(e) = evt.deliver_ack(Err(err)) {
189 tracing::warn!(kind = %evt.kind, id = %evt.id, error = %e, "bus: failed to deliver nack");
190 }
191 return;
192 };
193
194 let kind = evt.kind.clone();
195 let id = evt.id.clone();
196 let payload = evt.payload.clone();
197 let meta = evt.meta.clone();
198
199 let join = tokio::spawn(async move { handler.call(kind, id, payload, meta).await });
204
205 let result: AckResult = match join.await {
206 Ok(ack) => ack,
207 Err(join_err) => {
208 let msg = if join_err.is_panic() {
209 panic_message(join_err.into_panic())
210 } else {
211 format!("handler task error: {join_err}")
212 };
213 tracing::error!(
214 kind = %evt.kind,
215 id = %evt.id,
216 "bus: handler panicked: {}",
217 msg
218 );
219 Err(BlockError::Bus(format!("handler panic: {msg}")))
220 }
221 };
222
223 if let Err(ref e) = result {
224 tracing::warn!(kind = %evt.kind, id = %evt.id, error = %e, "bus: handler returned error");
225 }
226
227 if let Err(e) = evt.deliver_ack(result) {
228 tracing::warn!(kind = %evt.kind, id = %evt.id, error = %e, "bus: ack delivery failed");
229 }
230 }
231}
232
233fn panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
237 if let Some(s) = payload.downcast_ref::<&'static str>() {
238 return (*s).to_string();
239 }
240 if let Some(s) = payload.downcast_ref::<String>() {
241 return s.clone();
242 }
243 "<non-string panic payload>".to_string()
244}
245
246#[cfg(test)]
251mod tests {
252 use super::*;
253 use serde_json::json;
254 use std::sync::atomic::{AtomicUsize, Ordering};
255 use std::sync::Mutex as StdMutex;
256 use std::time::Duration;
257 use tokio::sync::{oneshot, Mutex as TokioMutex};
258
259 struct RecordingHandler {
261 label: &'static str,
262 calls: Arc<AtomicUsize>,
263 }
264
265 #[async_trait]
266 impl Handler for RecordingHandler {
267 async fn call(
268 &self,
269 _kind: String,
270 _id: String,
271 _payload: Value,
272 _meta: Value,
273 ) -> AckResult {
274 self.calls.fetch_add(1, Ordering::SeqCst);
275 Ok(Value::String(self.label.to_string()))
276 }
277 }
278
279 struct PanickingHandler;
280
281 #[async_trait]
282 impl Handler for PanickingHandler {
283 async fn call(
284 &self,
285 _kind: String,
286 _id: String,
287 _payload: Value,
288 _meta: Value,
289 ) -> AckResult {
290 panic!("boom");
291 }
292 }
293
294 fn send_event(tx: &mpsc::Sender<Event>, kind: &str, id: &str) -> oneshot::Receiver<AckResult> {
295 let (evt, rx) = Event::with_ack(kind, id, json!({"hello": "world"}), Value::Null);
296 tx.try_send(evt).expect("mpsc send");
297 rx
298 }
299
300 #[tokio::test]
301 async fn kind_specific_dispatch_hits_specialized_handler() {
302 let (tx, rx) = mpsc::channel::<Event>(4);
303 let mut bus = EventBus::new(rx);
304 let mesh_calls = Arc::new(AtomicUsize::new(0));
305 let any_calls = Arc::new(AtomicUsize::new(0));
306 bus.on(
307 "mesh",
308 Arc::new(RecordingHandler {
309 label: "mesh",
310 calls: mesh_calls.clone(),
311 }),
312 )
313 .unwrap();
314 bus.on_any(Arc::new(RecordingHandler {
315 label: "any",
316 calls: any_calls.clone(),
317 }))
318 .unwrap();
319
320 let token = CancellationToken::new();
321 let token_clone = token.clone();
322 let handle = tokio::spawn(async move { bus.run(token_clone).await });
323
324 let ack = send_event(&tx, "mesh", "e1");
325 let got = ack.await.unwrap().unwrap();
326 assert_eq!(got, Value::String("mesh".into()));
327 assert_eq!(mesh_calls.load(Ordering::SeqCst), 1);
328 assert_eq!(any_calls.load(Ordering::SeqCst), 0);
329
330 token.cancel();
331 drop(tx);
332 handle.await.unwrap().unwrap();
333 }
334
335 #[tokio::test]
336 async fn on_any_fallback_fires_only_when_no_match() {
337 let (tx, rx) = mpsc::channel::<Event>(4);
338 let mut bus = EventBus::new(rx);
339 let any_calls = Arc::new(AtomicUsize::new(0));
340 bus.on_any(Arc::new(RecordingHandler {
341 label: "any",
342 calls: any_calls.clone(),
343 }))
344 .unwrap();
345
346 let token = CancellationToken::new();
347 let token_clone = token.clone();
348 let handle = tokio::spawn(async move { bus.run(token_clone).await });
349
350 let ack = send_event(&tx, "unknown_kind", "e1");
351 let got = ack.await.unwrap().unwrap();
352 assert_eq!(got, Value::String("any".into()));
353 assert_eq!(any_calls.load(Ordering::SeqCst), 1);
354
355 token.cancel();
356 drop(tx);
357 handle.await.unwrap().unwrap();
358 }
359
360 #[tokio::test]
361 async fn no_handler_produces_nack() {
362 let (tx, rx) = mpsc::channel::<Event>(4);
363 let mut bus = EventBus::new(rx);
364 let token = CancellationToken::new();
367 let token_clone = token.clone();
368 let handle = tokio::spawn(async move { bus.run(token_clone).await });
369
370 let ack = send_event(&tx, "mesh", "e1");
371 let got = ack.await.unwrap();
372 match got {
373 Err(BlockError::Bus(msg)) => {
374 assert!(msg.contains("no handler"), "unexpected msg: {msg}");
375 }
376 other => panic!("expected Bus err, got {other:?}"),
377 }
378
379 token.cancel();
380 drop(tx);
381 handle.await.unwrap().unwrap();
382 }
383
384 #[tokio::test]
385 async fn shutdown_token_breaks_loop() {
386 let (tx, rx) = mpsc::channel::<Event>(4);
387 let mut bus = EventBus::new(rx);
388 bus.on_any(Arc::new(RecordingHandler {
389 label: "any",
390 calls: Arc::new(AtomicUsize::new(0)),
391 }))
392 .unwrap();
393
394 let token = CancellationToken::new();
395 let token_clone = token.clone();
396 let handle = tokio::spawn(async move { bus.run(token_clone).await });
397
398 token.cancel();
399 let res = tokio::time::timeout(Duration::from_millis(500), handle)
401 .await
402 .expect("timeout");
403 res.unwrap().unwrap();
404 drop(tx);
405 }
406
407 #[tokio::test]
408 async fn handler_panic_is_isolated_and_loop_continues() {
409 let (tx, rx) = mpsc::channel::<Event>(4);
410 let mut bus = EventBus::new(rx);
411 let ok_calls = Arc::new(AtomicUsize::new(0));
412 bus.on("boom", Arc::new(PanickingHandler)).unwrap();
413 bus.on(
414 "ok",
415 Arc::new(RecordingHandler {
416 label: "ok",
417 calls: ok_calls.clone(),
418 }),
419 )
420 .unwrap();
421
422 let token = CancellationToken::new();
423 let token_clone = token.clone();
424 let handle = tokio::spawn(async move { bus.run(token_clone).await });
425
426 let ack = send_event(&tx, "boom", "e1");
428 let got = ack.await.unwrap();
429 match got {
430 Err(BlockError::Bus(msg)) => {
431 assert!(
432 msg.contains("panic") || msg.contains("boom"),
433 "unexpected msg: {msg}"
434 );
435 }
436 other => panic!("expected Bus err, got {other:?}"),
437 }
438
439 let ack = send_event(&tx, "ok", "e2");
441 let got = ack.await.unwrap().unwrap();
442 assert_eq!(got, Value::String("ok".into()));
443 assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
444
445 token.cancel();
446 drop(tx);
447 handle.await.unwrap().unwrap();
448 }
449
450 #[tokio::test]
451 async fn bounded_mpsc_applies_backpressure_not_drop() {
452 let (tx, rx) = mpsc::channel::<Event>(1);
456 let mut bus = EventBus::new(rx);
457 bus.on(
458 "slow",
459 Arc::new(RecordingHandler {
460 label: "slow",
461 calls: Arc::new(AtomicUsize::new(0)),
462 }),
463 )
464 .unwrap();
465
466 let (evt1, _ack1_rx) = Event::with_ack("slow", "e1", json!({}), Value::Null);
468 tx.try_send(evt1).expect("first send fits capacity 1");
469
470 let (evt2, _ack2_rx) = Event::with_ack("slow", "e2", json!({}), Value::Null);
471 let err = tx.try_send(evt2).unwrap_err();
472 assert!(
473 matches!(err, mpsc::error::TrySendError::Full(_)),
474 "expected Full, got {err:?}"
475 );
476
477 let token = CancellationToken::new();
479 let token_clone = token.clone();
480 let handle = tokio::spawn(async move { bus.run(token_clone).await });
481 tokio::time::sleep(Duration::from_millis(50)).await;
483 token.cancel();
484 drop(tx);
485 handle.await.unwrap().unwrap();
486 }
487
488 #[tokio::test]
489 async fn on_after_running_returns_err() {
490 let (_tx, rx) = mpsc::channel::<Event>(1);
491 let mut bus = EventBus::new(rx);
492 bus.running = true;
494 let err = bus
495 .on(
496 "mesh",
497 Arc::new(RecordingHandler {
498 label: "x",
499 calls: Arc::new(AtomicUsize::new(0)),
500 }),
501 )
502 .unwrap_err();
503 match err {
504 BlockError::Bus(msg) => assert!(msg.contains("bus.on")),
505 other => panic!("expected Bus err, got {other:?}"),
506 }
507 let err = bus
508 .on_any(Arc::new(RecordingHandler {
509 label: "x",
510 calls: Arc::new(AtomicUsize::new(0)),
511 }))
512 .unwrap_err();
513 match err {
514 BlockError::Bus(msg) => assert!(msg.contains("bus.on_any")),
515 other => panic!("expected Bus err, got {other:?}"),
516 }
517 assert_eq!(bus.handler_count(), 0);
518 }
519
520 #[tokio::test]
521 async fn duplicate_on_is_last_write_wins() {
522 let (tx, rx) = mpsc::channel::<Event>(2);
523 let mut bus = EventBus::new(rx);
524 let first_calls = Arc::new(AtomicUsize::new(0));
525 let second_calls = Arc::new(AtomicUsize::new(0));
526 bus.on(
527 "mesh",
528 Arc::new(RecordingHandler {
529 label: "first",
530 calls: first_calls.clone(),
531 }),
532 )
533 .unwrap();
534 bus.on(
535 "mesh",
536 Arc::new(RecordingHandler {
537 label: "second",
538 calls: second_calls.clone(),
539 }),
540 )
541 .unwrap();
542 assert_eq!(bus.handler_count(), 1);
543
544 let token = CancellationToken::new();
545 let token_clone = token.clone();
546 let handle = tokio::spawn(async move { bus.run(token_clone).await });
547
548 let ack = send_event(&tx, "mesh", "e1");
549 let got = ack.await.unwrap().unwrap();
550 assert_eq!(got, Value::String("second".into()));
551 assert_eq!(first_calls.load(Ordering::SeqCst), 0);
552 assert_eq!(second_calls.load(Ordering::SeqCst), 1);
553
554 token.cancel();
555 drop(tx);
556 handle.await.unwrap().unwrap();
557 }
558
559 struct OrderingHandler {
566 order: Arc<StdMutex<Vec<String>>>,
567 delay: Duration,
568 }
569
570 #[async_trait]
571 impl Handler for OrderingHandler {
572 async fn call(
573 &self,
574 _kind: String,
575 id: String,
576 _payload: Value,
577 _meta: Value,
578 ) -> AckResult {
579 tokio::time::sleep(self.delay).await;
580 self.order.lock().expect("order mutex").push(id.clone());
582 Ok(Value::String(id))
583 }
584 }
585
586 struct ErrHandler;
589
590 #[async_trait]
591 impl Handler for ErrHandler {
592 async fn call(
593 &self,
594 _kind: String,
595 _id: String,
596 _payload: Value,
597 _meta: Value,
598 ) -> AckResult {
599 Err(BlockError::Bus("x".into()))
600 }
601 }
602
603 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
606 async fn test_bus_event_serialization_arrival_order() {
607 const N: usize = 20;
608 let (tx, rx) = mpsc::channel::<Event>(N);
609 let mut bus = EventBus::new(rx);
610 let order = Arc::new(StdMutex::new(Vec::new()));
611 bus.on(
612 "k",
613 Arc::new(OrderingHandler {
614 order: Arc::clone(&order),
615 delay: Duration::from_millis(5),
618 }),
619 )
620 .unwrap();
621
622 let token = CancellationToken::new();
623 let token_clone = token.clone();
624 let handle = tokio::spawn(async move { bus.run(token_clone).await });
625
626 let mut expected = Vec::with_capacity(N);
627 let mut acks = Vec::with_capacity(N);
628 for i in 0..N {
629 let id = format!("e{i}");
630 expected.push(id.clone());
631 let (evt, rx) = Event::with_ack("k", id, json!({}), Value::Null);
632 tx.send(evt).await.expect("send");
633 acks.push(rx);
634 }
635
636 for (i, ack) in acks.into_iter().enumerate() {
638 let got = ack.await.expect("ack recv").expect("ack ok");
639 assert_eq!(got, Value::String(format!("e{i}")));
640 }
641
642 token.cancel();
643 drop(tx);
644 handle.await.unwrap().unwrap();
645
646 let recorded = order.lock().unwrap().clone();
647 assert_eq!(recorded, expected, "dispatcher must preserve arrival order");
648 }
649
650 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
652 async fn test_bus_graceful_shutdown_within_grace_ms() {
653 let (tx, rx) = mpsc::channel::<Event>(4);
654 let mut bus = EventBus::new(rx);
655 bus.on_any(Arc::new(RecordingHandler {
656 label: "any",
657 calls: Arc::new(AtomicUsize::new(0)),
658 }))
659 .unwrap();
660
661 let token = CancellationToken::new();
662 let token_clone = token.clone();
663 let handle = tokio::spawn(async move { bus.run(token_clone).await });
664
665 token.cancel();
667 let res = tokio::time::timeout(Duration::from_millis(1500), handle)
671 .await
672 .expect("bus.run must exit within grace window");
673 res.unwrap().unwrap();
674 drop(tx);
675 }
676
677 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
682 async fn test_bus_handler_panic_isolation_catch_unwind() {
683 let (tx, rx) = mpsc::channel::<Event>(4);
684 let mut bus = EventBus::new(rx);
685 let ok_calls = Arc::new(AtomicUsize::new(0));
686 bus.on("crash", Arc::new(PanickingHandler)).unwrap();
687 bus.on(
688 "normal",
689 Arc::new(RecordingHandler {
690 label: "normal",
691 calls: ok_calls.clone(),
692 }),
693 )
694 .unwrap();
695
696 let token = CancellationToken::new();
697 let token_clone = token.clone();
698 let handle = tokio::spawn(async move { bus.run(token_clone).await });
699
700 let ack = send_event(&tx, "crash", "e1");
702 let got = ack.await.unwrap();
703 assert!(matches!(got, Err(BlockError::Bus(_))), "panic must NACK");
704
705 let ack = send_event(&tx, "normal", "e2");
707 let got = ack.await.unwrap().unwrap();
708 assert_eq!(got, Value::String("normal".into()));
709 assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
710
711 token.cancel();
712 drop(tx);
713 handle.await.unwrap().unwrap();
714 }
715
716 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
721 async fn test_bus_backpressure_bounded_mpsc_capacity() {
722 let (tx, rx) = mpsc::channel::<Event>(1);
723 let mut bus = EventBus::new(rx);
724 let calls = Arc::new(AtomicUsize::new(0));
725 bus.on(
726 "k",
727 Arc::new(RecordingHandler {
728 label: "k",
729 calls: calls.clone(),
730 }),
731 )
732 .unwrap();
733
734 let (evt1, _r1) = Event::with_ack("k", "e1", json!({}), Value::Null);
736 tx.try_send(evt1).expect("first send fits");
737 let (evt2, _r2) = Event::with_ack("k", "e2", json!({}), Value::Null);
738 let err = tx.try_send(evt2).expect_err("capacity full");
739 assert!(
740 matches!(err, mpsc::error::TrySendError::Full(_)),
741 "expected Full (not drop), got {err:?}"
742 );
743
744 let token = CancellationToken::new();
747 let token_clone = token.clone();
748 let handle = tokio::spawn(async move { bus.run(token_clone).await });
749
750 let (evt3, r3) = Event::with_ack("k", "e3", json!({}), Value::Null);
751 tx.send(evt3).await.expect("send e3");
752 let (evt4, r4) = Event::with_ack("k", "e4", json!({}), Value::Null);
753 tx.send(evt4).await.expect("send e4");
754
755 r3.await.unwrap().unwrap();
759 r4.await.unwrap().unwrap();
760
761 token.cancel();
762 drop(tx);
763 handle.await.unwrap().unwrap();
764 assert!(calls.load(Ordering::SeqCst) >= 2);
765 }
766
767 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
772 async fn test_bus_oneshot_ack_timeout_30s() {
773 let (tx, rx) = oneshot::channel::<AckResult>();
776 drop(tx);
777 let start = tokio::time::Instant::now();
778 let got = tokio::time::timeout(Duration::from_secs(30), rx)
779 .await
780 .expect("should not hit 30s timeout");
781 assert!(got.is_err(), "expected RecvError, got {got:?}");
782 assert!(
783 start.elapsed() < Duration::from_secs(1),
784 "sender-drop must short-circuit the 30s timeout"
785 );
786 }
787
788 #[cfg(unix)]
793 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
794 async fn test_bus_sigterm_sigint_race_select() {
795 use tokio::signal::unix::{signal, SignalKind};
796
797 let token = CancellationToken::new();
798 let token_for_task = token.clone();
799 let task = tokio::spawn(async move {
800 let mut term = signal(SignalKind::terminate()).expect("install SIGTERM");
801 tokio::select! {
802 _ = term.recv() => token_for_task.cancel(),
803 _ = tokio::signal::ctrl_c() => token_for_task.cancel(),
804 }
805 });
806
807 tokio::time::sleep(Duration::from_millis(100)).await;
810
811 nix::sys::signal::kill(nix::unistd::Pid::this(), nix::sys::signal::Signal::SIGTERM)
813 .expect("kill(SIGTERM)");
814
815 tokio::time::timeout(Duration::from_secs(2), token.cancelled())
816 .await
817 .expect("cancel must fire within 2s after SIGTERM");
818 task.await.expect("signal task");
819 }
820
821 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
825 async fn test_bus_arc_tokio_mutex_no_await_while_held() {
826 let (_tx, rx) = mpsc::channel::<Event>(1);
827 let shared: Arc<TokioMutex<EventBus>> = Arc::new(TokioMutex::new(EventBus::new(rx)));
828
829 let a = Arc::clone(&shared);
830 let t1 = tokio::spawn(async move {
831 let guard = a.lock().await;
832 let _ = guard.handler_count();
834 drop(guard);
836 tokio::time::sleep(Duration::from_millis(10)).await;
837 });
838
839 let b = Arc::clone(&shared);
840 let t2 = tokio::spawn(async move {
841 let guard = b.lock().await;
842 let _ = guard.handler_count();
843 drop(guard);
844 });
845
846 tokio::time::timeout(Duration::from_secs(2), async {
848 t1.await.unwrap();
849 t2.await.unwrap();
850 })
851 .await
852 .expect("no deadlock");
853 }
854
855 #[test]
861 fn test_bus_catch_unwind_paniceq_abort_not_caught() {
862 use std::panic::{catch_unwind, AssertUnwindSafe};
863
864 let ok = catch_unwind(|| 42);
866 assert_eq!(ok.ok(), Some(42));
867
868 let mut v = 0i32;
873 let caught = catch_unwind(AssertUnwindSafe(|| {
874 v += 1;
875 panic!("boom");
876 }));
877 assert!(caught.is_err(), "expected caught panic");
878 assert_eq!(v, 1, "side effect before panic still observable");
879
880 }
885
886 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
889 async fn test_bus_spawn_signal_task_cancellation() {
890 let shutdown = CancellationToken::new();
891 let shutdown_clone = shutdown.clone();
892 let task = tokio::spawn(async move {
893 shutdown_clone.cancelled().await;
895 });
896
897 task.abort();
899 let res = task.await;
900 match res {
901 Err(e) => assert!(e.is_cancelled(), "expected cancelled JoinError, got {e:?}"),
902 Ok(()) => panic!("task should have been cancelled before completing"),
903 }
904
905 assert!(!shutdown.is_cancelled());
909 }
910
911 #[tokio::test(flavor = "current_thread", start_paused = true)]
915 async fn test_bus_timeout_ack_expiry_30s_match() {
916 let (_tx, rx) = oneshot::channel::<AckResult>();
917 let fut = tokio::time::timeout(Duration::from_secs(30), rx);
918 tokio::pin!(fut);
919
920 tokio::time::advance(Duration::from_secs(29)).await;
922 assert!(
923 futures_poll_once(&mut fut).is_none(),
924 "timeout must not fire before 30s"
925 );
926
927 tokio::time::advance(Duration::from_secs(2)).await;
929 let got = (&mut fut).await;
930 assert!(got.is_err(), "expected Elapsed, got {got:?}");
931 }
932
933 fn futures_poll_once<F: std::future::Future>(
936 fut: &mut std::pin::Pin<&mut F>,
937 ) -> Option<F::Output> {
938 use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
939
940 fn raw_waker() -> RawWaker {
942 fn no_op(_: *const ()) {}
943 fn clone(_: *const ()) -> RawWaker {
944 raw_waker()
945 }
946 static VT: RawWakerVTable = RawWakerVTable::new(clone, no_op, no_op, no_op);
947 RawWaker::new(std::ptr::null(), &VT)
948 }
949 let waker = unsafe { Waker::from_raw(raw_waker()) };
955 let mut cx = Context::from_waker(&waker);
956 match fut.as_mut().poll(&mut cx) {
957 Poll::Ready(v) => Some(v),
958 Poll::Pending => None,
959 }
960 }
961
962 #[test]
968 fn test_std_mutex_poison_on_handler_registration() {
969 let m: Arc<StdMutex<i32>> = Arc::new(StdMutex::new(0));
970 let m_panic = Arc::clone(&m);
971 let handle = std::thread::spawn(move || {
972 let _guard = m_panic.lock().expect("first lock");
973 panic!("poison me");
974 });
975 assert!(handle.join().is_err());
977
978 let err = m.lock().expect_err("mutex must be poisoned");
979 let _inner = err.into_inner();
984 }
985
986 #[tokio::test]
993 async fn general_on_any_fallback_vs_no_handler_warn() {
994 let (tx, rx) = mpsc::channel::<Event>(2);
996 let mut bus = EventBus::new(rx);
997 let token = CancellationToken::new();
998 let token_clone = token.clone();
999 let handle = tokio::spawn(async move { bus.run(token_clone).await });
1000 let ack = send_event(&tx, "kind-x", "id1");
1001 let got = ack.await.unwrap();
1002 assert!(matches!(got, Err(BlockError::Bus(_))));
1003 token.cancel();
1004 drop(tx);
1005 handle.await.unwrap().unwrap();
1006
1007 let (tx, rx) = mpsc::channel::<Event>(2);
1009 let mut bus = EventBus::new(rx);
1010 let any_calls = Arc::new(AtomicUsize::new(0));
1011 bus.on_any(Arc::new(RecordingHandler {
1012 label: "any",
1013 calls: any_calls.clone(),
1014 }))
1015 .unwrap();
1016 let token = CancellationToken::new();
1017 let token_clone = token.clone();
1018 let handle = tokio::spawn(async move { bus.run(token_clone).await });
1019 let ack = send_event(&tx, "anything", "id1");
1020 assert_eq!(ack.await.unwrap().unwrap(), Value::String("any".into()));
1021 assert_eq!(any_calls.load(Ordering::SeqCst), 1);
1022 token.cancel();
1023 drop(tx);
1024 handle.await.unwrap().unwrap();
1025 }
1026
1027 #[tokio::test]
1030 async fn general_specialized_wins_over_on_any() {
1031 let (tx, rx) = mpsc::channel::<Event>(2);
1032 let mut bus = EventBus::new(rx);
1033 let spec_calls = Arc::new(AtomicUsize::new(0));
1034 let any_calls = Arc::new(AtomicUsize::new(0));
1035 bus.on(
1036 "k",
1037 Arc::new(RecordingHandler {
1038 label: "spec",
1039 calls: spec_calls.clone(),
1040 }),
1041 )
1042 .unwrap();
1043 bus.on_any(Arc::new(RecordingHandler {
1044 label: "any",
1045 calls: any_calls.clone(),
1046 }))
1047 .unwrap();
1048
1049 let token = CancellationToken::new();
1050 let token_clone = token.clone();
1051 let handle = tokio::spawn(async move { bus.run(token_clone).await });
1052
1053 let ack = send_event(&tx, "k", "e1");
1054 let got = ack.await.unwrap().unwrap();
1055 assert_eq!(got, Value::String("spec".into()));
1056 assert_eq!(spec_calls.load(Ordering::SeqCst), 1);
1057 assert_eq!(any_calls.load(Ordering::SeqCst), 0);
1058
1059 token.cancel();
1060 drop(tx);
1061 handle.await.unwrap().unwrap();
1062 }
1063
1064 #[tokio::test]
1068 async fn general_handler_error_ack_and_loop_continues() {
1069 let (tx, rx) = mpsc::channel::<Event>(4);
1070 let mut bus = EventBus::new(rx);
1071 bus.on("err", Arc::new(ErrHandler)).unwrap();
1072 let ok_calls = Arc::new(AtomicUsize::new(0));
1073 bus.on(
1074 "ok",
1075 Arc::new(RecordingHandler {
1076 label: "ok",
1077 calls: ok_calls.clone(),
1078 }),
1079 )
1080 .unwrap();
1081
1082 let token = CancellationToken::new();
1083 let token_clone = token.clone();
1084 let handle = tokio::spawn(async move { bus.run(token_clone).await });
1085
1086 let ack = send_event(&tx, "err", "e1");
1087 let got = ack.await.unwrap();
1088 match got {
1089 Err(BlockError::Bus(msg)) => assert_eq!(msg, "x"),
1090 other => panic!("expected Bus err 'x', got {other:?}"),
1091 }
1092
1093 let ack = send_event(&tx, "ok", "e2");
1094 assert_eq!(ack.await.unwrap().unwrap(), Value::String("ok".into()));
1095 assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
1096
1097 token.cancel();
1098 drop(tx);
1099 handle.await.unwrap().unwrap();
1100 }
1101}