1use crate::atomic::DispatchResult;
10use crate::bus::{InfraEvent, NodeEvent, OpError};
11use crate::engine::call_context::CallContext;
12use crate::engine::core::{graph_name_for, Engine};
13use crate::engine::dispatch_entry::{FunctionKey, OpDispatch, StatelessInvokeFn};
14use crate::engine::pending_async::PendingAsync;
15use crate::engine::step::EngineStep;
16use crate::ids::{ComponentRef, ExecId, NodeSiteId, OpRef};
17use crate::roles::ProtocolRuntime;
18use crate::runtime::RuntimeResourceRef;
19use crate::slot_value::SlotValue;
20use bb_ir::proto::onnx::NodeProto;
21
22impl Engine {
23 pub(crate) fn invoke_one(&mut self, op_ref: OpRef, exec_id: ExecId) -> EngineStep {
34 let node = match self.node_for(op_ref) {
35 Some(n) => n.clone(),
36 None => {
37 return self.fail_op(
38 op_ref,
39 exec_id,
40 crate::bus::OpErrorKind::ExecutionFailed,
41 "unknown_op_ref",
42 "unknown op_ref".to_string(),
43 )
44 }
45 };
46
47 let _invoke_span = tracing::debug_span!(
52 "engine.invoke_one",
53 op.name = %node.name,
54 op.kind = %node.op_type,
55 op.domain = %node.domain,
56 exec_id = %exec_id,
57 op_ref = %op_ref,
58 )
59 .entered();
60
61 match self.dispatch_for(op_ref) {
62 Some(OpDispatch::Stateless(invoke_fn)) => {
63 self.invoke_stateless(op_ref, exec_id, &node, invoke_fn)
64 }
65 Some(OpDispatch::Atomic {
66 component_ref,
67 dispatch_fn,
68 }) => self.invoke_atomic(op_ref, exec_id, &node, component_ref, dispatch_fn),
69 Some(OpDispatch::FunctionCall {
70 target,
71 input_rename,
72 output_rename,
73 }) => {
74 self.invoke_function_call(op_ref, exec_id, &target, &input_rename, &output_rename)
75 }
76 Some(OpDispatch::Unresolved) | None => self.fail_op(
77 op_ref,
78 exec_id,
79 crate::bus::OpErrorKind::NotRegistered,
80 "unresolved_dispatch",
81 format!("unresolved dispatch for {}::{}", node.domain, node.op_type),
82 ),
83 }
84 }
85
86 fn dispatch_for(&self, op_ref: OpRef) -> Option<OpDispatch> {
90 let (gi, ni) = op_ref.split();
91 self.graphs
92 .get(gi as usize)?
93 .op_dispatch
94 .get(ni as usize)
95 .cloned()
96 }
97
98 fn invoke_atomic(
101 &mut self,
102 op_ref: OpRef,
103 exec_id: ExecId,
104 node: &NodeProto,
105 component_ref: ComponentRef,
106 dispatch_fn: ProtocolDispatchFn,
107 ) -> EngineStep {
108 if let Some(cap) = self.max_pending_async {
117 if self.exec.pending_async.len() >= cap {
118 return self.fail_op(
119 op_ref,
120 exec_id,
121 crate::bus::OpErrorKind::Cooldown,
122 "pending_async_limit",
123 "pending-async limit exceeded".to_string(),
124 );
125 }
126 }
127
128 let input_pairs = self.resolve_input_pairs(node, exec_id);
132
133 let Some(mut taken) = self.take_component(component_ref) else {
140 return self.fail_op(
141 op_ref,
142 exec_id,
143 crate::bus::OpErrorKind::MissingSlot,
144 "component_missing",
145 "component missing".to_string(),
146 );
147 };
148
149 let result: Result<DispatchResult, String> = {
150 let mut input_refs: Vec<(String, &dyn SlotValue)> =
151 Vec::with_capacity(input_pairs.len());
152 for (site, name, read_exec_id) in &input_pairs {
153 if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
154 input_refs.push((name.clone(), boxed.as_ref()));
155 }
156 }
157 let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
158 input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
159
160 let (
161 envelope_src_peer,
162 inbound_correlation_wire_req_id,
163 inbound_arrival_ns,
164 inbound_remaining_deadline_ns,
165 ) = self
166 .framework
167 .inbound_contexts
168 .get(&exec_id)
169 .map(|c| {
170 (
171 c.src_peer,
172 c.wire_req_id,
173 c.arrival_ns,
174 c.remaining_deadline_ns,
175 )
176 })
177 .unwrap_or((None, None, None, None));
178 let mut ctx = RuntimeResourceRef {
179 peers: crate::runtime::PeerCtx {
180 gate: &mut self.framework.peer_state.gate,
181 backoff: &mut self.framework.peer_state.backoff,
182 governor: &mut self.framework.peer_state.governor,
183 addresses: &mut self.framework.address_book,
184 backpressure: &mut self.framework.peer_state.backpressure,
185 },
186 net: crate::runtime::NetCtx {
187 outbound: &mut self.framework.outbound_queue,
188 rtt: &mut self.framework.rtt_tracker,
189 requests: &mut self.framework.request_tracker,
190 dedup: &mut self.framework.inbound_dedup,
191 pending_peer_resolve_failures: &mut self
192 .framework
193 .pending_peer_resolve_failures,
194 },
195 time: crate::runtime::TimeCtx {
196 scheduler: &mut self.framework.scheduler,
197 },
198 syscall: crate::runtime::SyscallCtx {
199 serialize_queue: &mut self.framework.serialize_queue,
200 hold_table: &mut self.framework.hold_table,
201 record_buffer: &mut self.framework.record_buffer,
202 event_source: &mut self.framework.event_source,
203 counters: &mut self.framework.counters,
204 any_fired_groups: &mut self.framework.any_fired_groups,
205 deadline_match_fired: &mut self.framework.deadline_match_fired,
206 rng: &mut *self.framework.rng,
207 pending_app_events: &mut self.framework.pending_app_events,
208 },
209 bus: &mut self.bus,
210 ingress: std::sync::Arc::clone(&self.ingress),
211 components: crate::runtime::ComponentsView {
212 instances: Some(&self.components),
213 slots: Some(&self.slots),
214 },
215 current: crate::runtime::CurrentCallCtx {
216 op_ref,
217 exec_id,
218 self_peer: self.self_peer,
219 node_attributes: &node.attribute,
220 node_metadata: &node.metadata_props,
221 inbound: crate::runtime::InboundCtx {
222 src_peer: envelope_src_peer,
223 wire_req_id: inbound_correlation_wire_req_id,
224 arrival_ns: inbound_arrival_ns,
225 remaining_deadline_ns: inbound_remaining_deadline_ns,
226 },
227 pending_completions: Vec::new(),
228 next_command_id: &mut self.exec.ids.next_command_id,
229 },
230 };
231
232 let any: &mut dyn std::any::Any = taken.as_mut();
236 let dispatch_result = dispatch_fn(any, &node.op_type, &inputs_for_dispatch, &mut ctx);
237
238 let captured = std::mem::take(&mut ctx.current.pending_completions);
239 drop(ctx);
240 self.exec.pending_completions.extend(captured);
241
242 dispatch_result
243 };
244
245 self.restore_component(component_ref, taken);
248
249 match result {
250 Ok(DispatchResult::Immediate(outputs)) => {
251 let sites = self.write_outputs(op_ref, exec_id, outputs);
252 EngineStep::OpCompleted {
253 op_ref,
254 exec_id,
255 sites_written: sites,
256 }
257 }
258 Ok(DispatchResult::Async(cmd_id)) => {
259 let output_sites = self.op_output_sites(op_ref);
260 self.exec.pending_async.insert(
261 cmd_id,
262 PendingAsync {
263 op_ref,
264 exec_id,
265 output_sites,
266 deadline_ns: None,
267 },
268 );
269 EngineStep::AsyncSuspended {
270 op_ref,
271 exec_id,
272 cmd_id,
273 }
274 }
275 Err(detail) => self.fail_op(
276 op_ref,
277 exec_id,
278 crate::bus::OpErrorKind::ExecutionFailed,
279 "stateless_invoke",
280 detail,
281 ),
282 }
283 }
284
285 pub(crate) fn invoke_function_call(
299 &mut self,
300 op_ref: OpRef,
301 parent_exec_id: ExecId,
302 target: &FunctionKey,
303 input_rename: &[(String, String)],
304 output_rename: &[(String, String)],
305 ) -> EngineStep {
306 let graph_name = graph_name_for(target);
307 if !self.has_graph(&graph_name) {
308 return self.fail_op(
309 op_ref,
310 parent_exec_id,
311 crate::bus::OpErrorKind::NotRegistered,
312 "function_target_missing",
313 format!("function-call target {graph_name} not installed"),
314 );
315 }
316
317 let body_exec_id = self.allocate_exec_id();
318 let body = self.graph(&graph_name).expect("checked above");
319
320 let body_idx = self.graph_idx(&graph_name).expect("graph just resolved");
325 let body_op_refs: Vec<OpRef> = (0..body.function.node.len() as u32)
326 .map(|ni| OpRef::pack(body_idx, ni))
327 .collect();
328 let body_site_for: std::collections::HashMap<String, NodeSiteId> = body.site_names.clone();
329
330 let mut input_aliases: std::collections::HashMap<String, NodeSiteId> =
334 std::collections::HashMap::with_capacity(input_rename.len());
335 for (caller_name, formal_name) in input_rename {
336 let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_name) else {
337 return self.fail_op(
338 op_ref,
339 parent_exec_id,
340 crate::bus::OpErrorKind::MissingSlot,
341 "function_input_unbound",
342 format!("function-call input {caller_name} not bound"),
343 );
344 };
345 input_aliases.insert(formal_name.clone(), caller_site);
346 }
347
348 let mut output_forwarding: std::collections::HashMap<NodeSiteId, NodeSiteId> =
350 std::collections::HashMap::with_capacity(output_rename.len());
351 for (formal_out, caller_out) in output_rename {
352 let Some(&body_site) = body_site_for.get(formal_out) else {
353 return self.fail_op(
354 op_ref,
355 parent_exec_id,
356 crate::bus::OpErrorKind::NotRegistered,
357 "function_output_missing",
358 format!("function-call output {formal_out} missing from body"),
359 );
360 };
361 let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_out) else {
362 return self.fail_op(
363 op_ref,
364 parent_exec_id,
365 crate::bus::OpErrorKind::MissingSlot,
366 "function_output_unbound",
367 format!("function-call output {caller_out} not bound"),
368 );
369 };
370 output_forwarding.insert(body_site, caller_site);
371 }
372
373 let outputs_remaining = output_forwarding.len();
374 self.exec.pending_calls.insert(
375 body_exec_id,
376 CallContext {
377 parent_exec_id,
378 target: target.clone(),
379 input_aliases,
380 output_forwarding,
381 outputs_remaining,
382 },
383 );
384
385 for body_op in body_op_refs {
390 self.exec.frontier.push_back((body_op, body_exec_id));
391 }
392
393 if self
399 .exec
400 .pending_calls
401 .get(&body_exec_id)
402 .map(|c| c.outputs_remaining == 0)
403 .unwrap_or(false)
404 {
405 self.exec.pending_calls.remove(&body_exec_id);
406 }
407
408 EngineStep::OpCompleted {
409 op_ref,
410 exec_id: parent_exec_id,
411 sites_written: Vec::new(),
412 }
413 }
414
415 pub(crate) fn invoke_stateless(
420 &mut self,
421 op_ref: OpRef,
422 exec_id: ExecId,
423 node: &NodeProto,
424 invoke_fn: StatelessInvokeFn,
425 ) -> EngineStep {
426 if let Some(cap) = self.max_pending_async {
431 if self.exec.pending_async.len() >= cap {
432 return self.fail_op(
433 op_ref,
434 exec_id,
435 crate::bus::OpErrorKind::Cooldown,
436 "pending_async_limit",
437 "pending-async limit exceeded".to_string(),
438 );
439 }
440 }
441 let input_pairs = self.resolve_input_pairs(node, exec_id);
444
445 let result: Result<DispatchResult, OpError> = {
446 let mut input_refs: Vec<(String, &dyn SlotValue)> =
447 Vec::with_capacity(input_pairs.len());
448 for (site, name, read_exec_id) in &input_pairs {
449 if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
450 input_refs.push((name.clone(), boxed.as_ref()));
451 }
452 }
453 let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
454 input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
455
456 let (
457 envelope_src_peer,
458 inbound_correlation_wire_req_id,
459 inbound_arrival_ns,
460 inbound_remaining_deadline_ns,
461 ) = self
462 .framework
463 .inbound_contexts
464 .get(&exec_id)
465 .map(|c| {
466 (
467 c.src_peer,
468 c.wire_req_id,
469 c.arrival_ns,
470 c.remaining_deadline_ns,
471 )
472 })
473 .unwrap_or((None, None, None, None));
474 let mut ctx = RuntimeResourceRef {
475 peers: crate::runtime::PeerCtx {
476 gate: &mut self.framework.peer_state.gate,
477 backoff: &mut self.framework.peer_state.backoff,
478 governor: &mut self.framework.peer_state.governor,
479 addresses: &mut self.framework.address_book,
480 backpressure: &mut self.framework.peer_state.backpressure,
481 },
482 net: crate::runtime::NetCtx {
483 outbound: &mut self.framework.outbound_queue,
484 rtt: &mut self.framework.rtt_tracker,
485 requests: &mut self.framework.request_tracker,
486 dedup: &mut self.framework.inbound_dedup,
487 pending_peer_resolve_failures: &mut self
488 .framework
489 .pending_peer_resolve_failures,
490 },
491 time: crate::runtime::TimeCtx {
492 scheduler: &mut self.framework.scheduler,
493 },
494 syscall: crate::runtime::SyscallCtx {
495 serialize_queue: &mut self.framework.serialize_queue,
496 hold_table: &mut self.framework.hold_table,
497 record_buffer: &mut self.framework.record_buffer,
498 event_source: &mut self.framework.event_source,
499 counters: &mut self.framework.counters,
500 any_fired_groups: &mut self.framework.any_fired_groups,
501 deadline_match_fired: &mut self.framework.deadline_match_fired,
502 rng: &mut *self.framework.rng,
503 pending_app_events: &mut self.framework.pending_app_events,
504 },
505 bus: &mut self.bus,
506 ingress: std::sync::Arc::clone(&self.ingress),
507 components: crate::runtime::ComponentsView::default(),
508 current: crate::runtime::CurrentCallCtx {
509 op_ref,
510 exec_id,
511 self_peer: self.self_peer,
512 node_attributes: &node.attribute,
513 node_metadata: &node.metadata_props,
514 inbound: crate::runtime::InboundCtx {
515 src_peer: envelope_src_peer,
516 wire_req_id: inbound_correlation_wire_req_id,
517 arrival_ns: inbound_arrival_ns,
518 remaining_deadline_ns: inbound_remaining_deadline_ns,
519 },
520 pending_completions: Vec::new(),
521 next_command_id: &mut self.exec.ids.next_command_id,
522 },
523 };
524
525 let dispatch_result = invoke_fn(node, &inputs_for_dispatch, &mut ctx);
526
527 let captured = std::mem::take(&mut ctx.current.pending_completions);
528 drop(ctx);
529 self.exec.pending_completions.extend(captured);
530
531 dispatch_result
532 };
533
534 match result {
535 Ok(DispatchResult::Immediate(outputs)) => {
536 let sites = self.write_outputs(op_ref, exec_id, outputs);
537 EngineStep::OpCompleted {
538 op_ref,
539 exec_id,
540 sites_written: sites,
541 }
542 }
543 Ok(DispatchResult::Async(cmd_id)) => {
544 let output_sites = self.op_output_sites(op_ref);
545 self.exec.pending_async.insert(
546 cmd_id,
547 PendingAsync {
548 op_ref,
549 exec_id,
550 output_sites,
551 deadline_ns: None,
552 },
553 );
554 EngineStep::AsyncSuspended {
555 op_ref,
556 exec_id,
557 cmd_id,
558 }
559 }
560 Err(err) => {
561 self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
562 op_ref,
563 error: err.clone(),
564 }));
565 EngineStep::OpFailed {
566 op_ref,
567 exec_id,
568 error: err,
569 }
570 }
571 }
572 }
573
574 pub(crate) fn node_for(&self, op_ref: OpRef) -> Option<&NodeProto> {
577 let (gi, ni) = op_ref.split();
578 self.graphs.get(gi as usize)?.function.node.get(ni as usize)
579 }
580
581 pub(crate) fn op_output_sites(&self, op_ref: OpRef) -> Vec<NodeSiteId> {
586 let (gi, ni) = op_ref.split();
587 let Some(g) = self.graphs.get(gi as usize) else {
588 return Vec::new();
589 };
590 let Some(node) = g.function.node.get(ni as usize) else {
591 return Vec::new();
592 };
593 node.output
594 .iter()
595 .enumerate()
596 .map(|(i, name)| {
597 g.site_names
598 .get(name)
599 .copied()
600 .unwrap_or_else(|| synthesize_site_id(op_ref, i))
601 })
602 .collect()
603 }
604
605 pub(crate) fn resolve_site_name(&self, name: &str) -> Option<NodeSiteId> {
608 for g in self.graphs_iter() {
609 if let Some(&site) = g.site_names.get(name) {
610 return Some(site);
611 }
612 }
613 None
614 }
615
616 pub(crate) fn resolve_site_in_op_graph(&self, op_ref: OpRef, name: &str) -> Option<NodeSiteId> {
624 let (gi, _) = op_ref.split();
625 self.graphs.get(gi as usize)?.site_names.get(name).copied()
626 }
627
628 pub(crate) fn resolve_input_pairs(
644 &self,
645 node: &NodeProto,
646 exec_id: ExecId,
647 ) -> Vec<(NodeSiteId, String, ExecId)> {
648 let cc = self.exec.pending_calls.get(&exec_id);
649 let mut out = Vec::new();
650 for name in &node.input {
651 if name.is_empty() {
652 continue;
653 }
654 if let Some(cc) = cc {
655 if let Some(&alias_site) = cc.input_aliases.get(name) {
656 out.push((alias_site, name.clone(), cc.parent_exec_id));
657 continue;
658 }
659 }
660 if let Some(site) = self.resolve_site_name(name) {
661 out.push((site, name.clone(), exec_id));
662 }
663 }
664 out
665 }
666
667 pub(crate) fn write_outputs(
671 &mut self,
672 op_ref: OpRef,
673 exec_id: ExecId,
674 outputs: Vec<(String, Box<dyn SlotValue>)>,
675 ) -> Vec<NodeSiteId> {
676 let output_sites = self.op_output_sites(op_ref);
677 for ((site, _name), value) in output_sites
678 .iter()
679 .zip(outputs.iter().map(|(n, _)| n))
680 .zip(outputs.iter().map(|(_, v)| v.as_ref()))
681 {
682 let _ = (site, _name, value);
686 }
687 let mut sites_written: Vec<NodeSiteId> = Vec::new();
689 for (i, (_name, value)) in outputs.into_iter().enumerate() {
690 if let Some(site) = output_sites.get(i).copied() {
691 self.exec.slot_table.insert((site, exec_id), Some(value));
692 sites_written.push(site);
693 }
694 }
695
696 self.exec
698 .execution_state
699 .entry(exec_id)
700 .or_default()
701 .outputs_written += sites_written.len() as u32;
702
703 self.push_ready_consumers(&sites_written, exec_id);
705
706 self.forward_outputs_to_caller(&sites_written, exec_id);
711
712 self.surface_top_level_outputs(&sites_written, exec_id);
713 sites_written
714 }
715
716 pub(crate) fn forward_outputs_to_caller(
730 &mut self,
731 sites_written: &[NodeSiteId],
732 exec_id: ExecId,
733 ) {
734 let Some(cc) = self.exec.pending_calls.get(&exec_id) else {
735 return;
736 };
737 let mut pairs: Vec<(NodeSiteId, NodeSiteId)> = Vec::new();
740 for &body_site in sites_written {
741 if let Some(&caller_site) = cc.output_forwarding.get(&body_site) {
742 pairs.push((body_site, caller_site));
743 }
744 }
745 let parent_exec_id = cc.parent_exec_id;
746 if pairs.is_empty() {
747 return;
748 }
749 tracing::trace!(
750 target: "engine.function_call.forward",
751 call_target = ?cc.target,
752 body_exec_id = exec_id.as_u64(),
753 parent_exec_id = parent_exec_id.as_u64(),
754 pair_count = pairs.len(),
755 "forwarding body outputs to caller slots",
756 );
757
758 let mut caller_sites: Vec<NodeSiteId> = Vec::with_capacity(pairs.len());
759 for (body_site, caller_site) in &pairs {
760 let value = self
764 .exec
765 .slot_table
766 .get_mut(&(*body_site, exec_id))
767 .and_then(|opt| opt.take());
768 if let Some(value) = value {
769 self.exec
770 .slot_table
771 .insert((*caller_site, parent_exec_id), Some(value));
772 caller_sites.push(*caller_site);
773 }
774 }
775
776 if let Some(cc) = self.exec.pending_calls.get_mut(&exec_id) {
779 cc.outputs_remaining = cc.outputs_remaining.saturating_sub(pairs.len());
780 if cc.outputs_remaining == 0 {
781 self.exec.pending_calls.remove(&exec_id);
782 }
783 }
784
785 self.push_ready_consumers(&caller_sites, parent_exec_id);
788 self.surface_top_level_outputs(&caller_sites, parent_exec_id);
789 }
790
791 pub(crate) fn surface_top_level_outputs(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
809 for site in sites {
810 let consumer_count = self
811 .graphs_iter()
812 .map(|g| g.consumers.get(site).map(|v| v.len()).unwrap_or(0))
813 .sum::<usize>();
814 if consumer_count > 0 {
815 continue;
816 }
817 let name_opt = self
818 .graphs_iter()
819 .find_map(|g| g.top_level_outputs.get(site).cloned());
820 let Some(name) = name_opt else { continue };
821 let value_bytes = self
822 .exec
823 .slot_table
824 .get(&(*site, exec_id))
825 .and_then(|slot| slot.as_ref())
826 .map(|boxed| encode_for_host(boxed.as_ref()))
827 .unwrap_or_default();
828 self.framework
829 .pending_app_events
830 .push(crate::bus::AppEvent::Emit { name, value_bytes });
831 }
832 }
833
834 pub(crate) fn push_ready_consumers(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
838 let mut candidates: Vec<OpRef> = Vec::new();
842 for site in sites {
843 for g in self.graphs_iter() {
844 if let Some(consumers) = g.consumers.get(site) {
845 candidates.extend(consumers.iter().copied());
846 }
847 }
848 }
849 for op_ref in candidates {
850 if self.all_inputs_ready(op_ref, exec_id) {
851 self.exec.frontier.push_back((op_ref, exec_id));
852 }
853 }
854 }
855
856 pub(crate) fn all_inputs_ready(&self, op_ref: OpRef, exec_id: ExecId) -> bool {
860 let Some(node) = self.node_for(op_ref) else {
861 return false;
862 };
863 let cc = self.exec.pending_calls.get(&exec_id);
867 for name in &node.input {
868 if name.is_empty() {
869 continue; }
871 let (site, read_exec_id) = if let Some(cc) = cc {
872 if let Some(&alias_site) = cc.input_aliases.get(name) {
873 (alias_site, cc.parent_exec_id)
874 } else {
875 let Some(site) = self.resolve_site_name(name) else {
876 return false;
877 };
878 (site, exec_id)
879 }
880 } else {
881 let Some(site) = self.resolve_site_name(name) else {
882 return false;
883 };
884 (site, exec_id)
885 };
886 let has_value = self
887 .exec
888 .slot_table
889 .get(&(site, read_exec_id))
890 .map(|s| s.is_some())
891 .unwrap_or(false);
892 if !has_value {
893 return false;
894 }
895 }
896 true
897 }
898
899 pub(crate) fn fail_op(
905 &mut self,
906 op_ref: OpRef,
907 exec_id: ExecId,
908 kind: crate::bus::OpErrorKind,
909 reason: &'static str,
910 detail: String,
911 ) -> EngineStep {
912 let error = OpError {
913 kind,
914 reason,
915 detail,
916 };
917 self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
918 op_ref,
919 error: error.clone(),
920 }));
921 EngineStep::OpFailed {
922 op_ref,
923 exec_id,
924 error,
925 }
926 }
927}
928
929fn encode_for_host(value: &dyn crate::slot_value::SlotValue) -> Vec<u8> {
942 if let Some(b) = value
943 .as_any()
944 .downcast_ref::<crate::syscall::values::BytesValue>()
945 {
946 return b.0.clone();
947 }
948 match value.to_wire_bytes() {
949 Ok(bytes) => bytes,
950 Err(e) => {
951 tracing::warn!(error = %e, "encode_for_host: dropping host emit on encode failure");
952 Vec::new()
953 }
954 }
955}
956
957fn synthesize_site_id(op_ref: OpRef, output_index: usize) -> NodeSiteId {
961 NodeSiteId::from((op_ref.as_u64() << 8) | (output_index as u64 & 0xff))
962}
963
964pub(crate) fn call_protocol_dispatch_atomic(
973 component: &mut dyn crate::component::ErasedComponent,
974 op_type: &str,
975 inputs: &[(&str, &dyn SlotValue)],
976 ctx: &mut RuntimeResourceRef<'_>,
977 dispatchers: &std::collections::HashMap<std::any::TypeId, RoleDispatcher>,
978) -> Result<DispatchResult, String> {
979 let any: &mut dyn std::any::Any = component;
980 let tid = (*any).type_id();
981 if let Some(dispatcher) = dispatchers.get(&tid) {
982 (dispatcher.dispatch)(any, op_type, inputs, ctx)
983 } else {
984 Err("no ProtocolRuntime dispatcher registered for component".to_string())
985 }
986}
987
988pub type ProtocolDispatchFn = fn(
991 &mut dyn std::any::Any,
992 &str,
993 &[(&str, &dyn SlotValue)],
994 &mut RuntimeResourceRef<'_>,
995) -> Result<DispatchResult, String>;
996
997pub type BackendMaterializeFn =
1003 fn(
1004 &mut dyn std::any::Any,
1005 u64,
1006 Vec<u8>,
1007 ) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError>;
1008
1009pub fn no_materialize(
1015 _any: &mut dyn std::any::Any,
1016 _type_hash: u64,
1017 _bytes: Vec<u8>,
1018) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError> {
1019 Err(crate::slot_value::BackendMaterializeError {
1020 summary: "component is not a Backend; materialize_from_wire not supported".to_string(),
1021 })
1022}
1023
1024pub struct RoleDispatcher {
1032 pub(crate) dispatch: ProtocolDispatchFn,
1033 pub(crate) materialize: BackendMaterializeFn,
1034}
1035
1036pub fn make_protocol_dispatcher<T: ProtocolRuntime + 'static>() -> RoleDispatcher
1041where
1042 T::Error: std::fmt::Display,
1043{
1044 RoleDispatcher {
1045 dispatch: |any: &mut dyn std::any::Any,
1046 op_type: &str,
1047 inputs: &[(&str, &dyn SlotValue)],
1048 ctx: &mut RuntimeResourceRef<'_>| {
1049 let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1050 concrete
1051 .dispatch_atomic(op_type, inputs, ctx)
1052 .map_err(|e| e.to_string())
1053 },
1054 materialize: no_materialize,
1055 }
1056}
1057
1058macro_rules! emit_role_dispatcher_factory {
1065 ($factory_name:ident, $runtime_trait:path) => {
1066 #[doc = concat!("Build a `RoleDispatcher` for a concrete impl of ", stringify!($runtime_trait), ". Used by `Engine::register_*_dispatcher` chain (`Node::with_<role>(&value)`) so single-role components dispatch through the same `TypeId`-keyed registry as multi-role / Protocol-bearing components.")]
1067 pub fn $factory_name<T: $runtime_trait + 'static>() -> RoleDispatcher
1068 where
1069 <T as $runtime_trait>::Error: std::fmt::Display,
1070 {
1071 RoleDispatcher {
1072 dispatch: |any: &mut dyn std::any::Any,
1073 op_type: &str,
1074 inputs: &[(&str, &dyn SlotValue)],
1075 ctx: &mut RuntimeResourceRef<'_>| {
1076 let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1077 <T as $runtime_trait>::dispatch_atomic(concrete, op_type, inputs, ctx)
1078 .map_err(|e| e.to_string())
1079 },
1080 materialize: no_materialize,
1081 }
1082 }
1083 };
1084}
1085
1086emit_role_dispatcher_factory!(make_index_dispatcher, crate::roles::IndexRuntime);
1087emit_role_dispatcher_factory!(make_aggregator_dispatcher, crate::roles::AggregatorRuntime);
1088emit_role_dispatcher_factory!(make_model_dispatcher, crate::roles::ModelRuntime);
1089emit_role_dispatcher_factory!(make_codec_dispatcher, crate::roles::CodecRuntime);
1090emit_role_dispatcher_factory!(make_data_source_dispatcher, crate::roles::DataSourceRuntime);
1091emit_role_dispatcher_factory!(
1092 make_peer_selector_dispatcher,
1093 crate::roles::PeerSelectorRuntime
1094);
1095
1096pub fn make_backend_dispatcher<T: crate::roles::BackendRuntime + 'static>() -> RoleDispatcher
1102where
1103 <T as crate::roles::BackendRuntime>::Error: std::fmt::Display,
1104{
1105 RoleDispatcher {
1106 dispatch: |any: &mut dyn std::any::Any,
1107 op_type: &str,
1108 inputs: &[(&str, &dyn SlotValue)],
1109 ctx: &mut RuntimeResourceRef<'_>| {
1110 let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1111 <T as crate::roles::BackendRuntime>::dispatch_atomic(concrete, op_type, inputs, ctx)
1112 .map_err(|e| e.to_string())
1113 },
1114 materialize: |any: &mut dyn std::any::Any, type_hash: u64, bytes: Vec<u8>| {
1115 let concrete = any.downcast_ref::<T>().expect("is_match guaranteed");
1116 <T as crate::roles::BackendRuntime>::materialize_from_wire(concrete, type_hash, bytes)
1117 },
1118 }
1119}
1120
1121#[cfg(test)]
1122#[path = "invoke_function_call_tests.rs"]
1123mod function_call_tests;