use crate::atomic::DispatchResult;
use crate::bus::{InfraEvent, NodeEvent, OpError};
use crate::engine::call_context::CallContext;
use crate::engine::core::{graph_name_for, Engine};
use crate::engine::dispatch_entry::{FunctionKey, OpDispatch, StatelessInvokeFn};
use crate::engine::pending_async::PendingAsync;
use crate::engine::step::EngineStep;
use crate::ids::{ComponentRef, ExecId, NodeSiteId, OpRef};
use crate::roles::ProtocolRuntime;
use crate::runtime::RuntimeResourceRef;
use crate::slot_value::SlotValue;
use bb_ir::proto::onnx::NodeProto;
impl Engine {
pub(crate) fn invoke_one(&mut self, op_ref: OpRef, exec_id: ExecId) -> EngineStep {
let node = match self.node_for(op_ref) {
Some(n) => n.clone(),
None => {
return self.fail_op(
op_ref,
exec_id,
crate::bus::OpErrorKind::ExecutionFailed,
"unknown_op_ref",
"unknown op_ref".to_string(),
)
}
};
let _invoke_span = tracing::debug_span!(
"engine.invoke_one",
op.name = %node.name,
op.kind = %node.op_type,
op.domain = %node.domain,
exec_id = %exec_id,
op_ref = %op_ref,
)
.entered();
match self.dispatch_for(op_ref) {
Some(OpDispatch::Stateless(invoke_fn)) => {
self.invoke_stateless(op_ref, exec_id, &node, invoke_fn)
}
Some(OpDispatch::Atomic {
component_ref,
dispatch_fn,
}) => self.invoke_atomic(op_ref, exec_id, &node, component_ref, dispatch_fn),
Some(OpDispatch::FunctionCall {
target,
input_rename,
output_rename,
}) => {
self.invoke_function_call(op_ref, exec_id, &target, &input_rename, &output_rename)
}
Some(OpDispatch::Unresolved) | None => self.fail_op(
op_ref,
exec_id,
crate::bus::OpErrorKind::NotRegistered,
"unresolved_dispatch",
format!("unresolved dispatch for {}::{}", node.domain, node.op_type),
),
}
}
fn dispatch_for(&self, op_ref: OpRef) -> Option<OpDispatch> {
let (gi, ni) = op_ref.split();
self.graphs
.get(gi as usize)?
.op_dispatch
.get(ni as usize)
.cloned()
}
fn invoke_atomic(
&mut self,
op_ref: OpRef,
exec_id: ExecId,
node: &NodeProto,
component_ref: ComponentRef,
dispatch_fn: ProtocolDispatchFn,
) -> EngineStep {
if let Some(cap) = self.max_pending_async {
if self.exec.pending_async.len() >= cap {
return self.fail_op(
op_ref,
exec_id,
crate::bus::OpErrorKind::Cooldown,
"pending_async_limit",
"pending-async limit exceeded".to_string(),
);
}
}
let input_pairs = self.resolve_input_pairs(node, exec_id);
let Some(mut taken) = self.take_component(component_ref) else {
return self.fail_op(
op_ref,
exec_id,
crate::bus::OpErrorKind::MissingSlot,
"component_missing",
"component missing".to_string(),
);
};
let result: Result<DispatchResult, String> = {
let mut input_refs: Vec<(String, &dyn SlotValue)> =
Vec::with_capacity(input_pairs.len());
for (site, name, read_exec_id) in &input_pairs {
if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
input_refs.push((name.clone(), boxed.as_ref()));
}
}
let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
let (
envelope_src_peer,
inbound_correlation_wire_req_id,
inbound_arrival_ns,
inbound_remaining_deadline_ns,
) = self
.framework
.inbound_contexts
.get(&exec_id)
.map(|c| {
(
c.src_peer,
c.wire_req_id,
c.arrival_ns,
c.remaining_deadline_ns,
)
})
.unwrap_or((None, None, None, None));
let mut ctx = RuntimeResourceRef {
peers: crate::runtime::PeerCtx {
gate: &mut self.framework.peer_state.gate,
backoff: &mut self.framework.peer_state.backoff,
governor: &mut self.framework.peer_state.governor,
addresses: &mut self.framework.address_book,
backpressure: &mut self.framework.peer_state.backpressure,
},
net: crate::runtime::NetCtx {
outbound: &mut self.framework.outbound_queue,
rtt: &mut self.framework.rtt_tracker,
requests: &mut self.framework.request_tracker,
dedup: &mut self.framework.inbound_dedup,
pending_peer_resolve_failures: &mut self
.framework
.pending_peer_resolve_failures,
},
time: crate::runtime::TimeCtx {
scheduler: &mut self.framework.scheduler,
},
syscall: crate::runtime::SyscallCtx {
serialize_queue: &mut self.framework.serialize_queue,
hold_table: &mut self.framework.hold_table,
record_buffer: &mut self.framework.record_buffer,
event_source: &mut self.framework.event_source,
counters: &mut self.framework.counters,
any_fired_groups: &mut self.framework.any_fired_groups,
deadline_match_fired: &mut self.framework.deadline_match_fired,
rng: &mut *self.framework.rng,
pending_app_events: &mut self.framework.pending_app_events,
},
bus: &mut self.bus,
ingress: std::sync::Arc::clone(&self.ingress),
components: crate::runtime::ComponentsView {
instances: Some(&self.components),
slots: Some(&self.slots),
},
current: crate::runtime::CurrentCallCtx {
op_ref,
exec_id,
self_peer: self.self_peer,
node_attributes: &node.attribute,
node_metadata: &node.metadata_props,
inbound: crate::runtime::InboundCtx {
src_peer: envelope_src_peer,
wire_req_id: inbound_correlation_wire_req_id,
arrival_ns: inbound_arrival_ns,
remaining_deadline_ns: inbound_remaining_deadline_ns,
},
pending_completions: Vec::new(),
next_command_id: &mut self.exec.ids.next_command_id,
},
};
let any: &mut dyn std::any::Any = taken.as_mut();
let dispatch_result = dispatch_fn(any, &node.op_type, &inputs_for_dispatch, &mut ctx);
let captured = std::mem::take(&mut ctx.current.pending_completions);
drop(ctx);
self.exec.pending_completions.extend(captured);
dispatch_result
};
self.restore_component(component_ref, taken);
match result {
Ok(DispatchResult::Immediate(outputs)) => {
let sites = self.write_outputs(op_ref, exec_id, outputs);
EngineStep::OpCompleted {
op_ref,
exec_id,
sites_written: sites,
}
}
Ok(DispatchResult::Async(cmd_id)) => {
let output_sites = self.op_output_sites(op_ref);
self.exec.pending_async.insert(
cmd_id,
PendingAsync {
op_ref,
exec_id,
output_sites,
deadline_ns: None,
},
);
EngineStep::AsyncSuspended {
op_ref,
exec_id,
cmd_id,
}
}
Err(detail) => self.fail_op(
op_ref,
exec_id,
crate::bus::OpErrorKind::ExecutionFailed,
"stateless_invoke",
detail,
),
}
}
pub(crate) fn invoke_function_call(
&mut self,
op_ref: OpRef,
parent_exec_id: ExecId,
target: &FunctionKey,
input_rename: &[(String, String)],
output_rename: &[(String, String)],
) -> EngineStep {
let graph_name = graph_name_for(target);
if !self.has_graph(&graph_name) {
return self.fail_op(
op_ref,
parent_exec_id,
crate::bus::OpErrorKind::NotRegistered,
"function_target_missing",
format!("function-call target {graph_name} not installed"),
);
}
let body_exec_id = self.allocate_exec_id();
let body = self.graph(&graph_name).expect("checked above");
let body_idx = self.graph_idx(&graph_name).expect("graph just resolved");
let body_op_refs: Vec<OpRef> = (0..body.function.node.len() as u32)
.map(|ni| OpRef::pack(body_idx, ni))
.collect();
let body_site_for: std::collections::HashMap<String, NodeSiteId> = body.site_names.clone();
let mut input_aliases: std::collections::HashMap<String, NodeSiteId> =
std::collections::HashMap::with_capacity(input_rename.len());
for (caller_name, formal_name) in input_rename {
let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_name) else {
return self.fail_op(
op_ref,
parent_exec_id,
crate::bus::OpErrorKind::MissingSlot,
"function_input_unbound",
format!("function-call input {caller_name} not bound"),
);
};
input_aliases.insert(formal_name.clone(), caller_site);
}
let mut output_forwarding: std::collections::HashMap<NodeSiteId, NodeSiteId> =
std::collections::HashMap::with_capacity(output_rename.len());
for (formal_out, caller_out) in output_rename {
let Some(&body_site) = body_site_for.get(formal_out) else {
return self.fail_op(
op_ref,
parent_exec_id,
crate::bus::OpErrorKind::NotRegistered,
"function_output_missing",
format!("function-call output {formal_out} missing from body"),
);
};
let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_out) else {
return self.fail_op(
op_ref,
parent_exec_id,
crate::bus::OpErrorKind::MissingSlot,
"function_output_unbound",
format!("function-call output {caller_out} not bound"),
);
};
output_forwarding.insert(body_site, caller_site);
}
let outputs_remaining = output_forwarding.len();
self.exec.pending_calls.insert(
body_exec_id,
CallContext {
parent_exec_id,
target: target.clone(),
input_aliases,
output_forwarding,
outputs_remaining,
},
);
for body_op in body_op_refs {
self.exec.frontier.push_back((body_op, body_exec_id));
}
if self
.exec
.pending_calls
.get(&body_exec_id)
.map(|c| c.outputs_remaining == 0)
.unwrap_or(false)
{
self.exec.pending_calls.remove(&body_exec_id);
}
EngineStep::OpCompleted {
op_ref,
exec_id: parent_exec_id,
sites_written: Vec::new(),
}
}
pub(crate) fn invoke_stateless(
&mut self,
op_ref: OpRef,
exec_id: ExecId,
node: &NodeProto,
invoke_fn: StatelessInvokeFn,
) -> EngineStep {
if let Some(cap) = self.max_pending_async {
if self.exec.pending_async.len() >= cap {
return self.fail_op(
op_ref,
exec_id,
crate::bus::OpErrorKind::Cooldown,
"pending_async_limit",
"pending-async limit exceeded".to_string(),
);
}
}
let input_pairs = self.resolve_input_pairs(node, exec_id);
let result: Result<DispatchResult, OpError> = {
let mut input_refs: Vec<(String, &dyn SlotValue)> =
Vec::with_capacity(input_pairs.len());
for (site, name, read_exec_id) in &input_pairs {
if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
input_refs.push((name.clone(), boxed.as_ref()));
}
}
let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
let (
envelope_src_peer,
inbound_correlation_wire_req_id,
inbound_arrival_ns,
inbound_remaining_deadline_ns,
) = self
.framework
.inbound_contexts
.get(&exec_id)
.map(|c| {
(
c.src_peer,
c.wire_req_id,
c.arrival_ns,
c.remaining_deadline_ns,
)
})
.unwrap_or((None, None, None, None));
let mut ctx = RuntimeResourceRef {
peers: crate::runtime::PeerCtx {
gate: &mut self.framework.peer_state.gate,
backoff: &mut self.framework.peer_state.backoff,
governor: &mut self.framework.peer_state.governor,
addresses: &mut self.framework.address_book,
backpressure: &mut self.framework.peer_state.backpressure,
},
net: crate::runtime::NetCtx {
outbound: &mut self.framework.outbound_queue,
rtt: &mut self.framework.rtt_tracker,
requests: &mut self.framework.request_tracker,
dedup: &mut self.framework.inbound_dedup,
pending_peer_resolve_failures: &mut self
.framework
.pending_peer_resolve_failures,
},
time: crate::runtime::TimeCtx {
scheduler: &mut self.framework.scheduler,
},
syscall: crate::runtime::SyscallCtx {
serialize_queue: &mut self.framework.serialize_queue,
hold_table: &mut self.framework.hold_table,
record_buffer: &mut self.framework.record_buffer,
event_source: &mut self.framework.event_source,
counters: &mut self.framework.counters,
any_fired_groups: &mut self.framework.any_fired_groups,
deadline_match_fired: &mut self.framework.deadline_match_fired,
rng: &mut *self.framework.rng,
pending_app_events: &mut self.framework.pending_app_events,
},
bus: &mut self.bus,
ingress: std::sync::Arc::clone(&self.ingress),
components: crate::runtime::ComponentsView::default(),
current: crate::runtime::CurrentCallCtx {
op_ref,
exec_id,
self_peer: self.self_peer,
node_attributes: &node.attribute,
node_metadata: &node.metadata_props,
inbound: crate::runtime::InboundCtx {
src_peer: envelope_src_peer,
wire_req_id: inbound_correlation_wire_req_id,
arrival_ns: inbound_arrival_ns,
remaining_deadline_ns: inbound_remaining_deadline_ns,
},
pending_completions: Vec::new(),
next_command_id: &mut self.exec.ids.next_command_id,
},
};
let dispatch_result = invoke_fn(node, &inputs_for_dispatch, &mut ctx);
let captured = std::mem::take(&mut ctx.current.pending_completions);
drop(ctx);
self.exec.pending_completions.extend(captured);
dispatch_result
};
match result {
Ok(DispatchResult::Immediate(outputs)) => {
let sites = self.write_outputs(op_ref, exec_id, outputs);
EngineStep::OpCompleted {
op_ref,
exec_id,
sites_written: sites,
}
}
Ok(DispatchResult::Async(cmd_id)) => {
let output_sites = self.op_output_sites(op_ref);
self.exec.pending_async.insert(
cmd_id,
PendingAsync {
op_ref,
exec_id,
output_sites,
deadline_ns: None,
},
);
EngineStep::AsyncSuspended {
op_ref,
exec_id,
cmd_id,
}
}
Err(err) => {
self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
op_ref,
error: err.clone(),
}));
EngineStep::OpFailed {
op_ref,
exec_id,
error: err,
}
}
}
}
pub(crate) fn node_for(&self, op_ref: OpRef) -> Option<&NodeProto> {
let (gi, ni) = op_ref.split();
self.graphs.get(gi as usize)?.function.node.get(ni as usize)
}
pub(crate) fn op_output_sites(&self, op_ref: OpRef) -> Vec<NodeSiteId> {
let (gi, ni) = op_ref.split();
let Some(g) = self.graphs.get(gi as usize) else {
return Vec::new();
};
let Some(node) = g.function.node.get(ni as usize) else {
return Vec::new();
};
node.output
.iter()
.enumerate()
.map(|(i, name)| {
g.site_names
.get(name)
.copied()
.unwrap_or_else(|| synthesize_site_id(op_ref, i))
})
.collect()
}
pub(crate) fn resolve_site_name(&self, name: &str) -> Option<NodeSiteId> {
for g in self.graphs_iter() {
if let Some(&site) = g.site_names.get(name) {
return Some(site);
}
}
None
}
pub(crate) fn resolve_site_in_op_graph(&self, op_ref: OpRef, name: &str) -> Option<NodeSiteId> {
let (gi, _) = op_ref.split();
self.graphs.get(gi as usize)?.site_names.get(name).copied()
}
pub(crate) fn resolve_input_pairs(
&self,
node: &NodeProto,
exec_id: ExecId,
) -> Vec<(NodeSiteId, String, ExecId)> {
let cc = self.exec.pending_calls.get(&exec_id);
let mut out = Vec::new();
for name in &node.input {
if name.is_empty() {
continue;
}
if let Some(cc) = cc {
if let Some(&alias_site) = cc.input_aliases.get(name) {
out.push((alias_site, name.clone(), cc.parent_exec_id));
continue;
}
}
if let Some(site) = self.resolve_site_name(name) {
out.push((site, name.clone(), exec_id));
}
}
out
}
pub(crate) fn write_outputs(
&mut self,
op_ref: OpRef,
exec_id: ExecId,
outputs: Vec<(String, Box<dyn SlotValue>)>,
) -> Vec<NodeSiteId> {
let output_sites = self.op_output_sites(op_ref);
for ((site, _name), value) in output_sites
.iter()
.zip(outputs.iter().map(|(n, _)| n))
.zip(outputs.iter().map(|(_, v)| v.as_ref()))
{
let _ = (site, _name, value);
}
let mut sites_written: Vec<NodeSiteId> = Vec::new();
for (i, (_name, value)) in outputs.into_iter().enumerate() {
if let Some(site) = output_sites.get(i).copied() {
self.exec.slot_table.insert((site, exec_id), Some(value));
sites_written.push(site);
}
}
self.exec
.execution_state
.entry(exec_id)
.or_default()
.outputs_written += sites_written.len() as u32;
self.push_ready_consumers(&sites_written, exec_id);
self.forward_outputs_to_caller(&sites_written, exec_id);
self.surface_top_level_outputs(&sites_written, exec_id);
sites_written
}
pub(crate) fn forward_outputs_to_caller(
&mut self,
sites_written: &[NodeSiteId],
exec_id: ExecId,
) {
let Some(cc) = self.exec.pending_calls.get(&exec_id) else {
return;
};
let mut pairs: Vec<(NodeSiteId, NodeSiteId)> = Vec::new();
for &body_site in sites_written {
if let Some(&caller_site) = cc.output_forwarding.get(&body_site) {
pairs.push((body_site, caller_site));
}
}
let parent_exec_id = cc.parent_exec_id;
if pairs.is_empty() {
return;
}
tracing::trace!(
target: "engine.function_call.forward",
call_target = ?cc.target,
body_exec_id = exec_id.as_u64(),
parent_exec_id = parent_exec_id.as_u64(),
pair_count = pairs.len(),
"forwarding body outputs to caller slots",
);
let mut caller_sites: Vec<NodeSiteId> = Vec::with_capacity(pairs.len());
for (body_site, caller_site) in &pairs {
let value = self
.exec
.slot_table
.get_mut(&(*body_site, exec_id))
.and_then(|opt| opt.take());
if let Some(value) = value {
self.exec
.slot_table
.insert((*caller_site, parent_exec_id), Some(value));
caller_sites.push(*caller_site);
}
}
if let Some(cc) = self.exec.pending_calls.get_mut(&exec_id) {
cc.outputs_remaining = cc.outputs_remaining.saturating_sub(pairs.len());
if cc.outputs_remaining == 0 {
self.exec.pending_calls.remove(&exec_id);
}
}
self.push_ready_consumers(&caller_sites, parent_exec_id);
self.surface_top_level_outputs(&caller_sites, parent_exec_id);
}
pub(crate) fn surface_top_level_outputs(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
for site in sites {
let consumer_count = self
.graphs_iter()
.map(|g| g.consumers.get(site).map(|v| v.len()).unwrap_or(0))
.sum::<usize>();
if consumer_count > 0 {
continue;
}
let name_opt = self
.graphs_iter()
.find_map(|g| g.top_level_outputs.get(site).cloned());
let Some(name) = name_opt else { continue };
let value_bytes = self
.exec
.slot_table
.get(&(*site, exec_id))
.and_then(|slot| slot.as_ref())
.map(|boxed| encode_for_host(boxed.as_ref()))
.unwrap_or_default();
self.framework
.pending_app_events
.push(crate::bus::AppEvent::Emit { name, value_bytes });
}
}
pub(crate) fn push_ready_consumers(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
let mut candidates: Vec<OpRef> = Vec::new();
for site in sites {
for g in self.graphs_iter() {
if let Some(consumers) = g.consumers.get(site) {
candidates.extend(consumers.iter().copied());
}
}
}
for op_ref in candidates {
if self.all_inputs_ready(op_ref, exec_id) {
self.exec.frontier.push_back((op_ref, exec_id));
}
}
}
pub(crate) fn all_inputs_ready(&self, op_ref: OpRef, exec_id: ExecId) -> bool {
let Some(node) = self.node_for(op_ref) else {
return false;
};
let cc = self.exec.pending_calls.get(&exec_id);
for name in &node.input {
if name.is_empty() {
continue; }
let (site, read_exec_id) = if let Some(cc) = cc {
if let Some(&alias_site) = cc.input_aliases.get(name) {
(alias_site, cc.parent_exec_id)
} else {
let Some(site) = self.resolve_site_name(name) else {
return false;
};
(site, exec_id)
}
} else {
let Some(site) = self.resolve_site_name(name) else {
return false;
};
(site, exec_id)
};
let has_value = self
.exec
.slot_table
.get(&(site, read_exec_id))
.map(|s| s.is_some())
.unwrap_or(false);
if !has_value {
return false;
}
}
true
}
pub(crate) fn fail_op(
&mut self,
op_ref: OpRef,
exec_id: ExecId,
kind: crate::bus::OpErrorKind,
reason: &'static str,
detail: String,
) -> EngineStep {
let error = OpError {
kind,
reason,
detail,
};
self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
op_ref,
error: error.clone(),
}));
EngineStep::OpFailed {
op_ref,
exec_id,
error,
}
}
}
fn encode_for_host(value: &dyn crate::slot_value::SlotValue) -> Vec<u8> {
if let Some(b) = value
.as_any()
.downcast_ref::<crate::syscall::values::BytesValue>()
{
return b.0.clone();
}
match value.to_wire_bytes() {
Ok(bytes) => bytes,
Err(e) => {
tracing::warn!(error = %e, "encode_for_host: dropping host emit on encode failure");
Vec::new()
}
}
}
fn synthesize_site_id(op_ref: OpRef, output_index: usize) -> NodeSiteId {
NodeSiteId::from((op_ref.as_u64() << 8) | (output_index as u64 & 0xff))
}
pub(crate) fn call_protocol_dispatch_atomic(
component: &mut dyn crate::component::ErasedComponent,
op_type: &str,
inputs: &[(&str, &dyn SlotValue)],
ctx: &mut RuntimeResourceRef<'_>,
dispatchers: &std::collections::HashMap<std::any::TypeId, RoleDispatcher>,
) -> Result<DispatchResult, String> {
let any: &mut dyn std::any::Any = component;
let tid = (*any).type_id();
if let Some(dispatcher) = dispatchers.get(&tid) {
(dispatcher.dispatch)(any, op_type, inputs, ctx)
} else {
Err("no ProtocolRuntime dispatcher registered for component".to_string())
}
}
pub type ProtocolDispatchFn = fn(
&mut dyn std::any::Any,
&str,
&[(&str, &dyn SlotValue)],
&mut RuntimeResourceRef<'_>,
) -> Result<DispatchResult, String>;
pub type BackendMaterializeFn =
fn(
&mut dyn std::any::Any,
u64,
Vec<u8>,
) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError>;
pub type BootstrapDispatchFn = fn(
&mut dyn std::any::Any,
&mut crate::contracts::bootstrap::BootstrapCtx,
) -> Result<DispatchResult, String>;
pub fn make_bootstrap_dispatcher<T: crate::contracts::bootstrap::Bootstrap + 'static>(
) -> BootstrapDispatchFn
where
T::Error: std::fmt::Display,
{
|any, ctx| {
let concrete = any
.downcast_mut::<T>()
.expect("type-erased lookup matched T");
concrete
.bootstrap(ctx)
.map(|_| DispatchResult::Immediate(Vec::new()))
.map_err(|e| e.to_string())
}
}
pub fn no_materialize(
_any: &mut dyn std::any::Any,
_type_hash: u64,
_bytes: Vec<u8>,
) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError> {
Err(crate::slot_value::BackendMaterializeError {
summary: "component is not a Backend; materialize_from_wire not supported".to_string(),
})
}
pub struct RoleDispatcher {
pub(crate) dispatch: ProtocolDispatchFn,
pub(crate) materialize: BackendMaterializeFn,
}
pub fn make_protocol_dispatcher<T: ProtocolRuntime + 'static>() -> RoleDispatcher
where
T::Error: std::fmt::Display,
{
RoleDispatcher {
dispatch: |any: &mut dyn std::any::Any,
op_type: &str,
inputs: &[(&str, &dyn SlotValue)],
ctx: &mut RuntimeResourceRef<'_>| {
let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
concrete
.dispatch_atomic(op_type, inputs, ctx)
.map_err(|e| e.to_string())
},
materialize: no_materialize,
}
}
macro_rules! emit_role_dispatcher_factory {
($factory_name:ident, $runtime_trait:path) => {
#[doc = concat!("Build a `RoleDispatcher` for a concrete impl of ", stringify!($runtime_trait), ". Used by `Engine::register_*_dispatcher` chain (`Node::with_<role>(&value)`) so single-role components dispatch through the same `TypeId`-keyed registry as multi-role / Protocol-bearing components.")]
pub fn $factory_name<T: $runtime_trait + 'static>() -> RoleDispatcher
where
<T as $runtime_trait>::Error: std::fmt::Display,
{
RoleDispatcher {
dispatch: |any: &mut dyn std::any::Any,
op_type: &str,
inputs: &[(&str, &dyn SlotValue)],
ctx: &mut RuntimeResourceRef<'_>| {
let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
<T as $runtime_trait>::dispatch_atomic(concrete, op_type, inputs, ctx)
.map_err(|e| e.to_string())
},
materialize: no_materialize,
}
}
};
}
emit_role_dispatcher_factory!(make_index_dispatcher, crate::roles::IndexRuntime);
emit_role_dispatcher_factory!(make_aggregator_dispatcher, crate::roles::AggregatorRuntime);
emit_role_dispatcher_factory!(make_model_dispatcher, crate::roles::ModelRuntime);
emit_role_dispatcher_factory!(make_codec_dispatcher, crate::roles::CodecRuntime);
emit_role_dispatcher_factory!(make_data_source_dispatcher, crate::roles::DataSourceRuntime);
emit_role_dispatcher_factory!(
make_peer_selector_dispatcher,
crate::roles::PeerSelectorRuntime
);
pub fn make_backend_dispatcher<T: crate::roles::BackendRuntime + 'static>() -> RoleDispatcher
where
<T as crate::roles::BackendRuntime>::Error: std::fmt::Display,
{
RoleDispatcher {
dispatch: |any: &mut dyn std::any::Any,
op_type: &str,
inputs: &[(&str, &dyn SlotValue)],
ctx: &mut RuntimeResourceRef<'_>| {
let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
<T as crate::roles::BackendRuntime>::dispatch_atomic(concrete, op_type, inputs, ctx)
.map_err(|e| e.to_string())
},
materialize: |any: &mut dyn std::any::Any, type_hash: u64, bytes: Vec<u8>| {
let concrete = any.downcast_ref::<T>().expect("is_match guaranteed");
<T as crate::roles::BackendRuntime>::materialize_from_wire(concrete, type_hash, bytes)
},
}
}
#[cfg(test)]
#[path = "invoke_function_call_tests.rs"]
mod function_call_tests;