Skip to main content

agent_block_core/bus/
dispatcher.rs

1//! EventBus dispatcher.
2//!
3//! Single-task, single-loop dispatcher. Events arrive on a bounded
4//! `mpsc::Receiver<Event>` and are dispatched serially to the registered
5//! handler (kind-specific first, then `any` fallback, otherwise the event
6//! is NACK'd with a `BlockError::Bus` and a `tracing::warn!`).
7//!
8//! ## Concurrency primitives in use (see `concurrency-analysis.md` §1)
9//!
10//! - `tokio::sync::mpsc` (bounded) for ingress — backpressure rather than
11//!   drop
12//! - `tokio::sync::oneshot` for ack (carried on the [`Event`] itself)
13//! - `tokio_util::sync::CancellationToken` for cooperative shutdown
14//! - `tokio::select!` races `rx.recv()` against `shutdown.cancelled()`
15//! - `tokio::task::spawn` + awaited `JoinHandle` is used to invoke
16//!   handlers. A panic inside the handler surfaces as
17//!   `JoinError::is_panic()` on the join handle, which the dispatcher
18//!   converts into a `BlockError::Bus` ack — no panic propagates out of
19//!   the loop. The dispatcher awaits the join handle immediately, so
20//!   handlers still run serially.
21//!
22//! ## Serial guarantee
23//!
24//! The loop awaits each handler to completion before pulling the next
25//! event. This gives Lua handlers the cooperative-serial model described
26//! in `plan.md` (§設計選択 A1 + mlua-isle single-thread VM).
27//!
28//! ## Shutdown policy
29//!
30//! On cancel:
31//! 1. The `tokio::select!` picks the cancel branch.
32//! 2. We `rx.close()` — further send attempts by sources will fail fast.
33//! 3. In-flight handler (if any) is *not* pre-empted; the current
34//!    iteration finishes its await. The loop then exits.
35//! 4. Events that had queued into the mpsc buffer are **not drained** —
36//!    their ack senders drop, and callers see `RecvError` on their
37//!    oneshot (documented in `plan.md` Risks).
38
39use std::collections::HashMap;
40use std::sync::Arc;
41
42use async_trait::async_trait;
43use serde_json::Value;
44use tokio::sync::mpsc;
45use tokio_util::sync::CancellationToken;
46
47use crate::bus::event::{AckResult, Event};
48use agent_block_types::error::BlockError;
49
50/// Callable target for a registered handler.
51///
52/// Subtask 1 uses a trait object as a placeholder; Subtask 3 will plug in
53/// an `mlua::RegistryKey`-backed implementation that dispatches into the
54/// Isle Lua thread. The trait contract here intentionally mirrors what
55/// Subtask 3 needs: take an owned [`Event`] (minus its `ack_tx`, which is
56/// managed by the dispatcher), return a result that becomes the ack.
57///
58/// `'static` bound is required because the dispatcher invokes handlers via
59/// `tokio::task::spawn`, which requires the future to outlive any
60/// references captured on the current stack.
61#[async_trait]
62pub trait Handler: Send + Sync + 'static {
63    /// Invoke the handler with the event's kind/id/payload/meta.
64    ///
65    /// The handler implementation itself does not touch `ack_tx`. The
66    /// dispatcher takes care of delivering the result on the oneshot.
67    async fn call(&self, kind: String, id: String, payload: Value, meta: Value) -> AckResult;
68}
69
70/// Boxed handler reference used inside [`EventBus`].
71///
72/// Named `HandlerKey` to stay aligned with `subtask-1.md` §Design,
73/// where the field is called `HandlerKey`. In Subtask 3 this type will be
74/// replaced with a concrete struct wrapping `mlua::RegistryKey`.
75pub type HandlerKey = Arc<dyn Handler>;
76
77/// The serial event dispatcher.
78pub struct EventBus {
79    /// Ingress queue. Sources push events into the paired `Sender` (held
80    /// outside). Capacity is configured by the caller (default comes from
81    /// `AGENT_BLOCK_BUS_CAPACITY`, wired in Subtask 2).
82    rx: mpsc::Receiver<Event>,
83    /// kind -> handler. Populated via [`EventBus::on`] before [`EventBus::run`]
84    /// is awaited.
85    handlers: HashMap<String, HandlerKey>,
86    /// Fallback handler — fires only when no `handlers[kind]` is present.
87    any: Option<HandlerKey>,
88    /// Set once when `run` begins. Used to reject `on` / `on_any` calls
89    /// after dispatcher start (see plan.md §Constraints).
90    running: bool,
91}
92
93impl EventBus {
94    /// Build a new bus from an mpsc receiver. The paired `Sender` must be
95    /// held by callers (and shared to sources) so they can push events.
96    pub fn new(rx: mpsc::Receiver<Event>) -> Self {
97        Self {
98            rx,
99            handlers: HashMap::new(),
100            any: None,
101            running: false,
102        }
103    }
104
105    /// Register a kind-specific handler. Last write wins — re-registering
106    /// the same `kind` silently replaces the previous handler (documented
107    /// in plan.md §Phase 3 / wf-sim Counter-WF).
108    ///
109    /// Returns `Err(BlockError::Bus)` if called after [`EventBus::run`] has
110    /// begun.
111    pub fn on(&mut self, kind: impl Into<String>, handler: HandlerKey) -> Result<(), BlockError> {
112        if self.running {
113            return Err(BlockError::Bus(
114                "bus.on cannot be called after bus.serve() has started".into(),
115            ));
116        }
117        let kind = kind.into();
118        if self.handlers.insert(kind.clone(), handler).is_some() {
119            tracing::warn!(kind = %kind, "bus.on: duplicate registration (last-write-wins)");
120        }
121        Ok(())
122    }
123
124    /// Register the `on_any` fallback. Invoked only when no `on(kind)`
125    /// handler matches the event's `kind`. NOT a fan-out/tap.
126    pub fn on_any(&mut self, handler: HandlerKey) -> Result<(), BlockError> {
127        if self.running {
128            return Err(BlockError::Bus(
129                "bus.on_any cannot be called after bus.serve() has started".into(),
130            ));
131        }
132        if self.any.is_some() {
133            tracing::warn!("bus.on_any: duplicate registration (last-write-wins)");
134        }
135        self.any = Some(handler);
136        Ok(())
137    }
138
139    /// Test-only accessor used by `#[cfg(test)] mod tests` to check the
140    /// table contents without exposing internals to the rest of the crate.
141    #[cfg(test)]
142    fn handler_count(&self) -> usize {
143        self.handlers.len()
144    }
145
146    /// Drive the dispatcher loop until `shutdown` is cancelled.
147    ///
148    /// Cancel-safety: `mpsc::Receiver::recv` is cancel-safe; dropping the
149    /// `select!` branch on the `recv` side loses no events (tokio docs).
150    /// `CancellationToken::cancelled` is explicitly designed for this
151    /// usage (tokio-util docs).
152    pub async fn run(&mut self, shutdown: CancellationToken) -> Result<(), BlockError> {
153        self.running = true;
154        tracing::info!("bus: dispatcher loop starting");
155        loop {
156            tokio::select! {
157                biased;
158                _ = shutdown.cancelled() => {
159                    tracing::info!("bus: shutdown signalled; closing receiver");
160                    self.rx.close();
161                    break;
162                }
163                maybe_evt = self.rx.recv() => {
164                    let Some(evt) = maybe_evt else {
165                        tracing::info!("bus: all senders dropped; exiting loop");
166                        break;
167                    };
168                    self.dispatch(evt).await;
169                }
170            }
171        }
172        tracing::info!("bus: dispatcher loop exited");
173        Ok(())
174    }
175
176    /// Dispatch a single event to the matching handler (or the fallback,
177    /// or nack).
178    async fn dispatch(&self, mut evt: Event) {
179        let handler = self
180            .handlers
181            .get(&evt.kind)
182            .cloned()
183            .or_else(|| self.any.clone());
184
185        let Some(handler) = handler else {
186            tracing::warn!(kind = %evt.kind, id = %evt.id, "bus: no handler for event; nacking");
187            let err = BlockError::Bus(format!("no handler for kind `{}`", evt.kind));
188            if let Err(e) = evt.deliver_ack(Err(err)) {
189                tracing::warn!(kind = %evt.kind, id = %evt.id, error = %e, "bus: failed to deliver nack");
190            }
191            return;
192        };
193
194        let kind = evt.kind.clone();
195        let id = evt.id.clone();
196        let payload = evt.payload.clone();
197        let meta = evt.meta.clone();
198
199        // Spawn the handler as its own task and await the `JoinHandle`.
200        // A panic inside the handler surfaces as `JoinError::is_panic()`
201        // and is converted into a `BlockError::Bus` ack — the dispatcher
202        // loop itself never panics (panic-in-product policy).
203        let join = tokio::spawn(async move { handler.call(kind, id, payload, meta).await });
204
205        let result: AckResult = match join.await {
206            Ok(ack) => ack,
207            Err(join_err) => {
208                let msg = if join_err.is_panic() {
209                    panic_message(join_err.into_panic())
210                } else {
211                    format!("handler task error: {join_err}")
212                };
213                tracing::error!(
214                    kind = %evt.kind,
215                    id = %evt.id,
216                    "bus: handler panicked: {}",
217                    msg
218                );
219                Err(BlockError::Bus(format!("handler panic: {msg}")))
220            }
221        };
222
223        if let Err(ref e) = result {
224            tracing::warn!(kind = %evt.kind, id = %evt.id, error = %e, "bus: handler returned error");
225        }
226
227        if let Err(e) = evt.deliver_ack(result) {
228            tracing::warn!(kind = %evt.kind, id = %evt.id, error = %e, "bus: ack delivery failed");
229        }
230    }
231}
232
233/// Best-effort extraction of a human-readable message from a panic
234/// payload. Returns `"<non-string panic payload>"` when the panic value
235/// is neither `&str` nor `String`.
236fn panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
237    if let Some(s) = payload.downcast_ref::<&'static str>() {
238        return (*s).to_string();
239    }
240    if let Some(s) = payload.downcast_ref::<String>() {
241        return s.clone();
242    }
243    "<non-string panic payload>".to_string()
244}
245
246// ---------------------------------------------------------------------------
247// Tests
248// ---------------------------------------------------------------------------
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use serde_json::json;
254    use std::sync::atomic::{AtomicUsize, Ordering};
255    use std::sync::Mutex as StdMutex;
256    use std::time::Duration;
257    use tokio::sync::{oneshot, Mutex as TokioMutex};
258
259    /// Test handler that records invocations and returns a fixed value.
260    struct RecordingHandler {
261        label: &'static str,
262        calls: Arc<AtomicUsize>,
263    }
264
265    #[async_trait]
266    impl Handler for RecordingHandler {
267        async fn call(
268            &self,
269            _kind: String,
270            _id: String,
271            _payload: Value,
272            _meta: Value,
273        ) -> AckResult {
274            self.calls.fetch_add(1, Ordering::SeqCst);
275            Ok(Value::String(self.label.to_string()))
276        }
277    }
278
279    struct PanickingHandler;
280
281    #[async_trait]
282    impl Handler for PanickingHandler {
283        async fn call(
284            &self,
285            _kind: String,
286            _id: String,
287            _payload: Value,
288            _meta: Value,
289        ) -> AckResult {
290            panic!("boom");
291        }
292    }
293
294    fn send_event(tx: &mpsc::Sender<Event>, kind: &str, id: &str) -> oneshot::Receiver<AckResult> {
295        let (evt, rx) = Event::with_ack(kind, id, json!({"hello": "world"}), Value::Null);
296        tx.try_send(evt).expect("mpsc send");
297        rx
298    }
299
300    #[tokio::test]
301    async fn kind_specific_dispatch_hits_specialized_handler() {
302        let (tx, rx) = mpsc::channel::<Event>(4);
303        let mut bus = EventBus::new(rx);
304        let mesh_calls = Arc::new(AtomicUsize::new(0));
305        let any_calls = Arc::new(AtomicUsize::new(0));
306        bus.on(
307            "mesh",
308            Arc::new(RecordingHandler {
309                label: "mesh",
310                calls: mesh_calls.clone(),
311            }),
312        )
313        .unwrap();
314        bus.on_any(Arc::new(RecordingHandler {
315            label: "any",
316            calls: any_calls.clone(),
317        }))
318        .unwrap();
319
320        let token = CancellationToken::new();
321        let token_clone = token.clone();
322        let handle = tokio::spawn(async move { bus.run(token_clone).await });
323
324        let ack = send_event(&tx, "mesh", "e1");
325        let got = ack.await.unwrap().unwrap();
326        assert_eq!(got, Value::String("mesh".into()));
327        assert_eq!(mesh_calls.load(Ordering::SeqCst), 1);
328        assert_eq!(any_calls.load(Ordering::SeqCst), 0);
329
330        token.cancel();
331        drop(tx);
332        handle.await.unwrap().unwrap();
333    }
334
335    #[tokio::test]
336    async fn on_any_fallback_fires_only_when_no_match() {
337        let (tx, rx) = mpsc::channel::<Event>(4);
338        let mut bus = EventBus::new(rx);
339        let any_calls = Arc::new(AtomicUsize::new(0));
340        bus.on_any(Arc::new(RecordingHandler {
341            label: "any",
342            calls: any_calls.clone(),
343        }))
344        .unwrap();
345
346        let token = CancellationToken::new();
347        let token_clone = token.clone();
348        let handle = tokio::spawn(async move { bus.run(token_clone).await });
349
350        let ack = send_event(&tx, "unknown_kind", "e1");
351        let got = ack.await.unwrap().unwrap();
352        assert_eq!(got, Value::String("any".into()));
353        assert_eq!(any_calls.load(Ordering::SeqCst), 1);
354
355        token.cancel();
356        drop(tx);
357        handle.await.unwrap().unwrap();
358    }
359
360    #[tokio::test]
361    async fn no_handler_produces_nack() {
362        let (tx, rx) = mpsc::channel::<Event>(4);
363        let mut bus = EventBus::new(rx);
364        // no handlers registered
365
366        let token = CancellationToken::new();
367        let token_clone = token.clone();
368        let handle = tokio::spawn(async move { bus.run(token_clone).await });
369
370        let ack = send_event(&tx, "mesh", "e1");
371        let got = ack.await.unwrap();
372        match got {
373            Err(BlockError::Bus(msg)) => {
374                assert!(msg.contains("no handler"), "unexpected msg: {msg}");
375            }
376            other => panic!("expected Bus err, got {other:?}"),
377        }
378
379        token.cancel();
380        drop(tx);
381        handle.await.unwrap().unwrap();
382    }
383
384    #[tokio::test]
385    async fn shutdown_token_breaks_loop() {
386        let (tx, rx) = mpsc::channel::<Event>(4);
387        let mut bus = EventBus::new(rx);
388        bus.on_any(Arc::new(RecordingHandler {
389            label: "any",
390            calls: Arc::new(AtomicUsize::new(0)),
391        }))
392        .unwrap();
393
394        let token = CancellationToken::new();
395        let token_clone = token.clone();
396        let handle = tokio::spawn(async move { bus.run(token_clone).await });
397
398        token.cancel();
399        // Expect the task to exit promptly.
400        let res = tokio::time::timeout(Duration::from_millis(500), handle)
401            .await
402            .expect("timeout");
403        res.unwrap().unwrap();
404        drop(tx);
405    }
406
407    #[tokio::test]
408    async fn handler_panic_is_isolated_and_loop_continues() {
409        let (tx, rx) = mpsc::channel::<Event>(4);
410        let mut bus = EventBus::new(rx);
411        let ok_calls = Arc::new(AtomicUsize::new(0));
412        bus.on("boom", Arc::new(PanickingHandler)).unwrap();
413        bus.on(
414            "ok",
415            Arc::new(RecordingHandler {
416                label: "ok",
417                calls: ok_calls.clone(),
418            }),
419        )
420        .unwrap();
421
422        let token = CancellationToken::new();
423        let token_clone = token.clone();
424        let handle = tokio::spawn(async move { bus.run(token_clone).await });
425
426        // First event: handler panics. Expect a Bus err ack.
427        let ack = send_event(&tx, "boom", "e1");
428        let got = ack.await.unwrap();
429        match got {
430            Err(BlockError::Bus(msg)) => {
431                assert!(
432                    msg.contains("panic") || msg.contains("boom"),
433                    "unexpected msg: {msg}"
434                );
435            }
436            other => panic!("expected Bus err, got {other:?}"),
437        }
438
439        // Second event after the panic: should still be handled.
440        let ack = send_event(&tx, "ok", "e2");
441        let got = ack.await.unwrap().unwrap();
442        assert_eq!(got, Value::String("ok".into()));
443        assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
444
445        token.cancel();
446        drop(tx);
447        handle.await.unwrap().unwrap();
448    }
449
450    #[tokio::test]
451    async fn bounded_mpsc_applies_backpressure_not_drop() {
452        // Capacity-1 channel; fill it, then a second send must wait (not
453        // drop). We verify by asserting try_send fails with Full while the
454        // dispatcher is paused.
455        let (tx, rx) = mpsc::channel::<Event>(1);
456        let mut bus = EventBus::new(rx);
457        bus.on(
458            "slow",
459            Arc::new(RecordingHandler {
460                label: "slow",
461                calls: Arc::new(AtomicUsize::new(0)),
462            }),
463        )
464        .unwrap();
465
466        // Do NOT start the dispatcher yet — we want the channel to fill.
467        let (evt1, _ack1_rx) = Event::with_ack("slow", "e1", json!({}), Value::Null);
468        tx.try_send(evt1).expect("first send fits capacity 1");
469
470        let (evt2, _ack2_rx) = Event::with_ack("slow", "e2", json!({}), Value::Null);
471        let err = tx.try_send(evt2).unwrap_err();
472        assert!(
473            matches!(err, mpsc::error::TrySendError::Full(_)),
474            "expected Full, got {err:?}"
475        );
476
477        // Now drain it to prove the receiver actually reads them.
478        let token = CancellationToken::new();
479        let token_clone = token.clone();
480        let handle = tokio::spawn(async move { bus.run(token_clone).await });
481        // Give dispatcher time to drain one event, then cancel.
482        tokio::time::sleep(Duration::from_millis(50)).await;
483        token.cancel();
484        drop(tx);
485        handle.await.unwrap().unwrap();
486    }
487
488    #[tokio::test]
489    async fn on_after_running_returns_err() {
490        let (_tx, rx) = mpsc::channel::<Event>(1);
491        let mut bus = EventBus::new(rx);
492        // Simulate "running" state without actually running the loop.
493        bus.running = true;
494        let err = bus
495            .on(
496                "mesh",
497                Arc::new(RecordingHandler {
498                    label: "x",
499                    calls: Arc::new(AtomicUsize::new(0)),
500                }),
501            )
502            .unwrap_err();
503        match err {
504            BlockError::Bus(msg) => assert!(msg.contains("bus.on")),
505            other => panic!("expected Bus err, got {other:?}"),
506        }
507        let err = bus
508            .on_any(Arc::new(RecordingHandler {
509                label: "x",
510                calls: Arc::new(AtomicUsize::new(0)),
511            }))
512            .unwrap_err();
513        match err {
514            BlockError::Bus(msg) => assert!(msg.contains("bus.on_any")),
515            other => panic!("expected Bus err, got {other:?}"),
516        }
517        assert_eq!(bus.handler_count(), 0);
518    }
519
520    #[tokio::test]
521    async fn duplicate_on_is_last_write_wins() {
522        let (tx, rx) = mpsc::channel::<Event>(2);
523        let mut bus = EventBus::new(rx);
524        let first_calls = Arc::new(AtomicUsize::new(0));
525        let second_calls = Arc::new(AtomicUsize::new(0));
526        bus.on(
527            "mesh",
528            Arc::new(RecordingHandler {
529                label: "first",
530                calls: first_calls.clone(),
531            }),
532        )
533        .unwrap();
534        bus.on(
535            "mesh",
536            Arc::new(RecordingHandler {
537                label: "second",
538                calls: second_calls.clone(),
539            }),
540        )
541        .unwrap();
542        assert_eq!(bus.handler_count(), 1);
543
544        let token = CancellationToken::new();
545        let token_clone = token.clone();
546        let handle = tokio::spawn(async move { bus.run(token_clone).await });
547
548        let ack = send_event(&tx, "mesh", "e1");
549        let got = ack.await.unwrap().unwrap();
550        assert_eq!(got, Value::String("second".into()));
551        assert_eq!(first_calls.load(Ordering::SeqCst), 0);
552        assert_eq!(second_calls.load(Ordering::SeqCst), 1);
553
554        token.cancel();
555        drop(tx);
556        handle.await.unwrap().unwrap();
557    }
558
559    // -----------------------------------------------------------------
560    // concurrency-analysis.md §2 — 11 concurrency tests
561    // -----------------------------------------------------------------
562
563    /// Handler that sleeps then records its id, used to prove serial
564    /// dispatch order under a multi-thread runtime.
565    struct OrderingHandler {
566        order: Arc<StdMutex<Vec<String>>>,
567        delay: Duration,
568    }
569
570    #[async_trait]
571    impl Handler for OrderingHandler {
572        async fn call(
573            &self,
574            _kind: String,
575            id: String,
576            _payload: Value,
577            _meta: Value,
578        ) -> AckResult {
579            tokio::time::sleep(self.delay).await;
580            // guard is dropped before the async return, not held across .await
581            self.order.lock().expect("order mutex").push(id.clone());
582            Ok(Value::String(id))
583        }
584    }
585
586    /// Handler that always returns `Err`. Used to prove the dispatcher
587    /// continues after a handler error.
588    struct ErrHandler;
589
590    #[async_trait]
591    impl Handler for ErrHandler {
592        async fn call(
593            &self,
594            _kind: String,
595            _id: String,
596            _payload: Value,
597            _meta: Value,
598        ) -> AckResult {
599            Err(BlockError::Bus("x".into()))
600        }
601    }
602
603    /// §2.1 — kind-specific mpsc ingress with a single receiver preserves
604    /// arrival order when the dispatcher runs on a multi-thread runtime.
605    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
606    async fn test_bus_event_serialization_arrival_order() {
607        const N: usize = 20;
608        let (tx, rx) = mpsc::channel::<Event>(N);
609        let mut bus = EventBus::new(rx);
610        let order = Arc::new(StdMutex::new(Vec::new()));
611        bus.on(
612            "k",
613            Arc::new(OrderingHandler {
614                order: Arc::clone(&order),
615                // Small sleep inside each handler so the test would fail
616                // if the dispatcher tried to run them concurrently.
617                delay: Duration::from_millis(5),
618            }),
619        )
620        .unwrap();
621
622        let token = CancellationToken::new();
623        let token_clone = token.clone();
624        let handle = tokio::spawn(async move { bus.run(token_clone).await });
625
626        let mut expected = Vec::with_capacity(N);
627        let mut acks = Vec::with_capacity(N);
628        for i in 0..N {
629            let id = format!("e{i}");
630            expected.push(id.clone());
631            let (evt, rx) = Event::with_ack("k", id, json!({}), Value::Null);
632            tx.send(evt).await.expect("send");
633            acks.push(rx);
634        }
635
636        // Wait for every ack — each returns the handler id in order.
637        for (i, ack) in acks.into_iter().enumerate() {
638            let got = ack.await.expect("ack recv").expect("ack ok");
639            assert_eq!(got, Value::String(format!("e{i}")));
640        }
641
642        token.cancel();
643        drop(tx);
644        handle.await.unwrap().unwrap();
645
646        let recorded = order.lock().unwrap().clone();
647        assert_eq!(recorded, expected, "dispatcher must preserve arrival order");
648    }
649
650    /// §2.2 — `shutdown.cancel()` breaks the loop within the grace window.
651    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
652    async fn test_bus_graceful_shutdown_within_grace_ms() {
653        let (tx, rx) = mpsc::channel::<Event>(4);
654        let mut bus = EventBus::new(rx);
655        bus.on_any(Arc::new(RecordingHandler {
656            label: "any",
657            calls: Arc::new(AtomicUsize::new(0)),
658        }))
659        .unwrap();
660
661        let token = CancellationToken::new();
662        let token_clone = token.clone();
663        let handle = tokio::spawn(async move { bus.run(token_clone).await });
664
665        // No in-flight handler; cancel and expect prompt exit.
666        token.cancel();
667        // Allow a generous envelope well above any plausible grace window
668        // (default grace_ms = 1000). The dispatcher should exit within
669        // tens of ms in practice.
670        let res = tokio::time::timeout(Duration::from_millis(1500), handle)
671            .await
672            .expect("bus.run must exit within grace window");
673        res.unwrap().unwrap();
674        drop(tx);
675    }
676
677    /// §2.3 — a panicking handler is isolated; the loop keeps dispatching
678    /// subsequent events. Mirrors the existing
679    /// `handler_panic_is_isolated_and_loop_continues` but under a
680    /// multi_thread runtime to stress the spawn+join path.
681    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
682    async fn test_bus_handler_panic_isolation_catch_unwind() {
683        let (tx, rx) = mpsc::channel::<Event>(4);
684        let mut bus = EventBus::new(rx);
685        let ok_calls = Arc::new(AtomicUsize::new(0));
686        bus.on("crash", Arc::new(PanickingHandler)).unwrap();
687        bus.on(
688            "normal",
689            Arc::new(RecordingHandler {
690                label: "normal",
691                calls: ok_calls.clone(),
692            }),
693        )
694        .unwrap();
695
696        let token = CancellationToken::new();
697        let token_clone = token.clone();
698        let handle = tokio::spawn(async move { bus.run(token_clone).await });
699
700        // Panicking event first.
701        let ack = send_event(&tx, "crash", "e1");
702        let got = ack.await.unwrap();
703        assert!(matches!(got, Err(BlockError::Bus(_))), "panic must NACK");
704
705        // Normal event after the panic — the loop must still run.
706        let ack = send_event(&tx, "normal", "e2");
707        let got = ack.await.unwrap().unwrap();
708        assert_eq!(got, Value::String("normal".into()));
709        assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
710
711        token.cancel();
712        drop(tx);
713        handle.await.unwrap().unwrap();
714    }
715
716    /// §2.4 — a bounded mpsc applies backpressure (no drops). A capacity-1
717    /// channel with the dispatcher paused lets one send succeed and the
718    /// second `try_send` return `TrySendError::Full` — never silently drop.
719    /// Once the dispatcher drains it, everything flows through in order.
720    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
721    async fn test_bus_backpressure_bounded_mpsc_capacity() {
722        let (tx, rx) = mpsc::channel::<Event>(1);
723        let mut bus = EventBus::new(rx);
724        let calls = Arc::new(AtomicUsize::new(0));
725        bus.on(
726            "k",
727            Arc::new(RecordingHandler {
728                label: "k",
729                calls: calls.clone(),
730            }),
731        )
732        .unwrap();
733
734        // Fill the channel (capacity 1). Dispatcher not started yet.
735        let (evt1, _r1) = Event::with_ack("k", "e1", json!({}), Value::Null);
736        tx.try_send(evt1).expect("first send fits");
737        let (evt2, _r2) = Event::with_ack("k", "e2", json!({}), Value::Null);
738        let err = tx.try_send(evt2).expect_err("capacity full");
739        assert!(
740            matches!(err, mpsc::error::TrySendError::Full(_)),
741            "expected Full (not drop), got {err:?}"
742        );
743
744        // Start the dispatcher and send two more events via `.await`
745        // — backpressure must let them through without loss.
746        let token = CancellationToken::new();
747        let token_clone = token.clone();
748        let handle = tokio::spawn(async move { bus.run(token_clone).await });
749
750        let (evt3, r3) = Event::with_ack("k", "e3", json!({}), Value::Null);
751        tx.send(evt3).await.expect("send e3");
752        let (evt4, r4) = Event::with_ack("k", "e4", json!({}), Value::Null);
753        tx.send(evt4).await.expect("send e4");
754
755        // The first event (still in the channel) and the new ones should
756        // all be dispatched. Only assert the new acks fire (the original
757        // `_r1`/`_r2` receivers were dropped, which is fine).
758        r3.await.unwrap().unwrap();
759        r4.await.unwrap().unwrap();
760
761        token.cancel();
762        drop(tx);
763        handle.await.unwrap().unwrap();
764        assert!(calls.load(Ordering::SeqCst) >= 2);
765    }
766
767    /// §2.5 — a source that drops `ack_tx` (receiver side drops) surfaces
768    /// as `oneshot::RecvError` immediately, not as a 30-second timeout.
769    /// Combined with `tokio::time::timeout`, the sender-drop semantic
770    /// short-circuits the timeout.
771    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
772    async fn test_bus_oneshot_ack_timeout_30s() {
773        // Set up an oneshot, drop the sender, and prove `timeout(30s,
774        // rx)` resolves to `Ok(Err(RecvError))` essentially instantly.
775        let (tx, rx) = oneshot::channel::<AckResult>();
776        drop(tx);
777        let start = tokio::time::Instant::now();
778        let got = tokio::time::timeout(Duration::from_secs(30), rx)
779            .await
780            .expect("should not hit 30s timeout");
781        assert!(got.is_err(), "expected RecvError, got {got:?}");
782        assert!(
783            start.elapsed() < Duration::from_secs(1),
784            "sender-drop must short-circuit the 30s timeout"
785        );
786    }
787
788    /// §2.6 — SIGTERM / SIGINT race inside `tokio::select!`. Sends SIGTERM
789    /// to the current process; the select! should pick the SIGTERM branch
790    /// without losing the SIGINT branch's registration (Signal::recv
791    /// cancel safety).
792    #[cfg(unix)]
793    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
794    async fn test_bus_sigterm_sigint_race_select() {
795        use tokio::signal::unix::{signal, SignalKind};
796
797        let token = CancellationToken::new();
798        let token_for_task = token.clone();
799        let task = tokio::spawn(async move {
800            let mut term = signal(SignalKind::terminate()).expect("install SIGTERM");
801            tokio::select! {
802                _ = term.recv() => token_for_task.cancel(),
803                _ = tokio::signal::ctrl_c() => token_for_task.cancel(),
804            }
805        });
806
807        // Give the signal handlers time to install before we deliver the
808        // signal — otherwise we race the install.
809        tokio::time::sleep(Duration::from_millis(100)).await;
810
811        // Deliver SIGTERM to self. `nix` is cfg(unix) dev-only.
812        nix::sys::signal::kill(nix::unistd::Pid::this(), nix::sys::signal::Signal::SIGTERM)
813            .expect("kill(SIGTERM)");
814
815        tokio::time::timeout(Duration::from_secs(2), token.cancelled())
816            .await
817            .expect("cancel must fire within 2s after SIGTERM");
818        task.await.expect("signal task");
819    }
820
821    /// §2.7 — `Arc<tokio::sync::Mutex<EventBus>>` allows two tasks to
822    /// acquire the lock in sequence without deadlock. Exercises the
823    /// "take before await" pattern used in `bridge::bus::serve`.
824    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
825    async fn test_bus_arc_tokio_mutex_no_await_while_held() {
826        let (_tx, rx) = mpsc::channel::<Event>(1);
827        let shared: Arc<TokioMutex<EventBus>> = Arc::new(TokioMutex::new(EventBus::new(rx)));
828
829        let a = Arc::clone(&shared);
830        let t1 = tokio::spawn(async move {
831            let guard = a.lock().await;
832            // Do some non-await work under the lock.
833            let _ = guard.handler_count();
834            // Guard dropped here; any .await after this is safe.
835            drop(guard);
836            tokio::time::sleep(Duration::from_millis(10)).await;
837        });
838
839        let b = Arc::clone(&shared);
840        let t2 = tokio::spawn(async move {
841            let guard = b.lock().await;
842            let _ = guard.handler_count();
843            drop(guard);
844        });
845
846        // Both tasks complete promptly — no deadlock, no await-while-held.
847        tokio::time::timeout(Duration::from_secs(2), async {
848            t1.await.unwrap();
849            t2.await.unwrap();
850        })
851        .await
852        .expect("no deadlock");
853    }
854
855    /// §2.8 — `catch_unwind` compile-time type check. `UnwindSafe` bound
856    /// means a `&mut` capture needs `AssertUnwindSafe`. This test both
857    /// documents the constraint and verifies at runtime that
858    /// `catch_unwind` intercepts an unwinding panic (panic=abort is not
859    /// testable at runtime; documented in concurrency-analysis.md §2).
860    #[test]
861    fn test_bus_catch_unwind_paniceq_abort_not_caught() {
862        use std::panic::{catch_unwind, AssertUnwindSafe};
863
864        // A plain Fn() closure IS UnwindSafe; this compiles fine.
865        let ok = catch_unwind(|| 42);
866        assert_eq!(ok.ok(), Some(42));
867
868        // An `&mut` capture is NOT UnwindSafe without `AssertUnwindSafe`.
869        // This exercises the type-check path: the code compiles because
870        // `AssertUnwindSafe` is used; removing that wrapper would fail
871        // to type-check, which is the contract we care about.
872        let mut v = 0i32;
873        let caught = catch_unwind(AssertUnwindSafe(|| {
874            v += 1;
875            panic!("boom");
876        }));
877        assert!(caught.is_err(), "expected caught panic");
878        assert_eq!(v, 1, "side effect before panic still observable");
879
880        // Note: `panic=abort` builds do NOT unwind, and `catch_unwind`
881        // does not intercept an abort. We cannot runtime-test that path
882        // (the test process would abort); we assert the contract via
883        // documentation in concurrency-analysis.md §2 row 7.
884    }
885
886    /// §2.9 — a spawned signal-watching task can be aborted and its
887    /// JoinHandle resolves to a cancellation-flagged JoinError.
888    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
889    async fn test_bus_spawn_signal_task_cancellation() {
890        let shutdown = CancellationToken::new();
891        let shutdown_clone = shutdown.clone();
892        let task = tokio::spawn(async move {
893            // Simulate a signal-watching task that blocks until cancel.
894            shutdown_clone.cancelled().await;
895        });
896
897        // Abort before the token is ever cancelled.
898        task.abort();
899        let res = task.await;
900        match res {
901            Err(e) => assert!(e.is_cancelled(), "expected cancelled JoinError, got {e:?}"),
902            Ok(()) => panic!("task should have been cancelled before completing"),
903        }
904
905        // `shutdown` stays un-cancelled — abort of the task does not
906        // propagate to the token (by design: product code cancels
907        // explicitly from the signal branch).
908        assert!(!shutdown.is_cancelled());
909    }
910
911    /// §2.10 — `tokio::time::timeout` fires after the configured duration
912    /// under a paused clock. Verifies the 30s timeout contract used by
913    /// `BusRelayHandler` without actually sleeping 30 seconds.
914    #[tokio::test(flavor = "current_thread", start_paused = true)]
915    async fn test_bus_timeout_ack_expiry_30s_match() {
916        let (_tx, rx) = oneshot::channel::<AckResult>();
917        let fut = tokio::time::timeout(Duration::from_secs(30), rx);
918        tokio::pin!(fut);
919
920        // Before advance: the future has not resolved.
921        tokio::time::advance(Duration::from_secs(29)).await;
922        assert!(
923            futures_poll_once(&mut fut).is_none(),
924            "timeout must not fire before 30s"
925        );
926
927        // Cross the threshold.
928        tokio::time::advance(Duration::from_secs(2)).await;
929        let got = (&mut fut).await;
930        assert!(got.is_err(), "expected Elapsed, got {got:?}");
931    }
932
933    /// Poll a pinned future once; returns `Some(output)` if ready,
934    /// `None` otherwise. Used to inspect a future without awaiting it.
935    fn futures_poll_once<F: std::future::Future>(
936        fut: &mut std::pin::Pin<&mut F>,
937    ) -> Option<F::Output> {
938        use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
939
940        // Minimal no-op waker.
941        fn raw_waker() -> RawWaker {
942            fn no_op(_: *const ()) {}
943            fn clone(_: *const ()) -> RawWaker {
944                raw_waker()
945            }
946            static VT: RawWakerVTable = RawWakerVTable::new(clone, no_op, no_op, no_op);
947            RawWaker::new(std::ptr::null(), &VT)
948        }
949        // SAFETY: `raw_waker()` returns a `RawWaker` backed by a static
950        // `RawWakerVTable` whose clone/wake/drop functions are all no-ops.
951        // No data pointer is stored or dereferenced; the waker is used only
952        // to construct a `Context` for a single synchronous `poll` call and
953        // is not sent across threads or outlived.
954        let waker = unsafe { Waker::from_raw(raw_waker()) };
955        let mut cx = Context::from_waker(&waker);
956        match fut.as_mut().poll(&mut cx) {
957            Poll::Ready(v) => Some(v),
958            Poll::Pending => None,
959        }
960    }
961
962    /// §2.11 — `std::sync::Mutex::lock()` returns `PoisonError` after a
963    /// panic holding the guard. The registration path in `bridge::bus`
964    /// converts this to a typed error (`BlockError::Runtime("bus mutex
965    /// poisoned")`); this test proves the poison signal is observable
966    /// so the conversion has something to trigger on.
967    #[test]
968    fn test_std_mutex_poison_on_handler_registration() {
969        let m: Arc<StdMutex<i32>> = Arc::new(StdMutex::new(0));
970        let m_panic = Arc::clone(&m);
971        let handle = std::thread::spawn(move || {
972            let _guard = m_panic.lock().expect("first lock");
973            panic!("poison me");
974        });
975        // Thread panics; join returns Err, and the mutex is now poisoned.
976        assert!(handle.join().is_err());
977
978        let err = m.lock().expect_err("mutex must be poisoned");
979        // `err` is a PoisonError; `.into_inner()` would recover the guard.
980        // The key observable: `lock()` returns Err, which bridge::bus
981        // maps to `BlockError::Runtime("bus mutex poisoned")` / a Lua
982        // external error.
983        let _inner = err.into_inner();
984    }
985
986    // -----------------------------------------------------------------
987    // General tests (plan.md §一般テスト)
988    // -----------------------------------------------------------------
989
990    /// on_any fires only when the event's kind has no specialized handler.
991    /// (plan.md §一般テスト — on_any フォールバック)
992    #[tokio::test]
993    async fn general_on_any_fallback_vs_no_handler_warn() {
994        // 1) No specialized, no on_any → nack.
995        let (tx, rx) = mpsc::channel::<Event>(2);
996        let mut bus = EventBus::new(rx);
997        let token = CancellationToken::new();
998        let token_clone = token.clone();
999        let handle = tokio::spawn(async move { bus.run(token_clone).await });
1000        let ack = send_event(&tx, "kind-x", "id1");
1001        let got = ack.await.unwrap();
1002        assert!(matches!(got, Err(BlockError::Bus(_))));
1003        token.cancel();
1004        drop(tx);
1005        handle.await.unwrap().unwrap();
1006
1007        // 2) Only on_any → on_any fires on any kind.
1008        let (tx, rx) = mpsc::channel::<Event>(2);
1009        let mut bus = EventBus::new(rx);
1010        let any_calls = Arc::new(AtomicUsize::new(0));
1011        bus.on_any(Arc::new(RecordingHandler {
1012            label: "any",
1013            calls: any_calls.clone(),
1014        }))
1015        .unwrap();
1016        let token = CancellationToken::new();
1017        let token_clone = token.clone();
1018        let handle = tokio::spawn(async move { bus.run(token_clone).await });
1019        let ack = send_event(&tx, "anything", "id1");
1020        assert_eq!(ack.await.unwrap().unwrap(), Value::String("any".into()));
1021        assert_eq!(any_calls.load(Ordering::SeqCst), 1);
1022        token.cancel();
1023        drop(tx);
1024        handle.await.unwrap().unwrap();
1025    }
1026
1027    /// When a specialized handler matches, on_any is NOT invoked.
1028    /// (plan.md §一般テスト — 優先順位)
1029    #[tokio::test]
1030    async fn general_specialized_wins_over_on_any() {
1031        let (tx, rx) = mpsc::channel::<Event>(2);
1032        let mut bus = EventBus::new(rx);
1033        let spec_calls = Arc::new(AtomicUsize::new(0));
1034        let any_calls = Arc::new(AtomicUsize::new(0));
1035        bus.on(
1036            "k",
1037            Arc::new(RecordingHandler {
1038                label: "spec",
1039                calls: spec_calls.clone(),
1040            }),
1041        )
1042        .unwrap();
1043        bus.on_any(Arc::new(RecordingHandler {
1044            label: "any",
1045            calls: any_calls.clone(),
1046        }))
1047        .unwrap();
1048
1049        let token = CancellationToken::new();
1050        let token_clone = token.clone();
1051        let handle = tokio::spawn(async move { bus.run(token_clone).await });
1052
1053        let ack = send_event(&tx, "k", "e1");
1054        let got = ack.await.unwrap().unwrap();
1055        assert_eq!(got, Value::String("spec".into()));
1056        assert_eq!(spec_calls.load(Ordering::SeqCst), 1);
1057        assert_eq!(any_calls.load(Ordering::SeqCst), 0);
1058
1059        token.cancel();
1060        drop(tx);
1061        handle.await.unwrap().unwrap();
1062    }
1063
1064    /// A handler returning `Err(...)` delivers an error ack and the loop
1065    /// continues to dispatch the next event.
1066    /// (plan.md §一般テスト — Handler error 継続)
1067    #[tokio::test]
1068    async fn general_handler_error_ack_and_loop_continues() {
1069        let (tx, rx) = mpsc::channel::<Event>(4);
1070        let mut bus = EventBus::new(rx);
1071        bus.on("err", Arc::new(ErrHandler)).unwrap();
1072        let ok_calls = Arc::new(AtomicUsize::new(0));
1073        bus.on(
1074            "ok",
1075            Arc::new(RecordingHandler {
1076                label: "ok",
1077                calls: ok_calls.clone(),
1078            }),
1079        )
1080        .unwrap();
1081
1082        let token = CancellationToken::new();
1083        let token_clone = token.clone();
1084        let handle = tokio::spawn(async move { bus.run(token_clone).await });
1085
1086        let ack = send_event(&tx, "err", "e1");
1087        let got = ack.await.unwrap();
1088        match got {
1089            Err(BlockError::Bus(msg)) => assert_eq!(msg, "x"),
1090            other => panic!("expected Bus err 'x', got {other:?}"),
1091        }
1092
1093        let ack = send_event(&tx, "ok", "e2");
1094        assert_eq!(ack.await.unwrap().unwrap(), Value::String("ok".into()));
1095        assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
1096
1097        token.cancel();
1098        drop(tx);
1099        handle.await.unwrap().unwrap();
1100    }
1101}