Skip to main content

bb_runtime/engine/
invoke.rs

1//! Op invocation lifecycle
2//!
3//! implements the `Atomic { component_ref }` dispatch path
4//! routing through `ProtocolRuntime::dispatch_atomic` (the
5//! universal-pair-only role trait - see design call #4 in
6//! `docs/internal/IMPLEMENTATION_PLAN.md` ). The `Stateless`
7//! syscall path lands.
8
9use crate::atomic::DispatchResult;
10use crate::bus::{InfraEvent, NodeEvent, OpError};
11use crate::engine::call_context::CallContext;
12use crate::engine::core::{graph_name_for, Engine};
13use crate::engine::dispatch_entry::{FunctionKey, OpDispatch, StatelessInvokeFn};
14use crate::engine::pending_async::PendingAsync;
15use crate::engine::step::EngineStep;
16use crate::ids::{ComponentRef, ExecId, NodeSiteId, OpRef};
17use crate::roles::ProtocolRuntime;
18use crate::runtime::RuntimeResourceRef;
19use crate::slot_value::SlotValue;
20use bb_ir::proto::onnx::NodeProto;
21
22impl Engine {
23    /// Dispatch a single `(op_ref, exec_id)` per ENGINE.md §8.1.
24    /// Returns one `EngineStep` describing the outcome
25    /// (`OpCompleted` / `AsyncSuspended` / `OpFailed`).
26    ///
27    /// Consult `dispatch_for` for the `OpDispatch` variant stamped at
28    /// install time by `Engine::resolve_dispatch` and route to the
29    /// dedicated helper. An op without a stamped entry - or stamped
30    /// `Unresolved` - is a build invariant violation that surfaced
31    /// past the `Node` pre-flight; fail the op so the caller
32    /// sees a clean `OpFailure` rather than a panic.
33    pub(crate) fn invoke_one(&mut self, op_ref: OpRef, exec_id: ExecId) -> EngineStep {
34        let node = match self.node_for(op_ref) {
35            Some(n) => n.clone(),
36            None => {
37                return self.fail_op(
38                    op_ref,
39                    exec_id,
40                    crate::bus::OpErrorKind::ExecutionFailed,
41                    "unknown_op_ref",
42                    "unknown op_ref".to_string(),
43                )
44            }
45        };
46
47        // Structured op fields on the per-invocation span let
48        // operators filter and group traces by op kind, domain, or
49        // a specific (exec_id, op_ref) instance — equivalent to the
50        // op-keyed metadata ONNX Runtime emits via EventRecord.args.
51        let _invoke_span = tracing::debug_span!(
52            "engine.invoke_one",
53            op.name = %node.name,
54            op.kind = %node.op_type,
55            op.domain = %node.domain,
56            exec_id = %exec_id,
57            op_ref = %op_ref,
58        )
59        .entered();
60
61        match self.dispatch_for(op_ref) {
62            Some(OpDispatch::Stateless(invoke_fn)) => {
63                self.invoke_stateless(op_ref, exec_id, &node, invoke_fn)
64            }
65            Some(OpDispatch::Atomic {
66                component_ref,
67                dispatch_fn,
68            }) => self.invoke_atomic(op_ref, exec_id, &node, component_ref, dispatch_fn),
69            Some(OpDispatch::FunctionCall {
70                target,
71                input_rename,
72                output_rename,
73            }) => {
74                self.invoke_function_call(op_ref, exec_id, &target, &input_rename, &output_rename)
75            }
76            Some(OpDispatch::Unresolved) | None => self.fail_op(
77                op_ref,
78                exec_id,
79                crate::bus::OpErrorKind::NotRegistered,
80                "unresolved_dispatch",
81                format!("unresolved dispatch for {}::{}", node.domain, node.op_type),
82            ),
83        }
84    }
85
86    /// Look up the install-time-stamped `OpDispatch` for an `OpRef`.
87    /// Positional resolution per C5: `OpRef::pack(graph_idx, node_idx)`
88    /// → two direct array accesses, no HashMap probes.
89    fn dispatch_for(&self, op_ref: OpRef) -> Option<OpDispatch> {
90        let (gi, ni) = op_ref.split();
91        self.graphs
92            .get(gi as usize)?
93            .op_dispatch
94            .get(ni as usize)
95            .cloned()
96    }
97
98    /// Dispatch through `ProtocolRuntime::dispatch_atomic` for an op
99    /// resolved to a bound component.
100    fn invoke_atomic(
101        &mut self,
102        op_ref: OpRef,
103        exec_id: ExecId,
104        node: &NodeProto,
105        component_ref: ComponentRef,
106        dispatch_fn: ProtocolDispatchFn,
107    ) -> EngineStep {
108        // Defensive cap check BEFORE dispatch — any atomic op that
109        // returns Async would need to insert into pending_async, so
110        // when we're already at cap we reject early. Components that
111        // side-effect (e.g. allocate wire requests, push outbound
112        // envelopes) inside dispatch don't see the cap rejection
113        // until AFTER their effects run, leaking that state into the
114        // engine. Pre-dispatch rejection covers Immediate ops too but
115        // engine-at-cap is the right backpressure signal regardless.
116        if let Some(cap) = self.max_pending_async {
117            if self.exec.pending_async.len() >= cap {
118                return self.fail_op(
119                    op_ref,
120                    exec_id,
121                    crate::bus::OpErrorKind::Cooldown,
122                    "pending_async_limit",
123                    "pending-async limit exceeded".to_string(),
124                );
125            }
126        }
127
128        // Alias-aware: when `exec_id` is a function-body's derived id,
129        // formal inputs route through `pending_calls[exec_id]` to read
130        // the caller's slot at `parent_exec_id` (zero-copy).
131        let input_pairs = self.resolve_input_pairs(node, exec_id);
132
133        // D2 take-and-restore: lift the dispatching component out
134        // of the Vec so a live ComponentsView can borrow the rest of
135        // engine.components while the closure runs. Restore the slot
136        // unconditionally on the way out so a panic doesn't leak the
137        // Box (Rust catches via Drop, but the slot would stay None
138        // and confuse subsequent dispatches).
139        let Some(mut taken) = self.take_component(component_ref) else {
140            return self.fail_op(
141                op_ref,
142                exec_id,
143                crate::bus::OpErrorKind::MissingSlot,
144                "component_missing",
145                "component missing".to_string(),
146            );
147        };
148
149        let result: Result<DispatchResult, String> = {
150            let mut input_refs: Vec<(String, &dyn SlotValue)> =
151                Vec::with_capacity(input_pairs.len());
152            for (site, name, read_exec_id) in &input_pairs {
153                if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
154                    input_refs.push((name.clone(), boxed.as_ref()));
155                }
156            }
157            let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
158                input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
159
160            let (
161                envelope_src_peer,
162                inbound_correlation_wire_req_id,
163                inbound_arrival_ns,
164                inbound_remaining_deadline_ns,
165            ) = self
166                .framework
167                .inbound_contexts
168                .get(&exec_id)
169                .map(|c| {
170                    (
171                        c.src_peer,
172                        c.wire_req_id,
173                        c.arrival_ns,
174                        c.remaining_deadline_ns,
175                    )
176                })
177                .unwrap_or((None, None, None, None));
178            let mut ctx = RuntimeResourceRef {
179                peers: crate::runtime::PeerCtx {
180                    gate: &mut self.framework.peer_state.gate,
181                    backoff: &mut self.framework.peer_state.backoff,
182                    governor: &mut self.framework.peer_state.governor,
183                    addresses: &mut self.framework.address_book,
184                    backpressure: &mut self.framework.peer_state.backpressure,
185                },
186                net: crate::runtime::NetCtx {
187                    outbound: &mut self.framework.outbound_queue,
188                    rtt: &mut self.framework.rtt_tracker,
189                    requests: &mut self.framework.request_tracker,
190                    dedup: &mut self.framework.inbound_dedup,
191                    pending_peer_resolve_failures: &mut self
192                        .framework
193                        .pending_peer_resolve_failures,
194                },
195                time: crate::runtime::TimeCtx {
196                    scheduler: &mut self.framework.scheduler,
197                },
198                syscall: crate::runtime::SyscallCtx {
199                    serialize_queue: &mut self.framework.serialize_queue,
200                    hold_table: &mut self.framework.hold_table,
201                    record_buffer: &mut self.framework.record_buffer,
202                    event_source: &mut self.framework.event_source,
203                    counters: &mut self.framework.counters,
204                    any_fired_groups: &mut self.framework.any_fired_groups,
205                    deadline_match_fired: &mut self.framework.deadline_match_fired,
206                    rng: &mut *self.framework.rng,
207                    pending_app_events: &mut self.framework.pending_app_events,
208                },
209                bus: &mut self.bus,
210                ingress: std::sync::Arc::clone(&self.ingress),
211                components: crate::runtime::ComponentsView {
212                    instances: Some(&self.components),
213                    slots: Some(&self.slots),
214                },
215                current: crate::runtime::CurrentCallCtx {
216                    op_ref,
217                    exec_id,
218                    self_peer: self.self_peer,
219                    node_attributes: &node.attribute,
220                    node_metadata: &node.metadata_props,
221                    inbound: crate::runtime::InboundCtx {
222                        src_peer: envelope_src_peer,
223                        wire_req_id: inbound_correlation_wire_req_id,
224                        arrival_ns: inbound_arrival_ns,
225                        remaining_deadline_ns: inbound_remaining_deadline_ns,
226                    },
227                    pending_completions: Vec::new(),
228                    next_command_id: &mut self.exec.ids.next_command_id,
229                },
230            };
231
232            // The install-time-stamped dispatch_fn is the one
233            // canonical hot path — runtime calls the downcast
234            // closure directly, no per-op TypeId HashMap probe.
235            let any: &mut dyn std::any::Any = taken.as_mut();
236            let dispatch_result = dispatch_fn(any, &node.op_type, &inputs_for_dispatch, &mut ctx);
237
238            let captured = std::mem::take(&mut ctx.current.pending_completions);
239            drop(ctx);
240            self.exec.pending_completions.extend(captured);
241
242            dispatch_result
243        };
244
245        // Always restore - keeps the slot table consistent across
246        // dispatch outcomes (Immediate, Async, or error).
247        self.restore_component(component_ref, taken);
248
249        match result {
250            Ok(DispatchResult::Immediate(outputs)) => {
251                let sites = self.write_outputs(op_ref, exec_id, outputs);
252                EngineStep::OpCompleted {
253                    op_ref,
254                    exec_id,
255                    sites_written: sites,
256                }
257            }
258            Ok(DispatchResult::Async(cmd_id)) => {
259                let output_sites = self.op_output_sites(op_ref);
260                self.exec.pending_async.insert(
261                    cmd_id,
262                    PendingAsync {
263                        op_ref,
264                        exec_id,
265                        output_sites,
266                        deadline_ns: None,
267                    },
268                );
269                EngineStep::AsyncSuspended {
270                    op_ref,
271                    exec_id,
272                    cmd_id,
273                }
274            }
275            Err(detail) => self.fail_op(
276                op_ref,
277                exec_id,
278                crate::bus::OpErrorKind::ExecutionFailed,
279                "stateless_invoke",
280                detail,
281            ),
282        }
283    }
284
285    /// Splice a function-call invocation per ENGINE.md §8.4.
286    ///
287    /// Flat-frontier model: the body's `OpRef`s are shared (one
288    /// allocation in `engine.graphs[graph_name_for(target)]`); each call
289    /// allocates a fresh body `ExecId` and the body's nodes execute at
290    /// that derived id. Input slots are NOT copied - the body's formal
291    /// parameter reads route through `input_aliases` to the caller's
292    /// slot at `parent_exec_id` (zero-copy). Output forwarding is
293    /// applied in `write_outputs` (Phase 2b §2b.8).
294    ///
295    /// Returns `OpCompleted` for the call site itself with no
296    /// `sites_written` - the body's writes surface as separate events
297    /// as each body node fires.
298    pub(crate) fn invoke_function_call(
299        &mut self,
300        op_ref: OpRef,
301        parent_exec_id: ExecId,
302        target: &FunctionKey,
303        input_rename: &[(String, String)],
304        output_rename: &[(String, String)],
305    ) -> EngineStep {
306        let graph_name = graph_name_for(target);
307        if !self.has_graph(&graph_name) {
308            return self.fail_op(
309                op_ref,
310                parent_exec_id,
311                crate::bus::OpErrorKind::NotRegistered,
312                "function_target_missing",
313                format!("function-call target {graph_name} not installed"),
314            );
315        }
316
317        let body_exec_id = self.allocate_exec_id();
318        let body = self.graph(&graph_name).expect("checked above");
319
320        // Snapshot body OpRefs + formal-name → body-site map before
321        // releasing the borrow on self.graphs. With positional
322        // OpRefs the body's op_refs are `OpRef::pack(body_idx,
323        // node_idx)` for `node_idx in 0..body.function.node.len()`.
324        let body_idx = self.graph_idx(&graph_name).expect("graph just resolved");
325        let body_op_refs: Vec<OpRef> = (0..body.function.node.len() as u32)
326            .map(|ni| OpRef::pack(body_idx, ni))
327            .collect();
328        let body_site_for: std::collections::HashMap<String, NodeSiteId> = body.site_names.clone();
329
330        // input_rename pairs: (caller_value_name, formal_parameter_name)
331        // Caller-side names are scoped to the call op's owning graph -
332        // body-side names may coincidentally spell the same string.
333        let mut input_aliases: std::collections::HashMap<String, NodeSiteId> =
334            std::collections::HashMap::with_capacity(input_rename.len());
335        for (caller_name, formal_name) in input_rename {
336            let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_name) else {
337                return self.fail_op(
338                    op_ref,
339                    parent_exec_id,
340                    crate::bus::OpErrorKind::MissingSlot,
341                    "function_input_unbound",
342                    format!("function-call input {caller_name} not bound"),
343                );
344            };
345            input_aliases.insert(formal_name.clone(), caller_site);
346        }
347
348        // output_rename pairs: (formal_output_name, caller_value_name)
349        let mut output_forwarding: std::collections::HashMap<NodeSiteId, NodeSiteId> =
350            std::collections::HashMap::with_capacity(output_rename.len());
351        for (formal_out, caller_out) in output_rename {
352            let Some(&body_site) = body_site_for.get(formal_out) else {
353                return self.fail_op(
354                    op_ref,
355                    parent_exec_id,
356                    crate::bus::OpErrorKind::NotRegistered,
357                    "function_output_missing",
358                    format!("function-call output {formal_out} missing from body"),
359                );
360            };
361            let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_out) else {
362                return self.fail_op(
363                    op_ref,
364                    parent_exec_id,
365                    crate::bus::OpErrorKind::MissingSlot,
366                    "function_output_unbound",
367                    format!("function-call output {caller_out} not bound"),
368                );
369            };
370            output_forwarding.insert(body_site, caller_site);
371        }
372
373        let outputs_remaining = output_forwarding.len();
374        self.exec.pending_calls.insert(
375            body_exec_id,
376            CallContext {
377                parent_exec_id,
378                target: target.clone(),
379                input_aliases,
380                output_forwarding,
381                outputs_remaining,
382            },
383        );
384
385        // Push every body OpRef onto the frontier at body_exec_id.
386        // Alias-aware `all_inputs_ready` (Phase 2b §2b.7) gates each
387        // node until its formal inputs are populated at the caller's
388        // ExecId. Plan §2b.6 step 6: simple v1 - push all, gate.
389        for body_op in body_op_refs {
390            self.exec.frontier.push_back((body_op, body_exec_id));
391        }
392
393        // Zero-outputs corner case: with no forwarding to wait on, the
394        // call is conceptually complete the moment the body finishes.
395        // `write_outputs`' forwarding hook drops the entry once
396        // `outputs_remaining` hits zero - for a no-output call we drop
397        // immediately so we don't leak an entry.
398        if self
399            .exec
400            .pending_calls
401            .get(&body_exec_id)
402            .map(|c| c.outputs_remaining == 0)
403            .unwrap_or(false)
404        {
405            self.exec.pending_calls.remove(&body_exec_id);
406        }
407
408        EngineStep::OpCompleted {
409            op_ref,
410            exec_id: parent_exec_id,
411            sites_written: Vec::new(),
412        }
413    }
414
415    /// Dispatch a stateless syscall op. Shares the input-resolution,
416    /// split-borrow, and result-handling shape with `invoke_one`'s
417    /// atomic path; differs in calling the fn pointer directly rather
418    /// than going through `components[component_ref]`.
419    pub(crate) fn invoke_stateless(
420        &mut self,
421        op_ref: OpRef,
422        exec_id: ExecId,
423        node: &NodeProto,
424        invoke_fn: StatelessInvokeFn,
425    ) -> EngineStep {
426        // Same pre-dispatch pending_async cap check as invoke_atomic
427        // — syscalls (wire.Send, Sleep, ...) routinely side-effect
428        // before returning Async, so a post-dispatch rejection would
429        // leak state.
430        if let Some(cap) = self.max_pending_async {
431            if self.exec.pending_async.len() >= cap {
432                return self.fail_op(
433                    op_ref,
434                    exec_id,
435                    crate::bus::OpErrorKind::Cooldown,
436                    "pending_async_limit",
437                    "pending-async limit exceeded".to_string(),
438                );
439            }
440        }
441        // Alias-aware input resolution - see `invoke_atomic` for the
442        // function-body splice semantics.
443        let input_pairs = self.resolve_input_pairs(node, exec_id);
444
445        let result: Result<DispatchResult, OpError> = {
446            let mut input_refs: Vec<(String, &dyn SlotValue)> =
447                Vec::with_capacity(input_pairs.len());
448            for (site, name, read_exec_id) in &input_pairs {
449                if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
450                    input_refs.push((name.clone(), boxed.as_ref()));
451                }
452            }
453            let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
454                input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
455
456            let (
457                envelope_src_peer,
458                inbound_correlation_wire_req_id,
459                inbound_arrival_ns,
460                inbound_remaining_deadline_ns,
461            ) = self
462                .framework
463                .inbound_contexts
464                .get(&exec_id)
465                .map(|c| {
466                    (
467                        c.src_peer,
468                        c.wire_req_id,
469                        c.arrival_ns,
470                        c.remaining_deadline_ns,
471                    )
472                })
473                .unwrap_or((None, None, None, None));
474            let mut ctx = RuntimeResourceRef {
475                peers: crate::runtime::PeerCtx {
476                    gate: &mut self.framework.peer_state.gate,
477                    backoff: &mut self.framework.peer_state.backoff,
478                    governor: &mut self.framework.peer_state.governor,
479                    addresses: &mut self.framework.address_book,
480                    backpressure: &mut self.framework.peer_state.backpressure,
481                },
482                net: crate::runtime::NetCtx {
483                    outbound: &mut self.framework.outbound_queue,
484                    rtt: &mut self.framework.rtt_tracker,
485                    requests: &mut self.framework.request_tracker,
486                    dedup: &mut self.framework.inbound_dedup,
487                    pending_peer_resolve_failures: &mut self
488                        .framework
489                        .pending_peer_resolve_failures,
490                },
491                time: crate::runtime::TimeCtx {
492                    scheduler: &mut self.framework.scheduler,
493                },
494                syscall: crate::runtime::SyscallCtx {
495                    serialize_queue: &mut self.framework.serialize_queue,
496                    hold_table: &mut self.framework.hold_table,
497                    record_buffer: &mut self.framework.record_buffer,
498                    event_source: &mut self.framework.event_source,
499                    counters: &mut self.framework.counters,
500                    any_fired_groups: &mut self.framework.any_fired_groups,
501                    deadline_match_fired: &mut self.framework.deadline_match_fired,
502                    rng: &mut *self.framework.rng,
503                    pending_app_events: &mut self.framework.pending_app_events,
504                },
505                bus: &mut self.bus,
506                ingress: std::sync::Arc::clone(&self.ingress),
507                components: crate::runtime::ComponentsView::default(),
508                current: crate::runtime::CurrentCallCtx {
509                    op_ref,
510                    exec_id,
511                    self_peer: self.self_peer,
512                    node_attributes: &node.attribute,
513                    node_metadata: &node.metadata_props,
514                    inbound: crate::runtime::InboundCtx {
515                        src_peer: envelope_src_peer,
516                        wire_req_id: inbound_correlation_wire_req_id,
517                        arrival_ns: inbound_arrival_ns,
518                        remaining_deadline_ns: inbound_remaining_deadline_ns,
519                    },
520                    pending_completions: Vec::new(),
521                    next_command_id: &mut self.exec.ids.next_command_id,
522                },
523            };
524
525            let dispatch_result = invoke_fn(node, &inputs_for_dispatch, &mut ctx);
526
527            let captured = std::mem::take(&mut ctx.current.pending_completions);
528            drop(ctx);
529            self.exec.pending_completions.extend(captured);
530
531            dispatch_result
532        };
533
534        match result {
535            Ok(DispatchResult::Immediate(outputs)) => {
536                let sites = self.write_outputs(op_ref, exec_id, outputs);
537                EngineStep::OpCompleted {
538                    op_ref,
539                    exec_id,
540                    sites_written: sites,
541                }
542            }
543            Ok(DispatchResult::Async(cmd_id)) => {
544                let output_sites = self.op_output_sites(op_ref);
545                self.exec.pending_async.insert(
546                    cmd_id,
547                    PendingAsync {
548                        op_ref,
549                        exec_id,
550                        output_sites,
551                        deadline_ns: None,
552                    },
553                );
554                EngineStep::AsyncSuspended {
555                    op_ref,
556                    exec_id,
557                    cmd_id,
558                }
559            }
560            Err(err) => {
561                self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
562                    op_ref,
563                    error: err.clone(),
564                }));
565                EngineStep::OpFailed {
566                    op_ref,
567                    exec_id,
568                    error: err,
569                }
570            }
571        }
572    }
573
574    /// Look up a NodeProto by op_ref. Positional resolution via
575    /// `OpRef::pack(graph_idx, node_idx)` → two direct array indexes.
576    pub(crate) fn node_for(&self, op_ref: OpRef) -> Option<&NodeProto> {
577        let (gi, ni) = op_ref.split();
578        self.graphs.get(gi as usize)?.function.node.get(ni as usize)
579    }
580
581    /// Output sites the given Op declared. Positional resolution
582    /// looks up the NodeProto directly; each declared output name
583    /// resolves through the graph's `site_names` map (with a
584    /// deterministic synth as a fallback for test setups).
585    pub(crate) fn op_output_sites(&self, op_ref: OpRef) -> Vec<NodeSiteId> {
586        let (gi, ni) = op_ref.split();
587        let Some(g) = self.graphs.get(gi as usize) else {
588            return Vec::new();
589        };
590        let Some(node) = g.function.node.get(ni as usize) else {
591            return Vec::new();
592        };
593        node.output
594            .iter()
595            .enumerate()
596            .map(|(i, name)| {
597                g.site_names
598                    .get(name)
599                    .copied()
600                    .unwrap_or_else(|| synthesize_site_id(op_ref, i))
601            })
602            .collect()
603    }
604
605    /// Resolve a value-name to its `NodeSiteId` across every
606    /// GraphSlot. Returns `None` if no graph has a binding.
607    pub(crate) fn resolve_site_name(&self, name: &str) -> Option<NodeSiteId> {
608        for g in self.graphs_iter() {
609            if let Some(&site) = g.site_names.get(name) {
610                return Some(site);
611            }
612        }
613        None
614    }
615
616    /// Resolve a value-name to its `NodeSiteId` within the GraphSlot
617    /// that owns `op_ref`. Used by `invoke_function_call` to disambiguate
618    /// caller-side value names from formal-parameter names that happen
619    /// to spell the same string in the callee body (hoisted bodies
620    /// canonicalize to `__hoist_*` so this clash is rare in production,
621    /// but graph-scoped lookup is the principled fix). Positional
622    /// `OpRef::pack` makes the owning graph a direct index.
623    pub(crate) fn resolve_site_in_op_graph(&self, op_ref: OpRef, name: &str) -> Option<NodeSiteId> {
624        let (gi, _) = op_ref.split();
625        self.graphs.get(gi as usize)?.site_names.get(name).copied()
626    }
627
628    /// Resolve a NodeProto's inputs into `(NodeSiteId, value_name,
629    /// read_exec_id)` triples per ENGINE.md §8.4.
630    ///
631    /// Function-call splice: when `exec_id` is a body's derived
632    /// `ExecId` (i.e. present in `pending_calls`), formal inputs that
633    /// appear in `input_aliases` re-route reads to the caller's
634    /// `parent_exec_id` and the caller-side `NodeSiteId` - zero-copy
635    /// aliasing, the body never copies the caller's slot. Inputs not
636    /// in the alias map fall back to the standard
637    /// `resolve_site_name(name) → exec_id` path so body-internal
638    /// values keep flowing at `body_exec_id`.
639    ///
640    /// Empty input names (ONNX optional-arg convention) and inputs
641    /// that fail to resolve are skipped - the caller checks readiness
642    /// separately via `all_inputs_ready`.
643    pub(crate) fn resolve_input_pairs(
644        &self,
645        node: &NodeProto,
646        exec_id: ExecId,
647    ) -> Vec<(NodeSiteId, String, ExecId)> {
648        let cc = self.exec.pending_calls.get(&exec_id);
649        let mut out = Vec::new();
650        for name in &node.input {
651            if name.is_empty() {
652                continue;
653            }
654            if let Some(cc) = cc {
655                if let Some(&alias_site) = cc.input_aliases.get(name) {
656                    out.push((alias_site, name.clone(), cc.parent_exec_id));
657                    continue;
658                }
659            }
660            if let Some(site) = self.resolve_site_name(name) {
661                out.push((site, name.clone(), exec_id));
662            }
663        }
664        out
665    }
666
667    /// Write Op outputs to the slot_table + push ready downstream
668    /// consumers onto the frontier. Returns the list of sites
669    /// written (for `EngineStep::OpCompleted`).
670    pub(crate) fn write_outputs(
671        &mut self,
672        op_ref: OpRef,
673        exec_id: ExecId,
674        outputs: Vec<(String, Box<dyn SlotValue>)>,
675    ) -> Vec<NodeSiteId> {
676        let output_sites = self.op_output_sites(op_ref);
677        for ((site, _name), value) in output_sites
678            .iter()
679            .zip(outputs.iter().map(|(n, _)| n))
680            .zip(outputs.iter().map(|(_, v)| v.as_ref()))
681        {
682            // Place each value into its slot. We can't move out of
683            // `outputs` while iterating borrows, so we collect in
684            // two passes. (See below.)
685            let _ = (site, _name, value);
686        }
687        // Two-pass placement: zip site_index → outputs by position.
688        let mut sites_written: Vec<NodeSiteId> = Vec::new();
689        for (i, (_name, value)) in outputs.into_iter().enumerate() {
690            if let Some(site) = output_sites.get(i).copied() {
691                self.exec.slot_table.insert((site, exec_id), Some(value));
692                sites_written.push(site);
693            }
694        }
695
696        // Bump execution_state.
697        self.exec
698            .execution_state
699            .entry(exec_id)
700            .or_default()
701            .outputs_written += sites_written.len() as u32;
702
703        // Push consumers whose inputs are now all ready.
704        self.push_ready_consumers(&sites_written, exec_id);
705
706        // Function-call splice: when `exec_id` is a body's derived id,
707        // forward any matching outputs back to the caller's slots at
708        // `parent_exec_id` and push the caller's downstream consumers.
709        // No-op when there's no pending call context.
710        self.forward_outputs_to_caller(&sites_written, exec_id);
711
712        self.surface_top_level_outputs(&sites_written, exec_id);
713        sites_written
714    }
715
716    /// Phase 2b §2b.8 - output forwarding for the function-call splice.
717    ///
718    /// If `exec_id` keys a `CallContext` in `pending_calls`, every
719    /// `sites_written` entry that appears in `output_forwarding` gets
720    /// its `SlotValue` MOVED from the body's slot at `exec_id` to
721    /// the caller's slot at `parent_exec_id`. `SlotValue` is not
722    /// `Clone`; a body output is one-shot, so moving the value is
723    /// semantically correct.
724    ///
725    /// After moving values, pushes the caller-side consumers onto the
726    /// frontier at `parent_exec_id` and surfaces top-level outputs.
727    /// Drops the `pending_calls` entry once `outputs_remaining`
728    /// reaches zero.
729    pub(crate) fn forward_outputs_to_caller(
730        &mut self,
731        sites_written: &[NodeSiteId],
732        exec_id: ExecId,
733    ) {
734        let Some(cc) = self.exec.pending_calls.get(&exec_id) else {
735            return;
736        };
737        // Collect the body_site → caller_site pairs we'll forward,
738        // dropping the immutable borrow before mutating slot_table.
739        let mut pairs: Vec<(NodeSiteId, NodeSiteId)> = Vec::new();
740        for &body_site in sites_written {
741            if let Some(&caller_site) = cc.output_forwarding.get(&body_site) {
742                pairs.push((body_site, caller_site));
743            }
744        }
745        let parent_exec_id = cc.parent_exec_id;
746        if pairs.is_empty() {
747            return;
748        }
749        tracing::trace!(
750            target: "engine.function_call.forward",
751            call_target = ?cc.target,
752            body_exec_id = exec_id.as_u64(),
753            parent_exec_id = parent_exec_id.as_u64(),
754            pair_count = pairs.len(),
755            "forwarding body outputs to caller slots",
756        );
757
758        let mut caller_sites: Vec<NodeSiteId> = Vec::with_capacity(pairs.len());
759        for (body_site, caller_site) in &pairs {
760            // MOVE the value: body's slot loses the value, caller's
761            // slot owns it. Skip if the body slot is empty (e.g. the
762            // op produced fewer outputs than declared).
763            let value = self
764                .exec
765                .slot_table
766                .get_mut(&(*body_site, exec_id))
767                .and_then(|opt| opt.take());
768            if let Some(value) = value {
769                self.exec
770                    .slot_table
771                    .insert((*caller_site, parent_exec_id), Some(value));
772                caller_sites.push(*caller_site);
773            }
774        }
775
776        // Decrement `outputs_remaining` and drop the entry on
777        // completion in a single mut borrow.
778        if let Some(cc) = self.exec.pending_calls.get_mut(&exec_id) {
779            cc.outputs_remaining = cc.outputs_remaining.saturating_sub(pairs.len());
780            if cc.outputs_remaining == 0 {
781                self.exec.pending_calls.remove(&exec_id);
782            }
783        }
784
785        // Push caller-side consumers + surface top-level outputs
786        // against the caller's `parent_exec_id`.
787        self.push_ready_consumers(&caller_sites, parent_exec_id);
788        self.surface_top_level_outputs(&caller_sites, parent_exec_id);
789    }
790
791    /// Inspect each freshly-written site: when it corresponds to a
792    /// declared `function.output` AND no downstream Op consumes it
793    /// inside the function, push an `AppEvent::Emit` onto
794    /// `framework.pending_app_events` carrying the slot's serialized
795    /// bytes. Phase 8 drains the queue into `EngineStep::AppEvent`
796    /// for the host.
797    ///
798    /// This is the "function signature is the engine I/O contract"
799    /// path. Coexists with the explicit `AppEmit` / `AppNotify`
800    /// syscall ops from `src/syscall/telemetry/`; both surface as the
801    /// same `EngineStep::AppEvent` variant.
802    ///
803    /// Encode the slot's value for host consumption: a `BytesValue`
804    /// surfaces its inner `Vec<u8>` directly (the byte-level
805    /// contract), any other carrier surfaces its bincode-encoded
806    /// form. Push a `Emit { name, value_bytes }` for every top-
807    /// level output site with no in-graph consumer.
808    pub(crate) fn surface_top_level_outputs(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
809        for site in sites {
810            let consumer_count = self
811                .graphs_iter()
812                .map(|g| g.consumers.get(site).map(|v| v.len()).unwrap_or(0))
813                .sum::<usize>();
814            if consumer_count > 0 {
815                continue;
816            }
817            let name_opt = self
818                .graphs_iter()
819                .find_map(|g| g.top_level_outputs.get(site).cloned());
820            let Some(name) = name_opt else { continue };
821            let value_bytes = self
822                .exec
823                .slot_table
824                .get(&(*site, exec_id))
825                .and_then(|slot| slot.as_ref())
826                .map(|boxed| encode_for_host(boxed.as_ref()))
827                .unwrap_or_default();
828            self.framework
829                .pending_app_events
830                .push(crate::bus::AppEvent::Emit { name, value_bytes });
831        }
832    }
833
834    /// Push consumer Ops onto the frontier when all their inputs are
835    /// satisfied. minimum-viable: iterates each graph's
836    /// `consumers` map for each newly-written site.
837    pub(crate) fn push_ready_consumers(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
838        // Collect candidate (consumer_op) refs across graphs first,
839        // then filter by readiness, then push - avoids borrow
840        // conflicts.
841        let mut candidates: Vec<OpRef> = Vec::new();
842        for site in sites {
843            for g in self.graphs_iter() {
844                if let Some(consumers) = g.consumers.get(site) {
845                    candidates.extend(consumers.iter().copied());
846                }
847            }
848        }
849        for op_ref in candidates {
850            if self.all_inputs_ready(op_ref, exec_id) {
851                self.exec.frontier.push_back((op_ref, exec_id));
852            }
853        }
854    }
855
856    /// Per ENGINE.md §8.3 - all `Required` inputs must be filled.
857    /// treats every input as `Required` (no `AnyOf` yet -
858    /// 's syscall opset introduces optional-input ops).
859    pub(crate) fn all_inputs_ready(&self, op_ref: OpRef, exec_id: ExecId) -> bool {
860        let Some(node) = self.node_for(op_ref) else {
861            return false;
862        };
863        // Mirror `resolve_input_pairs`' alias-aware lookup: when
864        // `exec_id` is a body's derived id, formal inputs read from
865        // the caller's slot at `parent_exec_id` (per ENGINE.md §8.4).
866        let cc = self.exec.pending_calls.get(&exec_id);
867        for name in &node.input {
868            if name.is_empty() {
869                continue; // ONNX optional-arg convention.
870            }
871            let (site, read_exec_id) = if let Some(cc) = cc {
872                if let Some(&alias_site) = cc.input_aliases.get(name) {
873                    (alias_site, cc.parent_exec_id)
874                } else {
875                    let Some(site) = self.resolve_site_name(name) else {
876                        return false;
877                    };
878                    (site, exec_id)
879                }
880            } else {
881                let Some(site) = self.resolve_site_name(name) else {
882                    return false;
883                };
884                (site, exec_id)
885            };
886            let has_value = self
887                .exec
888                .slot_table
889                .get(&(site, read_exec_id))
890                .map(|s| s.is_some())
891                .unwrap_or(false);
892            if !has_value {
893                return false;
894            }
895        }
896        true
897    }
898
899    /// Surface an Op failure: emit `InfraEvent::OpFailure` onto the
900    /// bus AND return `EngineStep::OpFailed`. Every internal call
901    /// site classifies the failure so operators can match on the
902    /// `OpErrorKind` taxonomy for retry/report/drop policy without
903    /// parsing freeform detail strings.
904    pub(crate) fn fail_op(
905        &mut self,
906        op_ref: OpRef,
907        exec_id: ExecId,
908        kind: crate::bus::OpErrorKind,
909        reason: &'static str,
910        detail: String,
911    ) -> EngineStep {
912        let error = OpError {
913            kind,
914            reason,
915            detail,
916        };
917        self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
918            op_ref,
919            error: error.clone(),
920        }));
921        EngineStep::OpFailed {
922            op_ref,
923            exec_id,
924            error,
925        }
926    }
927}
928
929/// Encode a slot value for host-visible delivery. `BytesValue`
930/// surfaces its inner bytes raw - the byte-level contract callers
931/// expect when their function output is already wire-formatted.
932/// Any other carrier surfaces its bincode-encoded form via
933/// `SlotValue::to_wire_bytes`.
934///
935/// Encode a slot value's wire bytes for host delivery. `BytesValue`
936/// short-circuits to its stored bytes. Other values go through
937/// `to_wire_bytes`; encode failures here are non-fatal — the host
938/// gets empty bytes and a `tracing::warn` records the failure so an
939/// observer can attribute the drop to the value rather than to a
940/// missing emit.
941fn encode_for_host(value: &dyn crate::slot_value::SlotValue) -> Vec<u8> {
942    if let Some(b) = value
943        .as_any()
944        .downcast_ref::<crate::syscall::values::BytesValue>()
945    {
946        return b.0.clone();
947    }
948    match value.to_wire_bytes() {
949        Ok(bytes) => bytes,
950        Err(e) => {
951            tracing::warn!(error = %e, "encode_for_host: dropping host emit on encode failure");
952            Vec::new()
953        }
954    }
955}
956
957/// deterministic NodeSiteId synthesizer for test graphs
958/// that don't populate `site_names`. 's Node
959/// populates the real map.
960fn synthesize_site_id(op_ref: OpRef, output_index: usize) -> NodeSiteId {
961    NodeSiteId::from((op_ref.as_u64() << 8) | (output_index as u64 & 0xff))
962}
963
964/// Walk a `ProtocolRuntime` dispatcher slice, find the entry that
965/// matches the bound component's concrete type, and route the call
966/// through its `dispatch_atomic` impl.
967///
968/// The dispatcher registry is per-Engine (stored on
969/// `Engine.role_dispatchers`); the caller passes the slice in so
970/// borrow-splitting at the call site can keep `&mut self.components`
971/// and `&mut self.framework` independent.
972pub(crate) fn call_protocol_dispatch_atomic(
973    component: &mut dyn crate::component::ErasedComponent,
974    op_type: &str,
975    inputs: &[(&str, &dyn SlotValue)],
976    ctx: &mut RuntimeResourceRef<'_>,
977    dispatchers: &std::collections::HashMap<std::any::TypeId, RoleDispatcher>,
978) -> Result<DispatchResult, String> {
979    let any: &mut dyn std::any::Any = component;
980    let tid = (*any).type_id();
981    if let Some(dispatcher) = dispatchers.get(&tid) {
982        (dispatcher.dispatch)(any, op_type, inputs, ctx)
983    } else {
984        Err("no ProtocolRuntime dispatcher registered for component".to_string())
985    }
986}
987
988/// Type alias for the ProtocolRuntime downcast-dispatch fn pointer
989/// stored in the dispatcher registry.
990pub type ProtocolDispatchFn = fn(
991    &mut dyn std::any::Any,
992    &str,
993    &[(&str, &dyn SlotValue)],
994    &mut RuntimeResourceRef<'_>,
995) -> Result<DispatchResult, String>;
996
997/// Type alias for the `BackendRuntime::materialize_from_wire`
998/// downcast-dispatch fn pointer. Mirrors [`ProtocolDispatchFn`]'s
999/// erased-`Any` shape so the per-T closure stays callable from the
1000/// engine's wire-decode hot path without re-doing the downcast lookup
1001/// on every fill.
1002pub type BackendMaterializeFn =
1003    fn(
1004        &mut dyn std::any::Any,
1005        u64,
1006        Vec<u8>,
1007    ) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError>;
1008
1009/// Backend-only no-op materialize entry used by non-Backend roles.
1010/// Roles other than `Backend` never reach the materialize entry
1011/// (decode_typed_fill only consults it after confirming the slot
1012/// binds to `ComponentRole::Backend`), so the unused fn pointer
1013/// returns a descriptive error rather than panicking.
1014pub fn no_materialize(
1015    _any: &mut dyn std::any::Any,
1016    _type_hash: u64,
1017    _bytes: Vec<u8>,
1018) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError> {
1019    Err(crate::slot_value::BackendMaterializeError {
1020        summary: "component is not a Backend; materialize_from_wire not supported".to_string(),
1021    })
1022}
1023
1024/// One registered `ProtocolRuntime` / `<Role>Runtime` dispatcher.
1025/// `dispatch` downcasts to the concrete type `T` (guaranteed by the
1026/// `TypeId`-keyed registry) and delegates to `T::dispatch_atomic`.
1027/// `materialize` is the parallel entry for
1028/// [`crate::roles::BackendRuntime::materialize_from_wire`]; non-Backend
1029/// roles register [`no_materialize`] so the engine's wire-decode hot
1030/// path always finds an entry without a per-role conditional.
1031pub struct RoleDispatcher {
1032    pub(crate) dispatch: ProtocolDispatchFn,
1033    pub(crate) materialize: BackendMaterializeFn,
1034}
1035
1036/// Build a `RoleDispatcher` for a concrete `ProtocolRuntime` impl.
1037/// Called from `Engine::register_protocol_dispatcher` and from any
1038/// test/production setup that needs to register dispatcher entries on
1039/// a fresh Engine.
1040pub fn make_protocol_dispatcher<T: ProtocolRuntime + 'static>() -> RoleDispatcher
1041where
1042    T::Error: std::fmt::Display,
1043{
1044    RoleDispatcher {
1045        dispatch: |any: &mut dyn std::any::Any,
1046                   op_type: &str,
1047                   inputs: &[(&str, &dyn SlotValue)],
1048                   ctx: &mut RuntimeResourceRef<'_>| {
1049            let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1050            concrete
1051                .dispatch_atomic(op_type, inputs, ctx)
1052                .map_err(|e| e.to_string())
1053        },
1054        materialize: no_materialize,
1055    }
1056}
1057
1058/// Macro: emit `make_<role>_dispatcher` factories for every
1059/// non-Protocol role trait. Each factory captures `T`'s concrete
1060/// `dispatch_atomic` (a role-specific trait method) inside a closure
1061/// the universal-pair-only dispatch table can call. Authors deriving
1062/// a single role (e.g. `#[derive(bb::Index)]`) get atomic dispatch
1063/// wired without needing a manual `ProtocolRuntime` shell.
1064macro_rules! emit_role_dispatcher_factory {
1065    ($factory_name:ident, $runtime_trait:path) => {
1066        #[doc = concat!("Build a `RoleDispatcher` for a concrete impl of ", stringify!($runtime_trait), ". Used by `Engine::register_*_dispatcher` chain (`Node::with_<role>(&value)`) so single-role components dispatch through the same `TypeId`-keyed registry as multi-role / Protocol-bearing components.")]
1067        pub fn $factory_name<T: $runtime_trait + 'static>() -> RoleDispatcher
1068        where
1069            <T as $runtime_trait>::Error: std::fmt::Display,
1070        {
1071            RoleDispatcher {
1072                dispatch: |any: &mut dyn std::any::Any,
1073                           op_type: &str,
1074                           inputs: &[(&str, &dyn SlotValue)],
1075                           ctx: &mut RuntimeResourceRef<'_>| {
1076                    let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1077                    <T as $runtime_trait>::dispatch_atomic(concrete, op_type, inputs, ctx)
1078                        .map_err(|e| e.to_string())
1079                },
1080                materialize: no_materialize,
1081            }
1082        }
1083    };
1084}
1085
1086emit_role_dispatcher_factory!(make_index_dispatcher, crate::roles::IndexRuntime);
1087emit_role_dispatcher_factory!(make_aggregator_dispatcher, crate::roles::AggregatorRuntime);
1088emit_role_dispatcher_factory!(make_model_dispatcher, crate::roles::ModelRuntime);
1089emit_role_dispatcher_factory!(make_codec_dispatcher, crate::roles::CodecRuntime);
1090emit_role_dispatcher_factory!(make_data_source_dispatcher, crate::roles::DataSourceRuntime);
1091emit_role_dispatcher_factory!(
1092    make_peer_selector_dispatcher,
1093    crate::roles::PeerSelectorRuntime
1094);
1095
1096/// Build a `RoleDispatcher` for a concrete `BackendRuntime` impl —
1097/// the per-`T` `dispatch_atomic` closure plus the `materialize_from_wire`
1098/// bridge the derive emits. Backend dispatchers are the only ones
1099/// that wire a real `materialize` entry; every other role registers
1100/// [`no_materialize`].
1101pub fn make_backend_dispatcher<T: crate::roles::BackendRuntime + 'static>() -> RoleDispatcher
1102where
1103    <T as crate::roles::BackendRuntime>::Error: std::fmt::Display,
1104{
1105    RoleDispatcher {
1106        dispatch: |any: &mut dyn std::any::Any,
1107                   op_type: &str,
1108                   inputs: &[(&str, &dyn SlotValue)],
1109                   ctx: &mut RuntimeResourceRef<'_>| {
1110            let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1111            <T as crate::roles::BackendRuntime>::dispatch_atomic(concrete, op_type, inputs, ctx)
1112                .map_err(|e| e.to_string())
1113        },
1114        materialize: |any: &mut dyn std::any::Any, type_hash: u64, bytes: Vec<u8>| {
1115            let concrete = any.downcast_ref::<T>().expect("is_match guaranteed");
1116            <T as crate::roles::BackendRuntime>::materialize_from_wire(concrete, type_hash, bytes)
1117        },
1118    }
1119}
1120
1121#[cfg(test)]
1122#[path = "invoke_function_call_tests.rs"]
1123mod function_call_tests;