1use std::collections::HashMap;
37use std::sync::{Arc, Mutex};
38
39use axon_frontend::ir_nodes::{IRDiscover, IREmit, IRListenStep, IRProgram, IRPublish};
40
41use super::typed::{
42 Capability, TypedChannelError, TypedChannelHandle, TypedEventBus, TypedPayload,
43};
44
45#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum DispatchError {
51 EmitFailure(String),
53 PublishFailure(String),
55 DiscoverFailure(String),
57 ListenFailure(String),
59}
60
61impl std::fmt::Display for DispatchError {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 DispatchError::EmitFailure(m) => write!(f, "channel_op:emit — {m}"),
65 DispatchError::PublishFailure(m) => write!(f, "channel_op:publish — {m}"),
66 DispatchError::DiscoverFailure(m) => write!(f, "channel_op:discover — {m}"),
67 DispatchError::ListenFailure(m) => write!(f, "channel_op:listen — {m}"),
68 }
69 }
70}
71
72impl std::error::Error for DispatchError {}
73
74impl From<TypedChannelError> for DispatchError {
75 fn from(e: TypedChannelError) -> Self {
76 DispatchError::EmitFailure(e.to_string())
79 }
80}
81
82#[derive(Debug, Clone)]
87pub enum RunValue {
88 Json(serde_json::Value),
92 Handle(TypedChannelHandle),
95}
96
97impl RunValue {
98 pub fn as_json(&self) -> Option<&serde_json::Value> {
99 match self {
100 RunValue::Json(v) => Some(v),
101 RunValue::Handle(_) => None,
102 }
103 }
104 pub fn as_handle(&self) -> Option<&TypedChannelHandle> {
105 match self {
106 RunValue::Handle(h) => Some(h),
107 RunValue::Json(_) => None,
108 }
109 }
110}
111
112pub struct RunContext {
119 bus: Arc<TypedEventBus>,
120 discovered_handles: Mutex<HashMap<String, TypedChannelHandle>>,
123 variables: Mutex<HashMap<String, RunValue>>,
125 step_results: Mutex<HashMap<String, RunValue>>,
128 capabilities: Mutex<HashMap<String, Capability>>,
131}
132
133impl RunContext {
134 pub fn new(bus: Arc<TypedEventBus>) -> Self {
136 RunContext {
137 bus,
138 discovered_handles: Mutex::new(HashMap::new()),
139 variables: Mutex::new(HashMap::new()),
140 step_results: Mutex::new(HashMap::new()),
141 capabilities: Mutex::new(HashMap::new()),
142 }
143 }
144
145 pub fn bus(&self) -> &TypedEventBus {
146 &self.bus
147 }
148
149 pub fn from_ir_program(ir: &IRProgram) -> Self {
153 let bus = Arc::new(TypedEventBus::from_ir_program(ir));
154 Self::new(bus)
155 }
156
157 pub fn set_variable(&self, name: impl Into<String>, value: RunValue) {
158 self.variables.lock().unwrap().insert(name.into(), value);
159 }
160
161 pub fn get_variable(&self, name: &str) -> Option<RunValue> {
162 self.variables.lock().unwrap().get(name).cloned()
163 }
164
165 pub fn set_step_result(&self, name: impl Into<String>, value: RunValue) {
166 self.step_results.lock().unwrap().insert(name.into(), value);
167 }
168
169 pub fn get_step_result(&self, name: &str) -> Option<RunValue> {
170 self.step_results.lock().unwrap().get(name).cloned()
171 }
172
173 pub fn bind_discovered_handle(
174 &self, alias: impl Into<String>, handle: TypedChannelHandle,
175 ) {
176 self.discovered_handles
177 .lock()
178 .unwrap()
179 .insert(alias.into(), handle);
180 }
181
182 pub fn discovered_handles_snapshot(&self) -> HashMap<String, TypedChannelHandle> {
183 self.discovered_handles.lock().unwrap().clone()
184 }
185
186 pub fn record_capability(&self, channel: impl Into<String>, cap: Capability) {
187 self.capabilities.lock().unwrap().insert(channel.into(), cap);
188 }
189
190 pub fn take_capability(&self, channel: &str) -> Option<Capability> {
191 self.capabilities.lock().unwrap().remove(channel)
192 }
193
194 pub fn resolve_value_ref(&self, value_ref: &str) -> Result<RunValue, DispatchError> {
209 if value_ref.is_empty() {
210 return Err(DispatchError::EmitFailure(
211 "value_ref is empty".to_string(),
212 ));
213 }
214 let mut segments = value_ref.split('.');
215 let head = segments.next().expect("at least one segment by split");
216 let from_handles = {
224 let dh = self.discovered_handles.lock().unwrap();
225 dh.get(head).cloned()
226 };
227 let from_vars = if from_handles.is_none() {
228 let vars = self.variables.lock().unwrap();
229 vars.get(head).cloned()
230 } else {
231 None
232 };
233 let from_steps = if from_handles.is_none() && from_vars.is_none() {
234 let steps = self.step_results.lock().unwrap();
235 steps.get(head).cloned()
236 } else {
237 None
238 };
239 let mut current = if let Some(h) = from_handles {
240 RunValue::Handle(h)
241 } else if let Some(v) = from_vars {
242 v
243 } else if let Some(v) = from_steps {
244 v
245 } else {
246 let vars: Vec<String> = self.variables.lock().unwrap().keys().cloned().collect();
250 let steps: Vec<String> = self.step_results.lock().unwrap().keys().cloned().collect();
251 let dh: Vec<String> = self.discovered_handles.lock().unwrap().keys().cloned().collect();
252 return Err(DispatchError::EmitFailure(format!(
253 "value_ref '{value_ref}' — head segment '{head}' is not a \
254 variable, step result, or discovered handle. \
255 Variables: {vars:?}; Step results: {steps:?}; \
256 Discovered handles: {dh:?}",
257 )));
258 };
259
260 for seg in segments {
261 current = walk_one_segment(¤t, seg, value_ref)?;
262 }
263 Ok(current)
264 }
265}
266
267fn walk_one_segment(
268 current: &RunValue, seg: &str, full_ref: &str,
269) -> Result<RunValue, DispatchError> {
270 match current {
271 RunValue::Json(v) => match v {
272 serde_json::Value::Object(map) => map.get(seg).cloned().map(RunValue::Json).ok_or_else(
273 || DispatchError::EmitFailure(format!(
274 "value_ref '{full_ref}' — key '{seg}' missing on object value",
275 )),
276 ),
277 _ => Err(DispatchError::EmitFailure(format!(
278 "value_ref '{full_ref}' — cannot walk '{seg}' on JSON value of type {}",
279 json_type_name(v),
280 ))),
281 },
282 RunValue::Handle(h) => match seg {
283 "name" => Ok(RunValue::Json(serde_json::Value::String(h.name.clone()))),
284 "message" => Ok(RunValue::Json(serde_json::Value::String(h.message.clone()))),
285 "qos" => Ok(RunValue::Json(serde_json::Value::String(h.qos.clone()))),
286 "lifetime" => Ok(RunValue::Json(serde_json::Value::String(h.lifetime.clone()))),
287 "persistence" => Ok(RunValue::Json(serde_json::Value::String(h.persistence.clone()))),
288 "shield_ref" => Ok(RunValue::Json(serde_json::Value::String(h.shield_ref.clone()))),
289 other => Err(DispatchError::EmitFailure(format!(
290 "value_ref '{full_ref}' — handle has no field '{other}'. \
291 Allowed: name, message, qos, lifetime, persistence, shield_ref",
292 ))),
293 },
294 }
295}
296
297fn json_type_name(v: &serde_json::Value) -> &'static str {
298 match v {
299 serde_json::Value::Null => "null",
300 serde_json::Value::Bool(_) => "bool",
301 serde_json::Value::Number(_) => "number",
302 serde_json::Value::String(_) => "string",
303 serde_json::Value::Array(_) => "array",
304 serde_json::Value::Object(_) => "object",
305 }
306}
307
308pub async fn dispatch_emit(
317 ir: &IREmit, ctx: &RunContext,
318) -> Result<(), DispatchError> {
319 if ir.value_is_channel {
320 let handle = if let Some(h) = ctx
324 .discovered_handles
325 .lock()
326 .unwrap()
327 .get(&ir.value_ref)
328 .cloned()
329 {
330 h
331 } else {
332 match ctx.bus.get_handle(&ir.value_ref) {
333 Ok(h) => h,
334 Err(_) => {
335 return Err(DispatchError::EmitFailure(format!(
336 "emit on '{}' carries a channel handle but '{}' is not in scope \
337 (no discovered alias, no declared channel)",
338 ir.channel_ref, ir.value_ref,
339 )));
340 }
341 }
342 };
343 ctx.bus
344 .emit(&ir.channel_ref, TypedPayload::Handle(handle))
345 .await
346 .map_err(|e| DispatchError::EmitFailure(e.to_string()))?;
347 return Ok(());
348 }
349 let value = ctx.resolve_value_ref(&ir.value_ref)?;
351 match value {
352 RunValue::Json(j) => ctx
353 .bus
354 .emit(&ir.channel_ref, TypedPayload::Scalar(j))
355 .await
356 .map_err(|e| DispatchError::EmitFailure(e.to_string())),
357 RunValue::Handle(h) => Err(DispatchError::EmitFailure(format!(
358 "emit on '{}' is scalar (value_is_channel=false) but value_ref '{}' \
359 resolved to a TypedChannelHandle for '{}' — set value_is_channel=true \
360 at IR-generation time for mobility",
361 ir.channel_ref, ir.value_ref, h.name,
362 ))),
363 }
364}
365
366pub async fn dispatch_publish(
369 ir: &IRPublish, ctx: &RunContext,
370) -> Result<Capability, DispatchError> {
371 let cap = ctx
372 .bus
373 .publish(&ir.channel_ref, &ir.shield_ref)
374 .await
375 .map_err(|e| DispatchError::PublishFailure(e.to_string()))?;
376 ctx.record_capability(ir.channel_ref.clone(), cap.clone());
377 Ok(cap)
378}
379
380pub async fn dispatch_discover(
385 ir: &IRDiscover, ctx: &RunContext,
386) -> Result<TypedChannelHandle, DispatchError> {
387 let cap = ctx.take_capability(&ir.capability_ref).ok_or_else(|| {
388 DispatchError::DiscoverFailure(format!(
389 "no capability recorded for channel '{}'. Did a `publish {} within …` \
390 step run earlier in this unit?",
391 ir.capability_ref, ir.capability_ref,
392 ))
393 })?;
394 let handle = ctx
395 .bus
396 .discover(&cap)
397 .await
398 .map_err(|e| DispatchError::DiscoverFailure(e.to_string()))?;
399 ctx.bind_discovered_handle(ir.alias.clone(), handle.clone());
400 Ok(handle)
401}
402
403pub async fn dispatch_listen(
412 ir: &IRListenStep, ctx: &RunContext,
413) -> Result<RunValue, DispatchError> {
414 if !ir.channel_is_ref {
415 return Err(DispatchError::ListenFailure(format!(
421 "listen on legacy string-topic '{}' is not supported by the Rust \
422 runtime in 13.l — use a typed `channel` declaration (D4 canonical \
423 form) or the Python interpreter for D4 dual-mode programs",
424 ir.channel,
425 )));
426 }
427 let event = ctx
428 .bus
429 .receive(&ir.channel)
430 .await
431 .map_err(|e| DispatchError::ListenFailure(e.to_string()))?;
432 let bound = match event.payload {
433 TypedPayload::Handle(h) => {
434 ctx.bind_discovered_handle(ir.event_alias.clone(), h.clone());
435 RunValue::Handle(h)
436 }
437 TypedPayload::Scalar(j) => {
438 let v = RunValue::Json(j);
439 ctx.set_variable(ir.event_alias.clone(), v.clone());
440 v
441 }
442 };
443 Ok(bound)
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449 use axon_frontend::ir_nodes::{IRChannel, IRDiscover, IREmit, IRListenStep, IRPublish};
450
451 fn ir_channel(name: &str, message: &str, shield: &str) -> IRChannel {
452 IRChannel {
453 node_type: "IRChannel",
454 source_line: 0,
455 source_column: 0,
456 name: name.to_string(),
457 message: message.to_string(),
458 qos: "at_least_once".to_string(),
459 lifetime: "affine".to_string(),
460 persistence: "ephemeral".to_string(),
461 shield_ref: shield.to_string(),
462 }
463 }
464
465 fn make_ctx(channels: Vec<IRChannel>) -> RunContext {
466 let bus = Arc::new(TypedEventBus::new());
467 for ch in &channels {
468 bus.register_from_ir(ch);
469 }
470 RunContext::new(bus)
471 }
472
473 fn block_on<F: std::future::Future>(f: F) -> F::Output {
474 let rt = tokio::runtime::Builder::new_current_thread()
475 .enable_all().build().unwrap();
476 rt.block_on(f)
477 }
478
479 #[test]
482 fn resolve_bare_identifier_step_result() {
483 let ctx = make_ctx(vec![]);
484 ctx.set_step_result("Build", RunValue::Json(serde_json::json!({"output": "x"})));
485 let v = ctx.resolve_value_ref("Build").unwrap();
486 assert!(matches!(v, RunValue::Json(_)));
487 }
488
489 #[test]
490 fn resolve_dotted_walk_json_object() {
491 let ctx = make_ctx(vec![]);
492 ctx.set_step_result("Build", RunValue::Json(serde_json::json!({
493 "output": {"value": 42}
494 })));
495 let v = ctx.resolve_value_ref("Build.output.value").unwrap();
496 match v {
497 RunValue::Json(serde_json::Value::Number(n)) => {
498 assert_eq!(n.as_i64(), Some(42));
499 }
500 other => panic!("expected number, got {other:?}"),
501 }
502 }
503
504 #[test]
505 fn resolve_handle_field_access() {
506 let ctx = make_ctx(vec![ir_channel("Inner", "Bytes", "")]);
507 let h = ctx.bus.get_handle("Inner").unwrap();
508 ctx.bind_discovered_handle("alias", h);
509 let v = ctx.resolve_value_ref("alias.message").unwrap();
510 match v {
511 RunValue::Json(serde_json::Value::String(s)) => assert_eq!(s, "Bytes"),
512 other => panic!("expected string, got {other:?}"),
513 }
514 }
515
516 #[test]
517 fn resolve_unknown_head_lists_candidates() {
518 let ctx = make_ctx(vec![]);
519 ctx.set_step_result("Build", RunValue::Json(serde_json::json!({})));
520 ctx.set_variable("v", RunValue::Json(serde_json::json!(0)));
521 let err = ctx.resolve_value_ref("Missing.field").unwrap_err();
522 let s = err.to_string();
523 assert!(s.contains("Build") && s.contains('v'),
524 "candidates list missing: {s}");
525 }
526
527 #[test]
528 fn resolve_discovered_handle_shadows_variable() {
529 let ctx = make_ctx(vec![ir_channel("Real", "Bytes", "")]);
530 ctx.set_variable("alias", RunValue::Json(serde_json::json!("shadowed")));
531 let h = ctx.bus.get_handle("Real").unwrap();
532 ctx.bind_discovered_handle("alias", h);
533 let v = ctx.resolve_value_ref("alias").unwrap();
534 assert!(matches!(v, RunValue::Handle(_)));
535 }
536
537 #[test]
540 fn emit_scalar_dispatches_through_bus() {
541 let ctx = make_ctx(vec![ir_channel("Orders", "Bytes", "")]);
542 ctx.set_step_result(
543 "Build",
544 RunValue::Json(serde_json::json!({"output": {"id": 7}})),
545 );
546 let ir = IREmit {
547 node_type: "IREmit", source_line: 0, source_column: 0,
548 channel_ref: "Orders".to_string(),
549 value_ref: "Build.output".to_string(),
550 value_is_channel: false,
551 };
552 block_on(dispatch_emit(&ir, &ctx)).unwrap();
553 let event = block_on(ctx.bus.receive("Orders")).unwrap();
554 match event.payload {
555 TypedPayload::Scalar(v) => assert_eq!(v["id"], 7),
556 other => panic!("expected scalar, got {other:?}"),
557 }
558 }
559
560 #[test]
561 fn emit_unknown_value_ref_yields_dispatch_error() {
562 let ctx = make_ctx(vec![ir_channel("Orders", "Bytes", "")]);
563 let ir = IREmit {
564 node_type: "IREmit", source_line: 0, source_column: 0,
565 channel_ref: "Orders".to_string(),
566 value_ref: "Missing".to_string(),
567 value_is_channel: false,
568 };
569 let err = block_on(dispatch_emit(&ir, &ctx)).unwrap_err();
570 assert!(matches!(err, DispatchError::EmitFailure(_)));
571 }
572
573 #[test]
576 fn publish_records_capability_and_discover_consumes_it() {
577 let ctx = make_ctx(vec![ir_channel("Topic", "Bytes", "Gate")]);
578 let pub_ir = IRPublish {
579 node_type: "IRPublish", source_line: 0, source_column: 0,
580 channel_ref: "Topic".to_string(),
581 shield_ref: "Gate".to_string(),
582 };
583 let cap = block_on(dispatch_publish(&pub_ir, &ctx)).unwrap();
584 assert_eq!(cap.channel_name, "Topic");
585 let disc_ir = IRDiscover {
587 node_type: "IRDiscover", source_line: 0, source_column: 0,
588 capability_ref: "Topic".to_string(),
589 alias: "live".to_string(),
590 };
591 let h = block_on(dispatch_discover(&disc_ir, &ctx)).unwrap();
592 assert_eq!(h.name, "Topic");
593 assert!(ctx.discovered_handles_snapshot().contains_key("live"));
595 }
596
597 #[test]
598 fn discover_without_publish_yields_dispatch_error() {
599 let ctx = make_ctx(vec![ir_channel("Topic", "Bytes", "Gate")]);
600 let disc_ir = IRDiscover {
601 node_type: "IRDiscover", source_line: 0, source_column: 0,
602 capability_ref: "Topic".to_string(),
603 alias: "x".to_string(),
604 };
605 let err = block_on(dispatch_discover(&disc_ir, &ctx)).unwrap_err();
606 assert!(matches!(err, DispatchError::DiscoverFailure(_)));
607 }
608
609 #[test]
610 fn publish_unpublishable_channel_surfaces_failure() {
611 let ctx = make_ctx(vec![ir_channel("Topic", "Bytes", "")]); let ir = IRPublish {
613 node_type: "IRPublish", source_line: 0, source_column: 0,
614 channel_ref: "Topic".to_string(),
615 shield_ref: "Gate".to_string(),
616 };
617 let err = block_on(dispatch_publish(&ir, &ctx)).unwrap_err();
618 assert!(matches!(err, DispatchError::PublishFailure(_)));
619 }
620
621 #[test]
624 fn listen_typed_receives_scalar_and_binds_variable() {
625 let ctx = make_ctx(vec![ir_channel("Orders", "Bytes", "")]);
626 block_on(ctx.bus.emit(
628 "Orders", TypedPayload::Scalar(serde_json::json!({"id": 9})),
629 )).unwrap();
630 let ir = IRListenStep {
631 node_type: "IRListenStep", source_line: 0, source_column: 0,
632 channel: "Orders".to_string(),
633 channel_is_ref: true,
634 event_alias: "ev".to_string(),
635 };
636 let v = block_on(dispatch_listen(&ir, &ctx)).unwrap();
637 assert!(matches!(v, RunValue::Json(_)));
638 assert!(ctx.get_variable("ev").is_some());
640 }
641
642 #[test]
643 fn listen_legacy_string_topic_rejected_with_clear_message() {
644 let ctx = make_ctx(vec![]);
645 let ir = IRListenStep {
646 node_type: "IRListenStep", source_line: 0, source_column: 0,
647 channel: "orders".to_string(),
648 channel_is_ref: false,
649 event_alias: "ev".to_string(),
650 };
651 let err = block_on(dispatch_listen(&ir, &ctx)).unwrap_err();
652 let msg = err.to_string();
653 assert!(matches!(err, DispatchError::ListenFailure(_)));
654 assert!(msg.contains("legacy string-topic"));
655 }
656
657 #[test]
660 fn from_ir_program_registers_all_channels() {
661 let mut ir = IRProgram::new();
662 ir.channels.push(ir_channel("A", "Bytes", ""));
663 ir.channels.push(ir_channel("B", "Channel<Bytes>", "Gate"));
664 let ctx = RunContext::from_ir_program(&ir);
665 let names = ctx.bus.channel_names();
666 assert_eq!(names, vec!["A".to_string(), "B".to_string()]);
667 }
668}