1use crate::atomic::DispatchResult;
10use crate::bus::{InfraEvent, NodeEvent, OpError};
11use crate::engine::call_context::CallContext;
12use crate::engine::core::{graph_name_for, Engine};
13use crate::engine::dispatch_entry::{FunctionKey, OpDispatch, StatelessInvokeFn};
14use crate::engine::pending_async::PendingAsync;
15use crate::engine::step::EngineStep;
16use crate::ids::{ComponentRef, ExecId, NodeSiteId, OpRef};
17use crate::roles::ProtocolRuntime;
18use crate::runtime::RuntimeResourceRef;
19use crate::slot_value::SlotValue;
20use bb_ir::proto::onnx::NodeProto;
21
22impl Engine {
23 pub(crate) fn invoke_one(&mut self, op_ref: OpRef, exec_id: ExecId) -> EngineStep {
34 let node = match self.node_for(op_ref) {
35 Some(n) => n.clone(),
36 None => {
37 return self.fail_op(
38 op_ref,
39 exec_id,
40 crate::bus::OpErrorKind::ExecutionFailed,
41 "unknown_op_ref",
42 "unknown op_ref".to_string(),
43 )
44 }
45 };
46
47 let _invoke_span = tracing::debug_span!(
52 "engine.invoke_one",
53 op.name = %node.name,
54 op.kind = %node.op_type,
55 op.domain = %node.domain,
56 exec_id = %exec_id,
57 op_ref = %op_ref,
58 )
59 .entered();
60
61 match self.dispatch_for(op_ref) {
62 Some(OpDispatch::Stateless(invoke_fn)) => {
63 self.invoke_stateless(op_ref, exec_id, &node, invoke_fn)
64 }
65 Some(OpDispatch::Atomic {
66 component_ref,
67 dispatch_fn,
68 }) => self.invoke_atomic(op_ref, exec_id, &node, component_ref, dispatch_fn),
69 Some(OpDispatch::FunctionCall {
70 target,
71 input_rename,
72 output_rename,
73 }) => {
74 self.invoke_function_call(op_ref, exec_id, &target, &input_rename, &output_rename)
75 }
76 Some(OpDispatch::Unresolved) | None => self.fail_op(
77 op_ref,
78 exec_id,
79 crate::bus::OpErrorKind::NotRegistered,
80 "unresolved_dispatch",
81 format!("unresolved dispatch for {}::{}", node.domain, node.op_type),
82 ),
83 }
84 }
85
86 fn dispatch_for(&self, op_ref: OpRef) -> Option<OpDispatch> {
90 let (gi, ni) = op_ref.split();
91 self.graphs
92 .get(gi as usize)?
93 .op_dispatch
94 .get(ni as usize)
95 .cloned()
96 }
97
98 fn invoke_atomic(
103 &mut self,
104 op_ref: OpRef,
105 exec_id: ExecId,
106 node: &NodeProto,
107 component_ref: ComponentRef,
108 dispatch_fn: ProtocolDispatchFn,
109 ) -> EngineStep {
110 if let Some(cap) = self.max_pending_async {
119 if self.exec.pending_async.len() >= cap {
120 return self.fail_op(
121 op_ref,
122 exec_id,
123 crate::bus::OpErrorKind::Cooldown,
124 "pending_async_limit",
125 "pending-async limit exceeded".to_string(),
126 );
127 }
128 }
129
130 let input_pairs = self.resolve_input_pairs(node, exec_id);
134
135 let Some(mut taken) = self.take_component(component_ref) else {
142 return self.fail_op(
143 op_ref,
144 exec_id,
145 crate::bus::OpErrorKind::MissingSlot,
146 "component_missing",
147 "component missing".to_string(),
148 );
149 };
150
151 let result: Result<DispatchResult, String> = {
152 let mut input_refs: Vec<(String, &dyn SlotValue)> =
153 Vec::with_capacity(input_pairs.len());
154 for (site, name, read_exec_id) in &input_pairs {
155 if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
156 input_refs.push((name.clone(), boxed.as_ref()));
157 }
158 }
159 let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
160 input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
161
162 let (
163 envelope_src_peer,
164 inbound_correlation_wire_req_id,
165 inbound_arrival_ns,
166 inbound_remaining_deadline_ns,
167 ) = self
168 .framework
169 .inbound_contexts
170 .get(&exec_id)
171 .map(|c| {
172 (
173 c.src_peer,
174 c.wire_req_id,
175 c.arrival_ns,
176 c.remaining_deadline_ns,
177 )
178 })
179 .unwrap_or((None, None, None, None));
180 let mut ctx = RuntimeResourceRef {
181 peers: crate::runtime::PeerCtx {
182 gate: &mut self.framework.peer_state.gate,
183 backoff: &mut self.framework.peer_state.backoff,
184 governor: &mut self.framework.peer_state.governor,
185 addresses: &mut self.framework.address_book,
186 backpressure: &mut self.framework.peer_state.backpressure,
187 },
188 net: crate::runtime::NetCtx {
189 outbound: &mut self.framework.outbound_queue,
190 rtt: &mut self.framework.rtt_tracker,
191 requests: &mut self.framework.request_tracker,
192 dedup: &mut self.framework.inbound_dedup,
193 pending_peer_resolve_failures: &mut self
194 .framework
195 .pending_peer_resolve_failures,
196 },
197 time: crate::runtime::TimeCtx {
198 scheduler: &mut self.framework.scheduler,
199 },
200 syscall: crate::runtime::SyscallCtx {
201 serialize_queue: &mut self.framework.serialize_queue,
202 hold_table: &mut self.framework.hold_table,
203 record_buffer: &mut self.framework.record_buffer,
204 event_source: &mut self.framework.event_source,
205 counters: &mut self.framework.counters,
206 any_fired_groups: &mut self.framework.any_fired_groups,
207 deadline_match_fired: &mut self.framework.deadline_match_fired,
208 rng: &mut *self.framework.rng,
209 pending_app_events: &mut self.framework.pending_app_events,
210 },
211 bus: &mut self.bus,
212 ingress: std::sync::Arc::clone(&self.ingress),
213 components: crate::runtime::ComponentsView {
214 instances: Some(&self.components),
215 slots: Some(&self.slots),
216 },
217 current: crate::runtime::CurrentCallCtx {
218 op_ref,
219 exec_id,
220 self_peer: self.self_peer,
221 node_attributes: &node.attribute,
222 node_metadata: &node.metadata_props,
223 inbound: crate::runtime::InboundCtx {
224 src_peer: envelope_src_peer,
225 wire_req_id: inbound_correlation_wire_req_id,
226 arrival_ns: inbound_arrival_ns,
227 remaining_deadline_ns: inbound_remaining_deadline_ns,
228 },
229 pending_completions: Vec::new(),
230 next_command_id: &mut self.exec.ids.next_command_id,
231 },
232 };
233
234 let any: &mut dyn std::any::Any = taken.as_mut();
238 let dispatch_result = dispatch_fn(any, &node.op_type, &inputs_for_dispatch, &mut ctx);
239
240 let captured = std::mem::take(&mut ctx.current.pending_completions);
241 drop(ctx);
242 self.exec.pending_completions.extend(captured);
243
244 dispatch_result
245 };
246
247 self.restore_component(component_ref, taken);
250
251 match result {
252 Ok(DispatchResult::Immediate(outputs)) => {
253 let sites = self.write_outputs(op_ref, exec_id, outputs);
254 EngineStep::OpCompleted {
255 op_ref,
256 exec_id,
257 sites_written: sites,
258 }
259 }
260 Ok(DispatchResult::Async(cmd_id)) => {
261 let output_sites = self.op_output_sites(op_ref);
262 self.exec.pending_async.insert(
263 cmd_id,
264 PendingAsync {
265 op_ref,
266 exec_id,
267 output_sites,
268 deadline_ns: None,
269 },
270 );
271 EngineStep::AsyncSuspended {
272 op_ref,
273 exec_id,
274 cmd_id,
275 }
276 }
277 Err(detail) => self.fail_op(
278 op_ref,
279 exec_id,
280 crate::bus::OpErrorKind::ExecutionFailed,
281 "stateless_invoke",
282 detail,
283 ),
284 }
285 }
286
287 pub(crate) fn invoke_function_call(
301 &mut self,
302 op_ref: OpRef,
303 parent_exec_id: ExecId,
304 target: &FunctionKey,
305 input_rename: &[(String, String)],
306 output_rename: &[(String, String)],
307 ) -> EngineStep {
308 let graph_name = graph_name_for(target);
309 if !self.has_graph(&graph_name) {
310 return self.fail_op(
311 op_ref,
312 parent_exec_id,
313 crate::bus::OpErrorKind::NotRegistered,
314 "function_target_missing",
315 format!("function-call target {graph_name} not installed"),
316 );
317 }
318
319 let body_exec_id = self.allocate_exec_id();
320 let body = self.graph(&graph_name).expect("checked above");
321
322 let body_idx = self.graph_idx(&graph_name).expect("graph just resolved");
327 let body_op_refs: Vec<OpRef> = (0..body.function.node.len() as u32)
328 .map(|ni| OpRef::pack(body_idx, ni))
329 .collect();
330 let body_site_for: std::collections::HashMap<String, NodeSiteId> = body.site_names.clone();
331
332 let mut input_aliases: std::collections::HashMap<String, NodeSiteId> =
336 std::collections::HashMap::with_capacity(input_rename.len());
337 for (caller_name, formal_name) in input_rename {
338 let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_name) else {
339 return self.fail_op(
340 op_ref,
341 parent_exec_id,
342 crate::bus::OpErrorKind::MissingSlot,
343 "function_input_unbound",
344 format!("function-call input {caller_name} not bound"),
345 );
346 };
347 input_aliases.insert(formal_name.clone(), caller_site);
348 }
349
350 let mut output_forwarding: std::collections::HashMap<NodeSiteId, NodeSiteId> =
352 std::collections::HashMap::with_capacity(output_rename.len());
353 for (formal_out, caller_out) in output_rename {
354 let Some(&body_site) = body_site_for.get(formal_out) else {
355 return self.fail_op(
356 op_ref,
357 parent_exec_id,
358 crate::bus::OpErrorKind::NotRegistered,
359 "function_output_missing",
360 format!("function-call output {formal_out} missing from body"),
361 );
362 };
363 let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_out) else {
364 return self.fail_op(
365 op_ref,
366 parent_exec_id,
367 crate::bus::OpErrorKind::MissingSlot,
368 "function_output_unbound",
369 format!("function-call output {caller_out} not bound"),
370 );
371 };
372 output_forwarding.insert(body_site, caller_site);
373 }
374
375 let outputs_remaining = output_forwarding.len();
376 self.exec.pending_calls.insert(
377 body_exec_id,
378 CallContext {
379 parent_exec_id,
380 target: target.clone(),
381 input_aliases,
382 output_forwarding,
383 outputs_remaining,
384 },
385 );
386
387 for body_op in body_op_refs {
392 self.exec.frontier.push_back((body_op, body_exec_id));
393 }
394
395 if self
401 .exec
402 .pending_calls
403 .get(&body_exec_id)
404 .map(|c| c.outputs_remaining == 0)
405 .unwrap_or(false)
406 {
407 self.exec.pending_calls.remove(&body_exec_id);
408 }
409
410 EngineStep::OpCompleted {
411 op_ref,
412 exec_id: parent_exec_id,
413 sites_written: Vec::new(),
414 }
415 }
416
417 pub(crate) fn invoke_stateless(
422 &mut self,
423 op_ref: OpRef,
424 exec_id: ExecId,
425 node: &NodeProto,
426 invoke_fn: StatelessInvokeFn,
427 ) -> EngineStep {
428 if let Some(cap) = self.max_pending_async {
433 if self.exec.pending_async.len() >= cap {
434 return self.fail_op(
435 op_ref,
436 exec_id,
437 crate::bus::OpErrorKind::Cooldown,
438 "pending_async_limit",
439 "pending-async limit exceeded".to_string(),
440 );
441 }
442 }
443 let input_pairs = self.resolve_input_pairs(node, exec_id);
446
447 let result: Result<DispatchResult, OpError> = {
448 let mut input_refs: Vec<(String, &dyn SlotValue)> =
449 Vec::with_capacity(input_pairs.len());
450 for (site, name, read_exec_id) in &input_pairs {
451 if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
452 input_refs.push((name.clone(), boxed.as_ref()));
453 }
454 }
455 let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
456 input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
457
458 let (
459 envelope_src_peer,
460 inbound_correlation_wire_req_id,
461 inbound_arrival_ns,
462 inbound_remaining_deadline_ns,
463 ) = self
464 .framework
465 .inbound_contexts
466 .get(&exec_id)
467 .map(|c| {
468 (
469 c.src_peer,
470 c.wire_req_id,
471 c.arrival_ns,
472 c.remaining_deadline_ns,
473 )
474 })
475 .unwrap_or((None, None, None, None));
476 let mut ctx = RuntimeResourceRef {
477 peers: crate::runtime::PeerCtx {
478 gate: &mut self.framework.peer_state.gate,
479 backoff: &mut self.framework.peer_state.backoff,
480 governor: &mut self.framework.peer_state.governor,
481 addresses: &mut self.framework.address_book,
482 backpressure: &mut self.framework.peer_state.backpressure,
483 },
484 net: crate::runtime::NetCtx {
485 outbound: &mut self.framework.outbound_queue,
486 rtt: &mut self.framework.rtt_tracker,
487 requests: &mut self.framework.request_tracker,
488 dedup: &mut self.framework.inbound_dedup,
489 pending_peer_resolve_failures: &mut self
490 .framework
491 .pending_peer_resolve_failures,
492 },
493 time: crate::runtime::TimeCtx {
494 scheduler: &mut self.framework.scheduler,
495 },
496 syscall: crate::runtime::SyscallCtx {
497 serialize_queue: &mut self.framework.serialize_queue,
498 hold_table: &mut self.framework.hold_table,
499 record_buffer: &mut self.framework.record_buffer,
500 event_source: &mut self.framework.event_source,
501 counters: &mut self.framework.counters,
502 any_fired_groups: &mut self.framework.any_fired_groups,
503 deadline_match_fired: &mut self.framework.deadline_match_fired,
504 rng: &mut *self.framework.rng,
505 pending_app_events: &mut self.framework.pending_app_events,
506 },
507 bus: &mut self.bus,
508 ingress: std::sync::Arc::clone(&self.ingress),
509 components: crate::runtime::ComponentsView::default(),
510 current: crate::runtime::CurrentCallCtx {
511 op_ref,
512 exec_id,
513 self_peer: self.self_peer,
514 node_attributes: &node.attribute,
515 node_metadata: &node.metadata_props,
516 inbound: crate::runtime::InboundCtx {
517 src_peer: envelope_src_peer,
518 wire_req_id: inbound_correlation_wire_req_id,
519 arrival_ns: inbound_arrival_ns,
520 remaining_deadline_ns: inbound_remaining_deadline_ns,
521 },
522 pending_completions: Vec::new(),
523 next_command_id: &mut self.exec.ids.next_command_id,
524 },
525 };
526
527 let dispatch_result = invoke_fn(node, &inputs_for_dispatch, &mut ctx);
528
529 let captured = std::mem::take(&mut ctx.current.pending_completions);
530 drop(ctx);
531 self.exec.pending_completions.extend(captured);
532
533 dispatch_result
534 };
535
536 match result {
537 Ok(DispatchResult::Immediate(outputs)) => {
538 let sites = self.write_outputs(op_ref, exec_id, outputs);
539 EngineStep::OpCompleted {
540 op_ref,
541 exec_id,
542 sites_written: sites,
543 }
544 }
545 Ok(DispatchResult::Async(cmd_id)) => {
546 let output_sites = self.op_output_sites(op_ref);
547 self.exec.pending_async.insert(
548 cmd_id,
549 PendingAsync {
550 op_ref,
551 exec_id,
552 output_sites,
553 deadline_ns: None,
554 },
555 );
556 EngineStep::AsyncSuspended {
557 op_ref,
558 exec_id,
559 cmd_id,
560 }
561 }
562 Err(err) => {
563 self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
564 op_ref,
565 error: err.clone(),
566 }));
567 EngineStep::OpFailed {
568 op_ref,
569 exec_id,
570 error: err,
571 }
572 }
573 }
574 }
575
576 pub(crate) fn node_for(&self, op_ref: OpRef) -> Option<&NodeProto> {
579 let (gi, ni) = op_ref.split();
580 self.graphs.get(gi as usize)?.function.node.get(ni as usize)
581 }
582
583 pub(crate) fn op_output_sites(&self, op_ref: OpRef) -> Vec<NodeSiteId> {
588 let (gi, ni) = op_ref.split();
589 let Some(g) = self.graphs.get(gi as usize) else {
590 return Vec::new();
591 };
592 let Some(node) = g.function.node.get(ni as usize) else {
593 return Vec::new();
594 };
595 node.output
596 .iter()
597 .enumerate()
598 .map(|(i, name)| {
599 g.site_names
600 .get(name)
601 .copied()
602 .unwrap_or_else(|| synthesize_site_id(op_ref, i))
603 })
604 .collect()
605 }
606
607 pub(crate) fn resolve_site_name(&self, name: &str) -> Option<NodeSiteId> {
610 for g in self.graphs_iter() {
611 if let Some(&site) = g.site_names.get(name) {
612 return Some(site);
613 }
614 }
615 None
616 }
617
618 pub(crate) fn resolve_site_in_op_graph(&self, op_ref: OpRef, name: &str) -> Option<NodeSiteId> {
626 let (gi, _) = op_ref.split();
627 self.graphs.get(gi as usize)?.site_names.get(name).copied()
628 }
629
630 pub(crate) fn resolve_input_pairs(
646 &self,
647 node: &NodeProto,
648 exec_id: ExecId,
649 ) -> Vec<(NodeSiteId, String, ExecId)> {
650 let cc = self.exec.pending_calls.get(&exec_id);
651 let mut out = Vec::new();
652 for name in &node.input {
653 if name.is_empty() {
654 continue;
655 }
656 if let Some(cc) = cc {
657 if let Some(&alias_site) = cc.input_aliases.get(name) {
658 out.push((alias_site, name.clone(), cc.parent_exec_id));
659 continue;
660 }
661 }
662 if let Some(site) = self.resolve_site_name(name) {
663 out.push((site, name.clone(), exec_id));
664 }
665 }
666 out
667 }
668
669 pub(crate) fn write_outputs(
673 &mut self,
674 op_ref: OpRef,
675 exec_id: ExecId,
676 outputs: Vec<(String, Box<dyn SlotValue>)>,
677 ) -> Vec<NodeSiteId> {
678 let output_sites = self.op_output_sites(op_ref);
679 for ((site, _name), value) in output_sites
680 .iter()
681 .zip(outputs.iter().map(|(n, _)| n))
682 .zip(outputs.iter().map(|(_, v)| v.as_ref()))
683 {
684 let _ = (site, _name, value);
688 }
689 let mut sites_written: Vec<NodeSiteId> = Vec::new();
691 for (i, (_name, value)) in outputs.into_iter().enumerate() {
692 if let Some(site) = output_sites.get(i).copied() {
693 self.exec.slot_table.insert((site, exec_id), Some(value));
694 sites_written.push(site);
695 }
696 }
697
698 self.exec
700 .execution_state
701 .entry(exec_id)
702 .or_default()
703 .outputs_written += sites_written.len() as u32;
704
705 self.push_ready_consumers(&sites_written, exec_id);
707
708 self.forward_outputs_to_caller(&sites_written, exec_id);
713
714 self.surface_top_level_outputs(&sites_written, exec_id);
715 sites_written
716 }
717
718 pub(crate) fn forward_outputs_to_caller(
732 &mut self,
733 sites_written: &[NodeSiteId],
734 exec_id: ExecId,
735 ) {
736 let Some(cc) = self.exec.pending_calls.get(&exec_id) else {
737 return;
738 };
739 let mut pairs: Vec<(NodeSiteId, NodeSiteId)> = Vec::new();
742 for &body_site in sites_written {
743 if let Some(&caller_site) = cc.output_forwarding.get(&body_site) {
744 pairs.push((body_site, caller_site));
745 }
746 }
747 let parent_exec_id = cc.parent_exec_id;
748 if pairs.is_empty() {
749 return;
750 }
751 tracing::trace!(
752 target: "engine.function_call.forward",
753 call_target = ?cc.target,
754 body_exec_id = exec_id.as_u64(),
755 parent_exec_id = parent_exec_id.as_u64(),
756 pair_count = pairs.len(),
757 "forwarding body outputs to caller slots",
758 );
759
760 let mut caller_sites: Vec<NodeSiteId> = Vec::with_capacity(pairs.len());
761 for (body_site, caller_site) in &pairs {
762 let value = self
766 .exec
767 .slot_table
768 .get_mut(&(*body_site, exec_id))
769 .and_then(|opt| opt.take());
770 if let Some(value) = value {
771 self.exec
772 .slot_table
773 .insert((*caller_site, parent_exec_id), Some(value));
774 caller_sites.push(*caller_site);
775 }
776 }
777
778 if let Some(cc) = self.exec.pending_calls.get_mut(&exec_id) {
781 cc.outputs_remaining = cc.outputs_remaining.saturating_sub(pairs.len());
782 if cc.outputs_remaining == 0 {
783 self.exec.pending_calls.remove(&exec_id);
784 }
785 }
786
787 self.push_ready_consumers(&caller_sites, parent_exec_id);
790 self.surface_top_level_outputs(&caller_sites, parent_exec_id);
791 }
792
793 pub(crate) fn surface_top_level_outputs(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
811 for site in sites {
812 let consumer_count = self
813 .graphs_iter()
814 .map(|g| g.consumers.get(site).map(|v| v.len()).unwrap_or(0))
815 .sum::<usize>();
816 if consumer_count > 0 {
817 continue;
818 }
819 let name_opt = self
820 .graphs_iter()
821 .find_map(|g| g.top_level_outputs.get(site).cloned());
822 let Some(name) = name_opt else { continue };
823 let value_bytes = self
824 .exec
825 .slot_table
826 .get(&(*site, exec_id))
827 .and_then(|slot| slot.as_ref())
828 .map(|boxed| encode_for_host(boxed.as_ref()))
829 .unwrap_or_default();
830 self.framework
831 .pending_app_events
832 .push(crate::bus::AppEvent::Emit { name, value_bytes });
833 }
834 }
835
836 pub(crate) fn push_ready_consumers(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
840 let mut candidates: Vec<OpRef> = Vec::new();
844 for site in sites {
845 for g in self.graphs_iter() {
846 if let Some(consumers) = g.consumers.get(site) {
847 candidates.extend(consumers.iter().copied());
848 }
849 }
850 }
851 for op_ref in candidates {
852 if self.all_inputs_ready(op_ref, exec_id) {
853 self.exec.frontier.push_back((op_ref, exec_id));
854 }
855 }
856 }
857
858 pub(crate) fn all_inputs_ready(&self, op_ref: OpRef, exec_id: ExecId) -> bool {
862 let Some(node) = self.node_for(op_ref) else {
863 return false;
864 };
865 let cc = self.exec.pending_calls.get(&exec_id);
869 for name in &node.input {
870 if name.is_empty() {
871 continue; }
873 let (site, read_exec_id) = if let Some(cc) = cc {
874 if let Some(&alias_site) = cc.input_aliases.get(name) {
875 (alias_site, cc.parent_exec_id)
876 } else {
877 let Some(site) = self.resolve_site_name(name) else {
878 return false;
879 };
880 (site, exec_id)
881 }
882 } else {
883 let Some(site) = self.resolve_site_name(name) else {
884 return false;
885 };
886 (site, exec_id)
887 };
888 let has_value = self
889 .exec
890 .slot_table
891 .get(&(site, read_exec_id))
892 .map(|s| s.is_some())
893 .unwrap_or(false);
894 if !has_value {
895 return false;
896 }
897 }
898 true
899 }
900
901 pub(crate) fn fail_op(
907 &mut self,
908 op_ref: OpRef,
909 exec_id: ExecId,
910 kind: crate::bus::OpErrorKind,
911 reason: &'static str,
912 detail: String,
913 ) -> EngineStep {
914 let error = OpError {
915 kind,
916 reason,
917 detail,
918 };
919 self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
920 op_ref,
921 error: error.clone(),
922 }));
923 EngineStep::OpFailed {
924 op_ref,
925 exec_id,
926 error,
927 }
928 }
929}
930
931fn encode_for_host(value: &dyn crate::slot_value::SlotValue) -> Vec<u8> {
944 if let Some(b) = value
945 .as_any()
946 .downcast_ref::<crate::syscall::values::BytesValue>()
947 {
948 return b.0.clone();
949 }
950 match value.to_wire_bytes() {
951 Ok(bytes) => bytes,
952 Err(e) => {
953 tracing::warn!(error = %e, "encode_for_host: dropping host emit on encode failure");
954 Vec::new()
955 }
956 }
957}
958
959fn synthesize_site_id(op_ref: OpRef, output_index: usize) -> NodeSiteId {
963 NodeSiteId::from((op_ref.as_u64() << 8) | (output_index as u64 & 0xff))
964}
965
966pub(crate) fn call_protocol_dispatch_atomic(
975 component: &mut dyn crate::component::ErasedComponent,
976 op_type: &str,
977 inputs: &[(&str, &dyn SlotValue)],
978 ctx: &mut RuntimeResourceRef<'_>,
979 dispatchers: &std::collections::HashMap<std::any::TypeId, RoleDispatcher>,
980) -> Result<DispatchResult, String> {
981 let any: &mut dyn std::any::Any = component;
982 let tid = (*any).type_id();
983 if let Some(dispatcher) = dispatchers.get(&tid) {
984 (dispatcher.dispatch)(any, op_type, inputs, ctx)
985 } else {
986 Err("no ProtocolRuntime dispatcher registered for component".to_string())
987 }
988}
989
990pub type ProtocolDispatchFn = fn(
993 &mut dyn std::any::Any,
994 &str,
995 &[(&str, &dyn SlotValue)],
996 &mut RuntimeResourceRef<'_>,
997) -> Result<DispatchResult, String>;
998
999pub type BackendMaterializeFn =
1005 fn(
1006 &mut dyn std::any::Any,
1007 u64,
1008 Vec<u8>,
1009 ) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError>;
1010
1011pub type BootstrapDispatchFn = fn(
1020 &mut dyn std::any::Any,
1021 &mut crate::contracts::bootstrap::BootstrapCtx,
1022) -> Result<DispatchResult, String>;
1023
1024pub fn make_bootstrap_dispatcher<T: crate::contracts::bootstrap::Bootstrap + 'static>(
1037) -> BootstrapDispatchFn
1038where
1039 T::Error: std::fmt::Display,
1040{
1041 |any, ctx| {
1042 let concrete = any
1043 .downcast_mut::<T>()
1044 .expect("type-erased lookup matched T");
1045 concrete
1046 .bootstrap(ctx)
1047 .map(|_| DispatchResult::Immediate(Vec::new()))
1048 .map_err(|e| e.to_string())
1049 }
1050}
1051
1052pub fn no_materialize(
1058 _any: &mut dyn std::any::Any,
1059 _type_hash: u64,
1060 _bytes: Vec<u8>,
1061) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError> {
1062 Err(crate::slot_value::BackendMaterializeError {
1063 summary: "component is not a Backend; materialize_from_wire not supported".to_string(),
1064 })
1065}
1066
1067pub struct RoleDispatcher {
1075 pub(crate) dispatch: ProtocolDispatchFn,
1076 pub(crate) materialize: BackendMaterializeFn,
1077}
1078
1079pub fn make_protocol_dispatcher<T: ProtocolRuntime + 'static>() -> RoleDispatcher
1084where
1085 T::Error: std::fmt::Display,
1086{
1087 RoleDispatcher {
1088 dispatch: |any: &mut dyn std::any::Any,
1089 op_type: &str,
1090 inputs: &[(&str, &dyn SlotValue)],
1091 ctx: &mut RuntimeResourceRef<'_>| {
1092 let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1093 concrete
1094 .dispatch_atomic(op_type, inputs, ctx)
1095 .map_err(|e| e.to_string())
1096 },
1097 materialize: no_materialize,
1098 }
1099}
1100
1101macro_rules! emit_role_dispatcher_factory {
1108 ($factory_name:ident, $runtime_trait:path) => {
1109 #[doc = concat!("Build a `RoleDispatcher` for a concrete impl of ", stringify!($runtime_trait), ". Used by `Engine::register_*_dispatcher` chain (`Node::with_<role>(&value)`) so single-role components dispatch through the same `TypeId`-keyed registry as multi-role / Protocol-bearing components.")]
1110 pub fn $factory_name<T: $runtime_trait + 'static>() -> RoleDispatcher
1111 where
1112 <T as $runtime_trait>::Error: std::fmt::Display,
1113 {
1114 RoleDispatcher {
1115 dispatch: |any: &mut dyn std::any::Any,
1116 op_type: &str,
1117 inputs: &[(&str, &dyn SlotValue)],
1118 ctx: &mut RuntimeResourceRef<'_>| {
1119 let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1120 <T as $runtime_trait>::dispatch_atomic(concrete, op_type, inputs, ctx)
1121 .map_err(|e| e.to_string())
1122 },
1123 materialize: no_materialize,
1124 }
1125 }
1126 };
1127}
1128
1129emit_role_dispatcher_factory!(make_index_dispatcher, crate::roles::IndexRuntime);
1130emit_role_dispatcher_factory!(make_aggregator_dispatcher, crate::roles::AggregatorRuntime);
1131emit_role_dispatcher_factory!(make_model_dispatcher, crate::roles::ModelRuntime);
1132emit_role_dispatcher_factory!(make_codec_dispatcher, crate::roles::CodecRuntime);
1133emit_role_dispatcher_factory!(make_data_source_dispatcher, crate::roles::DataSourceRuntime);
1134emit_role_dispatcher_factory!(
1135 make_peer_selector_dispatcher,
1136 crate::roles::PeerSelectorRuntime
1137);
1138
1139pub fn make_backend_dispatcher<T: crate::roles::BackendRuntime + 'static>() -> RoleDispatcher
1145where
1146 <T as crate::roles::BackendRuntime>::Error: std::fmt::Display,
1147{
1148 RoleDispatcher {
1149 dispatch: |any: &mut dyn std::any::Any,
1150 op_type: &str,
1151 inputs: &[(&str, &dyn SlotValue)],
1152 ctx: &mut RuntimeResourceRef<'_>| {
1153 let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1154 <T as crate::roles::BackendRuntime>::dispatch_atomic(concrete, op_type, inputs, ctx)
1155 .map_err(|e| e.to_string())
1156 },
1157 materialize: |any: &mut dyn std::any::Any, type_hash: u64, bytes: Vec<u8>| {
1158 let concrete = any.downcast_ref::<T>().expect("is_match guaranteed");
1159 <T as crate::roles::BackendRuntime>::materialize_from_wire(concrete, type_hash, bytes)
1160 },
1161 }
1162}
1163
1164#[cfg(test)]
1165#[path = "invoke_function_call_tests.rs"]
1166mod function_call_tests;