Skip to main content

axon/runtime/channels/
typed.rs

1//! AXON Runtime — Typed Channels (Fase 13.f.2 — Rust runtime parity).
2//!
3//! Direct port of `axon/runtime/channels/typed.py` (Fase 13.d). Provides
4//! the runtime layer for `Channel<τ, q, ℓ, π>` — first-class affine
5//! resources with π-calculus mobility (paper_mobile_channels.md).
6//!
7//! Layered alongside the existing daemon-supervisor `EventBus`
8//! (broadcast semantics for lifecycle events) — this module owns its
9//! own per-channel transport with FIFO single-consumer semantics
10//! matching the Python reference exactly. The two layers do not share
11//! underlying queues; both can coexist in a single process.
12//!
13//! Public surface:
14//! - [`TypedChannelHandle`]   — runtime materialisation of an `IRChannel`
15//! - [`Capability`]           — opaque token returned by `publish`,
16//!                              consumed by `discover` (Publish-Ext)
17//! - [`TypedChannelRegistry`] — name → handle map; bootstraps from
18//!                              `axon_frontend::ir_nodes::IRProgram`
19//! - [`TypedEventBus`]        — `emit` / `publish` / `discover`
20//!                              orchestrator with schema validation,
21//!                              QoS, capability gating
22//!
23//! Errors mirror compile-time diagnostics (Fase 13.b type checker) so a
24//! misconfigured runtime cannot silently diverge from the static
25//! guarantees.
26
27use std::collections::{HashMap, HashSet};
28use std::fmt;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::time::{SystemTime, UNIX_EPOCH};
32
33use tokio::sync::{mpsc, Mutex as AsyncMutex};
34
35use axon_frontend::ir_nodes::{IRChannel, IRProgram};
36
37// ═══════════════════════════════════════════════════════════════════
38//  ERRORS — runtime-side mirrors of compile-time guarantees
39// ═══════════════════════════════════════════════════════════════════
40
41/// Runtime errors raised by the typed-channel layer.
42///
43/// Each variant mirrors a compile-time diagnostic so a program that
44/// would have been rejected by the Fase 13.b type checker is also
45/// rejected here as defence-in-depth (relevant for cross-process
46/// publish/discover where the receiver cannot rerun static analysis).
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum TypedChannelError {
49    /// Channel name not in the registry.
50    ChannelNotFound {
51        name: String,
52        registered: Vec<String>,
53    },
54    /// Payload type does not match the channel schema.
55    ///
56    /// The compile-time check (Fase 13.b `_check_emit`) catches this
57    /// for statically-known programs; this runtime check is
58    /// defence-in-depth.
59    SchemaMismatch(String),
60    /// `publish` lacks a shield (D8) or `discover` targets an
61    /// unpublishable channel / forged capability.
62    CapabilityGate(String),
63    /// An affine/linear handle was used after consumption.
64    ///
65    /// Affine: at most one consumption (use OK; drop OK; reuse rejected).
66    /// Linear: exactly one consumption (use required; reuse rejected).
67    /// Persistent (`!Channel`): unrestricted reuse (no enforcement).
68    LifetimeViolation { name: String, count: u32 },
69    /// Underlying transport failure (closed channel, dropped sender).
70    Transport(String),
71}
72
73impl fmt::Display for TypedChannelError {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        match self {
76            TypedChannelError::ChannelNotFound { name, registered } => write!(
77                f,
78                "channel '{name}' not in TypedChannelRegistry (registered: {registered:?})"
79            ),
80            TypedChannelError::SchemaMismatch(msg) => write!(f, "{msg}"),
81            TypedChannelError::CapabilityGate(msg) => write!(f, "{msg}"),
82            TypedChannelError::LifetimeViolation { name, count } => write!(
83                f,
84                "channel '{name}' is linear but has been consumed {count} times (linear ⇒ exactly once)"
85            ),
86            TypedChannelError::Transport(msg) => write!(f, "transport: {msg}"),
87        }
88    }
89}
90
91impl std::error::Error for TypedChannelError {}
92
93/// `Result` alias used throughout the typed-channel layer.
94pub type Result<T> = std::result::Result<T, TypedChannelError>;
95
96// ═══════════════════════════════════════════════════════════════════
97//  CAPABILITY — Publish-Ext token (paper §4.3)
98// ═══════════════════════════════════════════════════════════════════
99
100/// Opaque, single-extrusion-hop witness of a published channel.
101///
102/// A `publish c within σ` reduction returns one of these. The bearer
103/// can `discover` the underlying handle through the bus. The runtime
104/// attenuates the bearer's certainty envelope by `delta_pub` per hop
105/// so published-then-republished handles strictly lose certainty on
106/// every traversal — paper §6.2 ("no certainty laundering").
107///
108/// Capabilities are immutable (no setters); per-hop bookkeeping
109/// happens in the bus when the capability is created (`publish`) and
110/// consumed (`discover`).
111#[derive(Debug, Clone, PartialEq)]
112pub struct Capability {
113    /// uuid4 — opaque to consumers.
114    pub capability_id: String,
115    /// The `IRChannel.name` being exposed.
116    pub channel_name: String,
117    /// σ-shield that mediated extrusion.
118    pub shield_ref: String,
119    /// Certainty penalty per hop (paper §3.4 lower bound: 0.05).
120    pub delta_pub: f64,
121    /// Wall-clock seconds since the Unix epoch.
122    pub issued_at: f64,
123}
124
125// ═══════════════════════════════════════════════════════════════════
126//  HANDLE — runtime materialisation of an IRChannel
127// ═══════════════════════════════════════════════════════════════════
128
129/// A live, typed channel handle — wraps the static schema (message
130/// type, QoS, lifetime, persistence, shield ref) for runtime
131/// enforcement.
132///
133/// The handle's `consumed_count` lets the bus enforce lifetime rules:
134/// - linear     → must reach exactly 1 over the lifetime of the handle
135/// - affine     → may stay at 0 (drop) but never exceed 1 per holder
136/// - persistent → unbounded
137///
138/// At Fase 13.f.2 scope, the bus tracks consumption counters at the
139/// handle level. Per-binding tracking (when `discover` yields fresh
140/// aliases) follows the Python reference and is deferred to a
141/// future sub-phase aligned with cross-process replay tokens.
142#[derive(Debug, Clone, PartialEq)]
143pub struct TypedChannelHandle {
144    pub name: String,
145    /// Surface spelling — `Order` | `Channel<Order>` | …
146    pub message: String,
147    pub qos: String,
148    pub lifetime: String,
149    pub persistence: String,
150    pub shield_ref: String,
151    /// Incremented per emit/publish/discover.
152    pub consumed_count: u32,
153}
154
155impl TypedChannelHandle {
156    /// Construct a handle with Fase 13 defaults
157    /// (`qos=at_least_once`, `lifetime=affine`, `persistence=ephemeral`,
158    /// no shield).
159    pub fn new(name: impl Into<String>, message: impl Into<String>) -> Self {
160        Self {
161            name: name.into(),
162            message: message.into(),
163            qos: "at_least_once".to_string(),
164            lifetime: "affine".to_string(),
165            persistence: "ephemeral".to_string(),
166            shield_ref: String::new(),
167            consumed_count: 0,
168        }
169    }
170
171    /// D8 — a channel is publishable iff it declared a shield gate.
172    pub fn is_publishable(&self) -> bool {
173        !self.shield_ref.is_empty()
174    }
175
176    /// Second-order: this channel transports another channel handle.
177    pub fn carries_channel(&self) -> bool {
178        self.message.starts_with("Channel<") && self.message.ends_with('>')
179    }
180
181    /// Unwrap one level of `Channel<…>` to find the carried type.
182    ///
183    /// Returns the leaf type for plain channels, or the immediately
184    /// nested type for second-order channels. For triple-nesting
185    /// `Channel<Channel<T>>`, returns `Channel<T>` (one unwrap).
186    pub fn inner_message_type(&self) -> &str {
187        if !self.carries_channel() {
188            return &self.message;
189        }
190        &self.message["Channel<".len()..self.message.len() - 1]
191    }
192
193    /// Build a runtime handle from a lowered `IRChannel` (post-13.c).
194    pub fn from_ir(ir: &IRChannel) -> Self {
195        Self {
196            name: ir.name.clone(),
197            message: ir.message.clone(),
198            qos: ir.qos.clone(),
199            lifetime: ir.lifetime.clone(),
200            persistence: ir.persistence.clone(),
201            shield_ref: ir.shield_ref.clone(),
202            consumed_count: 0,
203        }
204    }
205}
206
207// ═══════════════════════════════════════════════════════════════════
208//  PAYLOAD + EVENT
209// ═══════════════════════════════════════════════════════════════════
210
211/// A typed payload — either a scalar value (JSON) or a channel handle
212/// (mobility). The discriminator replaces Python's
213/// `payload_is_handle: bool` keyword argument with a sum type that the
214/// type system enforces.
215#[derive(Debug, Clone)]
216pub enum TypedPayload {
217    Scalar(serde_json::Value),
218    Handle(TypedChannelHandle),
219}
220
221impl TypedPayload {
222    /// Convenience constructor for scalar payloads.
223    pub fn scalar<V: Into<serde_json::Value>>(v: V) -> Self {
224        TypedPayload::Scalar(v.into())
225    }
226
227    /// Convenience constructor for mobility (channel-as-value) payloads.
228    pub fn handle(h: TypedChannelHandle) -> Self {
229        TypedPayload::Handle(h)
230    }
231
232    pub fn is_handle(&self) -> bool {
233        matches!(self, TypedPayload::Handle(_))
234    }
235}
236
237/// One event flowing through a typed channel.
238#[derive(Debug, Clone)]
239pub struct TypedEvent {
240    pub channel: String,
241    pub payload: TypedPayload,
242    pub event_id: String,
243    pub timestamp_secs: f64,
244}
245
246// ═══════════════════════════════════════════════════════════════════
247//  REGISTRY — name → handle map
248// ═══════════════════════════════════════════════════════════════════
249
250/// Authoritative map of channel name → [`TypedChannelHandle`].
251///
252/// Bootstraps from an `IRProgram` (post-13.c) so the registry is a
253/// faithful runtime projection of the compiler's view. Hand-rolled
254/// registration is also supported for tests and embedded runtimes.
255#[derive(Debug, Default)]
256pub struct TypedChannelRegistry {
257    handles: HashMap<String, TypedChannelHandle>,
258}
259
260impl TypedChannelRegistry {
261    pub fn new() -> Self {
262        Self::default()
263    }
264
265    /// Add a handle to the registry. Re-registering with the same name
266    /// overwrites — useful for hot-reloads in dev workflows.
267    pub fn register(&mut self, handle: TypedChannelHandle) {
268        self.handles.insert(handle.name.clone(), handle);
269    }
270
271    /// Instantiate a handle from an `IRChannel` and register it.
272    pub fn register_from_ir(&mut self, ir: &IRChannel) -> TypedChannelHandle {
273        let handle = TypedChannelHandle::from_ir(ir);
274        self.handles.insert(handle.name.clone(), handle.clone());
275        handle
276    }
277
278    pub fn get(&self, name: &str) -> Result<&TypedChannelHandle> {
279        self.handles
280            .get(name)
281            .ok_or_else(|| TypedChannelError::ChannelNotFound {
282                name: name.to_string(),
283                registered: self.names(),
284            })
285    }
286
287    fn get_mut(&mut self, name: &str) -> Result<&mut TypedChannelHandle> {
288        let registered = self.names();
289        self.handles
290            .get_mut(name)
291            .ok_or_else(|| TypedChannelError::ChannelNotFound {
292                name: name.to_string(),
293                registered,
294            })
295    }
296
297    pub fn has(&self, name: &str) -> bool {
298        self.handles.contains_key(name)
299    }
300
301    pub fn names(&self) -> Vec<String> {
302        let mut v: Vec<String> = self.handles.keys().cloned().collect();
303        v.sort();
304        v
305    }
306
307    pub fn len(&self) -> usize {
308        self.handles.len()
309    }
310
311    pub fn is_empty(&self) -> bool {
312        self.handles.is_empty()
313    }
314}
315
316// ═══════════════════════════════════════════════════════════════════
317//  SHIELD CHECKER PROTOCOL — minimal interface to ESK
318// ═══════════════════════════════════════════════════════════════════
319
320/// Predicate `(shield_name, handle) → covers?`.
321///
322/// Default impl returns `true` (no enforcement). Production callers
323/// inject an ESK-aware checker that consults the actual
324/// `ShieldDefinition` and looks up `κ(message_type)` for the given
325/// handle. Delegating κ-extraction to the predicate keeps the typed-
326/// channel layer agnostic of the ESK module (which holds
327/// `TypeDefinition.compliance` metadata).
328pub type ShieldComplianceFn = Arc<dyn Fn(&str, &TypedChannelHandle) -> bool + Send + Sync>;
329
330fn default_compliance_check() -> ShieldComplianceFn {
331    Arc::new(|_, _| true)
332}
333
334// ═══════════════════════════════════════════════════════════════════
335//  PER-CHANNEL TRANSPORT — single-consumer FIFO
336// ═══════════════════════════════════════════════════════════════════
337
338/// Per-channel single-consumer FIFO queue (parallel to Python's
339/// `InMemoryChannel(asyncio.Queue)`).
340///
341/// Sender and receiver are split so the bus owns both — `emit()`
342/// pushes via the sender, `receive()` awaits via the receiver. The
343/// receiver is held behind a `tokio::sync::Mutex` so concurrent
344/// callers serialise into the queue (single-consumer FIFO matches
345/// `at_least_once` / `queue` semantics from the paper).
346struct ChannelTransport {
347    tx: mpsc::UnboundedSender<TypedEvent>,
348    rx: AsyncMutex<mpsc::UnboundedReceiver<TypedEvent>>,
349    closed: AtomicBool,
350}
351
352impl ChannelTransport {
353    fn new() -> Self {
354        let (tx, rx) = mpsc::unbounded_channel();
355        Self {
356            tx,
357            rx: AsyncMutex::new(rx),
358            closed: AtomicBool::new(false),
359        }
360    }
361
362    fn send(&self, event: TypedEvent) -> Result<()> {
363        if self.closed.load(Ordering::Acquire) {
364            return Err(TypedChannelError::Transport(
365                "channel is closed".to_string(),
366            ));
367        }
368        self.tx
369            .send(event)
370            .map_err(|e| TypedChannelError::Transport(format!("send failed: {e}")))
371    }
372
373    async fn recv(&self) -> Result<TypedEvent> {
374        let mut rx = self.rx.lock().await;
375        rx.recv().await.ok_or_else(|| {
376            TypedChannelError::Transport("channel sender dropped".to_string())
377        })
378    }
379
380    fn close(&self) {
381        self.closed.store(true, Ordering::Release);
382    }
383}
384
385// ═══════════════════════════════════════════════════════════════════
386//  TYPED EVENT BUS — emit / publish / discover orchestrator
387// ═══════════════════════════════════════════════════════════════════
388
389/// Schema-aware, capability-gated event bus.
390///
391/// Owns its own per-channel transport (FIFO mpsc queues for
392/// `at_least_once`/`queue`/`at_most_once`/`exactly_once`, multi-
393/// subscriber for `broadcast`). Coexists with the daemon-supervisor
394/// `EventBus` (`crate::event_bus`) without sharing transport — both
395/// can run in the same process for different concerns.
396///
397/// Usage:
398///
399/// ```no_run
400/// # use axon::runtime::channels::{TypedEventBus, TypedChannelHandle, TypedPayload};
401/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
402/// let bus = TypedEventBus::new();
403/// bus.register({
404///     let mut h = TypedChannelHandle::new("OrdersCreated", "Order");
405///     h.shield_ref = "PublicBroker".into();
406///     h
407/// });
408/// bus.emit("OrdersCreated", TypedPayload::scalar(serde_json::json!({"id": 1}))).await?;
409/// let cap = bus.publish("OrdersCreated", "PublicBroker").await?;
410/// let _handle = bus.discover(&cap).await?;
411/// # Ok(())
412/// # }
413/// ```
414pub struct TypedEventBus {
415    registry: Mutex<TypedChannelRegistry>,
416    transports: Mutex<HashMap<String, Arc<ChannelTransport>>>,
417    broadcast_subs: Mutex<HashMap<String, Vec<mpsc::UnboundedSender<TypedEvent>>>>,
418    capabilities: Mutex<HashMap<String, Capability>>,
419    delivered_ids: Mutex<HashMap<String, HashSet<String>>>,
420    compliance_check: ShieldComplianceFn,
421}
422
423impl Default for TypedEventBus {
424    fn default() -> Self {
425        Self::new()
426    }
427}
428
429impl TypedEventBus {
430    /// Empty bus with the permissive default compliance check.
431    pub fn new() -> Self {
432        Self::with_compliance_check(default_compliance_check())
433    }
434
435    /// Empty bus with a caller-supplied compliance predicate.
436    pub fn with_compliance_check(check: ShieldComplianceFn) -> Self {
437        Self {
438            registry: Mutex::new(TypedChannelRegistry::new()),
439            transports: Mutex::new(HashMap::new()),
440            broadcast_subs: Mutex::new(HashMap::new()),
441            capabilities: Mutex::new(HashMap::new()),
442            delivered_ids: Mutex::new(HashMap::new()),
443            compliance_check: check,
444        }
445    }
446
447    /// Bootstrap a bus from a fully-lowered `IRProgram` (post-13.c).
448    /// Every `IRChannel` becomes a registered runtime handle.
449    pub fn from_ir_program(ir: &IRProgram) -> Self {
450        Self::from_ir_program_with(ir, default_compliance_check())
451    }
452
453    /// Same as [`from_ir_program`](Self::from_ir_program) but with a
454    /// caller-supplied compliance predicate.
455    pub fn from_ir_program_with(ir: &IRProgram, check: ShieldComplianceFn) -> Self {
456        let bus = Self::with_compliance_check(check);
457        {
458            let mut reg = bus.registry.lock().unwrap();
459            for ch in &ir.channels {
460                reg.register_from_ir(ch);
461            }
462        }
463        bus
464    }
465
466    pub fn register(&self, handle: TypedChannelHandle) {
467        self.registry.lock().unwrap().register(handle);
468    }
469
470    pub fn register_from_ir(&self, ir: &IRChannel) -> TypedChannelHandle {
471        self.registry.lock().unwrap().register_from_ir(ir)
472    }
473
474    /// Snapshot of the handle for a channel.
475    pub fn get_handle(&self, name: &str) -> Result<TypedChannelHandle> {
476        self.registry.lock().unwrap().get(name).cloned()
477    }
478
479    pub fn channel_names(&self) -> Vec<String> {
480        self.registry.lock().unwrap().names()
481    }
482
483    // ── EMIT (Chan-Output / Chan-Mobility, paper §3.1, §3.2) ──────
484
485    /// Emit a value (or a channel handle for mobility) on a typed
486    /// channel.
487    ///
488    /// Schema enforcement mirrors Fase 13.b's `_check_emit`:
489    /// - second-order channel + scalar payload → [`TypedChannelError::SchemaMismatch`]
490    /// - first-order channel + handle payload  → [`TypedChannelError::SchemaMismatch`]
491    /// - second-order schema mismatch          → [`TypedChannelError::SchemaMismatch`]
492    pub async fn emit(&self, channel: &str, payload: TypedPayload) -> Result<()> {
493        let handle = self.get_handle(channel)?;
494        Self::check_emit_schema(&handle, &payload)?;
495
496        let event = TypedEvent {
497            channel: channel.to_string(),
498            payload,
499            event_id: gen_uuid(),
500            timestamp_secs: now_secs(),
501        };
502
503        self.dispatch(&handle, event)?;
504        self.consume(channel)?;
505        Ok(())
506    }
507
508    fn check_emit_schema(handle: &TypedChannelHandle, payload: &TypedPayload) -> Result<()> {
509        match payload {
510            TypedPayload::Handle(inner) => {
511                if !handle.carries_channel() {
512                    return Err(TypedChannelError::SchemaMismatch(format!(
513                        "emit on '{}' (message: {}) received a channel handle, but the channel is not second-order — expected scalar payload",
514                        handle.name, handle.message,
515                    )));
516                }
517                let expected_inner = handle.inner_message_type();
518                if inner.message != expected_inner {
519                    return Err(TypedChannelError::SchemaMismatch(format!(
520                        "emit on '{}' expects Channel<{}> but received handle for '{}' (second-order schema mismatch, paper §3.2)",
521                        handle.name, expected_inner, inner.message,
522                    )));
523                }
524                Ok(())
525            }
526            TypedPayload::Scalar(_) => {
527                if handle.carries_channel() {
528                    return Err(TypedChannelError::SchemaMismatch(format!(
529                        "emit on '{}' (message: {}) requires a channel handle but received scalar — pass TypedPayload::Handle(handle) for mobility",
530                        handle.name, handle.message,
531                    )));
532                }
533                Ok(())
534            }
535        }
536    }
537
538    fn dispatch(&self, handle: &TypedChannelHandle, event: TypedEvent) -> Result<()> {
539        match handle.qos.as_str() {
540            "broadcast" => {
541                let subs = self.broadcast_subs.lock().unwrap();
542                if let Some(queues) = subs.get(&handle.name) {
543                    for queue in queues {
544                        // Best-effort fan-out — a dropped subscriber
545                        // queue must not poison the publish path.
546                        let _ = queue.send(event.clone());
547                    }
548                }
549                Ok(())
550            }
551            "at_most_once" => {
552                let transport = self.transport_for(&handle.name);
553                // Best-effort: ignore put failures (drop silently per AMO).
554                let _ = transport.send(event);
555                Ok(())
556            }
557            "exactly_once" => {
558                {
559                    let mut delivered = self.delivered_ids.lock().unwrap();
560                    let seen = delivered.entry(handle.name.clone()).or_default();
561                    if seen.contains(&event.event_id) {
562                        return Ok(());
563                    }
564                    seen.insert(event.event_id.clone());
565                }
566                let transport = self.transport_for(&handle.name);
567                transport.send(event)
568            }
569            _ => {
570                // at_least_once (default) and queue both delegate to
571                // the single-consumer FIFO transport. Difference is
572                // per-handle, not per-event, and surfaces in
573                // subscribe semantics for `queue` (single-consumer).
574                let transport = self.transport_for(&handle.name);
575                transport.send(event)
576            }
577        }
578    }
579
580    fn transport_for(&self, channel: &str) -> Arc<ChannelTransport> {
581        let mut transports = self.transports.lock().unwrap();
582        transports
583            .entry(channel.to_string())
584            .or_insert_with(|| Arc::new(ChannelTransport::new()))
585            .clone()
586    }
587
588    fn consume(&self, channel: &str) -> Result<()> {
589        let mut reg = self.registry.lock().unwrap();
590        let handle = reg.get_mut(channel)?;
591        handle.consumed_count += 1;
592        if handle.lifetime == "linear" && handle.consumed_count > 1 {
593            return Err(TypedChannelError::LifetimeViolation {
594                name: handle.name.clone(),
595                count: handle.consumed_count,
596            });
597        }
598        // affine and persistent impose no upper bound on emits per
599        // handle definition; per-binding affinity is tracked
600        // separately (deferred — parity with Python 13.d note).
601        Ok(())
602    }
603
604    // ── PUBLISH (Publish-Ext, paper §4.3) ─────────────────────────
605
606    /// Extrude a channel handle through a shield, returning a
607    /// [`Capability`] that downstream callers can `discover`.
608    ///
609    /// Compile-time D8 already requires `within Shield`; the runtime
610    /// also rejects empty/missing shields so an embedded program
611    /// cannot bypass the gate by clearing the field.
612    ///
613    /// The compliance predicate (injected via constructor) verifies
614    /// that `shield.compliance ⊇ κ(channel.message_type)`. Default is
615    /// permissive; production hooks an ESK-aware checker.
616    pub async fn publish(&self, channel: &str, shield: &str) -> Result<Capability> {
617        if shield.is_empty() {
618            return Err(TypedChannelError::CapabilityGate(format!(
619                "publish '{channel}' requires a non-empty shield (D8 — capability extrusion is shield-mediated)"
620            )));
621        }
622        let handle = self.get_handle(channel)?;
623        if !handle.is_publishable() {
624            return Err(TypedChannelError::CapabilityGate(format!(
625                "channel '{channel}' is not publishable: its definition declares no shield_ref (D8)"
626            )));
627        }
628        if shield != handle.shield_ref {
629            return Err(TypedChannelError::CapabilityGate(format!(
630                "publish '{channel}' requires shield '{}' (declared on the channel) but received '{shield}'",
631                handle.shield_ref
632            )));
633        }
634        if !(self.compliance_check)(shield, &handle) {
635            return Err(TypedChannelError::CapabilityGate(format!(
636                "shield '{shield}' does not cover compliance required by channel '{channel}'"
637            )));
638        }
639
640        let cap = Capability {
641            capability_id: gen_uuid(),
642            channel_name: channel.to_string(),
643            shield_ref: shield.to_string(),
644            delta_pub: 0.05,
645            issued_at: now_secs(),
646        };
647        self.capabilities
648            .lock()
649            .unwrap()
650            .insert(cap.capability_id.clone(), cap.clone());
651        Ok(cap)
652    }
653
654    // ── DISCOVER (paper §3.4 dual) ────────────────────────────────
655
656    /// Consume a [`Capability`] and return the underlying handle.
657    /// One-shot: subsequent calls with the same capability are
658    /// rejected.
659    pub async fn discover(&self, capability: &Capability) -> Result<TypedChannelHandle> {
660        let removed = {
661            let mut caps = self.capabilities.lock().unwrap();
662            caps.remove(&capability.capability_id)
663        };
664        if removed.is_none() {
665            return Err(TypedChannelError::CapabilityGate(format!(
666                "capability '{}' has been revoked or was never issued by this bus",
667                capability.capability_id,
668            )));
669        }
670        self.get_handle(&capability.channel_name)
671    }
672
673    // ── SUBSCRIBE — broadcast and queue ──────────────────────────
674
675    /// Register a fresh queue for a `qos: broadcast` channel; returns
676    /// a receiver so the consumer can `recv().await` per event.
677    /// Multiple subscribers each receive every emitted event.
678    pub fn subscribe_broadcast(
679        &self,
680        channel: &str,
681    ) -> Result<mpsc::UnboundedReceiver<TypedEvent>> {
682        let handle = self.get_handle(channel)?;
683        if handle.qos != "broadcast" {
684            return Err(TypedChannelError::SchemaMismatch(format!(
685                "subscribe_broadcast called on '{channel}' but its qos is {}, not broadcast",
686                handle.qos,
687            )));
688        }
689        let (tx, rx) = mpsc::unbounded_channel();
690        self.broadcast_subs
691            .lock()
692            .unwrap()
693            .entry(channel.to_string())
694            .or_default()
695            .push(tx);
696        Ok(rx)
697    }
698
699    /// Receive the next event on a non-broadcast channel.
700    ///
701    /// For `qos: broadcast`, use [`subscribe_broadcast`](Self::subscribe_broadcast)
702    /// instead — the bus does not maintain a default queue for
703    /// broadcast since each subscriber needs its own.
704    pub async fn receive(&self, channel: &str) -> Result<TypedEvent> {
705        let handle = self.get_handle(channel)?;
706        if handle.qos == "broadcast" {
707            return Err(TypedChannelError::SchemaMismatch(format!(
708                "channel '{channel}' has qos=broadcast — call subscribe_broadcast() to get a per-subscriber queue"
709            )));
710        }
711        let transport = self.transport_for(channel);
712        transport.recv().await
713    }
714
715    // ── INTROSPECTION + CLEANUP ──────────────────────────────────
716
717    /// Count of live (not yet discovered) capabilities.
718    pub fn issued_capabilities(&self) -> usize {
719        self.capabilities.lock().unwrap().len()
720    }
721
722    /// Drain caps, close transports, clear broadcast queues. Mirrors
723    /// Python `close_all`.
724    pub fn close_all(&self) {
725        self.capabilities.lock().unwrap().clear();
726        self.broadcast_subs.lock().unwrap().clear();
727        self.delivered_ids.lock().unwrap().clear();
728        let transports = self.transports.lock().unwrap();
729        for t in transports.values() {
730            t.close();
731        }
732    }
733}
734
735// ═══════════════════════════════════════════════════════════════════
736//  helpers
737// ═══════════════════════════════════════════════════════════════════
738
739fn gen_uuid() -> String {
740    uuid::Uuid::new_v4().to_string()
741}
742
743fn now_secs() -> f64 {
744    SystemTime::now()
745        .duration_since(UNIX_EPOCH)
746        .map(|d| d.as_secs_f64())
747        .unwrap_or(0.0)
748}
749
750// ═══════════════════════════════════════════════════════════════════
751//  tests — paridad con tests/test_typed_channels.py (Fase 13.d)
752// ═══════════════════════════════════════════════════════════════════
753
754#[cfg(test)]
755mod tests {
756    use super::*;
757    use axon_frontend::ir_nodes::{IRChannel, IRProgram};
758    use serde_json::json;
759
760    // ── helpers ───────────────────────────────────────────────────
761
762    fn ir_channel(
763        name: &str,
764        message: &str,
765        qos: &str,
766        lifetime: &str,
767        persistence: &str,
768        shield: &str,
769    ) -> IRChannel {
770        IRChannel {
771            node_type: "IRChannel",
772            source_line: 0,
773            source_column: 0,
774            name: name.to_string(),
775            message: message.to_string(),
776            qos: qos.to_string(),
777            lifetime: lifetime.to_string(),
778            persistence: persistence.to_string(),
779            shield_ref: shield.to_string(),
780        }
781    }
782
783    fn handle(name: &str, message: &str) -> TypedChannelHandle {
784        TypedChannelHandle::new(name, message)
785    }
786
787    // ── HANDLE ────────────────────────────────────────────────────
788
789    #[test]
790    fn handle_defaults_match_d1() {
791        let h = handle("Orders", "Order");
792        assert_eq!(h.qos, "at_least_once");
793        assert_eq!(h.lifetime, "affine");
794        assert_eq!(h.persistence, "ephemeral");
795        assert_eq!(h.shield_ref, "");
796        assert_eq!(h.consumed_count, 0);
797    }
798
799    #[test]
800    fn handle_is_publishable_iff_shield() {
801        let mut h = handle("Orders", "Order");
802        assert!(!h.is_publishable());
803        h.shield_ref = "PublicBroker".into();
804        assert!(h.is_publishable());
805    }
806
807    #[test]
808    fn handle_carries_channel_second_order() {
809        let h = handle("Broker", "Channel<Order>");
810        assert!(h.carries_channel());
811        let h_first = handle("Orders", "Order");
812        assert!(!h_first.carries_channel());
813    }
814
815    #[test]
816    fn handle_inner_message_type_unwrap() {
817        let h_so = handle("Broker", "Channel<Order>");
818        assert_eq!(h_so.inner_message_type(), "Order");
819        let h_first = handle("Orders", "Order");
820        assert_eq!(h_first.inner_message_type(), "Order");
821        let h_third = handle("Outer", "Channel<Channel<Order>>");
822        assert_eq!(h_third.inner_message_type(), "Channel<Order>");
823    }
824
825    #[test]
826    fn handle_from_ir_round_trip() {
827        let ir = ir_channel(
828            "Orders",
829            "Order",
830            "exactly_once",
831            "linear",
832            "persistent",
833            "PublicBroker",
834        );
835        let h = TypedChannelHandle::from_ir(&ir);
836        assert_eq!(h.name, "Orders");
837        assert_eq!(h.message, "Order");
838        assert_eq!(h.qos, "exactly_once");
839        assert_eq!(h.lifetime, "linear");
840        assert_eq!(h.persistence, "persistent");
841        assert_eq!(h.shield_ref, "PublicBroker");
842        assert_eq!(h.consumed_count, 0);
843    }
844
845    // ── REGISTRY ──────────────────────────────────────────────────
846
847    #[test]
848    fn registry_register_and_get() {
849        let mut reg = TypedChannelRegistry::new();
850        reg.register(handle("Orders", "Order"));
851        assert!(reg.has("Orders"));
852        assert_eq!(reg.get("Orders").unwrap().message, "Order");
853    }
854
855    #[test]
856    fn registry_unknown_returns_error_with_registered() {
857        let mut reg = TypedChannelRegistry::new();
858        reg.register(handle("Orders", "Order"));
859        let err = reg.get("Missing").unwrap_err();
860        match err {
861            TypedChannelError::ChannelNotFound { name, registered } => {
862                assert_eq!(name, "Missing");
863                assert_eq!(registered, vec!["Orders".to_string()]);
864            }
865            other => panic!("expected ChannelNotFound, got {other:?}"),
866        }
867    }
868
869    #[test]
870    fn registry_overwrite_replaces() {
871        let mut reg = TypedChannelRegistry::new();
872        reg.register(handle("Orders", "Order"));
873        let mut h2 = handle("Orders", "OrderV2");
874        h2.qos = "exactly_once".into();
875        reg.register(h2);
876        let stored = reg.get("Orders").unwrap();
877        assert_eq!(stored.message, "OrderV2");
878        assert_eq!(stored.qos, "exactly_once");
879    }
880
881    #[test]
882    fn registry_names_sorted() {
883        let mut reg = TypedChannelRegistry::new();
884        reg.register(handle("ZetaOrders", "Order"));
885        reg.register(handle("Alpha", "Alpha"));
886        reg.register(handle("Mu", "Mu"));
887        assert_eq!(
888            reg.names(),
889            vec!["Alpha".to_string(), "Mu".to_string(), "ZetaOrders".to_string()]
890        );
891    }
892
893    #[test]
894    fn registry_register_from_ir_returns_handle() {
895        let mut reg = TypedChannelRegistry::new();
896        let ir = ir_channel(
897            "Orders",
898            "Order",
899            "at_least_once",
900            "affine",
901            "ephemeral",
902            "Σ",
903        );
904        let h = reg.register_from_ir(&ir);
905        assert_eq!(h.shield_ref, "Σ");
906        assert_eq!(reg.get("Orders").unwrap().shield_ref, "Σ");
907    }
908
909    // ── BUS BOOTSTRAP ─────────────────────────────────────────────
910
911    fn empty_ir_program() -> IRProgram {
912        IRProgram::new()
913    }
914
915    #[test]
916    fn bus_from_ir_program_registers_channels() {
917        let mut ir = empty_ir_program();
918        ir.channels.push(ir_channel(
919            "Orders",
920            "Order",
921            "at_least_once",
922            "affine",
923            "ephemeral",
924            "",
925        ));
926        ir.channels.push(ir_channel(
927            "Broker",
928            "Channel<Order>",
929            "exactly_once",
930            "affine",
931            "ephemeral",
932            "PublicBroker",
933        ));
934        let bus = TypedEventBus::from_ir_program(&ir);
935        let names = bus.channel_names();
936        assert_eq!(names, vec!["Broker".to_string(), "Orders".to_string()]);
937        assert!(bus.get_handle("Broker").unwrap().is_publishable());
938    }
939
940    #[test]
941    fn bus_default_compliance_is_permissive() {
942        // Default compliance always returns true; publish succeeds
943        // even when the underlying ESK metadata would have rejected.
944        let bus = TypedEventBus::new();
945        let mut h = handle("Orders", "Order");
946        h.shield_ref = "Σ".into();
947        bus.register(h);
948        let cap = futures_executor_block_on(bus.publish("Orders", "Σ")).unwrap();
949        assert_eq!(cap.channel_name, "Orders");
950    }
951
952    // Tiny block-on shim so non-async tests can trigger async paths.
953    fn futures_executor_block_on<F: std::future::Future>(f: F) -> F::Output {
954        let runtime = tokio::runtime::Builder::new_current_thread()
955            .enable_all()
956            .build()
957            .unwrap();
958        runtime.block_on(f)
959    }
960
961    // ── EMIT ──────────────────────────────────────────────────────
962
963    #[tokio::test]
964    async fn emit_scalar_round_trips() {
965        let bus = TypedEventBus::new();
966        bus.register(handle("Orders", "Order"));
967        bus.emit("Orders", TypedPayload::scalar(json!({"id": 1})))
968            .await
969            .unwrap();
970        let event = bus.receive("Orders").await.unwrap();
971        match event.payload {
972            TypedPayload::Scalar(v) => assert_eq!(v["id"], 1),
973            _ => panic!("expected scalar"),
974        }
975    }
976
977    #[tokio::test]
978    async fn emit_unknown_channel_errors() {
979        let bus = TypedEventBus::new();
980        let err = bus
981            .emit("Nope", TypedPayload::scalar(json!(null)))
982            .await
983            .unwrap_err();
984        assert!(matches!(err, TypedChannelError::ChannelNotFound { .. }));
985    }
986
987    #[tokio::test]
988    async fn emit_event_has_id_and_timestamp() {
989        let bus = TypedEventBus::new();
990        bus.register(handle("Orders", "Order"));
991        bus.emit("Orders", TypedPayload::scalar(json!(0)))
992            .await
993            .unwrap();
994        let e = bus.receive("Orders").await.unwrap();
995        assert!(!e.event_id.is_empty());
996        assert!(e.timestamp_secs > 0.0);
997    }
998
999    // ── EMIT MOBILITY (second-order) ──────────────────────────────
1000
1001    #[tokio::test]
1002    async fn emit_handle_through_second_order() {
1003        let bus = TypedEventBus::new();
1004        bus.register(handle("Orders", "Order"));
1005        bus.register(handle("Broker", "Channel<Order>"));
1006        let inner = bus.get_handle("Orders").unwrap();
1007        bus.emit("Broker", TypedPayload::handle(inner))
1008            .await
1009            .unwrap();
1010        let e = bus.receive("Broker").await.unwrap();
1011        match e.payload {
1012            TypedPayload::Handle(h) => assert_eq!(h.name, "Orders"),
1013            _ => panic!("expected handle"),
1014        }
1015    }
1016
1017    #[tokio::test]
1018    async fn emit_mobility_schema_mismatch_inner() {
1019        let bus = TypedEventBus::new();
1020        bus.register(handle("Orders", "Order"));
1021        bus.register(handle("Wrong", "Different"));
1022        bus.register(handle("Broker", "Channel<Order>"));
1023        let wrong = bus.get_handle("Wrong").unwrap();
1024        let err = bus
1025            .emit("Broker", TypedPayload::handle(wrong))
1026            .await
1027            .unwrap_err();
1028        match err {
1029            TypedChannelError::SchemaMismatch(msg) => {
1030                assert!(msg.contains("Channel<Order>"));
1031                assert!(msg.contains("Different"));
1032            }
1033            other => panic!("expected SchemaMismatch, got {other:?}"),
1034        }
1035    }
1036
1037    #[tokio::test]
1038    async fn emit_scalar_to_second_order_rejected() {
1039        let bus = TypedEventBus::new();
1040        bus.register(handle("Broker", "Channel<Order>"));
1041        let err = bus
1042            .emit("Broker", TypedPayload::scalar(json!("oops")))
1043            .await
1044            .unwrap_err();
1045        assert!(matches!(err, TypedChannelError::SchemaMismatch(_)));
1046    }
1047
1048    #[tokio::test]
1049    async fn emit_handle_to_first_order_rejected() {
1050        let bus = TypedEventBus::new();
1051        bus.register(handle("Orders", "Order"));
1052        bus.register(handle("FirstOrder", "Order"));
1053        let h = bus.get_handle("Orders").unwrap();
1054        let err = bus
1055            .emit("FirstOrder", TypedPayload::handle(h))
1056            .await
1057            .unwrap_err();
1058        assert!(matches!(err, TypedChannelError::SchemaMismatch(_)));
1059    }
1060
1061    // ── PUBLISH ──────────────────────────────────────────────────
1062
1063    fn publishable_handle(name: &str, message: &str, shield: &str) -> TypedChannelHandle {
1064        let mut h = handle(name, message);
1065        h.shield_ref = shield.into();
1066        h
1067    }
1068
1069    #[tokio::test]
1070    async fn publish_returns_capability() {
1071        let bus = TypedEventBus::new();
1072        bus.register(publishable_handle("Orders", "Order", "Σ"));
1073        let cap = bus.publish("Orders", "Σ").await.unwrap();
1074        assert_eq!(cap.channel_name, "Orders");
1075        assert_eq!(cap.shield_ref, "Σ");
1076        assert!(!cap.capability_id.is_empty());
1077        assert_eq!(bus.issued_capabilities(), 1);
1078    }
1079
1080    #[tokio::test]
1081    async fn publish_empty_shield_rejected() {
1082        let bus = TypedEventBus::new();
1083        bus.register(publishable_handle("Orders", "Order", "Σ"));
1084        let err = bus.publish("Orders", "").await.unwrap_err();
1085        assert!(matches!(err, TypedChannelError::CapabilityGate(_)));
1086    }
1087
1088    #[tokio::test]
1089    async fn publish_unpublishable_rejected() {
1090        let bus = TypedEventBus::new();
1091        // No shield_ref → not publishable.
1092        bus.register(handle("Orders", "Order"));
1093        let err = bus.publish("Orders", "Σ").await.unwrap_err();
1094        match err {
1095            TypedChannelError::CapabilityGate(msg) => {
1096                assert!(msg.contains("not publishable"));
1097            }
1098            other => panic!("expected CapabilityGate, got {other:?}"),
1099        }
1100    }
1101
1102    #[tokio::test]
1103    async fn publish_wrong_shield_rejected() {
1104        let bus = TypedEventBus::new();
1105        bus.register(publishable_handle("Orders", "Order", "Σ"));
1106        let err = bus.publish("Orders", "Other").await.unwrap_err();
1107        match err {
1108            TypedChannelError::CapabilityGate(msg) => {
1109                assert!(msg.contains("Σ"));
1110                assert!(msg.contains("Other"));
1111            }
1112            other => panic!("expected CapabilityGate, got {other:?}"),
1113        }
1114    }
1115
1116    #[tokio::test]
1117    async fn publish_unknown_channel_errors() {
1118        let bus = TypedEventBus::new();
1119        let err = bus.publish("Missing", "Σ").await.unwrap_err();
1120        assert!(matches!(err, TypedChannelError::ChannelNotFound { .. }));
1121    }
1122
1123    #[tokio::test]
1124    async fn publish_default_delta_pub_is_paper_lower_bound() {
1125        let bus = TypedEventBus::new();
1126        bus.register(publishable_handle("Orders", "Order", "Σ"));
1127        let cap = bus.publish("Orders", "Σ").await.unwrap();
1128        assert!((cap.delta_pub - 0.05).abs() < f64::EPSILON);
1129    }
1130
1131    #[tokio::test]
1132    async fn publish_compliance_predicate_can_veto() {
1133        let veto: ShieldComplianceFn = Arc::new(|_, _| false);
1134        let bus = TypedEventBus::with_compliance_check(veto);
1135        bus.register(publishable_handle("Orders", "Order", "Σ"));
1136        let err = bus.publish("Orders", "Σ").await.unwrap_err();
1137        match err {
1138            TypedChannelError::CapabilityGate(msg) => {
1139                assert!(msg.contains("does not cover compliance"));
1140            }
1141            other => panic!("expected CapabilityGate, got {other:?}"),
1142        }
1143    }
1144
1145    #[tokio::test]
1146    async fn publish_compliance_predicate_inspects_handle() {
1147        let inspected: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
1148        let captured = inspected.clone();
1149        let check: ShieldComplianceFn = Arc::new(move |shield, h| {
1150            captured.lock().unwrap().push(format!("{shield}/{}", h.name));
1151            true
1152        });
1153        let bus = TypedEventBus::with_compliance_check(check);
1154        bus.register(publishable_handle("Orders", "Order", "Σ"));
1155        bus.publish("Orders", "Σ").await.unwrap();
1156        let calls = inspected.lock().unwrap();
1157        assert_eq!(*calls, vec!["Σ/Orders".to_string()]);
1158    }
1159
1160    // ── DISCOVER ─────────────────────────────────────────────────
1161
1162    #[tokio::test]
1163    async fn discover_returns_handle_and_consumes_capability() {
1164        let bus = TypedEventBus::new();
1165        bus.register(publishable_handle("Orders", "Order", "Σ"));
1166        let cap = bus.publish("Orders", "Σ").await.unwrap();
1167        assert_eq!(bus.issued_capabilities(), 1);
1168        let found = bus.discover(&cap).await.unwrap();
1169        assert_eq!(found.name, "Orders");
1170        assert_eq!(bus.issued_capabilities(), 0);
1171        // Second discover with the same capability is rejected.
1172        let err = bus.discover(&cap).await.unwrap_err();
1173        assert!(matches!(err, TypedChannelError::CapabilityGate(_)));
1174    }
1175
1176    #[tokio::test]
1177    async fn discover_forged_capability_rejected() {
1178        let bus = TypedEventBus::new();
1179        bus.register(publishable_handle("Orders", "Order", "Σ"));
1180        let forged = Capability {
1181            capability_id: "forged".to_string(),
1182            channel_name: "Orders".to_string(),
1183            shield_ref: "Σ".to_string(),
1184            delta_pub: 0.05,
1185            issued_at: 0.0,
1186        };
1187        let err = bus.discover(&forged).await.unwrap_err();
1188        assert!(matches!(err, TypedChannelError::CapabilityGate(_)));
1189    }
1190
1191    #[tokio::test]
1192    async fn capability_from_other_bus_rejected() {
1193        let bus_a = TypedEventBus::new();
1194        let bus_b = TypedEventBus::new();
1195        bus_a.register(publishable_handle("Orders", "Order", "Σ"));
1196        bus_b.register(publishable_handle("Orders", "Order", "Σ"));
1197        let cap = bus_a.publish("Orders", "Σ").await.unwrap();
1198        let err = bus_b.discover(&cap).await.unwrap_err();
1199        assert!(matches!(err, TypedChannelError::CapabilityGate(_)));
1200    }
1201
1202    // ── QoS ──────────────────────────────────────────────────────
1203
1204    #[tokio::test]
1205    async fn qos_at_least_once_default_delivers() {
1206        let bus = TypedEventBus::new();
1207        bus.register(handle("Orders", "Order")); // default at_least_once
1208        bus.emit("Orders", TypedPayload::scalar(json!({"id": 1})))
1209            .await
1210            .unwrap();
1211        bus.emit("Orders", TypedPayload::scalar(json!({"id": 2})))
1212            .await
1213            .unwrap();
1214        let e1 = bus.receive("Orders").await.unwrap();
1215        let e2 = bus.receive("Orders").await.unwrap();
1216        match (&e1.payload, &e2.payload) {
1217            (TypedPayload::Scalar(v1), TypedPayload::Scalar(v2)) => {
1218                assert_eq!(v1["id"], 1);
1219                assert_eq!(v2["id"], 2);
1220            }
1221            _ => panic!("expected scalars"),
1222        }
1223    }
1224
1225    #[tokio::test]
1226    async fn qos_at_most_once_delivers_once_then_drops_silently() {
1227        let bus = TypedEventBus::new();
1228        let mut h = handle("Telemetry", "Tick");
1229        h.qos = "at_most_once".into();
1230        bus.register(h);
1231        bus.emit("Telemetry", TypedPayload::scalar(json!(1)))
1232            .await
1233            .unwrap();
1234        // Close transport so the second emit hits the silent-drop path.
1235        // Direct close: grab the transport and mark closed.
1236        let transport = bus.transport_for("Telemetry");
1237        transport.close();
1238        // Should NOT raise — at_most_once swallows transport errors.
1239        bus.emit("Telemetry", TypedPayload::scalar(json!(2)))
1240            .await
1241            .unwrap();
1242    }
1243
1244    #[tokio::test]
1245    async fn qos_exactly_once_dedups_event_ids() {
1246        let bus = TypedEventBus::new();
1247        let mut h = handle("EO", "Tick");
1248        h.qos = "exactly_once".into();
1249        bus.register(h.clone());
1250
1251        // First emit — assert event flows. Then synthesise an event
1252        // with the same event_id and dispatch directly to confirm the
1253        // dedup set short-circuits the second send.
1254        bus.emit("EO", TypedPayload::scalar(json!(1)))
1255            .await
1256            .unwrap();
1257        let _e1 = bus.receive("EO").await.unwrap();
1258
1259        let manual = TypedEvent {
1260            channel: "EO".to_string(),
1261            payload: TypedPayload::scalar(json!("dup")),
1262            event_id: "fixed-id".to_string(),
1263            timestamp_secs: now_secs(),
1264        };
1265        bus.dispatch(&h, manual.clone()).unwrap();
1266        // Second dispatch with the same event_id is dropped.
1267        bus.dispatch(&h, manual).unwrap();
1268        let received = bus.receive("EO").await.unwrap();
1269        assert_eq!(received.event_id, "fixed-id");
1270        // No more events queued.
1271        let try_more =
1272            tokio::time::timeout(std::time::Duration::from_millis(20), bus.receive("EO")).await;
1273        assert!(try_more.is_err(), "expected dedup to block second event");
1274    }
1275
1276    #[tokio::test]
1277    async fn qos_broadcast_fan_out_to_subscribers() {
1278        let bus = TypedEventBus::new();
1279        let mut h = handle("Bus", "Tick");
1280        h.qos = "broadcast".into();
1281        bus.register(h);
1282        let mut s1 = bus.subscribe_broadcast("Bus").unwrap();
1283        let mut s2 = bus.subscribe_broadcast("Bus").unwrap();
1284        bus.emit("Bus", TypedPayload::scalar(json!("hi")))
1285            .await
1286            .unwrap();
1287        let e1 = s1.recv().await.unwrap();
1288        let e2 = s2.recv().await.unwrap();
1289        assert_eq!(e1.event_id, e2.event_id);
1290    }
1291
1292    #[tokio::test]
1293    async fn qos_broadcast_subscribe_check_rejects_non_broadcast() {
1294        let bus = TypedEventBus::new();
1295        bus.register(handle("Plain", "X"));
1296        let err = bus.subscribe_broadcast("Plain").unwrap_err();
1297        assert!(matches!(err, TypedChannelError::SchemaMismatch(_)));
1298    }
1299
1300    #[tokio::test]
1301    async fn qos_broadcast_receive_rejection() {
1302        let bus = TypedEventBus::new();
1303        let mut h = handle("Bus", "Tick");
1304        h.qos = "broadcast".into();
1305        bus.register(h);
1306        let err = bus.receive("Bus").await.unwrap_err();
1307        assert!(matches!(err, TypedChannelError::SchemaMismatch(_)));
1308    }
1309
1310    #[tokio::test]
1311    async fn qos_queue_fifo_ordering() {
1312        let bus = TypedEventBus::new();
1313        let mut h = handle("Q", "Job");
1314        h.qos = "queue".into();
1315        bus.register(h);
1316        bus.emit("Q", TypedPayload::scalar(json!(1))).await.unwrap();
1317        bus.emit("Q", TypedPayload::scalar(json!(2))).await.unwrap();
1318        bus.emit("Q", TypedPayload::scalar(json!(3))).await.unwrap();
1319        let mut seen = vec![];
1320        for _ in 0..3 {
1321            let e = bus.receive("Q").await.unwrap();
1322            if let TypedPayload::Scalar(v) = e.payload {
1323                seen.push(v.as_i64().unwrap());
1324            }
1325        }
1326        assert_eq!(seen, vec![1, 2, 3]);
1327    }
1328
1329    // ── LIFETIME ─────────────────────────────────────────────────
1330
1331    #[tokio::test]
1332    async fn lifetime_affine_allows_multi_emit() {
1333        let bus = TypedEventBus::new();
1334        bus.register(handle("Orders", "Order")); // affine default
1335        for i in 0..3 {
1336            bus.emit("Orders", TypedPayload::scalar(json!(i)))
1337                .await
1338                .unwrap();
1339        }
1340        // affine has no upper bound on emits per handle (per-handle
1341        // tracking; per-binding deferred — parity with Python 13.d).
1342    }
1343
1344    #[tokio::test]
1345    async fn lifetime_linear_second_emit_violates() {
1346        let bus = TypedEventBus::new();
1347        let mut h = handle("Once", "Order");
1348        h.lifetime = "linear".into();
1349        bus.register(h);
1350        bus.emit("Once", TypedPayload::scalar(json!(0)))
1351            .await
1352            .unwrap();
1353        let err = bus
1354            .emit("Once", TypedPayload::scalar(json!(1)))
1355            .await
1356            .unwrap_err();
1357        match err {
1358            TypedChannelError::LifetimeViolation { name, count } => {
1359                assert_eq!(name, "Once");
1360                assert_eq!(count, 2);
1361            }
1362            other => panic!("expected LifetimeViolation, got {other:?}"),
1363        }
1364    }
1365
1366    #[tokio::test]
1367    async fn lifetime_persistent_unrestricted() {
1368        let bus = TypedEventBus::new();
1369        let mut h = handle("Ledger", "Entry");
1370        h.lifetime = "persistent".into();
1371        bus.register(h);
1372        for i in 0..16 {
1373            bus.emit("Ledger", TypedPayload::scalar(json!(i)))
1374                .await
1375                .unwrap();
1376        }
1377    }
1378
1379    // ── PAPER §9 END-TO-END ──────────────────────────────────────
1380
1381    #[tokio::test]
1382    async fn paper_section9_e2e_producer_publish_discover_receive() {
1383        // Models the worked example from paper_mobile_channels.md §9
1384        // (paper §9): typed producer emits an Order, then publishes
1385        // its OrdersCreated channel through a shield, then a separate
1386        // consumer discovers and receives.
1387        let bus = TypedEventBus::new();
1388        let mut orders = handle("OrdersCreated", "Order");
1389        orders.shield_ref = "PublicBroker".into();
1390        bus.register(orders);
1391
1392        // Producer emits an order.
1393        bus.emit(
1394            "OrdersCreated",
1395            TypedPayload::scalar(json!({"id": 42, "total": 19.99})),
1396        )
1397        .await
1398        .unwrap();
1399
1400        // Producer publishes the channel through the shield.
1401        let cap = bus
1402            .publish("OrdersCreated", "PublicBroker")
1403            .await
1404            .unwrap();
1405
1406        // Consumer discovers and receives the queued event.
1407        let handle = bus.discover(&cap).await.unwrap();
1408        assert_eq!(handle.name, "OrdersCreated");
1409        let event = bus.receive("OrdersCreated").await.unwrap();
1410        match event.payload {
1411            TypedPayload::Scalar(v) => {
1412                assert_eq!(v["id"], 42);
1413                assert_eq!(v["total"], 19.99);
1414            }
1415            _ => panic!("expected scalar Order payload"),
1416        }
1417    }
1418
1419    // ── ERROR DISPLAY ────────────────────────────────────────────
1420
1421    #[test]
1422    fn error_display_includes_useful_context() {
1423        let err = TypedChannelError::ChannelNotFound {
1424            name: "X".to_string(),
1425            registered: vec!["A".to_string(), "B".to_string()],
1426        };
1427        let s = format!("{err}");
1428        assert!(s.contains("'X'"));
1429        assert!(s.contains("[\"A\", \"B\"]"));
1430
1431        let err = TypedChannelError::LifetimeViolation {
1432            name: "Once".to_string(),
1433            count: 2,
1434        };
1435        let s = format!("{err}");
1436        assert!(s.contains("Once"));
1437        assert!(s.contains("2"));
1438    }
1439
1440    // ── EDGE CASES ───────────────────────────────────────────────
1441
1442    #[tokio::test]
1443    async fn capability_ids_are_unique() {
1444        let bus = TypedEventBus::new();
1445        bus.register(publishable_handle("Orders", "Order", "Σ"));
1446        let cap1 = bus.publish("Orders", "Σ").await.unwrap();
1447        let cap2 = bus.publish("Orders", "Σ").await.unwrap();
1448        assert_ne!(cap1.capability_id, cap2.capability_id);
1449        assert_eq!(bus.issued_capabilities(), 2);
1450    }
1451
1452    #[tokio::test]
1453    async fn close_all_drains_state() {
1454        let bus = TypedEventBus::new();
1455        bus.register(publishable_handle("Orders", "Order", "Σ"));
1456        let mut bcast = handle("Bus", "Tick");
1457        bcast.qos = "broadcast".into();
1458        bus.register(bcast);
1459        let _sub = bus.subscribe_broadcast("Bus").unwrap();
1460        let _cap = bus.publish("Orders", "Σ").await.unwrap();
1461        assert_eq!(bus.issued_capabilities(), 1);
1462
1463        bus.close_all();
1464
1465        assert_eq!(bus.issued_capabilities(), 0);
1466        // Broadcast subs cleared — next emit fans out to nobody.
1467        bus.emit("Bus", TypedPayload::scalar(json!("after-close")))
1468            .await
1469            .unwrap();
1470    }
1471}