Skip to main content

axon/runtime/channels/
executor.rs

1//! AXON Runtime — Rust executor integration for typed channels (Fase 13.l).
2//!
3//! The Python side (`axon/runtime/executor.py`) gained four dispatch
4//! branches in Fase 13.i + 13.j (`emit_apply` / `publish_apply` /
5//! `discover_apply` / `listen_apply`) so a flow's channel surface
6//! executes end-to-end on the Python interpreter. The Rust crate
7//! exposed `TypedEventBus` standalone in 13.f.2, but a Rust-native
8//! flow runner that orchestrates IR steps had no equivalent
9//! integration: a Rust adopter who wanted to drive an `IRProgram`
10//! through the runtime had to wire the bus, value-ref resolution,
11//! and capability/alias scope by hand.
12//!
13//! 13.l closes that. This module provides:
14//!
15//! - [`RunContext`]: mirror of Python's `ContextManager` for the
16//!   typed-channel concern. Holds the per-unit `TypedEventBus`,
17//!   `discovered_handles`, `capabilities`, and step results.
18//!   Implements `resolve_value_ref` with the same lookup order
19//!   (discovered handles ▶ variables ▶ step results) and
20//!   dotted-access walk over both serde JSON values and `String`
21//!   maps.
22//! - [`dispatch_emit`] / [`dispatch_publish`] / [`dispatch_discover`]
23//!   / [`dispatch_listen`]: async functions that consume an IR step
24//!   plus a `&RunContext` and route through `TypedEventBus`.
25//! - [`bootstrap_run_context`]: builds a `RunContext` from an
26//!   `IRProgram` (registers every `IRChannel` on a fresh
27//!   `TypedEventBus`).
28//!
29//! The dispatch surface is intentionally byte-identical (in semantics)
30//! to the Python handlers: same lookup precedence for value_ref, same
31//! one-shot capability consumption, same alias binding rules. Rust
32//! adopters who want a fully-orchestrated `axon run` Rust binary can
33//! compose these primitives directly; a future sub-phase wires them
34//! into `axon-rs/src/runner.rs::execute_real`.
35
36use std::collections::HashMap;
37use std::sync::{Arc, Mutex};
38
39use axon_frontend::ir_nodes::{IRDiscover, IREmit, IRListenStep, IRProgram, IRPublish};
40
41use super::typed::{
42    Capability, TypedChannelError, TypedChannelHandle, TypedEventBus, TypedPayload,
43};
44
45/// Errors surfaced by the `dispatch_*` functions. Each variant tags
46/// the channel-op kind so adopters can route failures back to the
47/// originating IR step. Mirrors the `channel_op:{op}` `details` tag
48/// the Python `AxonRuntimeError` carries.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum DispatchError {
51    /// `emit` could not resolve `value_ref`, or the bus refused the payload.
52    EmitFailure(String),
53    /// `publish` failed at the bus level (D8 gate, missing shield, etc.).
54    PublishFailure(String),
55    /// `discover` could not find a recorded capability or the bus refused.
56    DiscoverFailure(String),
57    /// `listen` could not subscribe / receive on the named channel.
58    ListenFailure(String),
59}
60
61impl std::fmt::Display for DispatchError {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            DispatchError::EmitFailure(m) => write!(f, "channel_op:emit — {m}"),
65            DispatchError::PublishFailure(m) => write!(f, "channel_op:publish — {m}"),
66            DispatchError::DiscoverFailure(m) => write!(f, "channel_op:discover — {m}"),
67            DispatchError::ListenFailure(m) => write!(f, "channel_op:listen — {m}"),
68        }
69    }
70}
71
72impl std::error::Error for DispatchError {}
73
74impl From<TypedChannelError> for DispatchError {
75    fn from(e: TypedChannelError) -> Self {
76        // Default to EmitFailure when callers convert raw errors;
77        // each dispatch_* function re-wraps with the right variant.
78        DispatchError::EmitFailure(e.to_string())
79    }
80}
81
82/// A value reachable by `value_ref` during channel-op dispatch. Rust
83/// adopters fill the run context with whatever shape their step
84/// outputs use; we only require the two access modes the dotted-access
85/// resolver needs (mapping access by string key + nested handle/JSON).
86#[derive(Debug, Clone)]
87pub enum RunValue {
88    /// Primitive / structured payload; `serde_json::Value` covers
89    /// scalars, arrays, and recursive objects so dotted access can
90    /// walk arbitrary JSON.
91    Json(serde_json::Value),
92    /// A live channel handle from the registry. Returned from
93    /// `discover` and from `listen` on a second-order channel.
94    Handle(TypedChannelHandle),
95}
96
97impl RunValue {
98    pub fn as_json(&self) -> Option<&serde_json::Value> {
99        match self {
100            RunValue::Json(v) => Some(v),
101            RunValue::Handle(_) => None,
102        }
103    }
104    pub fn as_handle(&self) -> Option<&TypedChannelHandle> {
105        match self {
106            RunValue::Handle(h) => Some(h),
107            RunValue::Json(_) => None,
108        }
109    }
110}
111
112/// Per-unit run context — the Rust mirror of Python's
113/// `ContextManager` for the typed-channel concern.
114///
115/// All mutable state lives behind `Mutex` so the dispatch functions
116/// can be called from a multi-threaded executor. `Arc<TypedEventBus>`
117/// because the bus itself is already internally synchronised.
118pub struct RunContext {
119    bus: Arc<TypedEventBus>,
120    /// Discovered handle alias scope (`discover X as alias`). Lookup
121    /// order #1 in `resolve_value_ref`.
122    discovered_handles: Mutex<HashMap<String, TypedChannelHandle>>,
123    /// Variable scope (flow params, listen alias for scalar payloads).
124    variables: Mutex<HashMap<String, RunValue>>,
125    /// Step results — set after each step completes. Used by `emit`
126    /// when the value_ref points to a previously-completed step.
127    step_results: Mutex<HashMap<String, RunValue>>,
128    /// Capability tokens that `publish` produced, keyed by the channel
129    /// name they expose. Consumed by `discover` (one-shot).
130    capabilities: Mutex<HashMap<String, Capability>>,
131}
132
133impl RunContext {
134    /// Wrap an existing bus in a fresh context.
135    pub fn new(bus: Arc<TypedEventBus>) -> Self {
136        RunContext {
137            bus,
138            discovered_handles: Mutex::new(HashMap::new()),
139            variables: Mutex::new(HashMap::new()),
140            step_results: Mutex::new(HashMap::new()),
141            capabilities: Mutex::new(HashMap::new()),
142        }
143    }
144
145    pub fn bus(&self) -> &TypedEventBus {
146        &self.bus
147    }
148
149    /// Bootstrap a context from an `IRProgram`. Every `IRChannel` is
150    /// registered on a fresh `TypedEventBus` whose registry is the
151    /// canonical source for typed handles during the run.
152    pub fn from_ir_program(ir: &IRProgram) -> Self {
153        let bus = Arc::new(TypedEventBus::from_ir_program(ir));
154        Self::new(bus)
155    }
156
157    pub fn set_variable(&self, name: impl Into<String>, value: RunValue) {
158        self.variables.lock().unwrap().insert(name.into(), value);
159    }
160
161    pub fn get_variable(&self, name: &str) -> Option<RunValue> {
162        self.variables.lock().unwrap().get(name).cloned()
163    }
164
165    pub fn set_step_result(&self, name: impl Into<String>, value: RunValue) {
166        self.step_results.lock().unwrap().insert(name.into(), value);
167    }
168
169    pub fn get_step_result(&self, name: &str) -> Option<RunValue> {
170        self.step_results.lock().unwrap().get(name).cloned()
171    }
172
173    pub fn bind_discovered_handle(
174        &self, alias: impl Into<String>, handle: TypedChannelHandle,
175    ) {
176        self.discovered_handles
177            .lock()
178            .unwrap()
179            .insert(alias.into(), handle);
180    }
181
182    pub fn discovered_handles_snapshot(&self) -> HashMap<String, TypedChannelHandle> {
183        self.discovered_handles.lock().unwrap().clone()
184    }
185
186    pub fn record_capability(&self, channel: impl Into<String>, cap: Capability) {
187        self.capabilities.lock().unwrap().insert(channel.into(), cap);
188    }
189
190    pub fn take_capability(&self, channel: &str) -> Option<Capability> {
191        self.capabilities.lock().unwrap().remove(channel)
192    }
193
194    /// Resolve an `emit` value_ref against the live state.
195    ///
196    /// Lookup order #1 → #3, walking nested segments after dots:
197    ///   1. `discovered_handles[head]`
198    ///   2. `variables[head]`
199    ///   3. `step_results[head]`
200    ///
201    /// On dotted paths, after the head the remaining segments walk
202    /// either:
203    ///   - JSON object access (`Json`),
204    ///   - struct-like field access on the handle struct (very few
205    ///     fields; we expose the same surface the Python `getattr`
206    ///     reaches on a `TypedChannelHandle` — `name`, `message`,
207    ///     `qos`, `lifetime`, `persistence`, `shield_ref`).
208    pub fn resolve_value_ref(&self, value_ref: &str) -> Result<RunValue, DispatchError> {
209        if value_ref.is_empty() {
210            return Err(DispatchError::EmitFailure(
211                "value_ref is empty".to_string(),
212            ));
213        }
214        let mut segments = value_ref.split('.');
215        let head = segments.next().expect("at least one segment by split");
216        // Resolve the head against the three scopes in priority order
217        // (discovered handles ▶ variables ▶ step results). Each scope
218        // is acquired in its own block so the MutexGuard drops before
219        // the next acquire — temporary `if let Some(_) = lock().get(_)`
220        // patterns previously kept guards alive across the whole
221        // if-else chain, which deadlocked the error-path acquire of
222        // the same locks. Fixed (Fase 13.l).
223        let from_handles = {
224            let dh = self.discovered_handles.lock().unwrap();
225            dh.get(head).cloned()
226        };
227        let from_vars = if from_handles.is_none() {
228            let vars = self.variables.lock().unwrap();
229            vars.get(head).cloned()
230        } else {
231            None
232        };
233        let from_steps = if from_handles.is_none() && from_vars.is_none() {
234            let steps = self.step_results.lock().unwrap();
235            steps.get(head).cloned()
236        } else {
237            None
238        };
239        let mut current = if let Some(h) = from_handles {
240            RunValue::Handle(h)
241        } else if let Some(v) = from_vars {
242            v
243        } else if let Some(v) = from_steps {
244            v
245        } else {
246            // Snapshot every scope's keys for the error message — each
247            // acquire is its own block so the guards drop before the
248            // String formatting completes.
249            let vars: Vec<String> = self.variables.lock().unwrap().keys().cloned().collect();
250            let steps: Vec<String> = self.step_results.lock().unwrap().keys().cloned().collect();
251            let dh: Vec<String> = self.discovered_handles.lock().unwrap().keys().cloned().collect();
252            return Err(DispatchError::EmitFailure(format!(
253                "value_ref '{value_ref}' — head segment '{head}' is not a \
254                 variable, step result, or discovered handle. \
255                 Variables: {vars:?}; Step results: {steps:?}; \
256                 Discovered handles: {dh:?}",
257            )));
258        };
259
260        for seg in segments {
261            current = walk_one_segment(&current, seg, value_ref)?;
262        }
263        Ok(current)
264    }
265}
266
267fn walk_one_segment(
268    current: &RunValue, seg: &str, full_ref: &str,
269) -> Result<RunValue, DispatchError> {
270    match current {
271        RunValue::Json(v) => match v {
272            serde_json::Value::Object(map) => map.get(seg).cloned().map(RunValue::Json).ok_or_else(
273                || DispatchError::EmitFailure(format!(
274                    "value_ref '{full_ref}' — key '{seg}' missing on object value",
275                )),
276            ),
277            _ => Err(DispatchError::EmitFailure(format!(
278                "value_ref '{full_ref}' — cannot walk '{seg}' on JSON value of type {}",
279                json_type_name(v),
280            ))),
281        },
282        RunValue::Handle(h) => match seg {
283            "name" => Ok(RunValue::Json(serde_json::Value::String(h.name.clone()))),
284            "message" => Ok(RunValue::Json(serde_json::Value::String(h.message.clone()))),
285            "qos" => Ok(RunValue::Json(serde_json::Value::String(h.qos.clone()))),
286            "lifetime" => Ok(RunValue::Json(serde_json::Value::String(h.lifetime.clone()))),
287            "persistence" => Ok(RunValue::Json(serde_json::Value::String(h.persistence.clone()))),
288            "shield_ref" => Ok(RunValue::Json(serde_json::Value::String(h.shield_ref.clone()))),
289            other => Err(DispatchError::EmitFailure(format!(
290                "value_ref '{full_ref}' — handle has no field '{other}'. \
291                 Allowed: name, message, qos, lifetime, persistence, shield_ref",
292            ))),
293        },
294    }
295}
296
297fn json_type_name(v: &serde_json::Value) -> &'static str {
298    match v {
299        serde_json::Value::Null => "null",
300        serde_json::Value::Bool(_) => "bool",
301        serde_json::Value::Number(_) => "number",
302        serde_json::Value::String(_) => "string",
303        serde_json::Value::Array(_) => "array",
304        serde_json::Value::Object(_) => "object",
305    }
306}
307
308// ─── DISPATCH FUNCTIONS ───────────────────────────────────────────────
309
310/// Execute an `IREmit` step against the run context.
311///
312/// Resolves `value_ref` per the dotted-access rules, then routes to
313/// `bus.emit` either as scalar (`TypedPayload::Scalar`) or as
314/// mobility (`TypedPayload::Handle`) per the IR's `value_is_channel`
315/// flag.
316pub async fn dispatch_emit(
317    ir: &IREmit, ctx: &RunContext,
318) -> Result<(), DispatchError> {
319    if ir.value_is_channel {
320        // Resolve the handle. First check discovered_handles by name;
321        // fall back to the bus registry via get_handle (canonical
322        // declared channel).
323        let handle = if let Some(h) = ctx
324            .discovered_handles
325            .lock()
326            .unwrap()
327            .get(&ir.value_ref)
328            .cloned()
329        {
330            h
331        } else {
332            match ctx.bus.get_handle(&ir.value_ref) {
333                Ok(h) => h,
334                Err(_) => {
335                    return Err(DispatchError::EmitFailure(format!(
336                        "emit on '{}' carries a channel handle but '{}' is not in scope \
337                         (no discovered alias, no declared channel)",
338                        ir.channel_ref, ir.value_ref,
339                    )));
340                }
341            }
342        };
343        ctx.bus
344            .emit(&ir.channel_ref, TypedPayload::Handle(handle))
345            .await
346            .map_err(|e| DispatchError::EmitFailure(e.to_string()))?;
347        return Ok(());
348    }
349    // Scalar path — resolve via dotted-access rules.
350    let value = ctx.resolve_value_ref(&ir.value_ref)?;
351    match value {
352        RunValue::Json(j) => ctx
353            .bus
354            .emit(&ir.channel_ref, TypedPayload::Scalar(j))
355            .await
356            .map_err(|e| DispatchError::EmitFailure(e.to_string())),
357        RunValue::Handle(h) => Err(DispatchError::EmitFailure(format!(
358            "emit on '{}' is scalar (value_is_channel=false) but value_ref '{}' \
359             resolved to a TypedChannelHandle for '{}' — set value_is_channel=true \
360             at IR-generation time for mobility",
361            ir.channel_ref, ir.value_ref, h.name,
362        ))),
363    }
364}
365
366/// Execute an `IRPublish` step. Records the returned `Capability` in
367/// the context keyed by channel name so a later `IRDiscover` consumes it.
368pub async fn dispatch_publish(
369    ir: &IRPublish, ctx: &RunContext,
370) -> Result<Capability, DispatchError> {
371    let cap = ctx
372        .bus
373        .publish(&ir.channel_ref, &ir.shield_ref)
374        .await
375        .map_err(|e| DispatchError::PublishFailure(e.to_string()))?;
376    ctx.record_capability(ir.channel_ref.clone(), cap.clone());
377    Ok(cap)
378}
379
380/// Execute an `IRDiscover` step. Pops the capability the matching
381/// `publish` recorded earlier in the unit, hands it to `bus.discover`,
382/// and binds the resulting handle under `alias` in the discovered-
383/// handles scope so subsequent emits / value_refs resolve it.
384pub async fn dispatch_discover(
385    ir: &IRDiscover, ctx: &RunContext,
386) -> Result<TypedChannelHandle, DispatchError> {
387    let cap = ctx.take_capability(&ir.capability_ref).ok_or_else(|| {
388        DispatchError::DiscoverFailure(format!(
389            "no capability recorded for channel '{}'. Did a `publish {} within …` \
390             step run earlier in this unit?",
391            ir.capability_ref, ir.capability_ref,
392        ))
393    })?;
394    let handle = ctx
395        .bus
396        .discover(&cap)
397        .await
398        .map_err(|e| DispatchError::DiscoverFailure(e.to_string()))?;
399    ctx.bind_discovered_handle(ir.alias.clone(), handle.clone());
400    Ok(handle)
401}
402
403/// Execute an `IRListenStep` step (free-standing in flow body — single-
404/// event receive). Subscribes to the channel, awaits one event, binds
405/// the payload under `event_alias` in the right scope (discovered_
406/// handles for mobility, variables for scalar), and returns the
407/// payload so the caller can iterate `ir.children` (left to the
408/// outer orchestrator since IRListenStep.children is currently typed as
409/// `Vec<IRFlowNode>` and dispatch of arbitrary flow steps is the
410/// orchestrator's job, not this module's).
411pub async fn dispatch_listen(
412    ir: &IRListenStep, ctx: &RunContext,
413) -> Result<RunValue, DispatchError> {
414    if !ir.channel_is_ref {
415        // Legacy string-topic path. The Rust runtime bus only knows
416        // typed channels in 13.l; the legacy path is supported in
417        // Python via the broadcast EventBus but the Rust runtime
418        // doesn't yet expose that surface. Surface a clear error
419        // rather than misroute.
420        return Err(DispatchError::ListenFailure(format!(
421            "listen on legacy string-topic '{}' is not supported by the Rust \
422             runtime in 13.l — use a typed `channel` declaration (D4 canonical \
423             form) or the Python interpreter for D4 dual-mode programs",
424            ir.channel,
425        )));
426    }
427    let event = ctx
428        .bus
429        .receive(&ir.channel)
430        .await
431        .map_err(|e| DispatchError::ListenFailure(e.to_string()))?;
432    let bound = match event.payload {
433        TypedPayload::Handle(h) => {
434            ctx.bind_discovered_handle(ir.event_alias.clone(), h.clone());
435            RunValue::Handle(h)
436        }
437        TypedPayload::Scalar(j) => {
438            let v = RunValue::Json(j);
439            ctx.set_variable(ir.event_alias.clone(), v.clone());
440            v
441        }
442    };
443    Ok(bound)
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449    use axon_frontend::ir_nodes::{IRChannel, IRDiscover, IREmit, IRListenStep, IRPublish};
450
451    fn ir_channel(name: &str, message: &str, shield: &str) -> IRChannel {
452        IRChannel {
453            node_type: "IRChannel",
454            source_line: 0,
455            source_column: 0,
456            name: name.to_string(),
457            message: message.to_string(),
458            qos: "at_least_once".to_string(),
459            lifetime: "affine".to_string(),
460            persistence: "ephemeral".to_string(),
461            shield_ref: shield.to_string(),
462        }
463    }
464
465    fn make_ctx(channels: Vec<IRChannel>) -> RunContext {
466        let bus = Arc::new(TypedEventBus::new());
467        for ch in &channels {
468            bus.register_from_ir(ch);
469        }
470        RunContext::new(bus)
471    }
472
473    fn block_on<F: std::future::Future>(f: F) -> F::Output {
474        let rt = tokio::runtime::Builder::new_current_thread()
475            .enable_all().build().unwrap();
476        rt.block_on(f)
477    }
478
479    // ── resolve_value_ref ───────────────────────────────────────────
480
481    #[test]
482    fn resolve_bare_identifier_step_result() {
483        let ctx = make_ctx(vec![]);
484        ctx.set_step_result("Build", RunValue::Json(serde_json::json!({"output": "x"})));
485        let v = ctx.resolve_value_ref("Build").unwrap();
486        assert!(matches!(v, RunValue::Json(_)));
487    }
488
489    #[test]
490    fn resolve_dotted_walk_json_object() {
491        let ctx = make_ctx(vec![]);
492        ctx.set_step_result("Build", RunValue::Json(serde_json::json!({
493            "output": {"value": 42}
494        })));
495        let v = ctx.resolve_value_ref("Build.output.value").unwrap();
496        match v {
497            RunValue::Json(serde_json::Value::Number(n)) => {
498                assert_eq!(n.as_i64(), Some(42));
499            }
500            other => panic!("expected number, got {other:?}"),
501        }
502    }
503
504    #[test]
505    fn resolve_handle_field_access() {
506        let ctx = make_ctx(vec![ir_channel("Inner", "Bytes", "")]);
507        let h = ctx.bus.get_handle("Inner").unwrap();
508        ctx.bind_discovered_handle("alias", h);
509        let v = ctx.resolve_value_ref("alias.message").unwrap();
510        match v {
511            RunValue::Json(serde_json::Value::String(s)) => assert_eq!(s, "Bytes"),
512            other => panic!("expected string, got {other:?}"),
513        }
514    }
515
516    #[test]
517    fn resolve_unknown_head_lists_candidates() {
518        let ctx = make_ctx(vec![]);
519        ctx.set_step_result("Build", RunValue::Json(serde_json::json!({})));
520        ctx.set_variable("v", RunValue::Json(serde_json::json!(0)));
521        let err = ctx.resolve_value_ref("Missing.field").unwrap_err();
522        let s = err.to_string();
523        assert!(s.contains("Build") && s.contains('v'),
524            "candidates list missing: {s}");
525    }
526
527    #[test]
528    fn resolve_discovered_handle_shadows_variable() {
529        let ctx = make_ctx(vec![ir_channel("Real", "Bytes", "")]);
530        ctx.set_variable("alias", RunValue::Json(serde_json::json!("shadowed")));
531        let h = ctx.bus.get_handle("Real").unwrap();
532        ctx.bind_discovered_handle("alias", h);
533        let v = ctx.resolve_value_ref("alias").unwrap();
534        assert!(matches!(v, RunValue::Handle(_)));
535    }
536
537    // ── dispatch_emit ──────────────────────────────────────────────
538
539    #[test]
540    fn emit_scalar_dispatches_through_bus() {
541        let ctx = make_ctx(vec![ir_channel("Orders", "Bytes", "")]);
542        ctx.set_step_result(
543            "Build",
544            RunValue::Json(serde_json::json!({"output": {"id": 7}})),
545        );
546        let ir = IREmit {
547            node_type: "IREmit", source_line: 0, source_column: 0,
548            channel_ref: "Orders".to_string(),
549            value_ref: "Build.output".to_string(),
550            value_is_channel: false,
551        };
552        block_on(dispatch_emit(&ir, &ctx)).unwrap();
553        let event = block_on(ctx.bus.receive("Orders")).unwrap();
554        match event.payload {
555            TypedPayload::Scalar(v) => assert_eq!(v["id"], 7),
556            other => panic!("expected scalar, got {other:?}"),
557        }
558    }
559
560    #[test]
561    fn emit_unknown_value_ref_yields_dispatch_error() {
562        let ctx = make_ctx(vec![ir_channel("Orders", "Bytes", "")]);
563        let ir = IREmit {
564            node_type: "IREmit", source_line: 0, source_column: 0,
565            channel_ref: "Orders".to_string(),
566            value_ref: "Missing".to_string(),
567            value_is_channel: false,
568        };
569        let err = block_on(dispatch_emit(&ir, &ctx)).unwrap_err();
570        assert!(matches!(err, DispatchError::EmitFailure(_)));
571    }
572
573    // ── publish + discover ─────────────────────────────────────────
574
575    #[test]
576    fn publish_records_capability_and_discover_consumes_it() {
577        let ctx = make_ctx(vec![ir_channel("Topic", "Bytes", "Gate")]);
578        let pub_ir = IRPublish {
579            node_type: "IRPublish", source_line: 0, source_column: 0,
580            channel_ref: "Topic".to_string(),
581            shield_ref: "Gate".to_string(),
582        };
583        let cap = block_on(dispatch_publish(&pub_ir, &ctx)).unwrap();
584        assert_eq!(cap.channel_name, "Topic");
585        // ctx has a capability for Topic.
586        let disc_ir = IRDiscover {
587            node_type: "IRDiscover", source_line: 0, source_column: 0,
588            capability_ref: "Topic".to_string(),
589            alias: "live".to_string(),
590        };
591        let h = block_on(dispatch_discover(&disc_ir, &ctx)).unwrap();
592        assert_eq!(h.name, "Topic");
593        // Alias is now bound.
594        assert!(ctx.discovered_handles_snapshot().contains_key("live"));
595    }
596
597    #[test]
598    fn discover_without_publish_yields_dispatch_error() {
599        let ctx = make_ctx(vec![ir_channel("Topic", "Bytes", "Gate")]);
600        let disc_ir = IRDiscover {
601            node_type: "IRDiscover", source_line: 0, source_column: 0,
602            capability_ref: "Topic".to_string(),
603            alias: "x".to_string(),
604        };
605        let err = block_on(dispatch_discover(&disc_ir, &ctx)).unwrap_err();
606        assert!(matches!(err, DispatchError::DiscoverFailure(_)));
607    }
608
609    #[test]
610    fn publish_unpublishable_channel_surfaces_failure() {
611        let ctx = make_ctx(vec![ir_channel("Topic", "Bytes", "")]); // no shield
612        let ir = IRPublish {
613            node_type: "IRPublish", source_line: 0, source_column: 0,
614            channel_ref: "Topic".to_string(),
615            shield_ref: "Gate".to_string(),
616        };
617        let err = block_on(dispatch_publish(&ir, &ctx)).unwrap_err();
618        assert!(matches!(err, DispatchError::PublishFailure(_)));
619    }
620
621    // ── listen ──────────────────────────────────────────────────────
622
623    #[test]
624    fn listen_typed_receives_scalar_and_binds_variable() {
625        let ctx = make_ctx(vec![ir_channel("Orders", "Bytes", "")]);
626        // Pre-seed an event so receive resolves immediately.
627        block_on(ctx.bus.emit(
628            "Orders", TypedPayload::Scalar(serde_json::json!({"id": 9})),
629        )).unwrap();
630        let ir = IRListenStep {
631            node_type: "IRListenStep", source_line: 0, source_column: 0,
632            channel: "Orders".to_string(),
633            channel_is_ref: true,
634            event_alias: "ev".to_string(),
635        };
636        let v = block_on(dispatch_listen(&ir, &ctx)).unwrap();
637        assert!(matches!(v, RunValue::Json(_)));
638        // Alias landed in variables (scalar payload).
639        assert!(ctx.get_variable("ev").is_some());
640    }
641
642    #[test]
643    fn listen_legacy_string_topic_rejected_with_clear_message() {
644        let ctx = make_ctx(vec![]);
645        let ir = IRListenStep {
646            node_type: "IRListenStep", source_line: 0, source_column: 0,
647            channel: "orders".to_string(),
648            channel_is_ref: false,
649            event_alias: "ev".to_string(),
650        };
651        let err = block_on(dispatch_listen(&ir, &ctx)).unwrap_err();
652        let msg = err.to_string();
653        assert!(matches!(err, DispatchError::ListenFailure(_)));
654        assert!(msg.contains("legacy string-topic"));
655    }
656
657    // ── from_ir_program ──────────────────────────────────────────────
658
659    #[test]
660    fn from_ir_program_registers_all_channels() {
661        let mut ir = IRProgram::new();
662        ir.channels.push(ir_channel("A", "Bytes", ""));
663        ir.channels.push(ir_channel("B", "Channel<Bytes>", "Gate"));
664        let ctx = RunContext::from_ir_program(&ir);
665        let names = ctx.bus.channel_names();
666        assert_eq!(names, vec!["A".to_string(), "B".to_string()]);
667    }
668}