Skip to main content

bb_runtime/engine/
core.rs

1//! The `Engine` struct + test-only constructor + registration
2//! accessors. Hot-path poll cycle lives in `engine::poll`.
3
4use std::collections::HashMap;
5use std::marker::PhantomData;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use crate::bus::TypedBus;
10use crate::component::ErasedComponent;
11use crate::engine::dispatch_entry::{FunctionKey, OpDispatch, StatelessInvokeFn};
12use crate::engine::graph_slot::GraphSlot;
13use crate::engine::invoke::{make_protocol_dispatcher, RoleDispatcher};
14use crate::framework::FrameworkComponents;
15use crate::ids::{CommandId, ComponentRef, ExecId, NodeSiteId, OpRef};
16use crate::ingress::IngressQueue;
17use crate::slot_value::SlotValue;
18use bb_ir::proto::onnx::{FunctionProto, NodeProto};
19
20/// Point-in-time hot-path counters for dashboards + saturation
21/// detection. Not synchronized against an in-flight poll cycle.
22#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
23pub struct EngineStats {
24    /// `(OpRef, ExecId)` pairs on the frontier. Climbing → poll
25    /// loop falling behind ingress.
26    pub frontier_len: usize,
27    /// Events queued on the typed bus awaiting routing.
28    pub bus_len: usize,
29    /// Suspended async commands. Approaching
30    /// `NodeConfig.max_pending_async` → cap pressure.
31    pub pending_async: usize,
32    /// `(NodeSiteId, ExecId)` entries holding a value.
33    pub slot_table_occupied: usize,
34    /// Approximate MPMC ingress depth.
35    pub ingress_depth: usize,
36    /// Envelopes queued for next outbound drain.
37    pub outbound_queue_depth: usize,
38    /// Event kinds with at least one subscriber.
39    pub event_subscriptions: usize,
40    /// Number of registered components.
41    pub registered_components: usize,
42    /// Number of installed graphs.
43    pub graph_slots: usize,
44}
45
46/// Single-Node engine state. Built via `Node::ensure_ready`;
47/// tests use `Engine::new()` and seed fields directly.
48pub struct Engine {
49    // --- Graph storage ---------------------------------------------
50    /// Installed graphs in insertion order. `OpRef::pack` puts the
51    /// slot index in the high 32 bits so `dispatch_for` resolves an
52    /// op via two direct array accesses (no HashMap probe).
53    pub(crate) graphs: Vec<GraphSlot>,
54
55    /// Graph name → `graphs[]` index. Lookup table for name-keyed
56    /// resolution (function-call targets, `Engine::graph(name)`
57    /// accessors). The index doubles as the value packed into every
58    /// `OpRef`.
59    pub(crate) graph_index: HashMap<String, u32>,
60
61    /// Sub-Module function registry by `(domain, name, overload)`.
62    /// Populated by `Node` at install time.
63    pub(crate) functions: HashMap<(String, String, String), FunctionProto>,
64
65    // --- Dispatch --------------------------------------------------
66    /// Unified syscall dispatch table — one lookup per op by
67    /// `(domain, op_type)` to its stateless invoke fn pointer.
68    /// Populated by `register_syscall` and `register_all_framework_syscalls`
69    /// at Engine construction; `resolve_dispatch` reads this to stamp
70    /// `OpDispatch::Stateless` for matching NodeProtos.
71    pub syscall_table: HashMap<(String, String), StatelessInvokeFn>,
72
73    // --- Component storage -----------------------------------------
74    /// All bound runtime impls indexed by `ComponentRef.as_usize()`.
75    /// The `Option<...>` wrapper lets `invoke_atomic` take the
76    /// dispatching component out of the Vec via `mem::take` so a
77    /// live [`crate::runtime::ComponentsView`] can borrow
78    /// `&self.components` for cross-component reads while the
79    /// dispatch closure runs - the dispatching slot is restored
80    /// after the closure returns.
81    pub(crate) components: Vec<Option<Box<dyn ErasedComponent>>>,
82
83    /// The Node's own `PeerId` (the one passed to `Node::new`).
84    /// Threaded into every `RuntimeResourceRef` so Components can
85    /// identify themselves in outbound envelopes.
86    pub self_peer: crate::ids::PeerId,
87
88    /// Per-Node framework primitive bundle (scheduler, peer_gate,
89    /// request_tracker, backoff_table, inbound_dedup, address_book).
90    /// Exposed as `pub` so cross-crate test fixtures in `bb-ops` can
91    /// construct `RuntimeResourceRef` from an Engine's framework
92    /// primitives. Engine internals proper stay encapsulated; this is
93    /// the dependency-injection boundary.
94    pub framework: FrameworkComponents,
95
96    /// The per-Node typed event bus.
97    pub bus: TypedBus,
98
99    // --- Execution state -------------------------------------------
100    /// Per-poll execution-state bundle: frontier, slot table, per-
101    /// execution liveness, parked async ops + in-cycle completions,
102    /// function-call invocation frames, inbound-envelope context
103    /// map, the timer scheduler, and the monotonic ID allocator.
104    /// See [`crate::exec_state::ExecState`].
105    pub exec: crate::exec_state::ExecState,
106
107    /// Reverse index from a fused `binding_id` (e.g.
108    /// `"BurnBackend#0"`) to the bound component's `ComponentRef`.
109    /// Populated by `Node` at install time; read by dispatch
110    /// resolution to bind NodeProtos that reference a backend by
111    /// binding_id.
112    pub(crate) binding_id_index: HashMap<String, ComponentRef>,
113
114    /// Per-`event_kind` subscription map. Each entry names the
115    /// `NodeSiteId`(s) of `EventSource` ops listening for that kind.
116    /// The bus-routing pass writes a `TriggerValue` into each
117    /// subscribed site at a fresh `ExecId` and pushes the site's
118    /// downstream consumers — matching the wire delivery semantics
119    /// per `docs/ADDRESSING.md` so bus + wire share one model.
120    pub(crate) event_subscriptions: HashMap<String, Vec<NodeSiteId>>,
121
122    /// Per-LifecyclePhase Op enrollments. `pub` for cross-crate
123    /// `bb-ops` test fixtures that exercise the LifecyclePhase
124    /// syscall.
125    pub lifecycle_table: HashMap<String, Vec<OpRef>>,
126
127    // --- Async + cross-thread --------------------------------------
128    /// Thread-safe inbox for external events. Producers (transport
129    /// adapters, host invocations) may push from any thread; the
130    /// Engine drains serially from its own (single) thread. Engine
131    /// holds an `Arc` so it can hand clones to producers without
132    /// surrendering ownership.
133    pub(crate) ingress: Arc<IngressQueue>,
134
135    /// Lifecycle phases queued for firing by `Engine::fire_lifecycle`
136    /// on the next poll.
137    pub(crate) fired_phases: Vec<String>,
138
139    /// Snapshot of the ingress queue depth at the start of the
140    /// engine's ingress drain. Used by the backpressure detection
141    /// hook in `process_ingress_event` to compare against the
142    /// configured high-water mark. Refreshed every poll cycle.
143    pub(crate) phase1_pre_drain_depth: usize,
144
145    // --- Bootstrap ---------------------------------------------------
146    /// Consolidated bootstrap state — every field the install path,
147    /// poll seeder, and body-op gate read goes through here. See
148    /// [`crate::engine::bootstrap::BootstrapState`] for field roles.
149    pub(crate) bootstrap: crate::engine::bootstrap::BootstrapState,
150
151    // --- Dispatcher registries (per-Engine) ------------------------
152    /// Concrete-type `ProtocolRuntime` dispatchers registered against
153    /// this Engine, indexed by `TypeId::of::<T>()`. Populated by
154    /// [`Engine::register_protocol_dispatcher`] at setup; consulted by
155    /// `invoke_atomic` via direct HashMap lookup (no linear scan).
156    pub(crate) role_dispatchers: HashMap<std::any::TypeId, RoleDispatcher>,
157
158    /// Concrete-type `Bootstrap` dispatchers, indexed by
159    /// `TypeId::of::<T>()`. Populated by
160    /// [`Engine::register_bootstrap_dispatcher`] at install time and
161    /// consulted by `fire_component_bootstrap` to dispatch the
162    /// synthetic single-op against the bound Component's
163    /// [`crate::contracts::bootstrap::Bootstrap::bootstrap`] impl.
164    pub(crate) bootstrap_dispatchers:
165        HashMap<std::any::TypeId, crate::engine::invoke::BootstrapDispatchFn>,
166
167    // --- Generic slot registry -------------------------------------
168    /// Slot-name → `ComponentRef` registry. Generic over component
169    /// role: indexes EVERY bound Component (backends, indexes,
170    /// models, peer selectors, custom) by binding slot name
171    /// (defaults to the field name; overridable via
172    /// `#[bb::slot("custom")]`). Components reach declared
173    /// dependencies through this map at dispatch time via
174    /// [`crate::runtime::ComponentsView::for_slot`].
175    ///
176    /// This is the canonical lookup table. Every install path
177    /// populates it; every dispatch path reads through it. No
178    /// per-role specialization above the slot abstraction.
179    pub(crate) slots: HashMap<String, ComponentRef>,
180
181    /// Parallel index: compiler-assigned slot id (the value of
182    /// `ai.bytesandbrains.slot_id` stamped on role NodeProtos by the
183    /// placeholders pass) → `ComponentRef`. Populated alongside
184    /// [`Self::slots`] at install time. `resolve_dispatch` reads
185    /// the role NodeProto's `slot_id`, looks it up here, and stamps
186    /// `OpDispatch::Atomic` against the resolved component. Single
187    /// source of truth: install populates both indexes from the same
188    /// `bb.binding.<target>.<slot>` metadata.
189    pub(crate) slot_id_to_cref: HashMap<u32, ComponentRef>,
190
191    /// Parallel index: compiler-assigned `slot_id` → `(role,
192    /// ComponentRef)`. Populated by [`Self::bind_slot_id_with_role`]
193    /// at install time from the same `binding.<target>.<slot>`
194    /// metadata that drives `slot_id_to_cref`; retains the
195    /// `ComponentRole` so `decode_typed_fill` can decide between the
196    /// framework-carrier path and the backend-mediated tensor path.
197    pub(crate) slot_id_to_role_ref: HashMap<u32, (crate::registry::ComponentRole, ComponentRef)>,
198
199    // --- Component role introspection ------------------------------
200    /// Per-component set of declared roles, sourced from
201    /// `inventory::iter::<ComponentRoleBinding>` keyed by
202    /// `T::TYPE_NAME`. Populated by `Node::ensure_ready` after
203    /// component registration; reported by [`Engine::roles_for`] for
204    /// introspection (engine tests, host tooling).
205    pub(crate) component_roles:
206        HashMap<ComponentRef, std::collections::HashSet<crate::registry::ComponentRole>>,
207
208    // --- Production-safety caps ------------------------------------
209    /// Soft per-poll-cycle op-invocation budget per
210    /// `NodeConfig.cycle_op_budget`. When set, `Engine::poll` yields
211    /// after this many invocations and surfaces
212    /// `EngineStep::CycleBudgetExceeded { ops_invoked }` so the host
213    /// can re-poll. `None` disables the budget.
214    pub(crate) cycle_op_budget: Option<usize>,
215
216    /// Cap on the number of in-flight `pending_async` entries per
217    /// `NodeConfig.max_pending_async`. When at cap, an Op returning
218    /// `DispatchResult::Async(_)` fails synchronously via the
219    /// existing `OpFailed` path. `None` disables the cap.
220    pub(crate) max_pending_async: Option<usize>,
221
222    /// Cumulative cap on in-flight ingress bytes held across the
223    /// ingress queue + slot table + pending async completion
224    /// buffers. Sourced from `NodeConfig::ingress_byte_budget`.
225    /// Boundary callers call [`Self::try_charge`] before installing
226    /// a payload; the slot-table writer calls [`Self::release`] on
227    /// overwrite / eviction.
228    pub(crate) ingress_byte_budget: usize,
229
230    /// Live count of ingress bytes the engine currently holds.
231    /// Incremented by [`Self::try_charge`] on successful admission;
232    /// decremented by [`Self::release`] on slot-table overwrite /
233    /// eviction / drop. The budget guard surfaces this as a
234    /// snapshot via [`Self::ingress_bytes_in_flight`].
235    pub(crate) ingress_bytes_in_flight: usize,
236
237    // --- Single-threaded anchor ------------------------------------
238    /// `PhantomData<*const ()>` makes `Engine` neither `Send` nor
239    /// `Sync` - the single-threaded sans-IO contract is enforced at
240    /// compile time. Producers can still push to `ingress` from other
241    /// threads because the `Arc<IngressQueue>` handle is independently
242    /// `Send + Sync`.
243    _not_send: PhantomData<*const ()>,
244}
245
246impl Default for Engine {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252impl Engine {
253    /// Construct an empty engine with the default ingress capacity.
254    /// `Node::new` wraps this with `self_peer`, framework syscalls,
255    /// and config caps applied. For non-default `bus_capacity` use
256    /// [`Self::with_bus_capacity`] so the ingress queue sizes to
257    /// `bus_capacity * 4` per ENGINE.md §2.2.
258    pub fn new() -> Self {
259        Self::with_bus_capacity(crate::node::DEFAULT_BUS_CAPACITY)
260    }
261
262    /// Construct an empty engine whose ingress queue holds up to
263    /// `bus_capacity * 4` events (the ENGINE.md §2.2 ratio that
264    /// reserves headroom for async completions, app events, and
265    /// inbound envelopes between poll cycles).
266    pub fn with_bus_capacity(bus_capacity: usize) -> Self {
267        Self {
268            graphs: Vec::new(),
269            graph_index: HashMap::new(),
270            functions: HashMap::new(),
271            syscall_table: HashMap::new(),
272            slot_id_to_cref: HashMap::new(),
273            slot_id_to_role_ref: HashMap::new(),
274            components: Vec::new(),
275            self_peer: crate::ids::PeerId::from(0u64),
276            framework: FrameworkComponents::new(),
277            bus: TypedBus::new(),
278            exec: crate::exec_state::ExecState::new(),
279            binding_id_index: HashMap::new(),
280            event_subscriptions: HashMap::new(),
281            lifecycle_table: HashMap::new(),
282            ingress: Arc::new(IngressQueue::with_capacity(bus_capacity.saturating_mul(4))),
283            fired_phases: Vec::new(),
284            phase1_pre_drain_depth: 0,
285            bootstrap: crate::engine::bootstrap::BootstrapState::new(),
286            role_dispatchers: HashMap::new(),
287            bootstrap_dispatchers: HashMap::new(),
288            slots: HashMap::new(),
289            component_roles: HashMap::new(),
290            cycle_op_budget: crate::node::DEFAULT_CYCLE_OP_BUDGET,
291            max_pending_async: crate::node::DEFAULT_MAX_PENDING_ASYNC,
292            ingress_byte_budget: crate::node::DEFAULT_INGRESS_BYTE_BUDGET,
293            ingress_bytes_in_flight: 0,
294            _not_send: PhantomData,
295        }
296    }
297
298    /// wipe restorable engine state ahead of a
299    /// `Node::restore` call, leaving the install-time-stamped
300    /// surfaces (`graphs`, `functions`, `dispatch_table`,
301    /// `atomic_dispatch`, `components`, `self_peer`,
302    /// `syscall_index`, `role_dispatchers`, `binding_id_index`,
303    /// `lifecycle_table`, `event_subscriptions`,
304    /// `cycle_op_budget`, `max_pending_async`) intact. The Node
305    /// re-applies the snapshot's framework state, ID counters, and
306    /// pending async/completion queues on top of the cleared
307    /// state, so the post-restore Engine is the same install
308    /// re-seeded with the snapshot's restorable transient state.
309    ///
310    /// Restorable surfaces explicitly cleared:
311    /// - `frontier`, `slot_table`, `execution_state`,
312    ///   `pending_async`, `pending_completions`, `pending_calls`,
313    ///   `fired_phases`
314    /// - `framework` (FrameworkComponents reseeds from snapshot)
315    /// - `bus` (re-establishes subscriptions from snapshot)
316    /// - `ingress` queue (fresh; in-flight inbound is the host's
317    ///   responsibility to redeliver)
318    pub fn clear_for_restore(&mut self) {
319        self.exec.frontier.clear();
320        self.exec.slot_table.clear();
321        self.exec.execution_state.clear();
322        self.exec.pending_async.clear();
323        self.exec.pending_completions.clear();
324        self.exec.pending_calls.clear();
325        self.fired_phases.clear();
326        // Slot-table clear above dropped every charged carrier;
327        // reset the counter so the restored snapshot doesn't inherit
328        // an in-flight balance the new state doesn't own.
329        self.ingress_bytes_in_flight = 0;
330        // Restore deliberately suppresses bootstrap re-runs: the
331        // restored Node already executed its bootstrap call before
332        // the snapshot, and replaying would re-seed the address
333        // book, re-fire the first Announce, etc.
334        // `install_order` + `module_bootstraps` stay populated for
335        // introspection (multi-target installs surface every queued
336        // target via [`Self::bootstrap_function_keys`]); `pending`,
337        // `in_flight`, `pending_requests`, `waiting`, and `next_idx`
338        // reset so `Node::run_bootstrap` is a no-op on a restored Node —
339        // bumping the index to the end of `install_order` keeps the
340        // seeder from re-firing if the host nonetheless polls.
341        self.bootstrap.clear_for_restore();
342        self.framework = FrameworkComponents::new();
343        self.bus = TypedBus::new();
344        self.ingress = Arc::new(IngressQueue::new());
345        // ID counters reset to 0; the restore path re-applies the
346        // snapshot's persisted values so post-restore IDs continue
347        // from where the pre-snapshot Node left off ().
348        self.exec.ids.next_exec_id = 0;
349        self.exec.ids.next_command_id = 0;
350    }
351
352    /// Mint a fresh `ExecId`. Replaces the prior static counter
353    /// in `src/ids.rs` so allocation runs single-threaded under
354    /// the engine's borrow discipline.
355    pub fn allocate_exec_id(&mut self) -> ExecId {
356        let id = self.exec.ids.next_exec_id;
357        self.exec.ids.next_exec_id = self
358            .exec
359            .ids
360            .next_exec_id
361            .checked_add(1)
362            .expect("ExecId counter overflow");
363        ExecId::from(id)
364    }
365
366    /// Mint a fresh `CommandId`. Used by async-suspending syscall
367    /// ops via `RuntimeResourceRef::next_command_id`.
368    pub fn allocate_command_id(&mut self) -> CommandId {
369        let id = self.exec.ids.next_command_id;
370        self.exec.ids.next_command_id = self
371            .exec
372            .ids
373            .next_command_id
374            .checked_add(1)
375            .expect("CommandId counter overflow");
376        CommandId::from(id)
377    }
378
379    /// Mint a fresh `NodeSiteId`. Used by graph installation; sites
380    /// must be globally unique across installed graphs.
381    pub fn allocate_node_site_id(&mut self) -> NodeSiteId {
382        let id = self.exec.ids.next_node_site_id;
383        self.exec.ids.next_node_site_id = self
384            .exec
385            .ids
386            .next_node_site_id
387            .checked_add(1)
388            .expect("NodeSiteId counter overflow");
389        NodeSiteId::from(id)
390    }
391
392    /// Drop slot_table and execution_state entries belonging to
393    /// executions that have completed. An execution is complete
394    /// when it has no frontier entries, no pending_async entries,
395    /// and no pending_calls entry pointing at its `ExecId`. Called
396    /// at the end of every `poll()` cycle so a long-running Node
397    /// keeps a bounded slot_table.
398    pub(crate) fn gc_completed_executions(&mut self) {
399        if self.exec.execution_state.is_empty() {
400            return;
401        }
402        let mut live: std::collections::HashSet<ExecId> =
403            std::collections::HashSet::with_capacity(self.exec.execution_state.len());
404        for (_, exec_id) in &self.exec.frontier {
405            live.insert(*exec_id);
406        }
407        for p in self.exec.pending_async.values() {
408            live.insert(p.exec_id);
409        }
410        for exec_id in self.exec.pending_calls.keys() {
411            live.insert(*exec_id);
412        }
413        let dead: Vec<ExecId> = self
414            .exec
415            .execution_state
416            .keys()
417            .copied()
418            .filter(|e| !live.contains(e))
419            .collect();
420        if dead.is_empty() {
421            return;
422        }
423        let dead_set: std::collections::HashSet<ExecId> = dead.iter().copied().collect();
424        // Walk doomed slot entries once to release any charged
425        // ingress bytes the slot-table writer admitted, then drop
426        // the entries. `retain` would let us mutate-in-place, but
427        // it borrows the table mutably for the entire walk; the
428        // explicit collect-then-remove pattern lets us drain
429        // `charged_bytes()` from each prior carrier first.
430        let doomed_keys: Vec<(NodeSiteId, ExecId)> = self
431            .exec
432            .slot_table
433            .iter()
434            .filter_map(|(key, _)| dead_set.contains(&key.1).then_some(*key))
435            .collect();
436        for key in doomed_keys {
437            // `clear_slot` releases the prior carrier's
438            // `charged_bytes()` against `ingress_bytes_in_flight`.
439            // Non-ingress carriers report 0 — release is a no-op.
440            let _ = self.clear_slot(key.0, key.1);
441        }
442        for exec_id in &dead {
443            self.exec.execution_state.remove(exec_id);
444        }
445    }
446
447    /// Apply production-safety caps from a `NodeConfig`. Called by
448    /// `Node::ensure_ready` after constructing the Engine; tests can
449    /// invoke directly to exercise specific cap values.
450    pub fn apply_config_caps(&mut self, config: &crate::node::NodeConfig) {
451        self.cycle_op_budget = config.cycle_op_budget;
452        self.max_pending_async = config.max_pending_async;
453        self.ingress_byte_budget = config.ingress_byte_budget;
454        self.framework
455            .outbound_queue
456            .set_cap(config.max_outbound_queue);
457        self.bus.set_cap(Some(config.bus_capacity));
458        // The off-thread `CompletionSink::complete` path consults the
459        // ingress queue itself for its per-item cap; reseed the
460        // atomic so sinks created before this call see the configured
461        // value on their next push.
462        self.ingress
463            .set_completion_result_cap(config.max_completion_result_bytes);
464        // Reseed the BackpressureTracker with the configured knobs.
465        // `apply_config_caps` is the canonical entry the host calls
466        // before the first poll, so a fresh tracker reflecting the
467        // resolved knobs is the only state observers see.
468        self.framework.peer_state.backpressure = crate::framework::BackpressureTracker::with_config(
469            config.backpressure_high_water_pct,
470            config.backpressure_k_before_silent,
471            config.backpressure_min_notice_interval_ns,
472        );
473    }
474
475    /// Live count of ingress bytes the engine currently holds across
476    /// its ingress queue + slot table + pending async completion
477    /// buffers. Updated by every successful charge / release pair on
478    /// the ingress paths. Surfaced for observability (operator
479    /// dashboards) and assertions.
480    pub fn ingress_bytes_in_flight(&self) -> usize {
481        self.ingress_bytes_in_flight
482    }
483
484    /// Configured cap on cumulative in-flight ingress bytes,
485    /// sourced from `NodeConfig::ingress_byte_budget`. Constant
486    /// between `apply_config_caps` calls.
487    pub fn ingress_byte_budget(&self) -> usize {
488        self.ingress_byte_budget
489    }
490
491    /// Pre-admission budget guard for an ingress payload of length
492    /// `bytes`. On success the bytes are added to
493    /// `ingress_bytes_in_flight` and the caller may install the
494    /// resulting carrier into the slot table or pending-completion
495    /// queue. On overflow the counter is left unchanged and the
496    /// caller drops the payload, emitting the appropriate
497    /// `BudgetExceeded` `InfraEvent`.
498    ///
499    /// One saturating-add + one comparison; below the cost of the
500    /// prost decode that typically follows.
501    pub(crate) fn try_charge(&mut self, bytes: usize) -> Result<(), BudgetExceededReason> {
502        let after = self.ingress_bytes_in_flight.saturating_add(bytes);
503        if after > self.ingress_byte_budget {
504            return Err(BudgetExceededReason {
505                byte_count: bytes,
506                budget_remaining: self
507                    .ingress_byte_budget
508                    .saturating_sub(self.ingress_bytes_in_flight),
509            });
510        }
511        self.ingress_bytes_in_flight = after;
512        Ok(())
513    }
514
515    /// Decrement `ingress_bytes_in_flight` after a charged payload
516    /// leaves engine state — slot-table overwrite, slot clear,
517    /// eviction, or in-cycle drop. `saturating_sub` defends against
518    /// a release path that arrives without a paired charge (e.g. a
519    /// snapshot replay reseeding the slot table before any wire
520    /// traffic).
521    pub(crate) fn release(&mut self, bytes: usize) {
522        self.ingress_bytes_in_flight = self.ingress_bytes_in_flight.saturating_sub(bytes);
523    }
524
525    /// Write a value into the slot table at `(site, exec_id)`,
526    /// releasing the prior occupant's `charged_bytes` against
527    /// `ingress_bytes_in_flight`. The incoming carrier's
528    /// `charged_bytes` is NOT re-added — admission callers
529    /// (`decode_typed_fill`, `deliver_event`, etc.) have already run
530    /// `try_charge` against the wire-byte budget. This helper is the
531    /// slot-table-side bookkeeping that closes the loop on overwrite.
532    ///
533    /// Returns the prior boxed value (if any) so the caller can
534    /// run additional teardown.
535    pub(crate) fn slot_write(
536        &mut self,
537        site: NodeSiteId,
538        exec_id: ExecId,
539        value: Box<dyn SlotValue>,
540    ) -> Option<Box<dyn SlotValue>> {
541        let key = (site, exec_id);
542        let prior = self.exec.slot_table.insert(key, Some(value));
543        // `prior` is `Option<Option<Box<dyn SlotValue>>>`: outer None
544        // means the slot was untouched; inner None means the slot
545        // existed but was empty (cleared previously). Release only
546        // when the prior carrier was alive.
547        match prior {
548            Some(Some(prior_box)) => {
549                self.ingress_bytes_in_flight = self
550                    .ingress_bytes_in_flight
551                    .saturating_sub(prior_box.charged_bytes());
552                Some(prior_box)
553            }
554            _ => None,
555        }
556    }
557
558    /// Remove the slot at `(site, exec_id)`, releasing any
559    /// `charged_bytes` the prior carrier was holding against
560    /// `ingress_bytes_in_flight`. Returns the removed carrier so
561    /// downstream paths (graph reset, GC) can pass it onward.
562    pub(crate) fn clear_slot(
563        &mut self,
564        site: NodeSiteId,
565        exec_id: ExecId,
566    ) -> Option<Box<dyn SlotValue>> {
567        let key = (site, exec_id);
568        match self.exec.slot_table.remove(&key) {
569            Some(Some(prior_box)) => {
570                self.ingress_bytes_in_flight = self
571                    .ingress_bytes_in_flight
572                    .saturating_sub(prior_box.charged_bytes());
573                Some(prior_box)
574            }
575            _ => None,
576        }
577    }
578
579    /// Snapshot of the engine's hot-path state, sized for cheap
580    /// reads on every poll cycle (no allocation). Production
581    /// observability: operators see saturation building up before
582    /// the process locks up.
583    pub fn engine_stats(&self) -> EngineStats {
584        EngineStats {
585            frontier_len: self.exec.frontier.len(),
586            bus_len: self.bus.len(),
587            pending_async: self.exec.pending_async.len(),
588            slot_table_occupied: self.exec.slot_table.len(),
589            ingress_depth: self.ingress.len(),
590            outbound_queue_depth: self.framework.outbound_queue.len(),
591            event_subscriptions: self.event_subscriptions.len(),
592            registered_components: self.components.len(),
593            graph_slots: self.graphs.len(),
594        }
595    }
596
597    /// Register a `ProtocolRuntime` dispatcher for the concrete
598    /// component type `T`. Call once per `T` after constructing the
599    /// Engine - typically alongside `register_component` for any
600    /// component whose `dispatch_atomic` you want to drive. Indexed
601    /// by `TypeId::of::<T>()` so dispatch is one HashMap lookup,
602    /// not a linear scan across the registry.
603    pub fn register_protocol_dispatcher<T: crate::roles::ProtocolRuntime + 'static>(&mut self)
604    where
605        T::Error: std::fmt::Display,
606    {
607        let type_id = std::any::TypeId::of::<T>();
608        self.role_dispatchers
609            .insert(type_id, make_protocol_dispatcher::<T>());
610    }
611
612    /// Register a role dispatcher keyed by `TypeId::of::<T>()` for a
613    /// concrete `IndexRuntime` impl. Lets `Node::with_index(&value)`
614    /// wire atomic dispatch even when `T` does not implement
615    /// `ProtocolRuntime`. Calling this twice for the same `T`
616    /// silently overwrites; the dispatcher is idempotent because
617    /// `T::dispatch_atomic` is the only consumer.
618    pub fn register_index_dispatcher<T: crate::roles::IndexRuntime + 'static>(&mut self)
619    where
620        <T as crate::roles::IndexRuntime>::Error: std::fmt::Display,
621    {
622        let type_id = std::any::TypeId::of::<T>();
623        self.role_dispatchers
624            .insert(type_id, crate::engine::invoke::make_index_dispatcher::<T>());
625    }
626
627    /// Register an `AggregatorRuntime` dispatcher. See
628    /// [`Engine::register_index_dispatcher`] for the rationale.
629    pub fn register_aggregator_dispatcher<T: crate::roles::AggregatorRuntime + 'static>(&mut self)
630    where
631        <T as crate::roles::AggregatorRuntime>::Error: std::fmt::Display,
632    {
633        let type_id = std::any::TypeId::of::<T>();
634        self.role_dispatchers.insert(
635            type_id,
636            crate::engine::invoke::make_aggregator_dispatcher::<T>(),
637        );
638    }
639
640    /// Register a `ModelRuntime` dispatcher. See
641    /// [`Engine::register_index_dispatcher`] for the rationale.
642    pub fn register_model_dispatcher<T: crate::roles::ModelRuntime + 'static>(&mut self)
643    where
644        <T as crate::roles::ModelRuntime>::Error: std::fmt::Display,
645    {
646        let type_id = std::any::TypeId::of::<T>();
647        self.role_dispatchers
648            .insert(type_id, crate::engine::invoke::make_model_dispatcher::<T>());
649    }
650
651    /// Register a `CodecRuntime` dispatcher. See
652    /// [`Engine::register_index_dispatcher`] for the rationale.
653    pub fn register_codec_dispatcher<T: crate::roles::CodecRuntime + 'static>(&mut self)
654    where
655        <T as crate::roles::CodecRuntime>::Error: std::fmt::Display,
656    {
657        let type_id = std::any::TypeId::of::<T>();
658        self.role_dispatchers
659            .insert(type_id, crate::engine::invoke::make_codec_dispatcher::<T>());
660    }
661
662    /// Register a `DataSourceRuntime` dispatcher. See
663    /// [`Engine::register_index_dispatcher`] for the rationale.
664    pub fn register_data_source_dispatcher<T: crate::roles::DataSourceRuntime + 'static>(&mut self)
665    where
666        <T as crate::roles::DataSourceRuntime>::Error: std::fmt::Display,
667    {
668        let type_id = std::any::TypeId::of::<T>();
669        self.role_dispatchers.insert(
670            type_id,
671            crate::engine::invoke::make_data_source_dispatcher::<T>(),
672        );
673    }
674
675    /// Register a `PeerSelectorRuntime` dispatcher. See
676    /// [`Engine::register_index_dispatcher`] for the rationale.
677    pub fn register_peer_selector_dispatcher<T: crate::roles::PeerSelectorRuntime + 'static>(
678        &mut self,
679    ) where
680        <T as crate::roles::PeerSelectorRuntime>::Error: std::fmt::Display,
681    {
682        let type_id = std::any::TypeId::of::<T>();
683        self.role_dispatchers.insert(
684            type_id,
685            crate::engine::invoke::make_peer_selector_dispatcher::<T>(),
686        );
687    }
688
689    /// Register a `BackendRuntime` dispatcher. The `Backend` Contract
690    /// trait's per-atomic-op surface dispatches through this entry,
691    /// emitted from `#[derive(bb::Backend)]`.
692    pub fn register_backend_dispatcher<T: crate::roles::BackendRuntime + 'static>(&mut self)
693    where
694        <T as crate::roles::BackendRuntime>::Error: std::fmt::Display,
695    {
696        let type_id = std::any::TypeId::of::<T>();
697        self.role_dispatchers.insert(
698            type_id,
699            crate::engine::invoke::make_backend_dispatcher::<T>(),
700        );
701    }
702
703    /// Register a `Bootstrap` dispatcher. The `Bootstrap` Contract
704    /// method dispatches through this entry — keyed on `TypeId::of::<T>()`
705    /// so the engine's `fire_ready_bootstrap` Component arm reaches
706    /// the concrete `T::bootstrap` impl via downcast without scanning
707    /// the registry. The derive bridge in F5 emits the call to this
708    /// method alongside the Component's other role registrations.
709    pub fn register_bootstrap_dispatcher<T: crate::contracts::bootstrap::Bootstrap + 'static>(
710        &mut self,
711    ) where
712        <T as crate::contracts::bootstrap::Bootstrap>::Error: std::fmt::Display,
713    {
714        let type_id = std::any::TypeId::of::<T>();
715        self.bootstrap_dispatchers.insert(
716            type_id,
717            crate::engine::invoke::make_bootstrap_dispatcher::<T>(),
718        );
719    }
720
721    /// Bind a Component bootstrap entry. Records `slot → cref` in
722    /// `bootstrap.component_bootstraps`; subsequent host-supplied
723    /// `BootstrapRequest`s targeting `slot` resolve the bound
724    /// `ComponentRef` through this map. Wires the Component-arm
725    /// seam the F5 install path will populate; F3 Commit 3 exposes
726    /// it under `test-components` so the integration tests in
727    /// `core_component_bootstrap_tests.rs` can register fixtures
728    /// without going through the install pipeline.
729    #[cfg(any(test, feature = "test-components"))]
730    pub fn register_component_bootstrap(&mut self, slot: &str, cref: ComponentRef) {
731        self.bootstrap.component_bootstraps.insert(
732            slot.to_string(),
733            crate::engine::bootstrap::ComponentBootstrap { cref },
734        );
735    }
736
737    /// Record the inventory-declared roles for a registered
738    /// component. `Node::ensure_ready` calls this once per
739    /// `ComponentRef` after registration, passing the set computed
740    /// from `crate::registry::roles_for_component(T::TYPE_NAME)`.
741    pub fn set_component_roles(
742        &mut self,
743        cref: ComponentRef,
744        roles: std::collections::HashSet<crate::registry::ComponentRole>,
745    ) {
746        self.component_roles.insert(cref, roles);
747    }
748
749    /// Return the inventory-declared roles for a registered
750    /// component, or an empty set if the component wasn't registered
751    /// through a derive emitting `ComponentRoleBinding` entries.
752    /// Introspection surface for engine tests + host tooling.
753    pub fn roles_for(
754        &self,
755        cref: ComponentRef,
756    ) -> std::collections::HashSet<crate::registry::ComponentRole> {
757        self.component_roles.get(&cref).cloned().unwrap_or_default()
758    }
759
760    /// Register a `binding_id → ComponentRef` mapping. Called by
761    /// `Node::ensure_ready` after binding resolution so the bound-
762    /// backend lookup can resolve a NodeProto's `binding_id`
763    /// metadata against an installed component.
764    pub fn register_binding_id(&mut self, binding_id: String, cref: ComponentRef) {
765        self.binding_id_index.insert(binding_id, cref);
766    }
767
768    /// Look up the `ComponentRef` bound at the given slot name -
769    /// the GENERIC dependency-resolution accessor. Components reach
770    /// declared dependencies through this method (typically via
771    /// [`crate::runtime::ComponentsView::for_slot`] at dispatch
772    /// time). Returns `None` when no slot of that name is bound.
773    pub fn slot(&self, slot: &str) -> Option<ComponentRef> {
774        self.slots.get(slot).copied()
775    }
776
777    /// Bind a `ComponentRef` at the given slot name. The
778    /// `install` facade in the `bytesandbrains` crate calls this
779    /// from the T8 chain; in-crate callers use it from the
780    /// transitional `Node::with_backend(slot, &b)` path. Returns
781    /// the previous binding if any.
782    pub fn bind_slot(&mut self, slot: String, cref: ComponentRef) -> Option<ComponentRef> {
783        self.slots.insert(slot, cref)
784    }
785
786    /// Register the compiler-assigned `slot_id` → `ComponentRef`
787    /// mapping. Called by `bb::install()` alongside
788    /// [`Self::bind_slot`]; the pair is read by
789    /// [`Self::resolve_dispatch`] when stamping
790    /// `OpDispatch::Atomic` against a role NodeProto's
791    /// `ai.bytesandbrains.slot_id` metadata. Returns the previous
792    /// binding, if any.
793    pub fn bind_slot_id(&mut self, slot_id: u32, cref: ComponentRef) -> Option<ComponentRef> {
794        self.slot_id_to_cref.insert(slot_id, cref)
795    }
796
797    /// Register the compiler-assigned `slot_id` → `(role,
798    /// ComponentRef)` mapping. Called by `bb::install()` alongside
799    /// [`Self::bind_slot_id`]; the role is required so
800    /// `decode_typed_fill` can branch between framework-carrier
801    /// decode (`Codec`, `Index`, …) and backend-mediated tensor
802    /// materialisation (`Backend`). Returns the previous binding, if
803    /// any.
804    pub fn bind_slot_id_with_role(
805        &mut self,
806        slot_id: u32,
807        role: crate::registry::ComponentRole,
808        cref: ComponentRef,
809    ) -> Option<(crate::registry::ComponentRole, ComponentRef)> {
810        self.slot_id_to_role_ref.insert(slot_id, (role, cref))
811    }
812
813    /// Look up the `(role, ComponentRef)` bound at a compiler-assigned
814    /// `slot_id`. Used by `decode_typed_fill` to discover whether an
815    /// inbound wire payload routes through a backend.
816    pub fn role_ref_for_slot_id(
817        &self,
818        slot_id: u32,
819    ) -> Option<(crate::registry::ComponentRole, ComponentRef)> {
820        self.slot_id_to_role_ref.get(&slot_id).copied()
821    }
822
823    /// Iterate every `(slot_name, ComponentRef)` pair currently
824    /// bound. Surfaces the registry to introspection tools + the
825    /// compiler's resolve-dependencies pass.
826    pub fn slots_iter(&self) -> impl Iterator<Item = (&str, ComponentRef)> {
827        self.slots.iter().map(|(k, v)| (k.as_str(), *v))
828    }
829
830    /// Subscribe a `NodeSiteId` to bus events of `event_kind` (the
831    /// discriminator returned by [`crate::bus::NodeEvent::kind`]).
832    /// The bus-routing pass writes a `TriggerValue` to each
833    /// subscribed site at a fresh `ExecId` and pushes the site's
834    /// downstream consumers onto the frontier — uniform with wire
835    /// delivery semantics per `docs/ADDRESSING.md`.
836    ///
837    /// `Node` calls this at install time for every
838    /// `EventSource` syscall op, passing the op's output `NodeSiteId`.
839    pub fn register_event_subscription(&mut self, event_kind: &str, site: NodeSiteId) {
840        let entry = self
841            .event_subscriptions
842            .entry(event_kind.to_string())
843            .or_default();
844        if !entry.contains(&site) {
845            entry.push(site);
846        }
847    }
848
849    /// Cheap clone of the shared `IngressQueue` handle. Test
850    /// harnesses + transport adapters push `IngressEvent`s through
851    /// this surface.
852    pub fn ingress_queue_handle(&self) -> Arc<IngressQueue> {
853        Arc::clone(&self.ingress)
854    }
855
856    /// Queue a lifecycle phase for firing on the next `poll()`
857    /// call. The framework emits `EngineStep::LifecycleFired { phase }`
858    /// for each queued phase and also pushes every `LifecyclePhase`
859    /// op enrolled under that phase name (via
860    /// [`Engine::register_lifecycle_op`]) onto the frontier with a
861    /// fresh `ExecId`.
862    pub fn fire_lifecycle(&mut self, phase: &str) {
863        self.fired_phases.push(phase.to_string());
864    }
865
866    /// Enroll `op_ref` under `phase` per IR_AND_DSL.md §5a.
867    /// Idempotent - the same `(phase, OpRef)` pair never enrolls
868    /// twice. `Node` calls this at install time after parsing
869    /// each `LifecyclePhase` NodeProto's `phase` attribute.
870    pub fn register_lifecycle_op(&mut self, phase: &str, op_ref: OpRef) {
871        let entry = self.lifecycle_table.entry(phase.to_string()).or_default();
872        if !entry.contains(&op_ref) {
873            entry.push(op_ref);
874        }
875    }
876
877    /// Register a stateless syscall op. Captures `TypeId::of::<T>()`
878    /// into both `dispatch_table` (TypeId → invoke fn) and
879    /// `syscall_index` ((domain, op_type) → TypeId) so
880    /// `resolve_dispatch` can stamp `OpDispatch::Stateless`.
881    ///
882    /// Register a stateless syscall by its `(domain, op_type)` key.
883    pub fn register_syscall(&mut self, domain: &str, op_type: &str, invoke_fn: StatelessInvokeFn) {
884        self.syscall_table
885            .insert((domain.to_string(), op_type.to_string()), invoke_fn);
886    }
887
888    /// Install every framework syscall shipped via inventory by
889    /// `bb-ops`. Each registration carries its own
890    /// `(domain, op_type)` + invoke pointer; the engine stamps them
891    /// into `syscall_table`.
892    pub fn register_all_framework_syscalls(&mut self) {
893        for reg in crate::registry::framework_syscalls() {
894            self.syscall_table.insert(
895                (reg.domain.to_string(), reg.op_type.to_string()),
896                reg.invoke,
897            );
898        }
899    }
900
901    /// Test-only installer. Inserts a fresh `GraphSlot` keyed by
902    /// `name` with empty per-node tables but with `op_dispatch`
903    /// pre-filled with `Unresolved` so subsequent `resolve_dispatch`
904    /// can stamp dispatch decisions. Use [`Engine::install_graph`]
905    /// for the canonical path that walks the FunctionProto.
906    #[cfg(any(test, feature = "test-components"))]
907    pub fn install_graph_for_test(
908        &mut self,
909        name: String,
910        function: FunctionProto,
911    ) -> &mut GraphSlot {
912        let mut g = GraphSlot::new_for_test(name.clone(), function);
913        g.op_dispatch = (0..g.function.node.len())
914            .map(|_| crate::engine::dispatch_entry::OpDispatch::Unresolved)
915            .collect();
916        let idx = self.push_graph_slot(name, g);
917        &mut self.graphs[idx as usize]
918    }
919
920    /// Push a `GraphSlot` onto the storage Vec and register its
921    /// name → index entry. Returns the assigned `graph_idx` (the
922    /// value that gets packed into `OpRef`). Same name twice
923    /// overwrites the existing slot but keeps the original
924    /// `graph_idx` — preserving OpRef stability across re-install.
925    pub(crate) fn push_graph_slot(&mut self, name: String, slot: GraphSlot) -> u32 {
926        if let Some(&idx) = self.graph_index.get(&name) {
927            self.graphs[idx as usize] = slot;
928            return idx;
929        }
930        let idx = self.graphs.len() as u32;
931        self.graph_index.insert(name, idx);
932        self.graphs.push(slot);
933        idx
934    }
935
936    /// Resolve a graph by name. Returns `None` when the name
937    /// isn't registered. Equivalent to `self.graphs.get(name)` on
938    /// the prior HashMap-keyed shape.
939    pub fn graph(&self, name: &str) -> Option<&GraphSlot> {
940        let idx = *self.graph_index.get(name)?;
941        self.graphs.get(idx as usize)
942    }
943
944    /// Resolve a graph by name for mutation. `None` when the name
945    /// isn't registered.
946    pub fn graph_mut(&mut self, name: &str) -> Option<&mut GraphSlot> {
947        let idx = *self.graph_index.get(name)?;
948        self.graphs.get_mut(idx as usize)
949    }
950
951    /// `true` when a graph with this name is installed.
952    pub fn has_graph(&self, name: &str) -> bool {
953        self.graph_index.contains_key(name)
954    }
955
956    /// Resolve a graph's positional index by name. Used by paths
957    /// that need to compute `OpRef::pack(idx, node_idx)` from a
958    /// graph name (function-call site resolution, etc.).
959    pub fn graph_idx(&self, name: &str) -> Option<u32> {
960        self.graph_index.get(name).copied()
961    }
962
963    /// Build an `OpRef` for the `node_idx`-th NodeProto of a graph
964    /// identified by name. Test-only convenience for tests that
965    /// used to fish the OpRef out of `GraphSlot.op_index`; with
966    /// positional `OpRef::pack(graph_idx, node_idx)` the lookup is
967    /// trivial.
968    #[cfg(any(test, feature = "test-components"))]
969    pub fn op_ref_at(&self, graph_name: &str, node_idx: u32) -> Option<OpRef> {
970        let gi = self.graph_idx(graph_name)?;
971        let g = self.graphs.get(gi as usize)?;
972        if (node_idx as usize) < g.function.node.len() {
973            Some(OpRef::pack(gi, node_idx))
974        } else {
975            None
976        }
977    }
978
979    /// Iterate every installed `GraphSlot` in install order.
980    pub fn graphs_iter(&self) -> impl Iterator<Item = &GraphSlot> {
981        self.graphs.iter()
982    }
983
984    /// Iterate every (`name`, `&GraphSlot`) pair in install order.
985    pub fn graphs_named(&self) -> impl Iterator<Item = (&str, &GraphSlot)> {
986        // graph_index maps name -> idx; rebuild idx -> name for the
987        // walk so the iteration order matches the storage Vec.
988        let mut by_idx: Vec<(u32, &str)> = self
989            .graph_index
990            .iter()
991            .map(|(n, i)| (*i, n.as_str()))
992            .collect();
993        by_idx.sort_by_key(|&(i, _)| i);
994        by_idx
995            .into_iter()
996            .filter_map(move |(i, n)| self.graphs.get(i as usize).map(|g| (n, g)))
997    }
998
999    /// Canonical install path: builds an
1000    /// [`GraphSlot`] from the FunctionProto (allocating
1001    /// `OpRef`s + `NodeSiteId`s for every node + produced value) and
1002    /// inserts it under `name`.
1003    ///
1004    /// Used by [`crate::node::Node::ready`] for each
1005    /// `ModelProto.functions[0]`. Returns a mutable reference
1006    /// for any subsequent setup (slot_bindings, local_event_subs).
1007    pub fn install_graph(&mut self, name: String, function: FunctionProto) -> &mut GraphSlot {
1008        let graph_idx = self.graphs.len() as u32;
1009        let mut g = GraphSlot::from_function(
1010            name.clone(),
1011            function,
1012            graph_idx,
1013            &mut self.exec.ids.next_node_site_id,
1014        );
1015        // Entry-point graphs (installed via `install_graph`, not
1016        // `install_function_library`) get a `NodeSiteId` registered
1017        // for every function input so `Engine::deliver_app_event`
1018        // can seed the input via ingress. Body functions used in
1019        // `OpDispatch::FunctionCall` deliberately route through
1020        // `input_aliases` and must NOT get input sites; that path
1021        // installs through `install_function_library` instead.
1022        register_function_input_sites(&mut g, &mut self.exec.ids.next_node_site_id, graph_idx);
1023        let idx = self.push_graph_slot(name, g);
1024        &mut self.graphs[idx as usize]
1025    }
1026
1027    /// Runtime-linker install: walk `model.functions[]` and install
1028    /// each FunctionProto as an `GraphSlot` keyed by its
1029    /// canonical `(domain, name, overload)`-derived string. Also
1030    /// populates the symbol-table index `functions` keyed on the same
1031    /// tuple, so call NodeProtos can be resolved at dispatch time.
1032    ///
1033    /// `entry_point_keys` lists the `FunctionKey`s for the registered
1034    /// Modules' main partition functions - those graphs get
1035    /// `is_entry_point = true` (their top-level outputs surface as
1036    /// `EngineStep::AppEvent`; sub-function bodies do not).
1037    ///
1038    /// A function stamped `MODULE_PHASE_KEY = "bootstrap"` registers
1039    /// its `FunctionKey` with the engine's bootstrap state (appends
1040    /// to `install_order`, populates `module_bootstraps`) without
1041    /// arming `pending` — install is pure. The host arms the queue by
1042    /// calling [`crate::node::Node::run_bootstrap`], which fans out
1043    /// each install-order target serially and emits one
1044    /// `BootstrapComplete` step per drained phase; multi-target
1045    /// installs surface their targets in slice order without further
1046    /// host action.
1047    ///
1048    /// Idempotent under ODR (same key + same body) - silently skips
1049    /// reinstall. Caller (Node linker) is responsible for the
1050    /// byte-equality check before calling.
1051    pub fn install_function_library(
1052        &mut self,
1053        functions: &[FunctionProto],
1054        entry_point_keys: &[FunctionKey],
1055    ) {
1056        let entry_set: std::collections::HashSet<&FunctionKey> = entry_point_keys.iter().collect();
1057        // Collect bootstrap keys + target names registered this call
1058        // so the post-pass can stamp touch sets once every function
1059        // in the batch is discoverable via `self.functions` —
1060        // necessary for forward references (a bootstrap body that
1061        // calls a sibling function declared later in `functions`).
1062        let mut new_bootstrap_targets: Vec<(FunctionKey, String)> = Vec::new();
1063        for f in functions {
1064            let key: FunctionKey = (f.domain.clone(), f.name.clone(), f.overload.clone());
1065            let graph_name = graph_name_for(&key);
1066            if self.has_graph(&graph_name) {
1067                continue;
1068            }
1069            let graph_idx = self.graphs.len() as u32;
1070            let mut g = GraphSlot::from_function(
1071                graph_name.clone(),
1072                f.clone(),
1073                graph_idx,
1074                &mut self.exec.ids.next_node_site_id,
1075            );
1076            // Only entry-point functions surface their outputs as
1077            // AppEvents. Sub-function bodies' outputs are forwarded
1078            // via output_forwarding at call sites.
1079            g.is_entry_point = entry_set.contains(&key);
1080            if !g.is_entry_point {
1081                g.top_level_outputs.clear();
1082            }
1083            let is_bootstrap = bb_ir::keys::read_function_module_phase(f)
1084                .is_some_and(|p| p == bb_ir::keys::MODULE_PHASE_BOOTSTRAP);
1085            // Bootstrap functions seed their inputs through the F5
1086            // host-driven staging path
1087            // (`Engine::enqueue_bootstrap_request`) rather than via a
1088            // FunctionCall splice. Mint a `NodeSiteId` per declared
1089            // input formal so the staging path can address the slot
1090            // via `(NodeSiteId, body_exec_id)` and the body ops can
1091            // resolve their input names through `resolve_site_name`.
1092            if is_bootstrap {
1093                register_function_input_sites(
1094                    &mut g,
1095                    &mut self.exec.ids.next_node_site_id,
1096                    graph_idx,
1097                );
1098            }
1099            self.push_graph_slot(graph_name, g);
1100            self.functions.insert(key.clone(), f.clone());
1101            if is_bootstrap {
1102                // Register the bootstrap target. A multi-target
1103                // install registers one bootstrap per target (in
1104                // the order [`crate::install::install`] iterates the
1105                // user-supplied `targets` slice). Seeding drains
1106                // `install_order` front-to-back so each target's
1107                // bootstrap fires in slice order.
1108                let target_name = key.1.clone();
1109                self.bootstrap.register_module(key.clone());
1110                new_bootstrap_targets.push((key, target_name));
1111            }
1112        }
1113        // Stamp touch sets for every bootstrap target registered in
1114        // this batch. Deferred until after the install loop so
1115        // forward-referenced FunctionCalls (callees declared later
1116        // in `functions`) resolve against the fully populated
1117        // `self.functions` registry.
1118        for (key, target_name) in new_bootstrap_targets {
1119            let touch_set = self.compute_touch_set(&key);
1120            if let Some(meta) = self.bootstrap.module_bootstraps.get_mut(&target_name) {
1121                meta.touch_set = touch_set;
1122            }
1123        }
1124    }
1125
1126    /// Closure of every `ComponentRef` referenced by `function_key`'s
1127    /// body (slot-id NodeProtos + transitive FunctionCall callees).
1128    /// Walks the function body once: each NodeProto carrying
1129    /// [`bb_ir::keys::SLOT_ID_KEY`] contributes the bound
1130    /// `ComponentRef` from [`Self::slot_id_to_cref`]; each NodeProto
1131    /// whose `(domain, op_type, overload)` resolves a sibling
1132    /// FunctionProto in [`Self::functions`] recurses on that callee.
1133    /// `visited_keys` defends against bootstrap recursion cycles
1134    /// (Module A bootstrap calls Module B body that calls Module A
1135    /// body) so the walk terminates even if the program graph
1136    /// contains a back-edge through FunctionCalls.
1137    pub(crate) fn compute_touch_set(
1138        &self,
1139        function_key: &FunctionKey,
1140    ) -> std::collections::HashSet<ComponentRef> {
1141        let mut touch_set: std::collections::HashSet<ComponentRef> =
1142            std::collections::HashSet::new();
1143        let mut visited_keys: std::collections::HashSet<FunctionKey> =
1144            std::collections::HashSet::new();
1145        self.collect_touch_set(function_key, &mut visited_keys, &mut touch_set);
1146        touch_set
1147    }
1148
1149    /// Recursive worker for [`Self::compute_touch_set`]. Skips
1150    /// already-visited keys (cycle defense) and missing-from-registry
1151    /// keys (defensive: the install path may discover a key whose
1152    /// callee was elided by upstream passes).
1153    fn collect_touch_set(
1154        &self,
1155        function_key: &FunctionKey,
1156        visited_keys: &mut std::collections::HashSet<FunctionKey>,
1157        touch_set: &mut std::collections::HashSet<ComponentRef>,
1158    ) {
1159        if !visited_keys.insert(function_key.clone()) {
1160            return;
1161        }
1162        let Some(function) = self.functions.get(function_key) else {
1163            return;
1164        };
1165        // `function` is borrowed off `self.functions`; the recursive
1166        // calls only read from other Engine fields (`slot_id_to_cref`,
1167        // `functions`) so the borrow holds across the loop.
1168        for node in &function.node {
1169            if let Some(slot_id) = node
1170                .metadata_props
1171                .iter()
1172                .find(|p| p.key == bb_ir::keys::SLOT_ID_KEY)
1173                .and_then(|p| p.value.parse::<u32>().ok())
1174            {
1175                if let Some(&cref) = self.slot_id_to_cref.get(&slot_id) {
1176                    touch_set.insert(cref);
1177                }
1178            }
1179            let callee_key: FunctionKey = (
1180                node.domain.clone(),
1181                node.op_type.clone(),
1182                node.overload.clone(),
1183            );
1184            if self.functions.contains_key(&callee_key) {
1185                self.collect_touch_set(&callee_key, visited_keys, touch_set);
1186            }
1187        }
1188    }
1189
1190    /// Allocate the next bootstrap call's body `ExecId` and push every
1191    /// body OpRef of the front of
1192    /// [`crate::engine::bootstrap::BootstrapState::install_order`]
1193    /// onto the frontier. Returns `true` when a bootstrap call was
1194    /// seeded; `false` when the engine has no remaining bootstrap
1195    /// functions or the previous call is still in flight.
1196    ///
1197    /// Host-driven (F4): `Node::run_bootstrap` invokes this once after
1198    /// arming `bootstrap.pending`; the poll cascade reseeds via
1199    /// `maybe_complete_bootstrap` after each phase drains so multi-
1200    /// target installs surface one `BootstrapComplete` per target in
1201    /// install order without further host action. Install itself no
1202    /// longer arms `pending`.
1203    pub(crate) fn seed_bootstrap_call(&mut self) -> bool {
1204        if self.bootstrap.module_exec_id().is_some() {
1205            return false;
1206        }
1207        if self.bootstrap.next_idx >= self.bootstrap.install_order.len() {
1208            // No further bootstraps to seed; the gate clears on the
1209            // next `maybe_complete_bootstrap` pass.
1210            self.bootstrap.pending = false;
1211            return false;
1212        }
1213        let target_name = self.bootstrap.install_order[self.bootstrap.next_idx].clone();
1214        let Some(meta) = self.bootstrap.module_bootstraps.get(&target_name) else {
1215            // Defensive skip: install_order names a target whose
1216            // metadata is missing. Advance past the stale entry
1217            // rather than wedging.
1218            self.bootstrap.next_idx += 1;
1219            return false;
1220        };
1221        let key = meta.function_key.clone();
1222        let touch_set = meta.touch_set.clone();
1223        if self
1224            .fire_module_bootstrap(target_name, &key, touch_set)
1225            .is_none()
1226        {
1227            self.bootstrap.next_idx += 1;
1228            return false;
1229        }
1230        true
1231    }
1232
1233    /// Seed a Module bootstrap body onto the frontier under a fresh
1234    /// ExecId and record it in `bootstrap.in_flight`. Shared by the
1235    /// install-order kick seeder ([`Self::seed_bootstrap_call`]) and
1236    /// the conflict-queue driver ([`Self::fire_ready_bootstrap`]).
1237    /// Returns the body ExecId on success or `None` when the graph
1238    /// name is missing (defensive — install populates it).
1239    fn fire_module_bootstrap(
1240        &mut self,
1241        target_name: String,
1242        key: &FunctionKey,
1243        touch_set: std::collections::HashSet<ComponentRef>,
1244    ) -> Option<ExecId> {
1245        let graph_name = graph_name_for(key);
1246        let graph_idx = self.graph_idx(&graph_name)?;
1247        let body_exec_id = self.allocate_exec_id();
1248        let node_count = self
1249            .graphs
1250            .get(graph_idx as usize)
1251            .map(|g| g.function.node.len())
1252            .unwrap_or(0);
1253        // Bootstrap takes no input formals and produces no outputs,
1254        // so no CallContext lives in `pending_calls`; the body-op
1255        // gate identifies bootstrap-descendant ExecIds by either
1256        // direct match against an in_flight ExecId or chain walk
1257        // through descendant FunctionCall CallContexts. Quiescence
1258        // resolves through `maybe_complete_bootstrap` once every
1259        // descendant frontier + pending_async entry clears.
1260        self.bootstrap
1261            .mark_module_in_flight(target_name, body_exec_id, touch_set);
1262        for node_idx in 0..node_count as u32 {
1263            let op_ref = OpRef::pack(graph_idx, node_idx);
1264            self.exec.frontier.push_back((op_ref, body_exec_id));
1265        }
1266        Some(body_exec_id)
1267    }
1268
1269    /// Drive a [`crate::engine::bootstrap::ReadyBootstrap`] returned
1270    /// by `BootstrapState::process_pending_requests` or
1271    /// `on_bootstrap_drained` onto the frontier. Module-kind entries
1272    /// allocate an ExecId and push body ops; Component-kind entries
1273    /// allocate an ExecId, lock the bound `ComponentRef` via the
1274    /// gate's touch set, and synchronously invoke the registered
1275    /// `Bootstrap::bootstrap` impl through the dispatcher registry.
1276    /// Returns the body ExecId on success.
1277    pub(crate) fn fire_ready_bootstrap(
1278        &mut self,
1279        ready: crate::engine::bootstrap::ReadyBootstrap,
1280    ) -> Option<ExecId> {
1281        match ready.kind {
1282            crate::engine::bootstrap::BootstrapKind::Module { target } => {
1283                let key = self
1284                    .bootstrap
1285                    .module_bootstraps
1286                    .get(&target)
1287                    .map(|m| m.function_key.clone())?;
1288                self.fire_module_bootstrap(target, &key, ready.touch_set)
1289            }
1290            crate::engine::bootstrap::BootstrapKind::Component { slot } => {
1291                self.fire_component_bootstrap(slot, ready.touch_set)
1292            }
1293        }
1294    }
1295
1296    /// Seed a Component bootstrap: resolve `slot → cref` via
1297    /// `bootstrap.component_bootstraps`, allocate an ExecId, lock the
1298    /// `{cref}` touch set on `bootstrap.in_flight` so body ops on
1299    /// disjoint Components keep firing, and synchronously invoke the
1300    /// concrete `Bootstrap::bootstrap` through the dispatcher
1301    /// registry. Returns the body ExecId on success.
1302    ///
1303    /// `DispatchResult::Immediate` retires the in-flight entry
1304    /// in-line via [`crate::engine::bootstrap::BootstrapState::on_bootstrap_drained`]
1305    /// and fires any promoted waiters so the conflict-queue path
1306    /// stays uniform across Module and Component kinds.
1307    /// `DispatchResult::Async` parks the ExecId on `pending_async`
1308    /// under a synthetic `OpRef`; the regular `handle_completion`
1309    /// path drives the eventual drain once the impl calls
1310    /// `ctx.complete_command(cmd_id, …)`.
1311    fn fire_component_bootstrap(
1312        &mut self,
1313        slot: String,
1314        touch_set: std::collections::HashSet<ComponentRef>,
1315    ) -> Option<ExecId> {
1316        let cref = self.bootstrap.component_bootstraps.get(&slot)?.cref;
1317        let body_exec_id = self.allocate_exec_id();
1318        // Use the supplied touch_set so caller-supplied lock semantics
1319        // win when present; F4 may extend the closure to include
1320        // declared dependencies. Falling back to `{cref}` keeps the
1321        // gate locking the dispatching Component even when the host
1322        // forgets to pass a touch_set (defensive — production
1323        // callers always populate it).
1324        let touch_set = if touch_set.is_empty() {
1325            let mut t = std::collections::HashSet::new();
1326            t.insert(cref);
1327            t
1328        } else {
1329            touch_set
1330        };
1331        self.bootstrap
1332            .in_flight
1333            .push(crate::engine::bootstrap::InFlightBootstrap {
1334                kind: crate::engine::bootstrap::BootstrapKind::Component { slot: slot.clone() },
1335                exec_id: body_exec_id,
1336                touch_set,
1337                staged_inputs: std::collections::HashSet::new(),
1338            });
1339        // Resolve dispatcher + invoke. Errors here surface as a
1340        // failed bootstrap step — the host sees an EngineStep
1341        // `OpFailed`-style emission once F5 wires the step plumbing;
1342        // for Commit 3 the synchronous-Immediate path leaves the
1343        // engine state ready for the next ready bootstrap and lets
1344        // the caller observe the success via `bootstrap.in_flight`
1345        // draining.
1346        let outcome = self.dispatch_component_bootstrap(cref);
1347        match outcome {
1348            Ok(crate::atomic::DispatchResult::Immediate(_outputs)) => {
1349                // Component bootstrap produces no slot outputs (the
1350                // Bootstrap trait returns `Result<(), Error>`). Retire
1351                // the in-flight entry by ExecId and promote any
1352                // waiters; the gate re-opens for the locked
1353                // ComponentRef once the in_flight retain step lands.
1354                let promoted = self.bootstrap.on_bootstrap_drained(body_exec_id);
1355                for r in promoted {
1356                    let _ = self.fire_ready_bootstrap(r);
1357                }
1358                Some(body_exec_id)
1359            }
1360            Ok(crate::atomic::DispatchResult::Async(cmd_id)) => {
1361                // Park the body ExecId on pending_async under a
1362                // synthetic OpRef. graph_idx = u32::MAX is reserved
1363                // for Component-bootstrap synthetic ops so the
1364                // regular `node_for` lookup returns `None` (no
1365                // NodeProto on the frontier) and downstream code
1366                // skips dispatch retries.
1367                let synthetic_op = OpRef::pack(u32::MAX, 0);
1368                self.exec.pending_async.insert(
1369                    cmd_id,
1370                    crate::engine::pending_async::PendingAsync {
1371                        op_ref: synthetic_op,
1372                        exec_id: body_exec_id,
1373                        output_sites: Vec::new(),
1374                        deadline_ns: None,
1375                    },
1376                );
1377                Some(body_exec_id)
1378            }
1379            Err(_) => {
1380                // Dispatcher missing or impl returned Err — retire
1381                // the in-flight entry so the queue does not wedge;
1382                // the host observes the unfinished work via the
1383                // missing BootstrapComplete step. F5 promotes this
1384                // path to an OpFailed-style EngineStep.
1385                let _ = self.bootstrap.on_bootstrap_drained(body_exec_id);
1386                None
1387            }
1388        }
1389    }
1390
1391    /// Look up + invoke the Bootstrap dispatcher for the Component at
1392    /// `cref`. Uses the same take-restore pattern as `invoke_atomic`
1393    /// so the dispatch closure has exclusive access to the concrete
1394    /// while the rest of `engine.components` stays borrowable for
1395    /// `RuntimeResourceRef`-style accessors (F5 plumbing — today the
1396    /// Bootstrap ctx carries only the cref).
1397    fn dispatch_component_bootstrap(
1398        &mut self,
1399        cref: ComponentRef,
1400    ) -> Result<crate::atomic::DispatchResult, String> {
1401        let mut taken = self
1402            .take_component(cref)
1403            .ok_or_else(|| "component missing".to_string())?;
1404        // `Box<dyn ErasedComponent>` coerces to `&mut dyn Any` via
1405        // the `Any` supertrait bound on `ErasedComponent`; mirrors
1406        // the dispatch path in `invoke_atomic` so the downcast in
1407        // the `BootstrapDispatchFn` stays consistent across surfaces.
1408        let any: &mut dyn std::any::Any = taken.as_mut();
1409        let tid = (*any).type_id();
1410        let dispatcher = self
1411            .bootstrap_dispatchers
1412            .get(&tid)
1413            .copied()
1414            .ok_or_else(|| "no Bootstrap dispatcher registered for component".to_string());
1415        let result = match dispatcher {
1416            Ok(f) => {
1417                let mut ctx = crate::contracts::bootstrap::BootstrapCtx::new(cref);
1418                f(any, &mut ctx)
1419            }
1420            Err(e) => Err(e),
1421        };
1422        self.restore_component(cref, taken);
1423        result
1424    }
1425
1426    /// Stage a host-supplied [`crate::engine::BootstrapRequest`] and
1427    /// fire the target Module's bootstrap immediately:
1428    ///
1429    /// 1. Resolve `request.target` → `FunctionKey` →
1430    ///    `graph_name` → `graph_idx`.
1431    /// 2. Read the bootstrap function's declared input port names
1432    ///    (the slots minted by `register_function_input_sites` against
1433    ///    `function.input`).
1434    /// 3. Validate the supplied `(input_name, bytes)` pairs against
1435    ///    the formal set — missing required → `MissingInput`, extra →
1436    ///    `UnknownInput`.
1437    /// 4. Allocate the body `ExecId`.
1438    /// 5. For each formal: charge against the ingress byte budget,
1439    ///    fallibly reserve a framework-owned `Vec<u8>` via the
1440    ///    `try_reserve_exact` seam, copy in the caller's borrowed
1441    ///    bytes, wrap as `BytesValue`, and write into the slot table
1442    ///    at `(site, body_exec_id)`. The caller's slices may drop the
1443    ///    moment this call returns — Principle 1a satisfied.
1444    /// 6. Mark the Module bootstrap in-flight + push the body's
1445    ///    `OpRef`s onto the frontier so the next `Engine::poll` drives
1446    ///    it to completion.
1447    ///
1448    /// Mid-flight failures (budget exhaustion, allocator fault) release
1449    /// every byte charge admitted earlier in the same call so the
1450    /// counter never leaks. The seed/in-flight bookkeeping only lands
1451    /// once every input stages successfully, so a failed request leaves
1452    /// the engine's bootstrap state untouched.
1453    pub fn enqueue_bootstrap_request(
1454        &mut self,
1455        request: crate::engine::bootstrap::BootstrapRequest<'_>,
1456    ) -> Result<(), crate::errors::BootstrapError> {
1457        // 1. Resolve target → function_key → graph_idx.
1458        let meta = self
1459            .bootstrap
1460            .module_bootstraps
1461            .get(request.target)
1462            .ok_or_else(|| crate::errors::BootstrapError::UnknownTarget {
1463                target_name: request.target.to_string(),
1464                available: self.bootstrap.install_order.clone(),
1465            })?;
1466        let function_key = meta.function_key.clone();
1467        let touch_set = meta.touch_set.clone();
1468        let graph_name = graph_name_for(&function_key);
1469        let graph_idx = self.graph_idx(&graph_name).ok_or_else(|| {
1470            crate::errors::BootstrapError::UnknownTarget {
1471                target_name: request.target.to_string(),
1472                available: self.bootstrap.install_order.clone(),
1473            }
1474        })?;
1475        let graph = &self.graphs[graph_idx as usize];
1476
1477        // 2. Read declared input formals from the GraphSlot.
1478        let declared: Vec<String> = graph.function.input.clone();
1479
1480        // 3. Validate. UnknownInput fires first (the supplied set is
1481        // the host's authoritative request shape; surfacing extras
1482        // before missing ones gives clearer diagnostics on typos).
1483        for (input_name, _) in request.inputs {
1484            if !declared.iter().any(|d| d == input_name) {
1485                return Err(crate::errors::BootstrapError::UnknownInput {
1486                    target_name: request.target.to_string(),
1487                    input_name: input_name.to_string(),
1488                    declared: declared.clone(),
1489                });
1490            }
1491        }
1492        for formal in &declared {
1493            if !request.inputs.iter().any(|(name, _)| *name == formal) {
1494                return Err(crate::errors::BootstrapError::MissingInput {
1495                    target_name: request.target.to_string(),
1496                    input_name: formal.clone(),
1497                });
1498            }
1499        }
1500
1501        // Resolve each formal to its NodeSiteId before allocating the
1502        // ExecId — a missing site at this stage is an install
1503        // invariant violation (register_function_input_sites runs
1504        // alongside register_module above), but defending against it
1505        // keeps the error path total.
1506        let mut sites: Vec<(crate::ids::NodeSiteId, &[u8])> =
1507            Vec::with_capacity(request.inputs.len());
1508        for (input_name, bytes) in request.inputs {
1509            let Some(&site) = graph.site_names.get(*input_name) else {
1510                return Err(crate::errors::BootstrapError::UnknownInput {
1511                    target_name: request.target.to_string(),
1512                    input_name: input_name.to_string(),
1513                    declared: declared.clone(),
1514                });
1515            };
1516            sites.push((site, *bytes));
1517        }
1518
1519        // 4. Allocate body ExecId. Done after validation so a rejected
1520        // request does not consume an ExecId counter slot.
1521        let body_exec_id = self.allocate_exec_id();
1522
1523        // 5. Per-input charge + Principle 1a copy. Track total
1524        // admitted bytes so a mid-loop failure releases the full
1525        // charge in one shot (none of the staged carriers ever
1526        // reached the slot table).
1527        let mut admitted: usize = 0;
1528        let mut staged_sites: Vec<crate::ids::NodeSiteId> = Vec::with_capacity(sites.len());
1529        for (site, bytes) in &sites {
1530            let byte_count = bytes.len();
1531            if let Err(reason) = self.try_charge(byte_count) {
1532                self.release(admitted);
1533                return Err(crate::errors::BootstrapError::AllocationFailed {
1534                    target_name: request.target.to_string(),
1535                    byte_count,
1536                    budget_remaining: reason.budget_remaining,
1537                });
1538            }
1539            admitted = admitted.saturating_add(byte_count);
1540            let mut owned: Vec<u8> = Vec::new();
1541            if crate::fallible::try_reserve_exact(&mut owned, byte_count).is_err() {
1542                self.release(admitted);
1543                return Err(crate::errors::BootstrapError::AllocationFailed {
1544                    target_name: request.target.to_string(),
1545                    byte_count,
1546                    budget_remaining: self
1547                        .ingress_byte_budget
1548                        .saturating_sub(self.ingress_bytes_in_flight),
1549                });
1550            }
1551            owned.extend_from_slice(bytes);
1552            let value: Box<dyn crate::slot_value::SlotValue> =
1553                Box::new(crate::syscall::values::BytesValue(owned));
1554            self.slot_write(*site, body_exec_id, value);
1555            staged_sites.push(*site);
1556        }
1557
1558        // 6. Mark the Module bootstrap in-flight and push every body
1559        // OpRef onto the frontier. The body-op gate (`is_op_locked`)
1560        // recognises the freshly seeded ExecId via `module_exec_id`
1561        // and the touch-set so disjoint Components keep firing. Any
1562        // body op that consumes a staged input reads its value
1563        // through `resolve_site_name` → slot_table at
1564        // `(site, body_exec_id)`.
1565        self.bootstrap.pending = true;
1566        self.bootstrap
1567            .mark_module_in_flight(request.target.to_string(), body_exec_id, touch_set);
1568        let node_count = self.graphs[graph_idx as usize].function.node.len();
1569        for node_idx in 0..node_count as u32 {
1570            let op_ref = OpRef::pack(graph_idx, node_idx);
1571            self.exec.frontier.push_back((op_ref, body_exec_id));
1572        }
1573        Ok(())
1574    }
1575
1576    /// Drain `bootstrap.pending_requests` once + fire each
1577    /// non-conflicting target. Conflicting requests stay parked on
1578    /// `bootstrap.waiting` until a drain promotes them via
1579    /// [`Self::maybe_complete_bootstrap`]. Called from
1580    /// `Engine::poll` at the top of each cycle so requests staged
1581    /// between polls land on the frontier before the body phase
1582    /// resumes.
1583    pub(crate) fn drive_pending_bootstrap_requests(&mut self) {
1584        let ready = self.bootstrap.process_pending_requests();
1585        for r in ready {
1586            let _ = self.fire_ready_bootstrap(r);
1587        }
1588    }
1589
1590    /// Arm the install-order bootstrap queue + seed the next target.
1591    /// Host entry point invoked through [`crate::node::Node::run_bootstrap`].
1592    /// Returns `true` when arming queued work (the caller should poll);
1593    /// `false` when no install-order target remains (idempotent on a
1594    /// fully drained Node).
1595    ///
1596    /// Cascade-fires multiple targets via the same
1597    /// `maybe_complete_bootstrap` reseed path the body-poll cycle
1598    /// uses: one call here picks the next target, the regular drain
1599    /// advances `next_idx`, and the next `seed_bootstrap_call`
1600    /// (inside poll) picks the following target.
1601    pub fn run_bootstrap(&mut self) -> bool {
1602        if !self.bootstrap.arm_install_order() {
1603            return false;
1604        }
1605        self.seed_bootstrap_call()
1606    }
1607
1608    /// `(domain, name, overload)` of the first bootstrap function
1609    /// recorded at install time, or `None` when no
1610    /// `module_phase = "bootstrap"` FunctionProto reached the function
1611    /// library. Stable across `clear_for_restore` (which preserves
1612    /// install-order metadata but bumps `next_idx` past every queued
1613    /// target so the restored Node does not re-fire bootstraps it
1614    /// already ran). For the full ordered list (multi-target installs
1615    /// queue one key per target), use [`Self::bootstrap_function_keys`].
1616    pub fn bootstrap_function_key(&self) -> Option<FunctionKey> {
1617        self.bootstrap.first_function_key().cloned()
1618    }
1619
1620    /// All bootstrap function keys the engine has queued, in install
1621    /// order. Multi-target installs append one entry per target via
1622    /// [`Self::install_function_library`]; the seeder fires each in
1623    /// slice order. Stable across `clear_for_restore` for snapshot
1624    /// introspection.
1625    pub fn bootstrap_function_keys(&self) -> Vec<FunctionKey> {
1626        self.bootstrap.function_keys()
1627    }
1628
1629    /// `true` while a bootstrap call is outstanding. Armed by
1630    /// [`Self::run_bootstrap`] (host kick) or by the host's
1631    /// staged-formals path (`enqueue_bootstrap_request`); cleared
1632    /// once every queued phase drains.
1633    pub fn bootstrap_pending(&self) -> bool {
1634        self.bootstrap.pending
1635    }
1636
1637    /// `true` when a Component bootstrap is registered for `slot`.
1638    /// Today the Component bootstrap registry stays empty so this
1639    /// always returns `false`; F5 wires the registration path. Wired
1640    /// now so the host can introspect the Bootstrap Contract surface
1641    /// without reaching through internal accessors.
1642    pub fn has_component_bootstrap(&self, slot: &str) -> bool {
1643        self.bootstrap.component_bootstrap(slot).is_some()
1644    }
1645
1646    /// Lifecycle status for the host-facing
1647    /// [`crate::node::Node::bootstrap_status`] accessor. `Idle` when no
1648    /// bootstrap is queued or in-flight; `Running` when one or more
1649    /// bootstraps occupy `in_flight` (the body gate is parking touched
1650    /// ops); `WaitingForInput` when work is staged on `pending_requests`
1651    /// but no in-flight phase has been seeded yet (the host must drive
1652    /// the queue to advance).
1653    pub fn bootstrap_status(&self) -> crate::engine::bootstrap::BootstrapStatus {
1654        if !self.bootstrap.in_flight.is_empty() {
1655            return crate::engine::bootstrap::BootstrapStatus::Running;
1656        }
1657        if !self.bootstrap.pending_requests.is_empty() || !self.bootstrap.waiting.is_empty() {
1658            return crate::engine::bootstrap::BootstrapStatus::WaitingForInput;
1659        }
1660        if self.bootstrap.pending && self.bootstrap.next_idx < self.bootstrap.install_order.len() {
1661            return crate::engine::bootstrap::BootstrapStatus::WaitingForInput;
1662        }
1663        crate::engine::bootstrap::BootstrapStatus::Idle
1664    }
1665
1666    /// Fire a Component bootstrap by slot name. Resolves
1667    /// `slot → ComponentRef` via `bootstrap.component_bootstraps`,
1668    /// builds a `ReadyBootstrap` with the bound cref as the touch set,
1669    /// and dispatches through the engine's internal fire path. Returns
1670    /// the dispatching ComponentRef on success so the host can wire
1671    /// per-slot telemetry; surfaces `BootstrapError::UnknownTarget`
1672    /// when no Component bootstrap is registered at the slot.
1673    pub fn fire_component_bootstrap_by_slot(
1674        &mut self,
1675        slot: &str,
1676    ) -> Result<ComponentRef, crate::errors::BootstrapError> {
1677        let cref = self
1678            .bootstrap
1679            .component_bootstrap(slot)
1680            .map(|cb| cb.cref)
1681            .ok_or_else(|| crate::errors::BootstrapError::UnknownTarget {
1682                target_name: slot.to_string(),
1683                available: self
1684                    .bootstrap
1685                    .component_bootstraps
1686                    .keys()
1687                    .cloned()
1688                    .collect(),
1689            })?;
1690        let mut touch_set = std::collections::HashSet::new();
1691        touch_set.insert(cref);
1692        // `pending` flips on so the body-op gate parks touching ops
1693        // while the Component bootstrap dispatches. `fire_ready_bootstrap`
1694        // retires the in-flight slot inline on `Immediate` dispatches.
1695        self.bootstrap.pending = true;
1696        let _ = self.fire_ready_bootstrap(crate::engine::bootstrap::ReadyBootstrap {
1697            kind: crate::engine::bootstrap::BootstrapKind::Component {
1698                slot: slot.to_string(),
1699            },
1700            touch_set,
1701            staged_inputs: std::collections::HashSet::new(),
1702        });
1703        // Component bootstrap with no remaining queued work + nothing
1704        // in-flight (Immediate retired in_flight) means we can clear the
1705        // pending arming so subsequent `Node::poll` does not park body
1706        // ops indefinitely.
1707        if self.bootstrap.in_flight.is_empty()
1708            && self.bootstrap.pending_requests.is_empty()
1709            && self.bootstrap.waiting.is_empty()
1710            && self.bootstrap.next_idx >= self.bootstrap.install_order.len()
1711        {
1712            self.bootstrap.pending = false;
1713        }
1714        Ok(cref)
1715    }
1716
1717    /// `true` when `target_name` is already on the install-order Module
1718    /// bootstrap queue (so `Node::run_bootstrap` can surface
1719    /// `AlreadyTransitivelyQueued` before re-staging the same target).
1720    pub fn module_bootstrap_registered(&self, target_name: &str) -> bool {
1721        self.bootstrap.module_bootstraps.contains_key(target_name)
1722    }
1723
1724    /// Snapshot of every registered Module bootstrap target name in
1725    /// install order. Used by `Node::run_bootstrap` to enqueue an
1726    /// empty-input request per target without reaching into the
1727    /// engine's private `install_order` Vec.
1728    pub fn module_bootstrap_target_names(&self) -> Vec<String> {
1729        self.bootstrap.install_order.clone()
1730    }
1731
1732    /// Per-component body-op gate. Returns `true` when the op must
1733    /// park because an in-flight bootstrap locks the `ComponentRef`
1734    /// it touches; `false` when the op is fireable.
1735    ///
1736    /// Resolution order:
1737    /// 1. No in-flight bootstraps → fire (the gate is dormant).
1738    /// 2. `exec_id` descends from some in-flight bootstrap's ExecId
1739    ///    via the `pending_calls.parent_exec_id` chain → fire. The
1740    ///    bootstrap body itself (and its sub-FunctionCalls) is
1741    ///    allowed to invoke any op regardless of the lock set.
1742    /// 3. Resolve the touched `ComponentRef` from the op's NodeProto
1743    ///    via `SLOT_ID_KEY → slot_id_to_cref`. If the touched cref
1744    ///    is in some in-flight bootstrap's `touch_set` → park.
1745    ///    Stateless syscalls (no slot_id stamp, no role binding)
1746    ///    fire because they reach no component.
1747    ///
1748    /// O(call depth * in-flight) per call — Module composition
1749    /// depth multiplied by the active bootstrap count. In-flight is
1750    /// 1 today; F3 Commit 2 enables concurrent disjoint bootstraps.
1751    pub(crate) fn is_op_locked(&self, op_ref: OpRef, exec_id: ExecId) -> bool {
1752        if self.bootstrap.in_flight.is_empty() {
1753            return false;
1754        }
1755        // 2. Bootstrap-descendant exec ids fire freely. Walk the
1756        // call chain once and short-circuit if any in-flight ExecId
1757        // matches.
1758        let mut current = exec_id;
1759        loop {
1760            if self
1761                .bootstrap
1762                .in_flight
1763                .iter()
1764                .any(|b| b.exec_id == current)
1765            {
1766                return false;
1767            }
1768            match self.exec.pending_calls.get(&current) {
1769                Some(call) => current = call.parent_exec_id,
1770                None => break,
1771            }
1772        }
1773        // 3. Look up the touched ComponentRef from the op's slot_id
1774        // stamp. Ops without a slot_id (stateless syscalls,
1775        // FunctionCall nodes, unbound roles) reach no component, so
1776        // no in-flight touch set can lock them.
1777        let Some(node) = self.node_for(op_ref) else {
1778            return false;
1779        };
1780        let Some(slot_id) = node
1781            .metadata_props
1782            .iter()
1783            .find(|p| p.key == bb_ir::keys::SLOT_ID_KEY)
1784            .and_then(|p| p.value.parse::<u32>().ok())
1785        else {
1786            return false;
1787        };
1788        let Some(&touched) = self.slot_id_to_cref.get(&slot_id) else {
1789            return false;
1790        };
1791        self.bootstrap
1792            .in_flight
1793            .iter()
1794            .any(|b| b.touch_set.contains(&touched))
1795    }
1796
1797    /// Inspect engine state and pop the in-flight bootstrap key once
1798    /// every bootstrap-descendant frontier entry and `pending_async`
1799    /// entry has cleared. Returns `true` when one phase just drained
1800    /// (i.e. the caller poll cycle should append a `BootstrapComplete`
1801    /// step for that phase). With remaining queued keys, the next
1802    /// `seed_bootstrap_call` advances to the following target;
1803    /// `bootstrap.pending` flips off only after the last key drains.
1804    /// Called after each drain phase + the ingress completion drain.
1805    ///
1806    /// The descendant walk mirrors the body-op gate predicate: a
1807    /// frontier entry belongs to bootstrap when its ExecId chain
1808    /// terminates at the in-flight bootstrap's ExecId. Bootstrap
1809    /// itself never installs a `CallContext` (zero formals, zero
1810    /// outputs); child FunctionCalls that the bootstrap body invokes
1811    /// do, and the chain walk reaches the bootstrap ExecId through
1812    /// them.
1813    pub(crate) fn maybe_complete_bootstrap(&mut self) -> bool {
1814        if !self.bootstrap.pending {
1815            return false;
1816        }
1817        let Some(boot_exec) = self.bootstrap.module_exec_id() else {
1818            return false;
1819        };
1820        let descendant = |engine: &Engine, mut exec_id: ExecId| -> bool {
1821            loop {
1822                if exec_id == boot_exec {
1823                    return true;
1824                }
1825                match engine.exec.pending_calls.get(&exec_id) {
1826                    Some(call) => exec_id = call.parent_exec_id,
1827                    None => return false,
1828                }
1829            }
1830        };
1831        if self
1832            .exec
1833            .frontier
1834            .iter()
1835            .any(|(_, exec_id)| descendant(self, *exec_id))
1836        {
1837            return false;
1838        }
1839        if self
1840            .exec
1841            .pending_async
1842            .values()
1843            .any(|p| descendant(self, p.exec_id))
1844        {
1845            return false;
1846        }
1847        // The in-flight phase drained. Advance the install_order
1848        // pointer (the post-kick cascade) and retire the in-flight
1849        // entry by ExecId via `on_bootstrap_drained`, which also
1850        // promotes any
1851        // host-driven waiter whose touch set no longer conflicts
1852        // with the remaining in-flight set. `bootstrap.pending`
1853        // stays armed while install_order keys remain queued so
1854        // body ops touching a locked Component keep parking through
1855        // the next phase's run. The `install_order` Vec itself is
1856        // append-only — introspection still reports every queued
1857        // target.
1858        self.bootstrap.next_idx += 1;
1859        let promoted = self.bootstrap.on_bootstrap_drained(boot_exec);
1860        for ready in promoted {
1861            // Promoted waiters fire immediately on the same poll
1862            // cycle so the conflict-queue path doesn't wedge waiting
1863            // for a future poll. Defensive `let _` — the only failure
1864            // path is a missing graph for the target, which the
1865            // install path forbids.
1866            let _ = self.fire_ready_bootstrap(ready);
1867        }
1868        if self.bootstrap.next_idx >= self.bootstrap.install_order.len()
1869            && self.bootstrap.in_flight.is_empty()
1870            && self.bootstrap.pending_requests.is_empty()
1871            && self.bootstrap.waiting.is_empty()
1872        {
1873            self.bootstrap.pending = false;
1874        }
1875        true
1876    }
1877
1878    /// Resolve every NodeProto's dispatch kind into `op_dispatch[]`
1879    /// per ENGINE.md §8.1 + §8.4. Run after install completes (so all
1880    /// symbols are in `self.functions`) but before the first poll.
1881    ///
1882    /// Walk order: each GraphSlot in turn. For each NodeProto:
1883    /// - syscall (`syscall_index` hit) → `Stateless`
1884    /// - call to function in `self.functions` with domain
1885    ///   `ai.bytesandbrains.module` → `FunctionCall` with the target
1886    ///   key + input/output rename pairs from the call's value names
1887    ///   zipped against the target function's formals.
1888    /// - call to function with domain `ai.bytesandbrains.framework`
1889    ///   starting with `BackendSubgraph_` → `BackendSubgraph` with
1890    ///   bound backend resolved via `BINDING_ID_KEY` metadata against
1891    ///   the atomic-dispatch table.
1892    /// - else atomic dispatch by `(domain, op_type, instance)` →
1893    ///   `Atomic`.
1894    ///
1895    /// Returns the number of nodes that remained `Unresolved`. Caller
1896    /// should fail build if non-zero.
1897    pub fn resolve_dispatch(&mut self) -> usize {
1898        // Snapshot per-graph node lists so we don't hold a borrow on
1899        // self.graphs while reading other tables. Indices are the
1900        // positional graph_idx values packed into OpRefs.
1901        let graph_count = self.graphs.len();
1902        let mut unresolved = 0;
1903        for graph_idx in 0..graph_count {
1904            let (function_domain, nodes): (String, Vec<NodeProto>) = {
1905                let g = &self.graphs[graph_idx];
1906                (g.function.domain.clone(), g.function.node.clone())
1907            };
1908            // BackendSubgraph bodies (domain == ai.bytesandbrains.framework)
1909            // are handed wholesale to the bound Backend Contract impl per
1910            // ENGINE.md §8.5; their interior NodeProtos are never invoked
1911            // individually by the engine, so resolve_dispatch leaves the
1912            // body's op_dispatch as Unresolved without counting it as a
1913            // failure. Mirror the same skip already applied by
1914            // Node's unsupported-ops pre-flight in `src/node.rs`.
1915            if function_domain == "ai.bytesandbrains.framework" {
1916                let mut dispatch: Vec<OpDispatch> = Vec::with_capacity(nodes.len());
1917                for _ in &nodes {
1918                    dispatch.push(OpDispatch::Unresolved);
1919                }
1920                self.graphs[graph_idx].op_dispatch = dispatch;
1921                continue;
1922            }
1923            let mut dispatch: Vec<OpDispatch> = Vec::with_capacity(nodes.len());
1924            for node in &nodes {
1925                let resolved = self.resolve_one(node);
1926                if matches!(resolved, OpDispatch::Unresolved) {
1927                    unresolved += 1;
1928                }
1929                dispatch.push(resolved);
1930            }
1931            self.graphs[graph_idx].op_dispatch = dispatch;
1932        }
1933        unresolved
1934    }
1935
1936    fn resolve_one(&self, node: &NodeProto) -> OpDispatch {
1937        // 1) Syscall path — single lookup by (domain, op_type).
1938        if let Some(&fn_ptr) = self
1939            .syscall_table
1940            .get(&(node.domain.clone(), node.op_type.clone()))
1941        {
1942            return OpDispatch::Stateless(fn_ptr);
1943        }
1944        // `crate::registry` for custom ops registered via
1945        // `bb::register_op!{}`. DCE strips unreferenced entries,
1946        // so a binary that doesn't `use` a library's op never
1947        // pulls it into the link image.
1948        if let Some(reg) = crate::registry::find_op(&node.domain, &node.op_type) {
1949            return OpDispatch::Stateless(reg.invoke);
1950        }
1951        // 2) Function-call paths via the symbol table.
1952        let key: FunctionKey = (
1953            node.domain.clone(),
1954            node.op_type.clone(),
1955            node.overload.clone(),
1956        );
1957        if let Some(target_fn) = self.functions.get(&key) {
1958            if node.domain == "ai.bytesandbrains.module" {
1959                let input_rename: Rc<[(String, String)]> = node
1960                    .input
1961                    .iter()
1962                    .zip(target_fn.input.iter())
1963                    .map(|(caller, formal)| (caller.clone(), formal.clone()))
1964                    .collect();
1965                let output_rename: Rc<[(String, String)]> = target_fn
1966                    .output
1967                    .iter()
1968                    .zip(node.output.iter())
1969                    .map(|(formal, caller)| (formal.clone(), caller.clone()))
1970                    .collect();
1971                return OpDispatch::FunctionCall {
1972                    target: key,
1973                    input_rename,
1974                    output_rename,
1975                };
1976            }
1977        }
1978        // 3) Atomic role path. The placeholders pass stamps
1979        // `ai.bytesandbrains.slot_id` on every role NodeProto; install
1980        // populates `slot_id_to_cref` from the model's binding metadata.
1981        // Single lookup chain: NodeProto.slot_id → ComponentRef →
1982        // bound role dispatcher closure.
1983        let slot_id = node
1984            .metadata_props
1985            .iter()
1986            .find(|p| p.key == bb_ir::keys::SLOT_ID_KEY)
1987            .and_then(|p| p.value.parse::<u32>().ok());
1988        if let Some(slot_id) = slot_id {
1989            if let Some(&cref) = self.slot_id_to_cref.get(&slot_id) {
1990                if let Some(dispatch_fn) = self.dispatch_fn_for_component(cref) {
1991                    return OpDispatch::Atomic {
1992                        component_ref: cref,
1993                        dispatch_fn,
1994                    };
1995                }
1996            }
1997        }
1998        OpDispatch::Unresolved
1999    }
2000
2001    /// Look up the install-time-stamped `ProtocolDispatchFn` for a
2002    /// registered component by its `TypeId`. `resolve_dispatch`
2003    /// embeds the result into `OpDispatch::Atomic { dispatch_fn }`
2004    /// so runtime invoke skips the per-op TypeId probe.
2005    fn dispatch_fn_for_component(
2006        &self,
2007        cref: ComponentRef,
2008    ) -> Option<crate::engine::invoke::ProtocolDispatchFn> {
2009        let component = self.component(cref)?;
2010        let any: &dyn std::any::Any = component;
2011        let tid = (*any).type_id();
2012        self.role_dispatchers.get(&tid).map(|d| d.dispatch)
2013    }
2014
2015    /// Resolve a registered component by `ComponentRef`.
2016    /// `None` when no component lives at that index, or when the
2017    /// slot was `mem::take`-ed out during dispatch (the caller
2018    /// dispatching component is invisible to itself).
2019    pub fn component(&self, cref: ComponentRef) -> Option<&dyn ErasedComponent> {
2020        self.components.get(cref.as_u32() as usize)?.as_deref()
2021    }
2022
2023    /// Resolve a registered component by `ComponentRef` for
2024    /// mutation. Same null semantics as [`Self::component`].
2025    pub fn component_mut(&mut self, cref: ComponentRef) -> Option<&mut Box<dyn ErasedComponent>> {
2026        self.components.get_mut(cref.as_u32() as usize)?.as_mut()
2027    }
2028
2029    /// Take the component at `cref` out of the registry, leaving
2030    /// `None` in its slot. Returns `None` if the slot was empty (or
2031    /// out of range). Paired with [`Self::restore_component`] in
2032    /// `invoke_atomic` so a [`crate::runtime::ComponentsView`] can
2033    /// borrow the rest of `engine.components` while the dispatching
2034    /// component is held exclusively.
2035    pub(crate) fn take_component(
2036        &mut self,
2037        cref: ComponentRef,
2038    ) -> Option<Box<dyn ErasedComponent>> {
2039        self.components.get_mut(cref.as_u32() as usize)?.take()
2040    }
2041
2042    /// Put a component back into the registry slot it was taken
2043    /// from via [`Self::take_component`]. The slot index must be
2044    /// within range (i.e. the cref came from a prior registration).
2045    pub(crate) fn restore_component(
2046        &mut self,
2047        cref: ComponentRef,
2048        component: Box<dyn ErasedComponent>,
2049    ) {
2050        let idx = cref.as_u32() as usize;
2051        if let Some(slot) = self.components.get_mut(idx) {
2052            *slot = Some(component);
2053        }
2054    }
2055
2056    /// test-only registrar. Stores a bound component impl
2057    /// at `cref`. Grows the underlying Vec to fit the index, filling
2058    /// holes with `None` so out-of-order registration works.
2059    pub fn register_component(&mut self, cref: ComponentRef, component: Box<dyn ErasedComponent>) {
2060        let idx = cref.as_u32() as usize;
2061        if self.components.len() <= idx {
2062            self.components.resize_with(idx + 1, || None);
2063        }
2064        self.components[idx] = Some(component);
2065    }
2066
2067    /// Push an `(OpRef, ExecId)` onto the frontier.
2068    pub fn push_frontier(&mut self, op_ref: OpRef, exec_id: ExecId) {
2069        self.exec.frontier.push_back((op_ref, exec_id));
2070    }
2071
2072    /// Pop the next `(OpRef, ExecId)` off the frontier. Used by the
2073    /// poll cycle's drain phases.
2074    pub fn pop_frontier(&mut self) -> Option<(OpRef, ExecId)> {
2075        self.exec.frontier.pop_front()
2076    }
2077
2078    /// Pop the next fireable `(OpRef, ExecId)` off the frontier,
2079    /// honouring the per-component body-op gate. With no in-flight
2080    /// bootstrap the front of the queue fires unconditionally. With
2081    /// one or more in-flight bootstraps the gate parks any op
2082    /// touching a locked `ComponentRef`; this scan picks the first
2083    /// unparked entry so disjoint Components can keep firing while
2084    /// bootstrap runs against an unrelated slot.
2085    pub(crate) fn pop_frontier_fireable(&mut self) -> Option<(OpRef, ExecId)> {
2086        if self.bootstrap.in_flight.is_empty() {
2087            return self.exec.frontier.pop_front();
2088        }
2089        let idx = self
2090            .exec
2091            .frontier
2092            .iter()
2093            .position(|(op_ref, exec_id)| !self.is_op_locked(*op_ref, *exec_id))?;
2094        self.exec.frontier.remove(idx)
2095    }
2096
2097    /// Snapshot of the `(NodeSiteId, ExecId)` keys currently in the
2098    /// slot table. Test-only - used to assert wire-envelope delivery
2099    /// lands at the right site without exposing the full
2100    /// `Box<dyn SlotValue>` payload type.
2101    pub fn slot_table_keys(&self) -> Vec<(NodeSiteId, ExecId)> {
2102        self.exec.slot_table.keys().copied().collect()
2103    }
2104
2105    /// Iterate every `((NodeSiteId, ExecId), Option<&dyn SlotValue>)`
2106    /// pair currently in the slot table. Test-only.
2107    pub fn slot_table_iter(
2108        &self,
2109    ) -> impl Iterator<Item = (&(NodeSiteId, ExecId), Option<&dyn SlotValue>)> {
2110        self.exec
2111            .slot_table
2112            .iter()
2113            .map(|(k, v)| (k, v.as_ref().map(|b| b.as_ref())))
2114    }
2115
2116    /// Read a slot value by `(NodeSiteId, ExecId)`. Returns `None`
2117    /// if the slot is empty or not yet allocated.
2118    pub fn slot_at(&self, site: NodeSiteId, exec_id: ExecId) -> Option<&dyn SlotValue> {
2119        self.exec
2120            .slot_table
2121            .get(&(site, exec_id))
2122            .and_then(|s| s.as_ref())
2123            .map(|b| b.as_ref())
2124    }
2125}
2126
2127/// Register a `NodeSiteId` for every function input name on
2128/// `graph` so `Engine::deliver_app_event` can seed the input
2129/// via ingress. Pre-existing entries (where a node output
2130/// happens to share a name with a function input - rare but
2131/// possible) are left alone. After the input sites are minted,
2132/// re-walk the function's node inputs and add consumer entries
2133/// for any node consuming an input - `GraphSlot::from_function`
2134/// already populated consumers for node outputs but skipped
2135/// inputs because they weren't in `site_names` yet.
2136fn register_function_input_sites(
2137    graph: &mut crate::engine::graph_slot::GraphSlot,
2138    next_node_site_id: &mut u64,
2139    graph_idx: u32,
2140) {
2141    for input_name in graph.function.input.clone().iter() {
2142        if input_name.is_empty() {
2143            continue;
2144        }
2145        graph
2146            .site_names
2147            .entry(input_name.clone())
2148            .or_insert_with(|| {
2149                let r = NodeSiteId::from(*next_node_site_id);
2150                *next_node_site_id = next_node_site_id.saturating_add(1);
2151                r
2152            });
2153    }
2154    // Backfill consumers for nodes whose inputs reference function
2155    // inputs we just minted sites for. Positional OpRefs make every
2156    // node's ref `OpRef::pack(graph_idx, node_idx)`.
2157    let nodes_inputs: Vec<(OpRef, Vec<String>)> = graph
2158        .function
2159        .node
2160        .iter()
2161        .enumerate()
2162        .map(|(idx, node)| (OpRef::pack(graph_idx, idx as u32), node.input.clone()))
2163        .collect();
2164    for (op_ref, inputs) in nodes_inputs {
2165        for input in inputs {
2166            if input.is_empty() {
2167                continue;
2168            }
2169            if let Some(&site) = graph.site_names.get(&input) {
2170                let entry = graph.consumers.entry(site).or_default();
2171                if !entry.contains(&op_ref) {
2172                    entry.push(op_ref);
2173                }
2174            }
2175        }
2176    }
2177}
2178
2179/// Failure case returned by `Engine::try_charge` when admitting
2180/// `byte_count` more bytes against `ingress_byte_budget` would
2181/// exceed the cap. `budget_remaining` is the cap minus the live
2182/// `ingress_bytes_in_flight` at the time of the rejection — the
2183/// caller embeds both into the resulting `WireReceiveError` or
2184/// `AppIngressError` so subscribers see the magnitude of the
2185/// rejection without re-querying the engine.
2186#[derive(Clone, Copy, Debug, PartialEq, Eq)]
2187pub struct BudgetExceededReason {
2188    /// Bytes the caller tried to admit.
2189    pub byte_count: usize,
2190    /// Bytes still available under `ingress_byte_budget` at the time
2191    /// of the rejection.
2192    pub budget_remaining: usize,
2193}
2194
2195/// Stable graph-name key for `Engine.graphs` derived from a
2196/// `FunctionKey`. Joins the tuple parts so two distinct keys produce
2197/// distinct strings, preserving the symbol-table semantics.
2198pub(crate) fn graph_name_for(key: &FunctionKey) -> String {
2199    let (domain, name, overload) = key;
2200    if overload.is_empty() {
2201        format!("{domain}::{name}")
2202    } else {
2203        format!("{domain}::{name}#{overload}")
2204    }
2205}
2206
2207
2208#[cfg(test)]
2209#[path = "core_multi_bootstrap_tests.rs"]
2210mod multi_bootstrap_tests;
2211
2212#[cfg(test)]
2213#[path = "core_touch_set_tests.rs"]
2214mod touch_set_tests;
2215
2216#[cfg(test)]
2217#[path = "core_op_locked_tests.rs"]
2218mod op_locked_tests;
2219
2220#[cfg(test)]
2221#[path = "core_component_bootstrap_tests.rs"]
2222mod component_bootstrap_tests;
2223
2224#[cfg(test)]
2225#[path = "core_bootstrap_request_tests.rs"]
2226mod bootstrap_request_tests;