Skip to main content

bb_runtime/engine/
invoke.rs

1//! Op invocation lifecycle
2//!
3//! implements the `Atomic { component_ref }` dispatch path
4//! routing through `ProtocolRuntime::dispatch_atomic` (the
5//! universal-pair-only role trait - see design call #4 in
6//! `docs/internal/IMPLEMENTATION_PLAN.md` ). The `Stateless`
7//! syscall path lands.
8
9use crate::atomic::DispatchResult;
10use crate::bus::{InfraEvent, NodeEvent, OpError};
11use crate::engine::call_context::CallContext;
12use crate::engine::core::{graph_name_for, Engine};
13use crate::engine::dispatch_entry::{FunctionKey, OpDispatch, StatelessInvokeFn};
14use crate::engine::pending_async::PendingAsync;
15use crate::engine::step::EngineStep;
16use crate::ids::{ComponentRef, ExecId, NodeSiteId, OpRef};
17use crate::roles::ProtocolRuntime;
18use crate::runtime::RuntimeResourceRef;
19use crate::slot_value::SlotValue;
20use bb_ir::proto::onnx::NodeProto;
21
22impl Engine {
23    /// Dispatch a single `(op_ref, exec_id)` per ENGINE.md §8.1.
24    /// Returns one `EngineStep` describing the outcome
25    /// (`OpCompleted` / `AsyncSuspended` / `OpFailed`).
26    ///
27    /// Consult `dispatch_for` for the `OpDispatch` variant stamped at
28    /// install time by `Engine::resolve_dispatch` and route to the
29    /// dedicated helper. An op without a stamped entry - or stamped
30    /// `Unresolved` - is a build invariant violation that surfaced
31    /// past the `Node` pre-flight; fail the op so the caller
32    /// sees a clean `OpFailure` rather than a panic.
33    pub(crate) fn invoke_one(&mut self, op_ref: OpRef, exec_id: ExecId) -> EngineStep {
34        let node = match self.node_for(op_ref) {
35            Some(n) => n.clone(),
36            None => {
37                return self.fail_op(
38                    op_ref,
39                    exec_id,
40                    crate::bus::OpErrorKind::ExecutionFailed,
41                    "unknown_op_ref",
42                    "unknown op_ref".to_string(),
43                )
44            }
45        };
46
47        // Structured op fields on the per-invocation span let
48        // operators filter and group traces by op kind, domain, or
49        // a specific (exec_id, op_ref) instance — equivalent to the
50        // op-keyed metadata ONNX Runtime emits via EventRecord.args.
51        let _invoke_span = tracing::debug_span!(
52            "engine.invoke_one",
53            op.name = %node.name,
54            op.kind = %node.op_type,
55            op.domain = %node.domain,
56            exec_id = %exec_id,
57            op_ref = %op_ref,
58        )
59        .entered();
60
61        match self.dispatch_for(op_ref) {
62            Some(OpDispatch::Stateless(invoke_fn)) => {
63                self.invoke_stateless(op_ref, exec_id, &node, invoke_fn)
64            }
65            Some(OpDispatch::Atomic {
66                component_ref,
67                dispatch_fn,
68            }) => self.invoke_atomic(op_ref, exec_id, &node, component_ref, dispatch_fn),
69            Some(OpDispatch::FunctionCall {
70                target,
71                input_rename,
72                output_rename,
73            }) => {
74                self.invoke_function_call(op_ref, exec_id, &target, &input_rename, &output_rename)
75            }
76            Some(OpDispatch::Unresolved) | None => self.fail_op(
77                op_ref,
78                exec_id,
79                crate::bus::OpErrorKind::NotRegistered,
80                "unresolved_dispatch",
81                format!("unresolved dispatch for {}::{}", node.domain, node.op_type),
82            ),
83        }
84    }
85
86    /// Look up the install-time-stamped `OpDispatch` for an `OpRef`.
87    /// Positional resolution per C5: `OpRef::pack(graph_idx, node_idx)`
88    /// → two direct array accesses, no HashMap probes.
89    fn dispatch_for(&self, op_ref: OpRef) -> Option<OpDispatch> {
90        let (gi, ni) = op_ref.split();
91        self.graphs
92            .get(gi as usize)?
93            .op_dispatch
94            .get(ni as usize)
95            .cloned()
96    }
97
98    /// Dispatch through `ProtocolRuntime::dispatch_atomic` for an op
99    /// resolved to a bound component. Factored out of the legacy
100    /// `invoke_one` cascade so the new dispatch path can call it
101    /// directly without re-doing the syscall / atomic_dispatch lookup.
102    fn invoke_atomic(
103        &mut self,
104        op_ref: OpRef,
105        exec_id: ExecId,
106        node: &NodeProto,
107        component_ref: ComponentRef,
108        dispatch_fn: ProtocolDispatchFn,
109    ) -> EngineStep {
110        // Defensive cap check BEFORE dispatch — any atomic op that
111        // returns Async would need to insert into pending_async, so
112        // when we're already at cap we reject early. Components that
113        // side-effect (e.g. allocate wire requests, push outbound
114        // envelopes) inside dispatch don't see the cap rejection
115        // until AFTER their effects run, leaking that state into the
116        // engine. Pre-dispatch rejection covers Immediate ops too but
117        // engine-at-cap is the right backpressure signal regardless.
118        if let Some(cap) = self.max_pending_async {
119            if self.exec.pending_async.len() >= cap {
120                return self.fail_op(
121                    op_ref,
122                    exec_id,
123                    crate::bus::OpErrorKind::Cooldown,
124                    "pending_async_limit",
125                    "pending-async limit exceeded".to_string(),
126                );
127            }
128        }
129
130        // Alias-aware: when `exec_id` is a function-body's derived id,
131        // formal inputs route through `pending_calls[exec_id]` to read
132        // the caller's slot at `parent_exec_id` (zero-copy).
133        let input_pairs = self.resolve_input_pairs(node, exec_id);
134
135        // D2 take-and-restore: lift the dispatching component out
136        // of the Vec so a live ComponentsView can borrow the rest of
137        // engine.components while the closure runs. Restore the slot
138        // unconditionally on the way out so a panic doesn't leak the
139        // Box (Rust catches via Drop, but the slot would stay None
140        // and confuse subsequent dispatches).
141        let Some(mut taken) = self.take_component(component_ref) else {
142            return self.fail_op(
143                op_ref,
144                exec_id,
145                crate::bus::OpErrorKind::MissingSlot,
146                "component_missing",
147                "component missing".to_string(),
148            );
149        };
150
151        let result: Result<DispatchResult, String> = {
152            let mut input_refs: Vec<(String, &dyn SlotValue)> =
153                Vec::with_capacity(input_pairs.len());
154            for (site, name, read_exec_id) in &input_pairs {
155                if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
156                    input_refs.push((name.clone(), boxed.as_ref()));
157                }
158            }
159            let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
160                input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
161
162            let (
163                envelope_src_peer,
164                inbound_correlation_wire_req_id,
165                inbound_arrival_ns,
166                inbound_remaining_deadline_ns,
167            ) = self
168                .framework
169                .inbound_contexts
170                .get(&exec_id)
171                .map(|c| {
172                    (
173                        c.src_peer,
174                        c.wire_req_id,
175                        c.arrival_ns,
176                        c.remaining_deadline_ns,
177                    )
178                })
179                .unwrap_or((None, None, None, None));
180            let mut ctx = RuntimeResourceRef {
181                peers: crate::runtime::PeerCtx {
182                    gate: &mut self.framework.peer_state.gate,
183                    backoff: &mut self.framework.peer_state.backoff,
184                    governor: &mut self.framework.peer_state.governor,
185                    addresses: &mut self.framework.address_book,
186                    backpressure: &mut self.framework.peer_state.backpressure,
187                },
188                net: crate::runtime::NetCtx {
189                    outbound: &mut self.framework.outbound_queue,
190                    rtt: &mut self.framework.rtt_tracker,
191                    requests: &mut self.framework.request_tracker,
192                    dedup: &mut self.framework.inbound_dedup,
193                    pending_peer_resolve_failures: &mut self
194                        .framework
195                        .pending_peer_resolve_failures,
196                },
197                time: crate::runtime::TimeCtx {
198                    scheduler: &mut self.framework.scheduler,
199                },
200                syscall: crate::runtime::SyscallCtx {
201                    serialize_queue: &mut self.framework.serialize_queue,
202                    hold_table: &mut self.framework.hold_table,
203                    record_buffer: &mut self.framework.record_buffer,
204                    event_source: &mut self.framework.event_source,
205                    counters: &mut self.framework.counters,
206                    any_fired_groups: &mut self.framework.any_fired_groups,
207                    deadline_match_fired: &mut self.framework.deadline_match_fired,
208                    rng: &mut *self.framework.rng,
209                    pending_app_events: &mut self.framework.pending_app_events,
210                },
211                bus: &mut self.bus,
212                ingress: std::sync::Arc::clone(&self.ingress),
213                components: crate::runtime::ComponentsView {
214                    instances: Some(&self.components),
215                    slots: Some(&self.slots),
216                },
217                current: crate::runtime::CurrentCallCtx {
218                    op_ref,
219                    exec_id,
220                    self_peer: self.self_peer,
221                    node_attributes: &node.attribute,
222                    node_metadata: &node.metadata_props,
223                    inbound: crate::runtime::InboundCtx {
224                        src_peer: envelope_src_peer,
225                        wire_req_id: inbound_correlation_wire_req_id,
226                        arrival_ns: inbound_arrival_ns,
227                        remaining_deadline_ns: inbound_remaining_deadline_ns,
228                    },
229                    pending_completions: Vec::new(),
230                    next_command_id: &mut self.exec.ids.next_command_id,
231                },
232            };
233
234            // The install-time-stamped dispatch_fn is the one
235            // canonical hot path — runtime calls the downcast
236            // closure directly, no per-op TypeId HashMap probe.
237            let any: &mut dyn std::any::Any = taken.as_mut();
238            let dispatch_result = dispatch_fn(any, &node.op_type, &inputs_for_dispatch, &mut ctx);
239
240            let captured = std::mem::take(&mut ctx.current.pending_completions);
241            drop(ctx);
242            self.exec.pending_completions.extend(captured);
243
244            dispatch_result
245        };
246
247        // Always restore - keeps the slot table consistent across
248        // dispatch outcomes (Immediate, Async, or error).
249        self.restore_component(component_ref, taken);
250
251        match result {
252            Ok(DispatchResult::Immediate(outputs)) => {
253                let sites = self.write_outputs(op_ref, exec_id, outputs);
254                EngineStep::OpCompleted {
255                    op_ref,
256                    exec_id,
257                    sites_written: sites,
258                }
259            }
260            Ok(DispatchResult::Async(cmd_id)) => {
261                let output_sites = self.op_output_sites(op_ref);
262                self.exec.pending_async.insert(
263                    cmd_id,
264                    PendingAsync {
265                        op_ref,
266                        exec_id,
267                        output_sites,
268                        deadline_ns: None,
269                    },
270                );
271                EngineStep::AsyncSuspended {
272                    op_ref,
273                    exec_id,
274                    cmd_id,
275                }
276            }
277            Err(detail) => self.fail_op(
278                op_ref,
279                exec_id,
280                crate::bus::OpErrorKind::ExecutionFailed,
281                "stateless_invoke",
282                detail,
283            ),
284        }
285    }
286
287    /// Splice a function-call invocation per ENGINE.md §8.4.
288    ///
289    /// Flat-frontier model: the body's `OpRef`s are shared (one
290    /// allocation in `engine.graphs[graph_name_for(target)]`); each call
291    /// allocates a fresh body `ExecId` and the body's nodes execute at
292    /// that derived id. Input slots are NOT copied - the body's formal
293    /// parameter reads route through `input_aliases` to the caller's
294    /// slot at `parent_exec_id` (zero-copy). Output forwarding is
295    /// applied in `write_outputs` (Phase 2b §2b.8).
296    ///
297    /// Returns `OpCompleted` for the call site itself with no
298    /// `sites_written` - the body's writes surface as separate events
299    /// as each body node fires.
300    pub(crate) fn invoke_function_call(
301        &mut self,
302        op_ref: OpRef,
303        parent_exec_id: ExecId,
304        target: &FunctionKey,
305        input_rename: &[(String, String)],
306        output_rename: &[(String, String)],
307    ) -> EngineStep {
308        let graph_name = graph_name_for(target);
309        if !self.has_graph(&graph_name) {
310            return self.fail_op(
311                op_ref,
312                parent_exec_id,
313                crate::bus::OpErrorKind::NotRegistered,
314                "function_target_missing",
315                format!("function-call target {graph_name} not installed"),
316            );
317        }
318
319        let body_exec_id = self.allocate_exec_id();
320        let body = self.graph(&graph_name).expect("checked above");
321
322        // Snapshot body OpRefs + formal-name → body-site map before
323        // releasing the borrow on self.graphs. With positional
324        // OpRefs the body's op_refs are `OpRef::pack(body_idx,
325        // node_idx)` for `node_idx in 0..body.function.node.len()`.
326        let body_idx = self.graph_idx(&graph_name).expect("graph just resolved");
327        let body_op_refs: Vec<OpRef> = (0..body.function.node.len() as u32)
328            .map(|ni| OpRef::pack(body_idx, ni))
329            .collect();
330        let body_site_for: std::collections::HashMap<String, NodeSiteId> = body.site_names.clone();
331
332        // input_rename pairs: (caller_value_name, formal_parameter_name)
333        // Caller-side names are scoped to the call op's owning graph -
334        // body-side names may coincidentally spell the same string.
335        let mut input_aliases: std::collections::HashMap<String, NodeSiteId> =
336            std::collections::HashMap::with_capacity(input_rename.len());
337        for (caller_name, formal_name) in input_rename {
338            let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_name) else {
339                return self.fail_op(
340                    op_ref,
341                    parent_exec_id,
342                    crate::bus::OpErrorKind::MissingSlot,
343                    "function_input_unbound",
344                    format!("function-call input {caller_name} not bound"),
345                );
346            };
347            input_aliases.insert(formal_name.clone(), caller_site);
348        }
349
350        // output_rename pairs: (formal_output_name, caller_value_name)
351        let mut output_forwarding: std::collections::HashMap<NodeSiteId, NodeSiteId> =
352            std::collections::HashMap::with_capacity(output_rename.len());
353        for (formal_out, caller_out) in output_rename {
354            let Some(&body_site) = body_site_for.get(formal_out) else {
355                return self.fail_op(
356                    op_ref,
357                    parent_exec_id,
358                    crate::bus::OpErrorKind::NotRegistered,
359                    "function_output_missing",
360                    format!("function-call output {formal_out} missing from body"),
361                );
362            };
363            let Some(caller_site) = self.resolve_site_in_op_graph(op_ref, caller_out) else {
364                return self.fail_op(
365                    op_ref,
366                    parent_exec_id,
367                    crate::bus::OpErrorKind::MissingSlot,
368                    "function_output_unbound",
369                    format!("function-call output {caller_out} not bound"),
370                );
371            };
372            output_forwarding.insert(body_site, caller_site);
373        }
374
375        let outputs_remaining = output_forwarding.len();
376        self.exec.pending_calls.insert(
377            body_exec_id,
378            CallContext {
379                parent_exec_id,
380                target: target.clone(),
381                input_aliases,
382                output_forwarding,
383                outputs_remaining,
384            },
385        );
386
387        // Push every body OpRef onto the frontier at body_exec_id.
388        // Alias-aware `all_inputs_ready` (Phase 2b §2b.7) gates each
389        // node until its formal inputs are populated at the caller's
390        // ExecId. Plan §2b.6 step 6: simple v1 - push all, gate.
391        for body_op in body_op_refs {
392            self.exec.frontier.push_back((body_op, body_exec_id));
393        }
394
395        // Zero-outputs corner case: with no forwarding to wait on, the
396        // call is conceptually complete the moment the body finishes.
397        // `write_outputs`' forwarding hook drops the entry once
398        // `outputs_remaining` hits zero - for a no-output call we drop
399        // immediately so we don't leak an entry.
400        if self
401            .exec
402            .pending_calls
403            .get(&body_exec_id)
404            .map(|c| c.outputs_remaining == 0)
405            .unwrap_or(false)
406        {
407            self.exec.pending_calls.remove(&body_exec_id);
408        }
409
410        EngineStep::OpCompleted {
411            op_ref,
412            exec_id: parent_exec_id,
413            sites_written: Vec::new(),
414        }
415    }
416
417    /// Dispatch a stateless syscall op. Shares the input-resolution,
418    /// split-borrow, and result-handling shape with `invoke_one`'s
419    /// atomic path; differs in calling the fn pointer directly rather
420    /// than going through `components[component_ref]`.
421    pub(crate) fn invoke_stateless(
422        &mut self,
423        op_ref: OpRef,
424        exec_id: ExecId,
425        node: &NodeProto,
426        invoke_fn: StatelessInvokeFn,
427    ) -> EngineStep {
428        // Same pre-dispatch pending_async cap check as invoke_atomic
429        // — syscalls (wire.Send, Sleep, ...) routinely side-effect
430        // before returning Async, so a post-dispatch rejection would
431        // leak state.
432        if let Some(cap) = self.max_pending_async {
433            if self.exec.pending_async.len() >= cap {
434                return self.fail_op(
435                    op_ref,
436                    exec_id,
437                    crate::bus::OpErrorKind::Cooldown,
438                    "pending_async_limit",
439                    "pending-async limit exceeded".to_string(),
440                );
441            }
442        }
443        // Alias-aware input resolution - see `invoke_atomic` for the
444        // function-body splice semantics.
445        let input_pairs = self.resolve_input_pairs(node, exec_id);
446
447        let result: Result<DispatchResult, OpError> = {
448            let mut input_refs: Vec<(String, &dyn SlotValue)> =
449                Vec::with_capacity(input_pairs.len());
450            for (site, name, read_exec_id) in &input_pairs {
451                if let Some(Some(boxed)) = self.exec.slot_table.get(&(*site, *read_exec_id)) {
452                    input_refs.push((name.clone(), boxed.as_ref()));
453                }
454            }
455            let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> =
456                input_refs.iter().map(|(n, h)| (n.as_str(), *h)).collect();
457
458            let (
459                envelope_src_peer,
460                inbound_correlation_wire_req_id,
461                inbound_arrival_ns,
462                inbound_remaining_deadline_ns,
463            ) = self
464                .framework
465                .inbound_contexts
466                .get(&exec_id)
467                .map(|c| {
468                    (
469                        c.src_peer,
470                        c.wire_req_id,
471                        c.arrival_ns,
472                        c.remaining_deadline_ns,
473                    )
474                })
475                .unwrap_or((None, None, None, None));
476            let mut ctx = RuntimeResourceRef {
477                peers: crate::runtime::PeerCtx {
478                    gate: &mut self.framework.peer_state.gate,
479                    backoff: &mut self.framework.peer_state.backoff,
480                    governor: &mut self.framework.peer_state.governor,
481                    addresses: &mut self.framework.address_book,
482                    backpressure: &mut self.framework.peer_state.backpressure,
483                },
484                net: crate::runtime::NetCtx {
485                    outbound: &mut self.framework.outbound_queue,
486                    rtt: &mut self.framework.rtt_tracker,
487                    requests: &mut self.framework.request_tracker,
488                    dedup: &mut self.framework.inbound_dedup,
489                    pending_peer_resolve_failures: &mut self
490                        .framework
491                        .pending_peer_resolve_failures,
492                },
493                time: crate::runtime::TimeCtx {
494                    scheduler: &mut self.framework.scheduler,
495                },
496                syscall: crate::runtime::SyscallCtx {
497                    serialize_queue: &mut self.framework.serialize_queue,
498                    hold_table: &mut self.framework.hold_table,
499                    record_buffer: &mut self.framework.record_buffer,
500                    event_source: &mut self.framework.event_source,
501                    counters: &mut self.framework.counters,
502                    any_fired_groups: &mut self.framework.any_fired_groups,
503                    deadline_match_fired: &mut self.framework.deadline_match_fired,
504                    rng: &mut *self.framework.rng,
505                    pending_app_events: &mut self.framework.pending_app_events,
506                },
507                bus: &mut self.bus,
508                ingress: std::sync::Arc::clone(&self.ingress),
509                components: crate::runtime::ComponentsView::default(),
510                current: crate::runtime::CurrentCallCtx {
511                    op_ref,
512                    exec_id,
513                    self_peer: self.self_peer,
514                    node_attributes: &node.attribute,
515                    node_metadata: &node.metadata_props,
516                    inbound: crate::runtime::InboundCtx {
517                        src_peer: envelope_src_peer,
518                        wire_req_id: inbound_correlation_wire_req_id,
519                        arrival_ns: inbound_arrival_ns,
520                        remaining_deadline_ns: inbound_remaining_deadline_ns,
521                    },
522                    pending_completions: Vec::new(),
523                    next_command_id: &mut self.exec.ids.next_command_id,
524                },
525            };
526
527            let dispatch_result = invoke_fn(node, &inputs_for_dispatch, &mut ctx);
528
529            let captured = std::mem::take(&mut ctx.current.pending_completions);
530            drop(ctx);
531            self.exec.pending_completions.extend(captured);
532
533            dispatch_result
534        };
535
536        match result {
537            Ok(DispatchResult::Immediate(outputs)) => {
538                let sites = self.write_outputs(op_ref, exec_id, outputs);
539                EngineStep::OpCompleted {
540                    op_ref,
541                    exec_id,
542                    sites_written: sites,
543                }
544            }
545            Ok(DispatchResult::Async(cmd_id)) => {
546                let output_sites = self.op_output_sites(op_ref);
547                self.exec.pending_async.insert(
548                    cmd_id,
549                    PendingAsync {
550                        op_ref,
551                        exec_id,
552                        output_sites,
553                        deadline_ns: None,
554                    },
555                );
556                EngineStep::AsyncSuspended {
557                    op_ref,
558                    exec_id,
559                    cmd_id,
560                }
561            }
562            Err(err) => {
563                self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
564                    op_ref,
565                    error: err.clone(),
566                }));
567                EngineStep::OpFailed {
568                    op_ref,
569                    exec_id,
570                    error: err,
571                }
572            }
573        }
574    }
575
576    /// Look up a NodeProto by op_ref. Positional resolution via
577    /// `OpRef::pack(graph_idx, node_idx)` → two direct array indexes.
578    pub(crate) fn node_for(&self, op_ref: OpRef) -> Option<&NodeProto> {
579        let (gi, ni) = op_ref.split();
580        self.graphs.get(gi as usize)?.function.node.get(ni as usize)
581    }
582
583    /// Output sites the given Op declared. Positional resolution
584    /// looks up the NodeProto directly; each declared output name
585    /// resolves through the graph's `site_names` map (with a
586    /// deterministic synth as a fallback for test setups).
587    pub(crate) fn op_output_sites(&self, op_ref: OpRef) -> Vec<NodeSiteId> {
588        let (gi, ni) = op_ref.split();
589        let Some(g) = self.graphs.get(gi as usize) else {
590            return Vec::new();
591        };
592        let Some(node) = g.function.node.get(ni as usize) else {
593            return Vec::new();
594        };
595        node.output
596            .iter()
597            .enumerate()
598            .map(|(i, name)| {
599                g.site_names
600                    .get(name)
601                    .copied()
602                    .unwrap_or_else(|| synthesize_site_id(op_ref, i))
603            })
604            .collect()
605    }
606
607    /// Resolve a value-name to its `NodeSiteId` across every
608    /// GraphSlot. Returns `None` if no graph has a binding.
609    pub(crate) fn resolve_site_name(&self, name: &str) -> Option<NodeSiteId> {
610        for g in self.graphs_iter() {
611            if let Some(&site) = g.site_names.get(name) {
612                return Some(site);
613            }
614        }
615        None
616    }
617
618    /// Resolve a value-name to its `NodeSiteId` within the GraphSlot
619    /// that owns `op_ref`. Used by `invoke_function_call` to disambiguate
620    /// caller-side value names from formal-parameter names that happen
621    /// to spell the same string in the callee body (hoisted bodies
622    /// canonicalize to `__hoist_*` so this clash is rare in production,
623    /// but graph-scoped lookup is the principled fix). Positional
624    /// `OpRef::pack` makes the owning graph a direct index.
625    pub(crate) fn resolve_site_in_op_graph(&self, op_ref: OpRef, name: &str) -> Option<NodeSiteId> {
626        let (gi, _) = op_ref.split();
627        self.graphs.get(gi as usize)?.site_names.get(name).copied()
628    }
629
630    /// Resolve a NodeProto's inputs into `(NodeSiteId, value_name,
631    /// read_exec_id)` triples per ENGINE.md §8.4.
632    ///
633    /// Function-call splice: when `exec_id` is a body's derived
634    /// `ExecId` (i.e. present in `pending_calls`), formal inputs that
635    /// appear in `input_aliases` re-route reads to the caller's
636    /// `parent_exec_id` and the caller-side `NodeSiteId` - zero-copy
637    /// aliasing, the body never copies the caller's slot. Inputs not
638    /// in the alias map fall back to the standard
639    /// `resolve_site_name(name) → exec_id` path so body-internal
640    /// values keep flowing at `body_exec_id`.
641    ///
642    /// Empty input names (ONNX optional-arg convention) and inputs
643    /// that fail to resolve are skipped - the caller checks readiness
644    /// separately via `all_inputs_ready`.
645    pub(crate) fn resolve_input_pairs(
646        &self,
647        node: &NodeProto,
648        exec_id: ExecId,
649    ) -> Vec<(NodeSiteId, String, ExecId)> {
650        let cc = self.exec.pending_calls.get(&exec_id);
651        let mut out = Vec::new();
652        for name in &node.input {
653            if name.is_empty() {
654                continue;
655            }
656            if let Some(cc) = cc {
657                if let Some(&alias_site) = cc.input_aliases.get(name) {
658                    out.push((alias_site, name.clone(), cc.parent_exec_id));
659                    continue;
660                }
661            }
662            if let Some(site) = self.resolve_site_name(name) {
663                out.push((site, name.clone(), exec_id));
664            }
665        }
666        out
667    }
668
669    /// Write Op outputs to the slot_table + push ready downstream
670    /// consumers onto the frontier. Returns the list of sites
671    /// written (for `EngineStep::OpCompleted`).
672    pub(crate) fn write_outputs(
673        &mut self,
674        op_ref: OpRef,
675        exec_id: ExecId,
676        outputs: Vec<(String, Box<dyn SlotValue>)>,
677    ) -> Vec<NodeSiteId> {
678        let output_sites = self.op_output_sites(op_ref);
679        for ((site, _name), value) in output_sites
680            .iter()
681            .zip(outputs.iter().map(|(n, _)| n))
682            .zip(outputs.iter().map(|(_, v)| v.as_ref()))
683        {
684            // Place each value into its slot. We can't move out of
685            // `outputs` while iterating borrows, so we collect in
686            // two passes. (See below.)
687            let _ = (site, _name, value);
688        }
689        // Two-pass placement: zip site_index → outputs by position.
690        let mut sites_written: Vec<NodeSiteId> = Vec::new();
691        for (i, (_name, value)) in outputs.into_iter().enumerate() {
692            if let Some(site) = output_sites.get(i).copied() {
693                self.exec.slot_table.insert((site, exec_id), Some(value));
694                sites_written.push(site);
695            }
696        }
697
698        // Bump execution_state.
699        self.exec
700            .execution_state
701            .entry(exec_id)
702            .or_default()
703            .outputs_written += sites_written.len() as u32;
704
705        // Push consumers whose inputs are now all ready.
706        self.push_ready_consumers(&sites_written, exec_id);
707
708        // Function-call splice: when `exec_id` is a body's derived id,
709        // forward any matching outputs back to the caller's slots at
710        // `parent_exec_id` and push the caller's downstream consumers.
711        // No-op when there's no pending call context.
712        self.forward_outputs_to_caller(&sites_written, exec_id);
713
714        self.surface_top_level_outputs(&sites_written, exec_id);
715        sites_written
716    }
717
718    /// Phase 2b §2b.8 - output forwarding for the function-call splice.
719    ///
720    /// If `exec_id` keys a `CallContext` in `pending_calls`, every
721    /// `sites_written` entry that appears in `output_forwarding` gets
722    /// its `SlotValue` MOVED from the body's slot at `exec_id` to
723    /// the caller's slot at `parent_exec_id`. `SlotValue` is not
724    /// `Clone`; a body output is one-shot, so moving the value is
725    /// semantically correct.
726    ///
727    /// After moving values, pushes the caller-side consumers onto the
728    /// frontier at `parent_exec_id` and surfaces top-level outputs.
729    /// Drops the `pending_calls` entry once `outputs_remaining`
730    /// reaches zero.
731    pub(crate) fn forward_outputs_to_caller(
732        &mut self,
733        sites_written: &[NodeSiteId],
734        exec_id: ExecId,
735    ) {
736        let Some(cc) = self.exec.pending_calls.get(&exec_id) else {
737            return;
738        };
739        // Collect the body_site → caller_site pairs we'll forward,
740        // dropping the immutable borrow before mutating slot_table.
741        let mut pairs: Vec<(NodeSiteId, NodeSiteId)> = Vec::new();
742        for &body_site in sites_written {
743            if let Some(&caller_site) = cc.output_forwarding.get(&body_site) {
744                pairs.push((body_site, caller_site));
745            }
746        }
747        let parent_exec_id = cc.parent_exec_id;
748        if pairs.is_empty() {
749            return;
750        }
751        tracing::trace!(
752            target: "engine.function_call.forward",
753            call_target = ?cc.target,
754            body_exec_id = exec_id.as_u64(),
755            parent_exec_id = parent_exec_id.as_u64(),
756            pair_count = pairs.len(),
757            "forwarding body outputs to caller slots",
758        );
759
760        let mut caller_sites: Vec<NodeSiteId> = Vec::with_capacity(pairs.len());
761        for (body_site, caller_site) in &pairs {
762            // MOVE the value: body's slot loses the value, caller's
763            // slot owns it. Skip if the body slot is empty (e.g. the
764            // op produced fewer outputs than declared).
765            let value = self
766                .exec
767                .slot_table
768                .get_mut(&(*body_site, exec_id))
769                .and_then(|opt| opt.take());
770            if let Some(value) = value {
771                self.exec
772                    .slot_table
773                    .insert((*caller_site, parent_exec_id), Some(value));
774                caller_sites.push(*caller_site);
775            }
776        }
777
778        // Decrement `outputs_remaining` and drop the entry on
779        // completion in a single mut borrow.
780        if let Some(cc) = self.exec.pending_calls.get_mut(&exec_id) {
781            cc.outputs_remaining = cc.outputs_remaining.saturating_sub(pairs.len());
782            if cc.outputs_remaining == 0 {
783                self.exec.pending_calls.remove(&exec_id);
784            }
785        }
786
787        // Push caller-side consumers + surface top-level outputs
788        // against the caller's `parent_exec_id`.
789        self.push_ready_consumers(&caller_sites, parent_exec_id);
790        self.surface_top_level_outputs(&caller_sites, parent_exec_id);
791    }
792
793    /// Inspect each freshly-written site: when it corresponds to a
794    /// declared `function.output` AND no downstream Op consumes it
795    /// inside the function, push an `AppEvent::Emit` onto
796    /// `framework.pending_app_events` carrying the slot's serialized
797    /// bytes. Phase 8 drains the queue into `EngineStep::AppEvent`
798    /// for the host.
799    ///
800    /// This is the "function signature is the engine I/O contract"
801    /// path. Coexists with the explicit `AppEmit` / `AppNotify`
802    /// syscall ops from `src/syscall/telemetry/`; both surface as the
803    /// same `EngineStep::AppEvent` variant.
804    ///
805    /// Encode the slot's value for host consumption: a `BytesValue`
806    /// surfaces its inner `Vec<u8>` directly (the byte-level
807    /// contract), any other carrier surfaces its bincode-encoded
808    /// form. Push a `Emit { name, value_bytes }` for every top-
809    /// level output site with no in-graph consumer.
810    pub(crate) fn surface_top_level_outputs(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
811        for site in sites {
812            let consumer_count = self
813                .graphs_iter()
814                .map(|g| g.consumers.get(site).map(|v| v.len()).unwrap_or(0))
815                .sum::<usize>();
816            if consumer_count > 0 {
817                continue;
818            }
819            let name_opt = self
820                .graphs_iter()
821                .find_map(|g| g.top_level_outputs.get(site).cloned());
822            let Some(name) = name_opt else { continue };
823            let value_bytes = self
824                .exec
825                .slot_table
826                .get(&(*site, exec_id))
827                .and_then(|slot| slot.as_ref())
828                .map(|boxed| encode_for_host(boxed.as_ref()))
829                .unwrap_or_default();
830            self.framework
831                .pending_app_events
832                .push(crate::bus::AppEvent::Emit { name, value_bytes });
833        }
834    }
835
836    /// Push consumer Ops onto the frontier when all their inputs are
837    /// satisfied. minimum-viable: iterates each graph's
838    /// `consumers` map for each newly-written site.
839    pub(crate) fn push_ready_consumers(&mut self, sites: &[NodeSiteId], exec_id: ExecId) {
840        // Collect candidate (consumer_op) refs across graphs first,
841        // then filter by readiness, then push - avoids borrow
842        // conflicts.
843        let mut candidates: Vec<OpRef> = Vec::new();
844        for site in sites {
845            for g in self.graphs_iter() {
846                if let Some(consumers) = g.consumers.get(site) {
847                    candidates.extend(consumers.iter().copied());
848                }
849            }
850        }
851        for op_ref in candidates {
852            if self.all_inputs_ready(op_ref, exec_id) {
853                self.exec.frontier.push_back((op_ref, exec_id));
854            }
855        }
856    }
857
858    /// Per ENGINE.md §8.3 - all `Required` inputs must be filled.
859    /// treats every input as `Required` (no `AnyOf` yet -
860    /// 's syscall opset introduces optional-input ops).
861    pub(crate) fn all_inputs_ready(&self, op_ref: OpRef, exec_id: ExecId) -> bool {
862        let Some(node) = self.node_for(op_ref) else {
863            return false;
864        };
865        // Mirror `resolve_input_pairs`' alias-aware lookup: when
866        // `exec_id` is a body's derived id, formal inputs read from
867        // the caller's slot at `parent_exec_id` (per ENGINE.md §8.4).
868        let cc = self.exec.pending_calls.get(&exec_id);
869        for name in &node.input {
870            if name.is_empty() {
871                continue; // ONNX optional-arg convention.
872            }
873            let (site, read_exec_id) = if let Some(cc) = cc {
874                if let Some(&alias_site) = cc.input_aliases.get(name) {
875                    (alias_site, cc.parent_exec_id)
876                } else {
877                    let Some(site) = self.resolve_site_name(name) else {
878                        return false;
879                    };
880                    (site, exec_id)
881                }
882            } else {
883                let Some(site) = self.resolve_site_name(name) else {
884                    return false;
885                };
886                (site, exec_id)
887            };
888            let has_value = self
889                .exec
890                .slot_table
891                .get(&(site, read_exec_id))
892                .map(|s| s.is_some())
893                .unwrap_or(false);
894            if !has_value {
895                return false;
896            }
897        }
898        true
899    }
900
901    /// Surface an Op failure: emit `InfraEvent::OpFailure` onto the
902    /// bus AND return `EngineStep::OpFailed`. Every internal call
903    /// site classifies the failure so operators can match on the
904    /// `OpErrorKind` taxonomy for retry/report/drop policy without
905    /// parsing freeform detail strings.
906    pub(crate) fn fail_op(
907        &mut self,
908        op_ref: OpRef,
909        exec_id: ExecId,
910        kind: crate::bus::OpErrorKind,
911        reason: &'static str,
912        detail: String,
913    ) -> EngineStep {
914        let error = OpError {
915            kind,
916            reason,
917            detail,
918        };
919        self.bus.publish(NodeEvent::Infra(InfraEvent::OpFailure {
920            op_ref,
921            error: error.clone(),
922        }));
923        EngineStep::OpFailed {
924            op_ref,
925            exec_id,
926            error,
927        }
928    }
929}
930
931/// Encode a slot value for host-visible delivery. `BytesValue`
932/// surfaces its inner bytes raw - the byte-level contract callers
933/// expect when their function output is already wire-formatted.
934/// Any other carrier surfaces its bincode-encoded form via
935/// `SlotValue::to_wire_bytes`.
936///
937/// Encode a slot value's wire bytes for host delivery. `BytesValue`
938/// short-circuits to its stored bytes. Other values go through
939/// `to_wire_bytes`; encode failures here are non-fatal — the host
940/// gets empty bytes and a `tracing::warn` records the failure so an
941/// observer can attribute the drop to the value rather than to a
942/// missing emit.
943fn encode_for_host(value: &dyn crate::slot_value::SlotValue) -> Vec<u8> {
944    if let Some(b) = value
945        .as_any()
946        .downcast_ref::<crate::syscall::values::BytesValue>()
947    {
948        return b.0.clone();
949    }
950    match value.to_wire_bytes() {
951        Ok(bytes) => bytes,
952        Err(e) => {
953            tracing::warn!(error = %e, "encode_for_host: dropping host emit on encode failure");
954            Vec::new()
955        }
956    }
957}
958
959/// deterministic NodeSiteId synthesizer for test graphs
960/// that don't populate `site_names`. 's Node
961/// populates the real map.
962fn synthesize_site_id(op_ref: OpRef, output_index: usize) -> NodeSiteId {
963    NodeSiteId::from((op_ref.as_u64() << 8) | (output_index as u64 & 0xff))
964}
965
966/// Walk a `ProtocolRuntime` dispatcher slice, find the entry that
967/// matches the bound component's concrete type, and route the call
968/// through its `dispatch_atomic` impl.
969///
970/// The dispatcher registry is per-Engine (stored on
971/// `Engine.role_dispatchers`); the caller passes the slice in so
972/// borrow-splitting at the call site can keep `&mut self.components`
973/// and `&mut self.framework` independent.
974pub(crate) fn call_protocol_dispatch_atomic(
975    component: &mut dyn crate::component::ErasedComponent,
976    op_type: &str,
977    inputs: &[(&str, &dyn SlotValue)],
978    ctx: &mut RuntimeResourceRef<'_>,
979    dispatchers: &std::collections::HashMap<std::any::TypeId, RoleDispatcher>,
980) -> Result<DispatchResult, String> {
981    let any: &mut dyn std::any::Any = component;
982    let tid = (*any).type_id();
983    if let Some(dispatcher) = dispatchers.get(&tid) {
984        (dispatcher.dispatch)(any, op_type, inputs, ctx)
985    } else {
986        Err("no ProtocolRuntime dispatcher registered for component".to_string())
987    }
988}
989
990/// Type alias for the ProtocolRuntime downcast-dispatch fn pointer
991/// stored in the dispatcher registry.
992pub type ProtocolDispatchFn = fn(
993    &mut dyn std::any::Any,
994    &str,
995    &[(&str, &dyn SlotValue)],
996    &mut RuntimeResourceRef<'_>,
997) -> Result<DispatchResult, String>;
998
999/// Type alias for the `BackendRuntime::materialize_from_wire`
1000/// downcast-dispatch fn pointer. Mirrors [`ProtocolDispatchFn`]'s
1001/// erased-`Any` shape so the per-T closure stays callable from the
1002/// engine's wire-decode hot path without re-doing the downcast lookup
1003/// on every fill.
1004pub type BackendMaterializeFn =
1005    fn(
1006        &mut dyn std::any::Any,
1007        u64,
1008        Vec<u8>,
1009    ) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError>;
1010
1011/// Type alias for the `Bootstrap::bootstrap` downcast-dispatch fn
1012/// pointer the engine stores per concrete Bootstrap impl. Mirrors
1013/// [`ProtocolDispatchFn`]'s erased-`Any` shape so the F3 Component
1014/// bootstrap fire path can invoke the impl without a per-TypeId
1015/// downcast on every call. The closure downcasts `any` to `T`,
1016/// runs `T::bootstrap(&mut BootstrapCtx)`, and reports the
1017/// `DispatchResult` (Immediate or Async) for the synthetic single-
1018/// op dispatch.
1019pub type BootstrapDispatchFn = fn(
1020    &mut dyn std::any::Any,
1021    &mut crate::contracts::bootstrap::BootstrapCtx,
1022) -> Result<DispatchResult, String>;
1023
1024/// Build a [`BootstrapDispatchFn`] for a concrete `T: Bootstrap`.
1025/// Called from `Engine::register_bootstrap_dispatcher` so the
1026/// engine's `fire_component_bootstrap` lookup keys on `TypeId::of::<T>()`
1027/// and the synthetic op invokes the user's `T::bootstrap` directly.
1028///
1029/// The Contract method's return is `Result<(), T::Error>`; this
1030/// helper converts the `Ok(())` to `DispatchResult::Immediate(Vec::new())`
1031/// because Component bootstrap declares no output slots — the
1032/// F3 spec lists `Async` as a future option a Component's
1033/// override surfaces via a wrapping `CommandId`, but Commit 3
1034/// lands the synchronous-Immediate path that every default-no-op
1035/// Bootstrap takes.
1036pub fn make_bootstrap_dispatcher<T: crate::contracts::bootstrap::Bootstrap + 'static>(
1037) -> BootstrapDispatchFn
1038where
1039    T::Error: std::fmt::Display,
1040{
1041    |any, ctx| {
1042        let concrete = any
1043            .downcast_mut::<T>()
1044            .expect("type-erased lookup matched T");
1045        concrete
1046            .bootstrap(ctx)
1047            .map(|_| DispatchResult::Immediate(Vec::new()))
1048            .map_err(|e| e.to_string())
1049    }
1050}
1051
1052/// Backend-only no-op materialize entry used by non-Backend roles.
1053/// Roles other than `Backend` never reach the materialize entry
1054/// (decode_typed_fill only consults it after confirming the slot
1055/// binds to `ComponentRole::Backend`), so the unused fn pointer
1056/// returns a descriptive error rather than panicking.
1057pub fn no_materialize(
1058    _any: &mut dyn std::any::Any,
1059    _type_hash: u64,
1060    _bytes: Vec<u8>,
1061) -> Result<Box<dyn SlotValue>, crate::slot_value::BackendMaterializeError> {
1062    Err(crate::slot_value::BackendMaterializeError {
1063        summary: "component is not a Backend; materialize_from_wire not supported".to_string(),
1064    })
1065}
1066
1067/// One registered `ProtocolRuntime` / `<Role>Runtime` dispatcher.
1068/// `dispatch` downcasts to the concrete type `T` (guaranteed by the
1069/// `TypeId`-keyed registry) and delegates to `T::dispatch_atomic`.
1070/// `materialize` is the parallel entry for
1071/// [`crate::roles::BackendRuntime::materialize_from_wire`]; non-Backend
1072/// roles register [`no_materialize`] so the engine's wire-decode hot
1073/// path always finds an entry without a per-role conditional.
1074pub struct RoleDispatcher {
1075    pub(crate) dispatch: ProtocolDispatchFn,
1076    pub(crate) materialize: BackendMaterializeFn,
1077}
1078
1079/// Build a `RoleDispatcher` for a concrete `ProtocolRuntime` impl.
1080/// Called from `Engine::register_protocol_dispatcher` and from any
1081/// test/production setup that needs to register dispatcher entries on
1082/// a fresh Engine.
1083pub fn make_protocol_dispatcher<T: ProtocolRuntime + 'static>() -> RoleDispatcher
1084where
1085    T::Error: std::fmt::Display,
1086{
1087    RoleDispatcher {
1088        dispatch: |any: &mut dyn std::any::Any,
1089                   op_type: &str,
1090                   inputs: &[(&str, &dyn SlotValue)],
1091                   ctx: &mut RuntimeResourceRef<'_>| {
1092            let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1093            concrete
1094                .dispatch_atomic(op_type, inputs, ctx)
1095                .map_err(|e| e.to_string())
1096        },
1097        materialize: no_materialize,
1098    }
1099}
1100
1101/// Macro: emit `make_<role>_dispatcher` factories for every
1102/// non-Protocol role trait. Each factory captures `T`'s concrete
1103/// `dispatch_atomic` (a role-specific trait method) inside a closure
1104/// the universal-pair-only dispatch table can call. Authors deriving
1105/// a single role (e.g. `#[derive(bb::Index)]`) get atomic dispatch
1106/// wired without needing a manual `ProtocolRuntime` shell.
1107macro_rules! emit_role_dispatcher_factory {
1108    ($factory_name:ident, $runtime_trait:path) => {
1109        #[doc = concat!("Build a `RoleDispatcher` for a concrete impl of ", stringify!($runtime_trait), ". Used by `Engine::register_*_dispatcher` chain (`Node::with_<role>(&value)`) so single-role components dispatch through the same `TypeId`-keyed registry as multi-role / Protocol-bearing components.")]
1110        pub fn $factory_name<T: $runtime_trait + 'static>() -> RoleDispatcher
1111        where
1112            <T as $runtime_trait>::Error: std::fmt::Display,
1113        {
1114            RoleDispatcher {
1115                dispatch: |any: &mut dyn std::any::Any,
1116                           op_type: &str,
1117                           inputs: &[(&str, &dyn SlotValue)],
1118                           ctx: &mut RuntimeResourceRef<'_>| {
1119                    let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1120                    <T as $runtime_trait>::dispatch_atomic(concrete, op_type, inputs, ctx)
1121                        .map_err(|e| e.to_string())
1122                },
1123                materialize: no_materialize,
1124            }
1125        }
1126    };
1127}
1128
1129emit_role_dispatcher_factory!(make_index_dispatcher, crate::roles::IndexRuntime);
1130emit_role_dispatcher_factory!(make_aggregator_dispatcher, crate::roles::AggregatorRuntime);
1131emit_role_dispatcher_factory!(make_model_dispatcher, crate::roles::ModelRuntime);
1132emit_role_dispatcher_factory!(make_codec_dispatcher, crate::roles::CodecRuntime);
1133emit_role_dispatcher_factory!(make_data_source_dispatcher, crate::roles::DataSourceRuntime);
1134emit_role_dispatcher_factory!(
1135    make_peer_selector_dispatcher,
1136    crate::roles::PeerSelectorRuntime
1137);
1138
1139/// Build a `RoleDispatcher` for a concrete `BackendRuntime` impl —
1140/// the per-`T` `dispatch_atomic` closure plus the `materialize_from_wire`
1141/// bridge the derive emits. Backend dispatchers are the only ones
1142/// that wire a real `materialize` entry; every other role registers
1143/// [`no_materialize`].
1144pub fn make_backend_dispatcher<T: crate::roles::BackendRuntime + 'static>() -> RoleDispatcher
1145where
1146    <T as crate::roles::BackendRuntime>::Error: std::fmt::Display,
1147{
1148    RoleDispatcher {
1149        dispatch: |any: &mut dyn std::any::Any,
1150                   op_type: &str,
1151                   inputs: &[(&str, &dyn SlotValue)],
1152                   ctx: &mut RuntimeResourceRef<'_>| {
1153            let concrete = any.downcast_mut::<T>().expect("is_match guaranteed");
1154            <T as crate::roles::BackendRuntime>::dispatch_atomic(concrete, op_type, inputs, ctx)
1155                .map_err(|e| e.to_string())
1156        },
1157        materialize: |any: &mut dyn std::any::Any, type_hash: u64, bytes: Vec<u8>| {
1158            let concrete = any.downcast_ref::<T>().expect("is_match guaranteed");
1159            <T as crate::roles::BackendRuntime>::materialize_from_wire(concrete, type_hash, bytes)
1160        },
1161    }
1162}
1163
1164#[cfg(test)]
1165#[path = "invoke_function_call_tests.rs"]
1166mod function_call_tests;