Skip to main content

bb_runtime/engine/
core.rs

1//! The `Engine` struct + test-only constructor + registration
2//! accessors. Hot-path poll cycle lives in `engine::poll`.
3
4use std::collections::HashMap;
5use std::marker::PhantomData;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use crate::bus::TypedBus;
10use crate::component::ErasedComponent;
11use crate::engine::dispatch_entry::{FunctionKey, OpDispatch, StatelessInvokeFn};
12use crate::engine::graph_slot::GraphSlot;
13use crate::engine::invoke::{make_protocol_dispatcher, RoleDispatcher};
14use crate::framework::FrameworkComponents;
15use crate::ids::{CommandId, ComponentRef, ExecId, NodeSiteId, OpRef};
16use crate::ingress::IngressQueue;
17use crate::slot_value::SlotValue;
18use bb_ir::proto::onnx::{FunctionProto, NodeProto};
19
20/// Point-in-time hot-path counters for dashboards + saturation
21/// detection. Not synchronized against an in-flight poll cycle.
22#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
23pub struct EngineStats {
24    /// `(OpRef, ExecId)` pairs on the frontier. Climbing → poll
25    /// loop falling behind ingress.
26    pub frontier_len: usize,
27    /// Events queued on the typed bus awaiting routing.
28    pub bus_len: usize,
29    /// Suspended async commands. Approaching
30    /// `NodeConfig.max_pending_async` → cap pressure.
31    pub pending_async: usize,
32    /// `(NodeSiteId, ExecId)` entries holding a value.
33    pub slot_table_occupied: usize,
34    /// Approximate MPMC ingress depth.
35    pub ingress_depth: usize,
36    /// Envelopes queued for next outbound drain.
37    pub outbound_queue_depth: usize,
38    /// Event kinds with at least one subscriber.
39    pub event_subscriptions: usize,
40    /// Number of registered components.
41    pub registered_components: usize,
42    /// Number of installed graphs.
43    pub graph_slots: usize,
44}
45
46/// Single-Node engine state. Built via `Node::ensure_ready`;
47/// tests use `Engine::new()` and seed fields directly.
48pub struct Engine {
49    // --- Graph storage ---------------------------------------------
50    /// Installed graphs in insertion order. `OpRef::pack` puts the
51    /// slot index in the high 32 bits so `dispatch_for` resolves an
52    /// op via two direct array accesses (no HashMap probe).
53    pub(crate) graphs: Vec<GraphSlot>,
54
55    /// Graph name → `graphs[]` index. Lookup table for name-keyed
56    /// resolution (function-call targets, `Engine::graph(name)`
57    /// accessors). The index doubles as the value packed into every
58    /// `OpRef`.
59    pub(crate) graph_index: HashMap<String, u32>,
60
61    /// Sub-Module function registry by `(domain, name, overload)`.
62    /// Populated by `Node` at install time.
63    pub(crate) functions: HashMap<(String, String, String), FunctionProto>,
64
65    // --- Dispatch --------------------------------------------------
66    /// Unified syscall dispatch table — one lookup per op by
67    /// `(domain, op_type)` to its stateless invoke fn pointer.
68    /// Populated by `register_syscall` and `register_all_framework_syscalls`
69    /// at Engine construction; `resolve_dispatch` reads this to stamp
70    /// `OpDispatch::Stateless` for matching NodeProtos.
71    pub syscall_table: HashMap<(String, String), StatelessInvokeFn>,
72
73    // --- Component storage -----------------------------------------
74    /// All bound runtime impls indexed by `ComponentRef.as_usize()`.
75    /// The `Option<...>` wrapper lets `invoke_atomic` take the
76    /// dispatching component out of the Vec via `mem::take` so a
77    /// live [`crate::runtime::ComponentsView`] can borrow
78    /// `&self.components` for cross-component reads while the
79    /// dispatch closure runs - the dispatching slot is restored
80    /// after the closure returns.
81    pub(crate) components: Vec<Option<Box<dyn ErasedComponent>>>,
82
83    /// The Node's own `PeerId` (the one passed to `Node::new`).
84    /// Threaded into every `RuntimeResourceRef` so Components can
85    /// identify themselves in outbound envelopes.
86    pub self_peer: crate::ids::PeerId,
87
88    /// Per-Node framework primitive bundle (scheduler, peer_gate,
89    /// request_tracker, backoff_table, inbound_dedup, address_book).
90    /// Exposed as `pub` so cross-crate test fixtures in `bb-ops` can
91    /// construct `RuntimeResourceRef` from an Engine's framework
92    /// primitives. Engine internals proper stay encapsulated; this is
93    /// the dependency-injection boundary.
94    pub framework: FrameworkComponents,
95
96    /// The per-Node typed event bus.
97    pub bus: TypedBus,
98
99    // --- Execution state -------------------------------------------
100    /// Per-poll execution-state bundle: frontier, slot table, per-
101    /// execution liveness, parked async ops + in-cycle completions,
102    /// function-call invocation frames, inbound-envelope context
103    /// map, the timer scheduler, and the monotonic ID allocator.
104    /// See [`crate::exec_state::ExecState`].
105    pub exec: crate::exec_state::ExecState,
106
107    /// Reverse index from a fused `binding_id` (e.g.
108    /// `"BurnBackend#0"`) to the bound component's `ComponentRef`.
109    /// Populated by `Node` at install time; read by dispatch
110    /// resolution to bind NodeProtos that reference a backend by
111    /// binding_id.
112    pub(crate) binding_id_index: HashMap<String, ComponentRef>,
113
114    /// Per-`event_kind` subscription map. Each entry names the
115    /// `NodeSiteId`(s) of `EventSource` ops listening for that kind.
116    /// The bus-routing pass writes a `TriggerValue` into each
117    /// subscribed site at a fresh `ExecId` and pushes the site's
118    /// downstream consumers — matching the wire delivery semantics
119    /// per `docs/ADDRESSING.md` so bus + wire share one model.
120    pub(crate) event_subscriptions: HashMap<String, Vec<NodeSiteId>>,
121
122    /// Per-LifecyclePhase Op enrollments. `pub` for cross-crate
123    /// `bb-ops` test fixtures that exercise the LifecyclePhase
124    /// syscall.
125    pub lifecycle_table: HashMap<String, Vec<OpRef>>,
126
127    // --- Async + cross-thread --------------------------------------
128    /// Thread-safe inbox for external events. Producers (transport
129    /// adapters, host invocations) may push from any thread; the
130    /// Engine drains serially from its own (single) thread. Engine
131    /// holds an `Arc` so it can hand clones to producers without
132    /// surrendering ownership.
133    pub(crate) ingress: Arc<IngressQueue>,
134
135    /// Lifecycle phases queued for firing by `Engine::fire_lifecycle`
136    /// on the next poll.
137    pub(crate) fired_phases: Vec<String>,
138
139    /// Snapshot of the ingress queue depth at the start of the
140    /// engine's ingress drain. Used by the backpressure detection
141    /// hook in `process_ingress_event` to compare against the
142    /// configured high-water mark. Refreshed every poll cycle.
143    pub(crate) phase1_pre_drain_depth: usize,
144
145    // --- Bootstrap ---------------------------------------------------
146    /// Consolidated bootstrap state — every field the install path,
147    /// poll seeder, and body-op gate read goes through here. See
148    /// [`crate::engine::bootstrap::BootstrapState`] for field roles.
149    pub(crate) bootstrap: crate::engine::bootstrap::BootstrapState,
150
151    // --- Dispatcher registries (per-Engine) ------------------------
152    /// Concrete-type `ProtocolRuntime` dispatchers registered against
153    /// this Engine, indexed by `TypeId::of::<T>()`. Populated by
154    /// [`Engine::register_protocol_dispatcher`] at setup; consulted by
155    /// `invoke_atomic` via direct HashMap lookup (no linear scan).
156    pub(crate) role_dispatchers: HashMap<std::any::TypeId, RoleDispatcher>,
157
158    // --- Generic slot registry -------------------------------------
159    /// Slot-name → `ComponentRef` registry. Generic over component
160    /// role: indexes EVERY bound Component (backends, indexes,
161    /// models, peer selectors, custom) by binding slot name
162    /// (defaults to the field name; overridable via
163    /// `#[bb::slot("custom")]`). Components reach declared
164    /// dependencies through this map at dispatch time via
165    /// [`crate::runtime::ComponentsView::for_slot`].
166    ///
167    /// This is the canonical lookup table. Every install path
168    /// populates it; every dispatch path reads through it. No
169    /// per-role specialization above the slot abstraction.
170    pub(crate) slots: HashMap<String, ComponentRef>,
171
172    /// Parallel index: compiler-assigned slot id (the value of
173    /// `ai.bytesandbrains.slot_id` stamped on role NodeProtos by the
174    /// placeholders pass) → `ComponentRef`. Populated alongside
175    /// [`Self::slots`] at install time. `resolve_dispatch` reads
176    /// the role NodeProto's `slot_id`, looks it up here, and stamps
177    /// `OpDispatch::Atomic` against the resolved component. Single
178    /// source of truth: install populates both indexes from the same
179    /// `bb.binding.<target>.<slot>` metadata.
180    pub(crate) slot_id_to_cref: HashMap<u32, ComponentRef>,
181
182    /// Parallel index: compiler-assigned `slot_id` → `(role,
183    /// ComponentRef)`. Populated by [`Self::bind_slot_id_with_role`]
184    /// at install time from the same `binding.<target>.<slot>`
185    /// metadata that drives `slot_id_to_cref`; retains the
186    /// `ComponentRole` so `decode_typed_fill` can decide between the
187    /// framework-carrier path and the backend-mediated tensor path.
188    pub(crate) slot_id_to_role_ref: HashMap<u32, (crate::registry::ComponentRole, ComponentRef)>,
189
190    // --- Component role introspection ------------------------------
191    /// Per-component set of declared roles, sourced from
192    /// `inventory::iter::<ComponentRoleBinding>` keyed by
193    /// `T::TYPE_NAME`. Populated by `Node::ensure_ready` after
194    /// component registration; reported by [`Engine::roles_for`] for
195    /// introspection (engine tests, host tooling).
196    pub(crate) component_roles:
197        HashMap<ComponentRef, std::collections::HashSet<crate::registry::ComponentRole>>,
198
199    // --- Production-safety caps ------------------------------------
200    /// Soft per-poll-cycle op-invocation budget per
201    /// `NodeConfig.cycle_op_budget`. When set, `Engine::poll` yields
202    /// after this many invocations and surfaces
203    /// `EngineStep::CycleBudgetExceeded { ops_invoked }` so the host
204    /// can re-poll. `None` disables the budget.
205    pub(crate) cycle_op_budget: Option<usize>,
206
207    /// Cap on the number of in-flight `pending_async` entries per
208    /// `NodeConfig.max_pending_async`. When at cap, an Op returning
209    /// `DispatchResult::Async(_)` fails synchronously via the
210    /// existing `OpFailed` path. `None` disables the cap.
211    pub(crate) max_pending_async: Option<usize>,
212
213    /// Cumulative cap on in-flight ingress bytes held across the
214    /// ingress queue + slot table + pending async completion
215    /// buffers. Sourced from `NodeConfig::ingress_byte_budget`.
216    /// Boundary callers call [`Self::try_charge`] before installing
217    /// a payload; the slot-table writer calls [`Self::release`] on
218    /// overwrite / eviction.
219    pub(crate) ingress_byte_budget: usize,
220
221    /// Live count of ingress bytes the engine currently holds.
222    /// Incremented by [`Self::try_charge`] on successful admission;
223    /// decremented by [`Self::release`] on slot-table overwrite /
224    /// eviction / drop. The budget guard surfaces this as a
225    /// snapshot via [`Self::ingress_bytes_in_flight`].
226    pub(crate) ingress_bytes_in_flight: usize,
227
228    // --- Single-threaded anchor ------------------------------------
229    /// `PhantomData<*const ()>` makes `Engine` neither `Send` nor
230    /// `Sync` - the single-threaded sans-IO contract is enforced at
231    /// compile time. Producers can still push to `ingress` from other
232    /// threads because the `Arc<IngressQueue>` handle is independently
233    /// `Send + Sync`.
234    _not_send: PhantomData<*const ()>,
235}
236
237impl Default for Engine {
238    fn default() -> Self {
239        Self::new()
240    }
241}
242
243impl Engine {
244    /// Construct an empty engine with the default ingress capacity.
245    /// `Node::new` wraps this with `self_peer`, framework syscalls,
246    /// and config caps applied. For non-default `bus_capacity` use
247    /// [`Self::with_bus_capacity`] so the ingress queue sizes to
248    /// `bus_capacity * 4` per ENGINE.md §2.2.
249    pub fn new() -> Self {
250        Self::with_bus_capacity(crate::node::DEFAULT_BUS_CAPACITY)
251    }
252
253    /// Construct an empty engine whose ingress queue holds up to
254    /// `bus_capacity * 4` events (the ENGINE.md §2.2 ratio that
255    /// reserves headroom for async completions, app events, and
256    /// inbound envelopes between poll cycles).
257    pub fn with_bus_capacity(bus_capacity: usize) -> Self {
258        Self {
259            graphs: Vec::new(),
260            graph_index: HashMap::new(),
261            functions: HashMap::new(),
262            syscall_table: HashMap::new(),
263            slot_id_to_cref: HashMap::new(),
264            slot_id_to_role_ref: HashMap::new(),
265            components: Vec::new(),
266            self_peer: crate::ids::PeerId::from(0u64),
267            framework: FrameworkComponents::new(),
268            bus: TypedBus::new(),
269            exec: crate::exec_state::ExecState::new(),
270            binding_id_index: HashMap::new(),
271            event_subscriptions: HashMap::new(),
272            lifecycle_table: HashMap::new(),
273            ingress: Arc::new(IngressQueue::with_capacity(bus_capacity.saturating_mul(4))),
274            fired_phases: Vec::new(),
275            phase1_pre_drain_depth: 0,
276            bootstrap: crate::engine::bootstrap::BootstrapState::new(),
277            role_dispatchers: HashMap::new(),
278            slots: HashMap::new(),
279            component_roles: HashMap::new(),
280            cycle_op_budget: crate::node::DEFAULT_CYCLE_OP_BUDGET,
281            max_pending_async: crate::node::DEFAULT_MAX_PENDING_ASYNC,
282            ingress_byte_budget: crate::node::DEFAULT_INGRESS_BYTE_BUDGET,
283            ingress_bytes_in_flight: 0,
284            _not_send: PhantomData,
285        }
286    }
287
288    /// wipe restorable engine state ahead of a
289    /// `Node::restore` call, leaving the install-time-stamped
290    /// surfaces (`graphs`, `functions`, `dispatch_table`,
291    /// `atomic_dispatch`, `components`, `self_peer`,
292    /// `syscall_index`, `role_dispatchers`, `binding_id_index`,
293    /// `lifecycle_table`, `event_subscriptions`,
294    /// `cycle_op_budget`, `max_pending_async`) intact. The Node
295    /// re-applies the snapshot's framework state, ID counters, and
296    /// pending async/completion queues on top of the cleared
297    /// state, so the post-restore Engine is the same install
298    /// re-seeded with the snapshot's restorable transient state.
299    ///
300    /// Restorable surfaces explicitly cleared:
301    /// - `frontier`, `slot_table`, `execution_state`,
302    ///   `pending_async`, `pending_completions`, `pending_calls`,
303    ///   `fired_phases`
304    /// - `framework` (FrameworkComponents reseeds from snapshot)
305    /// - `bus` (re-establishes subscriptions from snapshot)
306    /// - `ingress` queue (fresh; in-flight inbound is the host's
307    ///   responsibility to redeliver)
308    pub fn clear_for_restore(&mut self) {
309        self.exec.frontier.clear();
310        self.exec.slot_table.clear();
311        self.exec.execution_state.clear();
312        self.exec.pending_async.clear();
313        self.exec.pending_completions.clear();
314        self.exec.pending_calls.clear();
315        self.fired_phases.clear();
316        // Slot-table clear above dropped every charged carrier;
317        // reset the counter so the restored snapshot doesn't inherit
318        // an in-flight balance the new state doesn't own.
319        self.ingress_bytes_in_flight = 0;
320        // Restore deliberately suppresses bootstrap re-runs: the
321        // restored Node already executed its bootstrap call before
322        // the snapshot, and replaying would re-seed the address
323        // book, re-fire the first Announce, etc.
324        // `install_order` + `module_bootstraps` stay populated for
325        // introspection (multi-target installs surface every queued
326        // target via [`Self::bootstrap_function_keys`]); `pending`,
327        // `current_exec_id`, and `next_idx` reset so
328        // `Node::run_bootstrap` is a no-op on a restored Node —
329        // bumping the index to the end of `install_order` keeps the
330        // seeder from re-firing if the host nonetheless polls.
331        self.bootstrap.clear_for_restore();
332        self.framework = FrameworkComponents::new();
333        self.bus = TypedBus::new();
334        self.ingress = Arc::new(IngressQueue::new());
335        // ID counters reset to 0; the restore path re-applies the
336        // snapshot's persisted values so post-restore IDs continue
337        // from where the pre-snapshot Node left off ().
338        self.exec.ids.next_exec_id = 0;
339        self.exec.ids.next_command_id = 0;
340    }
341
342    /// Mint a fresh `ExecId`. Replaces the prior static counter
343    /// in `src/ids.rs` so allocation runs single-threaded under
344    /// the engine's borrow discipline.
345    pub fn allocate_exec_id(&mut self) -> ExecId {
346        let id = self.exec.ids.next_exec_id;
347        self.exec.ids.next_exec_id = self
348            .exec
349            .ids
350            .next_exec_id
351            .checked_add(1)
352            .expect("ExecId counter overflow");
353        ExecId::from(id)
354    }
355
356    /// Mint a fresh `CommandId`. Used by async-suspending syscall
357    /// ops via `RuntimeResourceRef::next_command_id`.
358    pub fn allocate_command_id(&mut self) -> CommandId {
359        let id = self.exec.ids.next_command_id;
360        self.exec.ids.next_command_id = self
361            .exec
362            .ids
363            .next_command_id
364            .checked_add(1)
365            .expect("CommandId counter overflow");
366        CommandId::from(id)
367    }
368
369    /// Mint a fresh `NodeSiteId`. Used by graph installation; sites
370    /// must be globally unique across installed graphs.
371    pub fn allocate_node_site_id(&mut self) -> NodeSiteId {
372        let id = self.exec.ids.next_node_site_id;
373        self.exec.ids.next_node_site_id = self
374            .exec
375            .ids
376            .next_node_site_id
377            .checked_add(1)
378            .expect("NodeSiteId counter overflow");
379        NodeSiteId::from(id)
380    }
381
382    /// Drop slot_table and execution_state entries belonging to
383    /// executions that have completed. An execution is complete
384    /// when it has no frontier entries, no pending_async entries,
385    /// and no pending_calls entry pointing at its `ExecId`. Called
386    /// at the end of every `poll()` cycle so a long-running Node
387    /// keeps a bounded slot_table.
388    pub(crate) fn gc_completed_executions(&mut self) {
389        if self.exec.execution_state.is_empty() {
390            return;
391        }
392        let mut live: std::collections::HashSet<ExecId> =
393            std::collections::HashSet::with_capacity(self.exec.execution_state.len());
394        for (_, exec_id) in &self.exec.frontier {
395            live.insert(*exec_id);
396        }
397        for p in self.exec.pending_async.values() {
398            live.insert(p.exec_id);
399        }
400        for exec_id in self.exec.pending_calls.keys() {
401            live.insert(*exec_id);
402        }
403        let dead: Vec<ExecId> = self
404            .exec
405            .execution_state
406            .keys()
407            .copied()
408            .filter(|e| !live.contains(e))
409            .collect();
410        if dead.is_empty() {
411            return;
412        }
413        let dead_set: std::collections::HashSet<ExecId> = dead.iter().copied().collect();
414        // Walk doomed slot entries once to release any charged
415        // ingress bytes the slot-table writer admitted, then drop
416        // the entries. `retain` would let us mutate-in-place, but
417        // it borrows the table mutably for the entire walk; the
418        // explicit collect-then-remove pattern lets us drain
419        // `charged_bytes()` from each prior carrier first.
420        let doomed_keys: Vec<(NodeSiteId, ExecId)> = self
421            .exec
422            .slot_table
423            .iter()
424            .filter_map(|(key, _)| dead_set.contains(&key.1).then_some(*key))
425            .collect();
426        for key in doomed_keys {
427            // `clear_slot` releases the prior carrier's
428            // `charged_bytes()` against `ingress_bytes_in_flight`.
429            // Non-ingress carriers report 0 — release is a no-op.
430            let _ = self.clear_slot(key.0, key.1);
431        }
432        for exec_id in &dead {
433            self.exec.execution_state.remove(exec_id);
434        }
435    }
436
437    /// Apply production-safety caps from a `NodeConfig`. Called by
438    /// `Node::ensure_ready` after constructing the Engine; tests can
439    /// invoke directly to exercise specific cap values.
440    pub fn apply_config_caps(&mut self, config: &crate::node::NodeConfig) {
441        self.cycle_op_budget = config.cycle_op_budget;
442        self.max_pending_async = config.max_pending_async;
443        self.ingress_byte_budget = config.ingress_byte_budget;
444        self.framework
445            .outbound_queue
446            .set_cap(config.max_outbound_queue);
447        self.bus.set_cap(Some(config.bus_capacity));
448        // The off-thread `CompletionSink::complete` path consults the
449        // ingress queue itself for its per-item cap; reseed the
450        // atomic so sinks created before this call see the configured
451        // value on their next push.
452        self.ingress
453            .set_completion_result_cap(config.max_completion_result_bytes);
454        // Reseed the BackpressureTracker with the configured knobs.
455        // `apply_config_caps` is the canonical entry the host calls
456        // before the first poll, so a fresh tracker reflecting the
457        // resolved knobs is the only state observers see.
458        self.framework.peer_state.backpressure = crate::framework::BackpressureTracker::with_config(
459            config.backpressure_high_water_pct,
460            config.backpressure_k_before_silent,
461            config.backpressure_min_notice_interval_ns,
462        );
463    }
464
465    /// Live count of ingress bytes the engine currently holds across
466    /// its ingress queue + slot table + pending async completion
467    /// buffers. Updated by every successful charge / release pair on
468    /// the ingress paths. Surfaced for observability (operator
469    /// dashboards) and assertions.
470    pub fn ingress_bytes_in_flight(&self) -> usize {
471        self.ingress_bytes_in_flight
472    }
473
474    /// Configured cap on cumulative in-flight ingress bytes,
475    /// sourced from `NodeConfig::ingress_byte_budget`. Constant
476    /// between `apply_config_caps` calls.
477    pub fn ingress_byte_budget(&self) -> usize {
478        self.ingress_byte_budget
479    }
480
481    /// Pre-admission budget guard for an ingress payload of length
482    /// `bytes`. On success the bytes are added to
483    /// `ingress_bytes_in_flight` and the caller may install the
484    /// resulting carrier into the slot table or pending-completion
485    /// queue. On overflow the counter is left unchanged and the
486    /// caller drops the payload, emitting the appropriate
487    /// `BudgetExceeded` `InfraEvent`.
488    ///
489    /// One saturating-add + one comparison; below the cost of the
490    /// prost decode that typically follows.
491    pub(crate) fn try_charge(&mut self, bytes: usize) -> Result<(), BudgetExceededReason> {
492        let after = self.ingress_bytes_in_flight.saturating_add(bytes);
493        if after > self.ingress_byte_budget {
494            return Err(BudgetExceededReason {
495                byte_count: bytes,
496                budget_remaining: self
497                    .ingress_byte_budget
498                    .saturating_sub(self.ingress_bytes_in_flight),
499            });
500        }
501        self.ingress_bytes_in_flight = after;
502        Ok(())
503    }
504
505    /// Decrement `ingress_bytes_in_flight` after a charged payload
506    /// leaves engine state — slot-table overwrite, slot clear,
507    /// eviction, or in-cycle drop. `saturating_sub` defends against
508    /// a release path that arrives without a paired charge (e.g. a
509    /// snapshot replay reseeding the slot table before any wire
510    /// traffic).
511    pub(crate) fn release(&mut self, bytes: usize) {
512        self.ingress_bytes_in_flight = self.ingress_bytes_in_flight.saturating_sub(bytes);
513    }
514
515    /// Write a value into the slot table at `(site, exec_id)`,
516    /// releasing the prior occupant's `charged_bytes` against
517    /// `ingress_bytes_in_flight`. The incoming carrier's
518    /// `charged_bytes` is NOT re-added — admission callers
519    /// (`decode_typed_fill`, `deliver_event`, etc.) have already run
520    /// `try_charge` against the wire-byte budget. This helper is the
521    /// slot-table-side bookkeeping that closes the loop on overwrite.
522    ///
523    /// Returns the prior boxed value (if any) so the caller can
524    /// run additional teardown.
525    pub(crate) fn slot_write(
526        &mut self,
527        site: NodeSiteId,
528        exec_id: ExecId,
529        value: Box<dyn SlotValue>,
530    ) -> Option<Box<dyn SlotValue>> {
531        let key = (site, exec_id);
532        let prior = self.exec.slot_table.insert(key, Some(value));
533        // `prior` is `Option<Option<Box<dyn SlotValue>>>`: outer None
534        // means the slot was untouched; inner None means the slot
535        // existed but was empty (cleared previously). Release only
536        // when the prior carrier was alive.
537        match prior {
538            Some(Some(prior_box)) => {
539                self.ingress_bytes_in_flight = self
540                    .ingress_bytes_in_flight
541                    .saturating_sub(prior_box.charged_bytes());
542                Some(prior_box)
543            }
544            _ => None,
545        }
546    }
547
548    /// Remove the slot at `(site, exec_id)`, releasing any
549    /// `charged_bytes` the prior carrier was holding against
550    /// `ingress_bytes_in_flight`. Returns the removed carrier so
551    /// downstream paths (graph reset, GC) can pass it onward.
552    pub(crate) fn clear_slot(
553        &mut self,
554        site: NodeSiteId,
555        exec_id: ExecId,
556    ) -> Option<Box<dyn SlotValue>> {
557        let key = (site, exec_id);
558        match self.exec.slot_table.remove(&key) {
559            Some(Some(prior_box)) => {
560                self.ingress_bytes_in_flight = self
561                    .ingress_bytes_in_flight
562                    .saturating_sub(prior_box.charged_bytes());
563                Some(prior_box)
564            }
565            _ => None,
566        }
567    }
568
569    /// Snapshot of the engine's hot-path state, sized for cheap
570    /// reads on every poll cycle (no allocation). Production
571    /// observability: operators see saturation building up before
572    /// the process locks up.
573    pub fn engine_stats(&self) -> EngineStats {
574        EngineStats {
575            frontier_len: self.exec.frontier.len(),
576            bus_len: self.bus.len(),
577            pending_async: self.exec.pending_async.len(),
578            slot_table_occupied: self.exec.slot_table.len(),
579            ingress_depth: self.ingress.len(),
580            outbound_queue_depth: self.framework.outbound_queue.len(),
581            event_subscriptions: self.event_subscriptions.len(),
582            registered_components: self.components.len(),
583            graph_slots: self.graphs.len(),
584        }
585    }
586
587    /// Register a `ProtocolRuntime` dispatcher for the concrete
588    /// component type `T`. Call once per `T` after constructing the
589    /// Engine - typically alongside `register_component` for any
590    /// component whose `dispatch_atomic` you want to drive. Indexed
591    /// by `TypeId::of::<T>()` so dispatch is one HashMap lookup,
592    /// not a linear scan across the registry.
593    pub fn register_protocol_dispatcher<T: crate::roles::ProtocolRuntime + 'static>(&mut self)
594    where
595        T::Error: std::fmt::Display,
596    {
597        let type_id = std::any::TypeId::of::<T>();
598        self.role_dispatchers
599            .insert(type_id, make_protocol_dispatcher::<T>());
600    }
601
602    /// Register a role dispatcher keyed by `TypeId::of::<T>()` for a
603    /// concrete `IndexRuntime` impl. Lets `Node::with_index(&value)`
604    /// wire atomic dispatch even when `T` does not implement
605    /// `ProtocolRuntime`. Calling this twice for the same `T`
606    /// silently overwrites; the dispatcher is idempotent because
607    /// `T::dispatch_atomic` is the only consumer.
608    pub fn register_index_dispatcher<T: crate::roles::IndexRuntime + 'static>(&mut self)
609    where
610        <T as crate::roles::IndexRuntime>::Error: std::fmt::Display,
611    {
612        let type_id = std::any::TypeId::of::<T>();
613        self.role_dispatchers
614            .insert(type_id, crate::engine::invoke::make_index_dispatcher::<T>());
615    }
616
617    /// Register an `AggregatorRuntime` dispatcher. See
618    /// [`Engine::register_index_dispatcher`] for the rationale.
619    pub fn register_aggregator_dispatcher<T: crate::roles::AggregatorRuntime + 'static>(&mut self)
620    where
621        <T as crate::roles::AggregatorRuntime>::Error: std::fmt::Display,
622    {
623        let type_id = std::any::TypeId::of::<T>();
624        self.role_dispatchers.insert(
625            type_id,
626            crate::engine::invoke::make_aggregator_dispatcher::<T>(),
627        );
628    }
629
630    /// Register a `ModelRuntime` dispatcher. See
631    /// [`Engine::register_index_dispatcher`] for the rationale.
632    pub fn register_model_dispatcher<T: crate::roles::ModelRuntime + 'static>(&mut self)
633    where
634        <T as crate::roles::ModelRuntime>::Error: std::fmt::Display,
635    {
636        let type_id = std::any::TypeId::of::<T>();
637        self.role_dispatchers
638            .insert(type_id, crate::engine::invoke::make_model_dispatcher::<T>());
639    }
640
641    /// Register a `CodecRuntime` dispatcher. See
642    /// [`Engine::register_index_dispatcher`] for the rationale.
643    pub fn register_codec_dispatcher<T: crate::roles::CodecRuntime + 'static>(&mut self)
644    where
645        <T as crate::roles::CodecRuntime>::Error: std::fmt::Display,
646    {
647        let type_id = std::any::TypeId::of::<T>();
648        self.role_dispatchers
649            .insert(type_id, crate::engine::invoke::make_codec_dispatcher::<T>());
650    }
651
652    /// Register a `DataSourceRuntime` dispatcher. See
653    /// [`Engine::register_index_dispatcher`] for the rationale.
654    pub fn register_data_source_dispatcher<T: crate::roles::DataSourceRuntime + 'static>(&mut self)
655    where
656        <T as crate::roles::DataSourceRuntime>::Error: std::fmt::Display,
657    {
658        let type_id = std::any::TypeId::of::<T>();
659        self.role_dispatchers.insert(
660            type_id,
661            crate::engine::invoke::make_data_source_dispatcher::<T>(),
662        );
663    }
664
665    /// Register a `PeerSelectorRuntime` dispatcher. See
666    /// [`Engine::register_index_dispatcher`] for the rationale.
667    pub fn register_peer_selector_dispatcher<T: crate::roles::PeerSelectorRuntime + 'static>(
668        &mut self,
669    ) where
670        <T as crate::roles::PeerSelectorRuntime>::Error: std::fmt::Display,
671    {
672        let type_id = std::any::TypeId::of::<T>();
673        self.role_dispatchers.insert(
674            type_id,
675            crate::engine::invoke::make_peer_selector_dispatcher::<T>(),
676        );
677    }
678
679    /// Register a `BackendRuntime` dispatcher. The `Backend` Contract
680    /// trait's per-atomic-op surface dispatches through this entry,
681    /// emitted from `#[derive(bb::Backend)]`.
682    pub fn register_backend_dispatcher<T: crate::roles::BackendRuntime + 'static>(&mut self)
683    where
684        <T as crate::roles::BackendRuntime>::Error: std::fmt::Display,
685    {
686        let type_id = std::any::TypeId::of::<T>();
687        self.role_dispatchers.insert(
688            type_id,
689            crate::engine::invoke::make_backend_dispatcher::<T>(),
690        );
691    }
692
693    /// Record the inventory-declared roles for a registered
694    /// component. `Node::ensure_ready` calls this once per
695    /// `ComponentRef` after registration, passing the set computed
696    /// from `crate::registry::roles_for_component(T::TYPE_NAME)`.
697    pub fn set_component_roles(
698        &mut self,
699        cref: ComponentRef,
700        roles: std::collections::HashSet<crate::registry::ComponentRole>,
701    ) {
702        self.component_roles.insert(cref, roles);
703    }
704
705    /// Return the inventory-declared roles for a registered
706    /// component, or an empty set if the component wasn't registered
707    /// through a derive emitting `ComponentRoleBinding` entries.
708    /// Introspection surface for engine tests + host tooling.
709    pub fn roles_for(
710        &self,
711        cref: ComponentRef,
712    ) -> std::collections::HashSet<crate::registry::ComponentRole> {
713        self.component_roles.get(&cref).cloned().unwrap_or_default()
714    }
715
716    /// Register a `binding_id → ComponentRef` mapping. Called by
717    /// `Node::ensure_ready` after binding resolution so the bound-
718    /// backend lookup can resolve a NodeProto's `binding_id`
719    /// metadata against an installed component.
720    pub fn register_binding_id(&mut self, binding_id: String, cref: ComponentRef) {
721        self.binding_id_index.insert(binding_id, cref);
722    }
723
724    /// Look up the `ComponentRef` bound at the given slot name -
725    /// the GENERIC dependency-resolution accessor. Components reach
726    /// declared dependencies through this method (typically via
727    /// [`crate::runtime::ComponentsView::for_slot`] at dispatch
728    /// time). Returns `None` when no slot of that name is bound.
729    pub fn slot(&self, slot: &str) -> Option<ComponentRef> {
730        self.slots.get(slot).copied()
731    }
732
733    /// Bind a `ComponentRef` at the given slot name. The
734    /// `install` facade in the `bytesandbrains` crate calls this
735    /// from the T8 chain; in-crate callers use it from the
736    /// transitional `Node::with_backend(slot, &b)` path. Returns
737    /// the previous binding if any.
738    pub fn bind_slot(&mut self, slot: String, cref: ComponentRef) -> Option<ComponentRef> {
739        self.slots.insert(slot, cref)
740    }
741
742    /// Register the compiler-assigned `slot_id` → `ComponentRef`
743    /// mapping. Called by `bb::install()` alongside
744    /// [`Self::bind_slot`]; the pair is read by
745    /// [`Self::resolve_dispatch`] when stamping
746    /// `OpDispatch::Atomic` against a role NodeProto's
747    /// `ai.bytesandbrains.slot_id` metadata. Returns the previous
748    /// binding, if any.
749    pub fn bind_slot_id(&mut self, slot_id: u32, cref: ComponentRef) -> Option<ComponentRef> {
750        self.slot_id_to_cref.insert(slot_id, cref)
751    }
752
753    /// Register the compiler-assigned `slot_id` → `(role,
754    /// ComponentRef)` mapping. Called by `bb::install()` alongside
755    /// [`Self::bind_slot_id`]; the role is required so
756    /// `decode_typed_fill` can branch between framework-carrier
757    /// decode (`Codec`, `Index`, …) and backend-mediated tensor
758    /// materialisation (`Backend`). Returns the previous binding, if
759    /// any.
760    pub fn bind_slot_id_with_role(
761        &mut self,
762        slot_id: u32,
763        role: crate::registry::ComponentRole,
764        cref: ComponentRef,
765    ) -> Option<(crate::registry::ComponentRole, ComponentRef)> {
766        self.slot_id_to_role_ref.insert(slot_id, (role, cref))
767    }
768
769    /// Look up the `(role, ComponentRef)` bound at a compiler-assigned
770    /// `slot_id`. Used by `decode_typed_fill` to discover whether an
771    /// inbound wire payload routes through a backend.
772    pub fn role_ref_for_slot_id(
773        &self,
774        slot_id: u32,
775    ) -> Option<(crate::registry::ComponentRole, ComponentRef)> {
776        self.slot_id_to_role_ref.get(&slot_id).copied()
777    }
778
779    /// Iterate every `(slot_name, ComponentRef)` pair currently
780    /// bound. Surfaces the registry to introspection tools + the
781    /// compiler's resolve-dependencies pass.
782    pub fn slots_iter(&self) -> impl Iterator<Item = (&str, ComponentRef)> {
783        self.slots.iter().map(|(k, v)| (k.as_str(), *v))
784    }
785
786    /// Subscribe a `NodeSiteId` to bus events of `event_kind` (the
787    /// discriminator returned by [`crate::bus::NodeEvent::kind`]).
788    /// The bus-routing pass writes a `TriggerValue` to each
789    /// subscribed site at a fresh `ExecId` and pushes the site's
790    /// downstream consumers onto the frontier — uniform with wire
791    /// delivery semantics per `docs/ADDRESSING.md`.
792    ///
793    /// `Node` calls this at install time for every
794    /// `EventSource` syscall op, passing the op's output `NodeSiteId`.
795    pub fn register_event_subscription(&mut self, event_kind: &str, site: NodeSiteId) {
796        let entry = self
797            .event_subscriptions
798            .entry(event_kind.to_string())
799            .or_default();
800        if !entry.contains(&site) {
801            entry.push(site);
802        }
803    }
804
805    /// Cheap clone of the shared `IngressQueue` handle. Test
806    /// harnesses + transport adapters push `IngressEvent`s through
807    /// this surface.
808    pub fn ingress_queue_handle(&self) -> Arc<IngressQueue> {
809        Arc::clone(&self.ingress)
810    }
811
812    /// Queue a lifecycle phase for firing on the next `poll()`
813    /// call. The framework emits `EngineStep::LifecycleFired { phase }`
814    /// for each queued phase and also pushes every `LifecyclePhase`
815    /// op enrolled under that phase name (via
816    /// [`Engine::register_lifecycle_op`]) onto the frontier with a
817    /// fresh `ExecId`.
818    pub fn fire_lifecycle(&mut self, phase: &str) {
819        self.fired_phases.push(phase.to_string());
820    }
821
822    /// Enroll `op_ref` under `phase` per IR_AND_DSL.md §5a.
823    /// Idempotent - the same `(phase, OpRef)` pair never enrolls
824    /// twice. `Node` calls this at install time after parsing
825    /// each `LifecyclePhase` NodeProto's `phase` attribute.
826    pub fn register_lifecycle_op(&mut self, phase: &str, op_ref: OpRef) {
827        let entry = self.lifecycle_table.entry(phase.to_string()).or_default();
828        if !entry.contains(&op_ref) {
829            entry.push(op_ref);
830        }
831    }
832
833    /// Register a stateless syscall op. Captures `TypeId::of::<T>()`
834    /// into both `dispatch_table` (TypeId → invoke fn) and
835    /// `syscall_index` ((domain, op_type) → TypeId) so
836    /// `resolve_dispatch` can stamp `OpDispatch::Stateless`.
837    ///
838    /// Register a stateless syscall by its `(domain, op_type)` key.
839    pub fn register_syscall(&mut self, domain: &str, op_type: &str, invoke_fn: StatelessInvokeFn) {
840        self.syscall_table
841            .insert((domain.to_string(), op_type.to_string()), invoke_fn);
842    }
843
844    /// Install every framework syscall shipped via inventory by
845    /// `bb-ops`. Each registration carries its own
846    /// `(domain, op_type)` + invoke pointer; the engine stamps them
847    /// into `syscall_table`.
848    pub fn register_all_framework_syscalls(&mut self) {
849        for reg in crate::registry::framework_syscalls() {
850            self.syscall_table.insert(
851                (reg.domain.to_string(), reg.op_type.to_string()),
852                reg.invoke,
853            );
854        }
855    }
856
857    /// Test-only installer. Inserts a fresh `GraphSlot` keyed by
858    /// `name` with empty per-node tables but with `op_dispatch`
859    /// pre-filled with `Unresolved` so subsequent `resolve_dispatch`
860    /// can stamp dispatch decisions. Use [`Engine::install_graph`]
861    /// for the canonical path that walks the FunctionProto.
862    #[cfg(any(test, feature = "test-components"))]
863    pub fn install_graph_for_test(
864        &mut self,
865        name: String,
866        function: FunctionProto,
867    ) -> &mut GraphSlot {
868        let mut g = GraphSlot::new_for_test(name.clone(), function);
869        g.op_dispatch = (0..g.function.node.len())
870            .map(|_| crate::engine::dispatch_entry::OpDispatch::Unresolved)
871            .collect();
872        let idx = self.push_graph_slot(name, g);
873        &mut self.graphs[idx as usize]
874    }
875
876    /// Push a `GraphSlot` onto the storage Vec and register its
877    /// name → index entry. Returns the assigned `graph_idx` (the
878    /// value that gets packed into `OpRef`). Same name twice
879    /// overwrites the existing slot but keeps the original
880    /// `graph_idx` — preserving OpRef stability across re-install.
881    pub(crate) fn push_graph_slot(&mut self, name: String, slot: GraphSlot) -> u32 {
882        if let Some(&idx) = self.graph_index.get(&name) {
883            self.graphs[idx as usize] = slot;
884            return idx;
885        }
886        let idx = self.graphs.len() as u32;
887        self.graph_index.insert(name, idx);
888        self.graphs.push(slot);
889        idx
890    }
891
892    /// Resolve a graph by name. Returns `None` when the name
893    /// isn't registered. Equivalent to `self.graphs.get(name)` on
894    /// the prior HashMap-keyed shape.
895    pub fn graph(&self, name: &str) -> Option<&GraphSlot> {
896        let idx = *self.graph_index.get(name)?;
897        self.graphs.get(idx as usize)
898    }
899
900    /// Resolve a graph by name for mutation. `None` when the name
901    /// isn't registered.
902    pub fn graph_mut(&mut self, name: &str) -> Option<&mut GraphSlot> {
903        let idx = *self.graph_index.get(name)?;
904        self.graphs.get_mut(idx as usize)
905    }
906
907    /// `true` when a graph with this name is installed.
908    pub fn has_graph(&self, name: &str) -> bool {
909        self.graph_index.contains_key(name)
910    }
911
912    /// Resolve a graph's positional index by name. Used by paths
913    /// that need to compute `OpRef::pack(idx, node_idx)` from a
914    /// graph name (function-call site resolution, etc.).
915    pub fn graph_idx(&self, name: &str) -> Option<u32> {
916        self.graph_index.get(name).copied()
917    }
918
919    /// Build an `OpRef` for the `node_idx`-th NodeProto of a graph
920    /// identified by name. Test-only convenience for tests that
921    /// used to fish the OpRef out of `GraphSlot.op_index`; with
922    /// positional `OpRef::pack(graph_idx, node_idx)` the lookup is
923    /// trivial.
924    #[cfg(any(test, feature = "test-components"))]
925    pub fn op_ref_at(&self, graph_name: &str, node_idx: u32) -> Option<OpRef> {
926        let gi = self.graph_idx(graph_name)?;
927        let g = self.graphs.get(gi as usize)?;
928        if (node_idx as usize) < g.function.node.len() {
929            Some(OpRef::pack(gi, node_idx))
930        } else {
931            None
932        }
933    }
934
935    /// Iterate every installed `GraphSlot` in install order.
936    pub fn graphs_iter(&self) -> impl Iterator<Item = &GraphSlot> {
937        self.graphs.iter()
938    }
939
940    /// Iterate every (`name`, `&GraphSlot`) pair in install order.
941    pub fn graphs_named(&self) -> impl Iterator<Item = (&str, &GraphSlot)> {
942        // graph_index maps name -> idx; rebuild idx -> name for the
943        // walk so the iteration order matches the storage Vec.
944        let mut by_idx: Vec<(u32, &str)> = self
945            .graph_index
946            .iter()
947            .map(|(n, i)| (*i, n.as_str()))
948            .collect();
949        by_idx.sort_by_key(|&(i, _)| i);
950        by_idx
951            .into_iter()
952            .filter_map(move |(i, n)| self.graphs.get(i as usize).map(|g| (n, g)))
953    }
954
955    /// Canonical install path: builds an
956    /// [`GraphSlot`] from the FunctionProto (allocating
957    /// `OpRef`s + `NodeSiteId`s for every node + produced value) and
958    /// inserts it under `name`.
959    ///
960    /// Used by [`crate::node::Node::ready`] for each
961    /// `ModelProto.functions[0]`. Returns a mutable reference
962    /// for any subsequent setup (slot_bindings, local_event_subs).
963    pub fn install_graph(&mut self, name: String, function: FunctionProto) -> &mut GraphSlot {
964        let graph_idx = self.graphs.len() as u32;
965        let mut g = GraphSlot::from_function(
966            name.clone(),
967            function,
968            graph_idx,
969            &mut self.exec.ids.next_node_site_id,
970        );
971        // Entry-point graphs (installed via `install_graph`, not
972        // `install_function_library`) get a `NodeSiteId` registered
973        // for every function input so `Engine::deliver_app_event`
974        // can seed the input via ingress. Body functions used in
975        // `OpDispatch::FunctionCall` deliberately route through
976        // `input_aliases` and must NOT get input sites; that path
977        // installs through `install_function_library` instead.
978        register_function_input_sites(&mut g, &mut self.exec.ids.next_node_site_id, graph_idx);
979        let idx = self.push_graph_slot(name, g);
980        &mut self.graphs[idx as usize]
981    }
982
983    /// Runtime-linker install: walk `model.functions[]` and install
984    /// each FunctionProto as an `GraphSlot` keyed by its
985    /// canonical `(domain, name, overload)`-derived string. Also
986    /// populates the symbol-table index `functions` keyed on the same
987    /// tuple, so call NodeProtos can be resolved at dispatch time.
988    ///
989    /// `entry_point_keys` lists the `FunctionKey`s for the registered
990    /// Modules' main partition functions - those graphs get
991    /// `is_entry_point = true` (their top-level outputs surface as
992    /// `EngineStep::AppEvent`; sub-function bodies do not).
993    ///
994    /// A function stamped `MODULE_PHASE_KEY = "bootstrap"` registers
995    /// its `FunctionKey` with the engine's bootstrap state (appends
996    /// to `install_order`, populates `module_bootstraps`) without
997    /// arming `pending` — install is pure. The host arms the queue by
998    /// calling [`crate::node::Node::run_bootstrap`], which fans out
999    /// each install-order target serially and emits one
1000    /// `BootstrapComplete` step per drained phase; multi-target
1001    /// installs surface their targets in slice order without further
1002    /// host action.
1003    ///
1004    /// Idempotent under ODR (same key + same body) - silently skips
1005    /// reinstall. Caller (Node linker) is responsible for the
1006    /// byte-equality check before calling.
1007    pub fn install_function_library(
1008        &mut self,
1009        functions: &[FunctionProto],
1010        entry_point_keys: &[FunctionKey],
1011    ) {
1012        let entry_set: std::collections::HashSet<&FunctionKey> = entry_point_keys.iter().collect();
1013        for f in functions {
1014            let key: FunctionKey = (f.domain.clone(), f.name.clone(), f.overload.clone());
1015            let graph_name = graph_name_for(&key);
1016            if self.has_graph(&graph_name) {
1017                continue;
1018            }
1019            let graph_idx = self.graphs.len() as u32;
1020            let mut g = GraphSlot::from_function(
1021                graph_name.clone(),
1022                f.clone(),
1023                graph_idx,
1024                &mut self.exec.ids.next_node_site_id,
1025            );
1026            // Only entry-point functions surface their outputs as
1027            // AppEvents. Sub-function bodies' outputs are forwarded
1028            // via output_forwarding at call sites.
1029            g.is_entry_point = entry_set.contains(&key);
1030            if !g.is_entry_point {
1031                g.top_level_outputs.clear();
1032            }
1033            let is_bootstrap = bb_ir::keys::read_function_module_phase(f)
1034                .is_some_and(|p| p == bb_ir::keys::MODULE_PHASE_BOOTSTRAP);
1035            // Bootstrap functions seed their inputs through the
1036            // host-driven staging path
1037            // (`Node::run_bootstrap(&[BootstrapInput])`) rather than
1038            // via a FunctionCall splice. Mint a `NodeSiteId` per
1039            // declared input formal so the staging path can address
1040            // the slot via `(NodeSiteId, body_exec_id)` and the body
1041            // ops can resolve their input names through
1042            // `resolve_site_name`.
1043            if is_bootstrap {
1044                register_function_input_sites(
1045                    &mut g,
1046                    &mut self.exec.ids.next_node_site_id,
1047                    graph_idx,
1048                );
1049            }
1050            self.push_graph_slot(graph_name, g);
1051            self.functions.insert(key.clone(), f.clone());
1052            if is_bootstrap {
1053                // Register the bootstrap target. A multi-target
1054                // install registers one bootstrap per target (in
1055                // the order [`crate::install::install`] iterates the
1056                // user-supplied `targets` slice). Seeding drains
1057                // `install_order` front-to-back so each target's
1058                // bootstrap fires in slice order.
1059                self.bootstrap.register_module(key);
1060            }
1061        }
1062    }
1063
1064    /// Allocate the next bootstrap call's body `ExecId` and push every
1065    /// body OpRef of the front of
1066    /// [`crate::engine::bootstrap::BootstrapState::install_order`]
1067    /// onto the frontier. Returns `true` when a bootstrap call was
1068    /// seeded; `false` when the engine has no remaining bootstrap
1069    /// functions or the previous call is still in flight.
1070    ///
1071    /// Host-driven: `Node::run_bootstrap(&[])` invokes this once after
1072    /// arming `bootstrap.pending`; the poll cascade reseeds via
1073    /// `maybe_complete_bootstrap` after each phase drains so multi-
1074    /// target installs surface one `BootstrapComplete` per target in
1075    /// install order without further host action. Install itself no
1076    /// longer arms `pending`.
1077    pub(crate) fn seed_bootstrap_call(&mut self) -> bool {
1078        if self.bootstrap.current_exec_id.is_some() {
1079            return false;
1080        }
1081        if self.bootstrap.next_idx >= self.bootstrap.install_order.len() {
1082            // No further bootstraps to seed; the gate clears on the
1083            // next `maybe_complete_bootstrap` pass.
1084            self.bootstrap.pending = false;
1085            return false;
1086        }
1087        let target_name = self.bootstrap.install_order[self.bootstrap.next_idx].clone();
1088        let Some(meta) = self.bootstrap.module_bootstraps.get(&target_name) else {
1089            // Defensive skip: install_order names a target whose
1090            // metadata is missing. Advance past the stale entry
1091            // rather than wedging.
1092            self.bootstrap.next_idx += 1;
1093            return false;
1094        };
1095        let key = meta.function_key.clone();
1096        if self.fire_module_bootstrap(target_name, &key).is_none() {
1097            self.bootstrap.next_idx += 1;
1098            return false;
1099        }
1100        true
1101    }
1102
1103    /// Seed a Module bootstrap body onto the frontier under a fresh
1104    /// ExecId and record its ExecId in `bootstrap.current_exec_id`.
1105    /// Returns the body ExecId on success or `None` when the graph
1106    /// name is missing (defensive — install populates it).
1107    fn fire_module_bootstrap(&mut self, target_name: String, key: &FunctionKey) -> Option<ExecId> {
1108        let graph_name = graph_name_for(key);
1109        let graph_idx = self.graph_idx(&graph_name)?;
1110        let body_exec_id = self.allocate_exec_id();
1111        let node_count = self
1112            .graphs
1113            .get(graph_idx as usize)
1114            .map(|g| g.function.node.len())
1115            .unwrap_or(0);
1116        // Bootstrap takes no input formals and produces no outputs,
1117        // so no CallContext lives in `pending_calls`; the body-op
1118        // gate identifies bootstrap-descendant ExecIds by either
1119        // direct match against the current bootstrap ExecId or chain
1120        // walk through descendant FunctionCall CallContexts.
1121        // Quiescence resolves through `maybe_complete_bootstrap` once
1122        // every descendant frontier + pending_async entry clears.
1123        self.bootstrap
1124            .mark_module_in_flight(target_name, body_exec_id);
1125        for node_idx in 0..node_count as u32 {
1126            let op_ref = OpRef::pack(graph_idx, node_idx);
1127            self.exec.frontier.push_back((op_ref, body_exec_id));
1128        }
1129        Some(body_exec_id)
1130    }
1131
1132    /// Stage one [`crate::engine::BootstrapInput`] against its target's
1133    /// declared formal inputs, copy the bytes via Principle 1a, and
1134    /// seed the body onto the frontier. Helper called by
1135    /// [`Self::run_bootstrap`] per non-empty target.
1136    fn enqueue_module_bootstrap(
1137        &mut self,
1138        request: crate::engine::bootstrap::BootstrapInput<'_>,
1139    ) -> Result<(), crate::errors::BootstrapError> {
1140        // 1. Resolve target → function_key → graph_idx.
1141        let meta = self
1142            .bootstrap
1143            .module_bootstraps
1144            .get(request.target)
1145            .ok_or_else(|| crate::errors::BootstrapError::UnknownTarget {
1146                target_name: request.target.to_string(),
1147                available: self.bootstrap.install_order.clone(),
1148            })?;
1149        let function_key = meta.function_key.clone();
1150        let graph_name = graph_name_for(&function_key);
1151        let graph_idx = self.graph_idx(&graph_name).ok_or_else(|| {
1152            crate::errors::BootstrapError::UnknownTarget {
1153                target_name: request.target.to_string(),
1154                available: self.bootstrap.install_order.clone(),
1155            }
1156        })?;
1157        let graph = &self.graphs[graph_idx as usize];
1158
1159        // 2. Read declared input formals from the GraphSlot.
1160        let declared: Vec<String> = graph.function.input.clone();
1161
1162        // 3. Validate. UnknownInput fires first (the supplied set is
1163        // the host's authoritative request shape; surfacing extras
1164        // before missing ones gives clearer diagnostics on typos).
1165        for (input_name, _) in request.inputs {
1166            if !declared.iter().any(|d| d == input_name) {
1167                return Err(crate::errors::BootstrapError::UnknownInput {
1168                    target_name: request.target.to_string(),
1169                    input_name: input_name.to_string(),
1170                    declared: declared.clone(),
1171                });
1172            }
1173        }
1174        for formal in &declared {
1175            if !request.inputs.iter().any(|(name, _)| *name == formal) {
1176                return Err(crate::errors::BootstrapError::MissingInput {
1177                    target_name: request.target.to_string(),
1178                    input_name: formal.clone(),
1179                });
1180            }
1181        }
1182
1183        // Resolve each formal to its NodeSiteId before allocating the
1184        // ExecId — a missing site at this stage is an install
1185        // invariant violation, but defending against it keeps the
1186        // error path total.
1187        let mut sites: Vec<(crate::ids::NodeSiteId, &[u8])> =
1188            Vec::with_capacity(request.inputs.len());
1189        for (input_name, bytes) in request.inputs {
1190            let Some(&site) = graph.site_names.get(*input_name) else {
1191                return Err(crate::errors::BootstrapError::UnknownInput {
1192                    target_name: request.target.to_string(),
1193                    input_name: input_name.to_string(),
1194                    declared: declared.clone(),
1195                });
1196            };
1197            sites.push((site, *bytes));
1198        }
1199
1200        // 4. Allocate body ExecId. Done after validation so a rejected
1201        // request does not consume an ExecId counter slot.
1202        let body_exec_id = self.allocate_exec_id();
1203
1204        // 5. Per-input charge + Principle 1a copy. Track total
1205        // admitted bytes so a mid-loop failure releases the full
1206        // charge in one shot.
1207        let mut admitted: usize = 0;
1208        for (site, bytes) in &sites {
1209            let byte_count = bytes.len();
1210            if let Err(reason) = self.try_charge(byte_count) {
1211                self.release(admitted);
1212                return Err(crate::errors::BootstrapError::AllocationFailed {
1213                    target_name: request.target.to_string(),
1214                    byte_count,
1215                    budget_remaining: reason.budget_remaining,
1216                });
1217            }
1218            admitted = admitted.saturating_add(byte_count);
1219            let mut owned: Vec<u8> = Vec::new();
1220            if crate::fallible::try_reserve_exact(&mut owned, byte_count).is_err() {
1221                self.release(admitted);
1222                return Err(crate::errors::BootstrapError::AllocationFailed {
1223                    target_name: request.target.to_string(),
1224                    byte_count,
1225                    budget_remaining: self
1226                        .ingress_byte_budget
1227                        .saturating_sub(self.ingress_bytes_in_flight),
1228                });
1229            }
1230            owned.extend_from_slice(bytes);
1231            let value: Box<dyn crate::slot_value::SlotValue> =
1232                Box::new(crate::syscall::values::BytesValue(owned));
1233            self.slot_write(*site, body_exec_id, value);
1234        }
1235
1236        // 6. Mark the Module bootstrap in-flight and push every body
1237        // OpRef onto the frontier. The body-op gate (`is_op_locked`)
1238        // recognises the freshly seeded ExecId via `current_exec_id`
1239        // so descendant ops keep firing.
1240        self.bootstrap.pending = true;
1241        self.bootstrap
1242            .mark_module_in_flight(request.target.to_string(), body_exec_id);
1243        let node_count = self.graphs[graph_idx as usize].function.node.len();
1244        for node_idx in 0..node_count as u32 {
1245            let op_ref = OpRef::pack(graph_idx, node_idx);
1246            self.exec.frontier.push_back((op_ref, body_exec_id));
1247        }
1248        Ok(())
1249    }
1250
1251    /// Flat host-facing bootstrap entry point. Empty slice fires the
1252    /// install-order kick (arming + seeding every queued target);
1253    /// non-empty slice stages each [`BootstrapInput`] in slice order
1254    /// using the same validation + Principle 1a copy + frontier seed
1255    /// as `enqueue_module_bootstrap`.
1256    pub fn run_bootstrap(
1257        &mut self,
1258        targets: &[crate::engine::bootstrap::BootstrapInput<'_>],
1259    ) -> Result<bool, crate::errors::BootstrapError> {
1260        if targets.is_empty() {
1261            if !self.bootstrap.arm_install_order() {
1262                return Ok(false);
1263            }
1264            return Ok(self.seed_bootstrap_call());
1265        }
1266        for req in targets {
1267            self.enqueue_module_bootstrap(crate::engine::bootstrap::BootstrapInput {
1268                target: req.target,
1269                inputs: req.inputs,
1270            })?;
1271        }
1272        Ok(true)
1273    }
1274
1275    /// `(domain, name, overload)` of the first bootstrap function
1276    /// recorded at install time, or `None` when no
1277    /// `module_phase = "bootstrap"` FunctionProto reached the function
1278    /// library. Stable across `clear_for_restore` (which preserves
1279    /// install-order metadata but bumps `next_idx` past every queued
1280    /// target so the restored Node does not re-fire bootstraps it
1281    /// already ran). For the full ordered list (multi-target installs
1282    /// queue one key per target), use [`Self::bootstrap_function_keys`].
1283    pub fn bootstrap_function_key(&self) -> Option<FunctionKey> {
1284        self.bootstrap.first_function_key().cloned()
1285    }
1286
1287    /// All bootstrap function keys the engine has queued, in install
1288    /// order. Multi-target installs append one entry per target via
1289    /// [`Self::install_function_library`]; the seeder fires each in
1290    /// slice order. Stable across `clear_for_restore` for snapshot
1291    /// introspection.
1292    pub fn bootstrap_function_keys(&self) -> Vec<FunctionKey> {
1293        self.bootstrap.function_keys()
1294    }
1295
1296    /// `true` while a bootstrap call is outstanding. Armed by
1297    /// [`Self::run_bootstrap`]; cleared once every queued phase
1298    /// drains.
1299    pub fn bootstrap_pending(&self) -> bool {
1300        self.bootstrap.pending
1301    }
1302
1303    /// Lifecycle status for the host-facing
1304    /// [`crate::node::Node::bootstrap_status`] accessor. `Idle` when no
1305    /// bootstrap is queued or in-flight; `Running` when a bootstrap
1306    /// body is currently in-flight; `WaitingForInput` when the
1307    /// install-order queue still has unseeded targets but no body is
1308    /// active yet (the host must drive the queue to advance).
1309    pub fn bootstrap_status(&self) -> crate::engine::bootstrap::BootstrapStatus {
1310        if self.bootstrap.current_exec_id.is_some() {
1311            return crate::engine::bootstrap::BootstrapStatus::Running;
1312        }
1313        if self.bootstrap.pending && self.bootstrap.next_idx < self.bootstrap.install_order.len() {
1314            return crate::engine::bootstrap::BootstrapStatus::WaitingForInput;
1315        }
1316        crate::engine::bootstrap::BootstrapStatus::Idle
1317    }
1318
1319    /// `true` when `target_name` is already on the install-order Module
1320    /// bootstrap queue. Used by `Node::run_bootstrap` validation.
1321    pub fn module_bootstrap_registered(&self, target_name: &str) -> bool {
1322        self.bootstrap.module_bootstraps.contains_key(target_name)
1323    }
1324
1325    /// Snapshot of every registered Module bootstrap target name in
1326    /// install order. Returned by `BootstrapError::UnknownTarget` so
1327    /// callers see the legal set.
1328    pub fn module_bootstrap_target_names(&self) -> Vec<String> {
1329        self.bootstrap.install_order.clone()
1330    }
1331
1332    /// Body-op gate. Returns `true` when the op must park because a
1333    /// bootstrap body is in-flight and the op's ExecId is not a
1334    /// descendant of the bootstrap ExecId; `false` when the op is
1335    /// fireable.
1336    ///
1337    /// Resolution order:
1338    /// 1. `bootstrap.pending` clear → fire (gate dormant).
1339    /// 2. `exec_id` descends from the in-flight bootstrap ExecId via
1340    ///    the `pending_calls.parent_exec_id` chain → fire. Bootstrap
1341    ///    body + its sub-FunctionCalls keep firing while the body
1342    ///    runs.
1343    /// 3. Otherwise → park. The collapsed gate denies every body-op
1344    ///    until the bootstrap drains.
1345    pub(crate) fn is_op_locked(&self, _op_ref: OpRef, exec_id: ExecId) -> bool {
1346        if !self.bootstrap.pending {
1347            return false;
1348        }
1349        let Some(boot_exec) = self.bootstrap.current_exec_id else {
1350            return false;
1351        };
1352        // 2. Bootstrap-descendant exec ids fire freely. Walk the
1353        // call chain once and short-circuit if the in-flight ExecId
1354        // matches anywhere along the chain.
1355        let mut current = exec_id;
1356        loop {
1357            if current == boot_exec {
1358                return false;
1359            }
1360            match self.exec.pending_calls.get(&current) {
1361                Some(call) => current = call.parent_exec_id,
1362                None => break,
1363            }
1364        }
1365        true
1366    }
1367
1368    /// Inspect engine state and pop the in-flight bootstrap key once
1369    /// every bootstrap-descendant frontier entry and `pending_async`
1370    /// entry has cleared. Returns `true` when one phase just drained
1371    /// (i.e. the caller poll cycle should append a `BootstrapComplete`
1372    /// step for that phase). With remaining queued keys, the next
1373    /// `seed_bootstrap_call` advances to the following target;
1374    /// `bootstrap.pending` flips off only after the last key drains.
1375    /// Called after each drain phase + the ingress completion drain.
1376    pub(crate) fn maybe_complete_bootstrap(&mut self) -> bool {
1377        if !self.bootstrap.pending {
1378            return false;
1379        }
1380        let Some(boot_exec) = self.bootstrap.current_exec_id else {
1381            return false;
1382        };
1383        let descendant = |engine: &Engine, mut exec_id: ExecId| -> bool {
1384            loop {
1385                if exec_id == boot_exec {
1386                    return true;
1387                }
1388                match engine.exec.pending_calls.get(&exec_id) {
1389                    Some(call) => exec_id = call.parent_exec_id,
1390                    None => return false,
1391                }
1392            }
1393        };
1394        if self
1395            .exec
1396            .frontier
1397            .iter()
1398            .any(|(_, exec_id)| descendant(self, *exec_id))
1399        {
1400            return false;
1401        }
1402        if self
1403            .exec
1404            .pending_async
1405            .values()
1406            .any(|p| descendant(self, p.exec_id))
1407        {
1408            return false;
1409        }
1410        // The in-flight phase drained. Advance the install_order
1411        // pointer (the post-kick cascade) and retire the in-flight
1412        // ExecId. `bootstrap.pending` clears once the cursor reaches
1413        // the end of `install_order` AND no in-flight body remains.
1414        self.bootstrap.next_idx += 1;
1415        self.bootstrap.clear_in_flight();
1416        if self.bootstrap.next_idx >= self.bootstrap.install_order.len()
1417            && self.bootstrap.current_exec_id.is_none()
1418        {
1419            self.bootstrap.pending = false;
1420        }
1421        true
1422    }
1423
1424    /// Resolve every NodeProto's dispatch kind into `op_dispatch[]`
1425    /// per ENGINE.md §8.1 + §8.4. Run after install completes (so all
1426    /// symbols are in `self.functions`) but before the first poll.
1427    ///
1428    /// Walk order: each GraphSlot in turn. For each NodeProto:
1429    /// - syscall (`syscall_index` hit) → `Stateless`
1430    /// - call to function in `self.functions` with domain
1431    ///   `ai.bytesandbrains.module` → `FunctionCall` with the target
1432    ///   key + input/output rename pairs from the call's value names
1433    ///   zipped against the target function's formals.
1434    /// - call to function with domain `ai.bytesandbrains.framework`
1435    ///   starting with `BackendSubgraph_` → `BackendSubgraph` with
1436    ///   bound backend resolved via `BINDING_ID_KEY` metadata against
1437    ///   the atomic-dispatch table.
1438    /// - else atomic dispatch by `(domain, op_type, instance)` →
1439    ///   `Atomic`.
1440    ///
1441    /// Returns the number of nodes that remained `Unresolved`. Caller
1442    /// should fail build if non-zero.
1443    pub fn resolve_dispatch(&mut self) -> usize {
1444        // Snapshot per-graph node lists so we don't hold a borrow on
1445        // self.graphs while reading other tables. Indices are the
1446        // positional graph_idx values packed into OpRefs.
1447        let graph_count = self.graphs.len();
1448        let mut unresolved = 0;
1449        for graph_idx in 0..graph_count {
1450            let (function_domain, nodes): (String, Vec<NodeProto>) = {
1451                let g = &self.graphs[graph_idx];
1452                (g.function.domain.clone(), g.function.node.clone())
1453            };
1454            // BackendSubgraph bodies (domain == ai.bytesandbrains.framework)
1455            // are handed wholesale to the bound Backend Contract impl per
1456            // ENGINE.md §8.5; their interior NodeProtos are never invoked
1457            // individually by the engine, so resolve_dispatch leaves the
1458            // body's op_dispatch as Unresolved without counting it as a
1459            // failure. Mirror the same skip already applied by
1460            // Node's unsupported-ops pre-flight in `src/node.rs`.
1461            if function_domain == "ai.bytesandbrains.framework" {
1462                let mut dispatch: Vec<OpDispatch> = Vec::with_capacity(nodes.len());
1463                for _ in &nodes {
1464                    dispatch.push(OpDispatch::Unresolved);
1465                }
1466                self.graphs[graph_idx].op_dispatch = dispatch;
1467                continue;
1468            }
1469            let mut dispatch: Vec<OpDispatch> = Vec::with_capacity(nodes.len());
1470            for node in &nodes {
1471                let resolved = self.resolve_one(node);
1472                if matches!(resolved, OpDispatch::Unresolved) {
1473                    unresolved += 1;
1474                }
1475                dispatch.push(resolved);
1476            }
1477            self.graphs[graph_idx].op_dispatch = dispatch;
1478        }
1479        unresolved
1480    }
1481
1482    fn resolve_one(&self, node: &NodeProto) -> OpDispatch {
1483        // 1) Syscall path — single lookup by (domain, op_type).
1484        if let Some(&fn_ptr) = self
1485            .syscall_table
1486            .get(&(node.domain.clone(), node.op_type.clone()))
1487        {
1488            return OpDispatch::Stateless(fn_ptr);
1489        }
1490        // `crate::registry` for custom ops registered via
1491        // `bb::register_op!{}`. DCE strips unreferenced entries,
1492        // so a binary that doesn't `use` a library's op never
1493        // pulls it into the link image.
1494        if let Some(reg) = crate::registry::find_op(&node.domain, &node.op_type) {
1495            return OpDispatch::Stateless(reg.invoke);
1496        }
1497        // 2) Function-call paths via the symbol table.
1498        let key: FunctionKey = (
1499            node.domain.clone(),
1500            node.op_type.clone(),
1501            node.overload.clone(),
1502        );
1503        if let Some(target_fn) = self.functions.get(&key) {
1504            if node.domain == "ai.bytesandbrains.module" {
1505                let input_rename: Rc<[(String, String)]> = node
1506                    .input
1507                    .iter()
1508                    .zip(target_fn.input.iter())
1509                    .map(|(caller, formal)| (caller.clone(), formal.clone()))
1510                    .collect();
1511                let output_rename: Rc<[(String, String)]> = target_fn
1512                    .output
1513                    .iter()
1514                    .zip(node.output.iter())
1515                    .map(|(formal, caller)| (formal.clone(), caller.clone()))
1516                    .collect();
1517                return OpDispatch::FunctionCall {
1518                    target: key,
1519                    input_rename,
1520                    output_rename,
1521                };
1522            }
1523        }
1524        // 3) Atomic role path. The placeholders pass stamps
1525        // `ai.bytesandbrains.slot_id` on every role NodeProto; install
1526        // populates `slot_id_to_cref` from the model's binding metadata.
1527        // Single lookup chain: NodeProto.slot_id → ComponentRef →
1528        // bound role dispatcher closure.
1529        let slot_id = node
1530            .metadata_props
1531            .iter()
1532            .find(|p| p.key == bb_ir::keys::SLOT_ID_KEY)
1533            .and_then(|p| p.value.parse::<u32>().ok());
1534        if let Some(slot_id) = slot_id {
1535            if let Some(&cref) = self.slot_id_to_cref.get(&slot_id) {
1536                if let Some(dispatch_fn) = self.dispatch_fn_for_component(cref) {
1537                    return OpDispatch::Atomic {
1538                        component_ref: cref,
1539                        dispatch_fn,
1540                    };
1541                }
1542            }
1543        }
1544        OpDispatch::Unresolved
1545    }
1546
1547    /// Look up the install-time-stamped `ProtocolDispatchFn` for a
1548    /// registered component by its `TypeId`. `resolve_dispatch`
1549    /// embeds the result into `OpDispatch::Atomic { dispatch_fn }`
1550    /// so runtime invoke skips the per-op TypeId probe.
1551    fn dispatch_fn_for_component(
1552        &self,
1553        cref: ComponentRef,
1554    ) -> Option<crate::engine::invoke::ProtocolDispatchFn> {
1555        let component = self.component(cref)?;
1556        let any: &dyn std::any::Any = component;
1557        let tid = (*any).type_id();
1558        self.role_dispatchers.get(&tid).map(|d| d.dispatch)
1559    }
1560
1561    /// Resolve a registered component by `ComponentRef`.
1562    /// `None` when no component lives at that index, or when the
1563    /// slot was `mem::take`-ed out during dispatch (the caller
1564    /// dispatching component is invisible to itself).
1565    pub fn component(&self, cref: ComponentRef) -> Option<&dyn ErasedComponent> {
1566        self.components.get(cref.as_u32() as usize)?.as_deref()
1567    }
1568
1569    /// Resolve a registered component by `ComponentRef` for
1570    /// mutation. Same null semantics as [`Self::component`].
1571    pub fn component_mut(&mut self, cref: ComponentRef) -> Option<&mut Box<dyn ErasedComponent>> {
1572        self.components.get_mut(cref.as_u32() as usize)?.as_mut()
1573    }
1574
1575    /// Take the component at `cref` out of the registry, leaving
1576    /// `None` in its slot. Returns `None` if the slot was empty (or
1577    /// out of range). Paired with [`Self::restore_component`] in
1578    /// `invoke_atomic` so a [`crate::runtime::ComponentsView`] can
1579    /// borrow the rest of `engine.components` while the dispatching
1580    /// component is held exclusively.
1581    pub(crate) fn take_component(
1582        &mut self,
1583        cref: ComponentRef,
1584    ) -> Option<Box<dyn ErasedComponent>> {
1585        self.components.get_mut(cref.as_u32() as usize)?.take()
1586    }
1587
1588    /// Put a component back into the registry slot it was taken
1589    /// from via [`Self::take_component`]. The slot index must be
1590    /// within range (i.e. the cref came from a prior registration).
1591    pub(crate) fn restore_component(
1592        &mut self,
1593        cref: ComponentRef,
1594        component: Box<dyn ErasedComponent>,
1595    ) {
1596        let idx = cref.as_u32() as usize;
1597        if let Some(slot) = self.components.get_mut(idx) {
1598            *slot = Some(component);
1599        }
1600    }
1601
1602    /// test-only registrar. Stores a bound component impl
1603    /// at `cref`. Grows the underlying Vec to fit the index, filling
1604    /// holes with `None` so out-of-order registration works.
1605    pub fn register_component(&mut self, cref: ComponentRef, component: Box<dyn ErasedComponent>) {
1606        let idx = cref.as_u32() as usize;
1607        if self.components.len() <= idx {
1608            self.components.resize_with(idx + 1, || None);
1609        }
1610        self.components[idx] = Some(component);
1611    }
1612
1613    /// Push an `(OpRef, ExecId)` onto the frontier.
1614    pub fn push_frontier(&mut self, op_ref: OpRef, exec_id: ExecId) {
1615        self.exec.frontier.push_back((op_ref, exec_id));
1616    }
1617
1618    /// Pop the next `(OpRef, ExecId)` off the frontier. Used by the
1619    /// poll cycle's drain phases.
1620    pub fn pop_frontier(&mut self) -> Option<(OpRef, ExecId)> {
1621        self.exec.frontier.pop_front()
1622    }
1623
1624    /// Pop the next fireable `(OpRef, ExecId)` off the frontier,
1625    /// honouring the per-component body-op gate. With no in-flight
1626    /// bootstrap the front of the queue fires unconditionally. With
1627    /// one or more in-flight bootstraps the gate parks any op
1628    /// touching a locked `ComponentRef`; this scan picks the first
1629    /// unparked entry so disjoint Components can keep firing while
1630    /// bootstrap runs against an unrelated slot.
1631    pub(crate) fn pop_frontier_fireable(&mut self) -> Option<(OpRef, ExecId)> {
1632        if !self.bootstrap.pending {
1633            return self.exec.frontier.pop_front();
1634        }
1635        let idx = self
1636            .exec
1637            .frontier
1638            .iter()
1639            .position(|(op_ref, exec_id)| !self.is_op_locked(*op_ref, *exec_id))?;
1640        self.exec.frontier.remove(idx)
1641    }
1642
1643    /// Snapshot of the `(NodeSiteId, ExecId)` keys currently in the
1644    /// slot table. Test-only - used to assert wire-envelope delivery
1645    /// lands at the right site without exposing the full
1646    /// `Box<dyn SlotValue>` payload type.
1647    pub fn slot_table_keys(&self) -> Vec<(NodeSiteId, ExecId)> {
1648        self.exec.slot_table.keys().copied().collect()
1649    }
1650
1651    /// Iterate every `((NodeSiteId, ExecId), Option<&dyn SlotValue>)`
1652    /// pair currently in the slot table. Test-only.
1653    pub fn slot_table_iter(
1654        &self,
1655    ) -> impl Iterator<Item = (&(NodeSiteId, ExecId), Option<&dyn SlotValue>)> {
1656        self.exec
1657            .slot_table
1658            .iter()
1659            .map(|(k, v)| (k, v.as_ref().map(|b| b.as_ref())))
1660    }
1661
1662    /// Read a slot value by `(NodeSiteId, ExecId)`. Returns `None`
1663    /// if the slot is empty or not yet allocated.
1664    pub fn slot_at(&self, site: NodeSiteId, exec_id: ExecId) -> Option<&dyn SlotValue> {
1665        self.exec
1666            .slot_table
1667            .get(&(site, exec_id))
1668            .and_then(|s| s.as_ref())
1669            .map(|b| b.as_ref())
1670    }
1671}
1672
1673/// Register a `NodeSiteId` for every function input name on
1674/// `graph` so `Engine::deliver_app_event` can seed the input
1675/// via ingress. Pre-existing entries (where a node output
1676/// happens to share a name with a function input - rare but
1677/// possible) are left alone. After the input sites are minted,
1678/// re-walk the function's node inputs and add consumer entries
1679/// for any node consuming an input - `GraphSlot::from_function`
1680/// already populated consumers for node outputs but skipped
1681/// inputs because they weren't in `site_names` yet.
1682fn register_function_input_sites(
1683    graph: &mut crate::engine::graph_slot::GraphSlot,
1684    next_node_site_id: &mut u64,
1685    graph_idx: u32,
1686) {
1687    for input_name in graph.function.input.clone().iter() {
1688        if input_name.is_empty() {
1689            continue;
1690        }
1691        graph
1692            .site_names
1693            .entry(input_name.clone())
1694            .or_insert_with(|| {
1695                let r = NodeSiteId::from(*next_node_site_id);
1696                *next_node_site_id = next_node_site_id.saturating_add(1);
1697                r
1698            });
1699    }
1700    // Backfill consumers for nodes whose inputs reference function
1701    // inputs we just minted sites for. Positional OpRefs make every
1702    // node's ref `OpRef::pack(graph_idx, node_idx)`.
1703    let nodes_inputs: Vec<(OpRef, Vec<String>)> = graph
1704        .function
1705        .node
1706        .iter()
1707        .enumerate()
1708        .map(|(idx, node)| (OpRef::pack(graph_idx, idx as u32), node.input.clone()))
1709        .collect();
1710    for (op_ref, inputs) in nodes_inputs {
1711        for input in inputs {
1712            if input.is_empty() {
1713                continue;
1714            }
1715            if let Some(&site) = graph.site_names.get(&input) {
1716                let entry = graph.consumers.entry(site).or_default();
1717                if !entry.contains(&op_ref) {
1718                    entry.push(op_ref);
1719                }
1720            }
1721        }
1722    }
1723}
1724
1725/// Failure case returned by `Engine::try_charge` when admitting
1726/// `byte_count` more bytes against `ingress_byte_budget` would
1727/// exceed the cap. `budget_remaining` is the cap minus the live
1728/// `ingress_bytes_in_flight` at the time of the rejection — the
1729/// caller embeds both into the resulting `WireReceiveError` or
1730/// `AppIngressError` so subscribers see the magnitude of the
1731/// rejection without re-querying the engine.
1732#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1733pub struct BudgetExceededReason {
1734    /// Bytes the caller tried to admit.
1735    pub byte_count: usize,
1736    /// Bytes still available under `ingress_byte_budget` at the time
1737    /// of the rejection.
1738    pub budget_remaining: usize,
1739}
1740
1741/// Stable graph-name key for `Engine.graphs` derived from a
1742/// `FunctionKey`. Joins the tuple parts so two distinct keys produce
1743/// distinct strings, preserving the symbol-table semantics.
1744pub(crate) fn graph_name_for(key: &FunctionKey) -> String {
1745    let (domain, name, overload) = key;
1746    if overload.is_empty() {
1747        format!("{domain}::{name}")
1748    } else {
1749        format!("{domain}::{name}#{overload}")
1750    }
1751}
1752
1753
1754#[cfg(test)]
1755#[path = "core_multi_bootstrap_tests.rs"]
1756mod multi_bootstrap_tests;
1757
1758#[cfg(test)]
1759#[path = "core_op_locked_tests.rs"]
1760mod op_locked_tests;
1761
1762#[cfg(test)]
1763#[path = "core_bootstrap_input_tests.rs"]
1764mod bootstrap_input_tests;