bb_runtime/engine/core.rs
1//! The `Engine` struct + test-only constructor + registration
2//! accessors. Hot-path poll cycle lives in `engine::poll`.
3
4use std::collections::HashMap;
5use std::marker::PhantomData;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use crate::bus::TypedBus;
10use crate::component::ErasedComponent;
11use crate::engine::dispatch_entry::{FunctionKey, OpDispatch, StatelessInvokeFn};
12use crate::engine::graph_slot::GraphSlot;
13use crate::engine::invoke::{make_protocol_dispatcher, RoleDispatcher};
14use crate::framework::FrameworkComponents;
15use crate::ids::{CommandId, ComponentRef, ExecId, NodeSiteId, OpRef};
16use crate::ingress::IngressQueue;
17use crate::slot_value::SlotValue;
18use bb_ir::proto::onnx::{FunctionProto, NodeProto};
19
20/// Point-in-time hot-path counters for dashboards + saturation
21/// detection. Not synchronized against an in-flight poll cycle.
22#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
23pub struct EngineStats {
24 /// `(OpRef, ExecId)` pairs on the frontier. Climbing → poll
25 /// loop falling behind ingress.
26 pub frontier_len: usize,
27 /// Events queued on the typed bus awaiting routing.
28 pub bus_len: usize,
29 /// Suspended async commands. Approaching
30 /// `NodeConfig.max_pending_async` → cap pressure.
31 pub pending_async: usize,
32 /// `(NodeSiteId, ExecId)` entries holding a value.
33 pub slot_table_occupied: usize,
34 /// Approximate MPMC ingress depth.
35 pub ingress_depth: usize,
36 /// Envelopes queued for next outbound drain.
37 pub outbound_queue_depth: usize,
38 /// Event kinds with at least one subscriber.
39 pub event_subscriptions: usize,
40 /// Number of registered components.
41 pub registered_components: usize,
42 /// Number of installed graphs.
43 pub graph_slots: usize,
44}
45
46/// Single-Node engine state. Built via `Node::ensure_ready`;
47/// tests use `Engine::new()` and seed fields directly.
48pub struct Engine {
49 // --- Graph storage ---------------------------------------------
50 /// Installed graphs in insertion order. `OpRef::pack` puts the
51 /// slot index in the high 32 bits so `dispatch_for` resolves an
52 /// op via two direct array accesses (no HashMap probe).
53 pub(crate) graphs: Vec<GraphSlot>,
54
55 /// Graph name → `graphs[]` index. Lookup table for name-keyed
56 /// resolution (function-call targets, `Engine::graph(name)`
57 /// accessors). The index doubles as the value packed into every
58 /// `OpRef`.
59 pub(crate) graph_index: HashMap<String, u32>,
60
61 /// Sub-Module function registry by `(domain, name, overload)`.
62 /// Populated by `Node` at install time.
63 pub(crate) functions: HashMap<(String, String, String), FunctionProto>,
64
65 // --- Dispatch --------------------------------------------------
66 /// Unified syscall dispatch table — one lookup per op by
67 /// `(domain, op_type)` to its stateless invoke fn pointer.
68 /// Populated by `register_syscall` and `register_all_framework_syscalls`
69 /// at Engine construction; `resolve_dispatch` reads this to stamp
70 /// `OpDispatch::Stateless` for matching NodeProtos.
71 pub syscall_table: HashMap<(String, String), StatelessInvokeFn>,
72
73 // --- Component storage -----------------------------------------
74 /// All bound runtime impls indexed by `ComponentRef.as_usize()`.
75 /// The `Option<...>` wrapper lets `invoke_atomic` take the
76 /// dispatching component out of the Vec via `mem::take` so a
77 /// live [`crate::runtime::ComponentsView`] can borrow
78 /// `&self.components` for cross-component reads while the
79 /// dispatch closure runs - the dispatching slot is restored
80 /// after the closure returns.
81 pub(crate) components: Vec<Option<Box<dyn ErasedComponent>>>,
82
83 /// The Node's own `PeerId` (the one passed to `Node::new`).
84 /// Threaded into every `RuntimeResourceRef` so Components can
85 /// identify themselves in outbound envelopes.
86 pub self_peer: crate::ids::PeerId,
87
88 /// Per-Node framework primitive bundle (scheduler, peer_gate,
89 /// request_tracker, backoff_table, inbound_dedup, address_book).
90 /// Exposed as `pub` so cross-crate test fixtures in `bb-ops` can
91 /// construct `RuntimeResourceRef` from an Engine's framework
92 /// primitives. Engine internals proper stay encapsulated; this is
93 /// the dependency-injection boundary.
94 pub framework: FrameworkComponents,
95
96 /// The per-Node typed event bus.
97 pub bus: TypedBus,
98
99 // --- Execution state -------------------------------------------
100 /// Per-poll execution-state bundle: frontier, slot table, per-
101 /// execution liveness, parked async ops + in-cycle completions,
102 /// function-call invocation frames, inbound-envelope context
103 /// map, the timer scheduler, and the monotonic ID allocator.
104 /// See [`crate::exec_state::ExecState`].
105 pub exec: crate::exec_state::ExecState,
106
107 /// Reverse index from a fused `binding_id` (e.g.
108 /// `"BurnBackend#0"`) to the bound component's `ComponentRef`.
109 /// Populated by `Node` at install time; read by dispatch
110 /// resolution to bind NodeProtos that reference a backend by
111 /// binding_id.
112 pub(crate) binding_id_index: HashMap<String, ComponentRef>,
113
114 /// Per-`event_kind` subscription map. Each entry names the
115 /// `NodeSiteId`(s) of `EventSource` ops listening for that kind.
116 /// The bus-routing pass writes a `TriggerValue` into each
117 /// subscribed site at a fresh `ExecId` and pushes the site's
118 /// downstream consumers — matching the wire delivery semantics
119 /// per `docs/ADDRESSING.md` so bus + wire share one model.
120 pub(crate) event_subscriptions: HashMap<String, Vec<NodeSiteId>>,
121
122 /// Per-LifecyclePhase Op enrollments. `pub` for cross-crate
123 /// `bb-ops` test fixtures that exercise the LifecyclePhase
124 /// syscall.
125 pub lifecycle_table: HashMap<String, Vec<OpRef>>,
126
127 // --- Async + cross-thread --------------------------------------
128 /// Thread-safe inbox for external events. Producers (transport
129 /// adapters, host invocations) may push from any thread; the
130 /// Engine drains serially from its own (single) thread. Engine
131 /// holds an `Arc` so it can hand clones to producers without
132 /// surrendering ownership.
133 pub(crate) ingress: Arc<IngressQueue>,
134
135 /// Lifecycle phases queued for firing by `Engine::fire_lifecycle`
136 /// on the next poll.
137 pub(crate) fired_phases: Vec<String>,
138
139 /// Snapshot of the ingress queue depth at the start of the
140 /// engine's ingress drain. Used by the backpressure detection
141 /// hook in `process_ingress_event` to compare against the
142 /// configured high-water mark. Refreshed every poll cycle.
143 pub(crate) phase1_pre_drain_depth: usize,
144
145 // --- Bootstrap ---------------------------------------------------
146 /// Consolidated bootstrap state — every field the install path,
147 /// poll seeder, and body-op gate read goes through here. See
148 /// [`crate::engine::bootstrap::BootstrapState`] for field roles.
149 pub(crate) bootstrap: crate::engine::bootstrap::BootstrapState,
150
151 // --- Dispatcher registries (per-Engine) ------------------------
152 /// Concrete-type `ProtocolRuntime` dispatchers registered against
153 /// this Engine, indexed by `TypeId::of::<T>()`. Populated by
154 /// [`Engine::register_protocol_dispatcher`] at setup; consulted by
155 /// `invoke_atomic` via direct HashMap lookup (no linear scan).
156 pub(crate) role_dispatchers: HashMap<std::any::TypeId, RoleDispatcher>,
157
158 /// Concrete-type `Bootstrap` dispatchers, indexed by
159 /// `TypeId::of::<T>()`. Populated by
160 /// [`Engine::register_bootstrap_dispatcher`] at install time and
161 /// consulted by `fire_component_bootstrap` to dispatch the
162 /// synthetic single-op against the bound Component's
163 /// [`crate::contracts::bootstrap::Bootstrap::bootstrap`] impl.
164 pub(crate) bootstrap_dispatchers:
165 HashMap<std::any::TypeId, crate::engine::invoke::BootstrapDispatchFn>,
166
167 // --- Generic slot registry -------------------------------------
168 /// Slot-name → `ComponentRef` registry. Generic over component
169 /// role: indexes EVERY bound Component (backends, indexes,
170 /// models, peer selectors, custom) by binding slot name
171 /// (defaults to the field name; overridable via
172 /// `#[bb::slot("custom")]`). Components reach declared
173 /// dependencies through this map at dispatch time via
174 /// [`crate::runtime::ComponentsView::for_slot`].
175 ///
176 /// This is the canonical lookup table. Every install path
177 /// populates it; every dispatch path reads through it. No
178 /// per-role specialization above the slot abstraction.
179 pub(crate) slots: HashMap<String, ComponentRef>,
180
181 /// Parallel index: compiler-assigned slot id (the value of
182 /// `ai.bytesandbrains.slot_id` stamped on role NodeProtos by the
183 /// placeholders pass) → `ComponentRef`. Populated alongside
184 /// [`Self::slots`] at install time. `resolve_dispatch` reads
185 /// the role NodeProto's `slot_id`, looks it up here, and stamps
186 /// `OpDispatch::Atomic` against the resolved component. Single
187 /// source of truth: install populates both indexes from the same
188 /// `bb.binding.<target>.<slot>` metadata.
189 pub(crate) slot_id_to_cref: HashMap<u32, ComponentRef>,
190
191 /// Parallel index: compiler-assigned `slot_id` → `(role,
192 /// ComponentRef)`. Populated by [`Self::bind_slot_id_with_role`]
193 /// at install time from the same `binding.<target>.<slot>`
194 /// metadata that drives `slot_id_to_cref`; retains the
195 /// `ComponentRole` so `decode_typed_fill` can decide between the
196 /// framework-carrier path and the backend-mediated tensor path.
197 pub(crate) slot_id_to_role_ref: HashMap<u32, (crate::registry::ComponentRole, ComponentRef)>,
198
199 // --- Component role introspection ------------------------------
200 /// Per-component set of declared roles, sourced from
201 /// `inventory::iter::<ComponentRoleBinding>` keyed by
202 /// `T::TYPE_NAME`. Populated by `Node::ensure_ready` after
203 /// component registration; reported by [`Engine::roles_for`] for
204 /// introspection (engine tests, host tooling).
205 pub(crate) component_roles:
206 HashMap<ComponentRef, std::collections::HashSet<crate::registry::ComponentRole>>,
207
208 // --- Production-safety caps ------------------------------------
209 /// Soft per-poll-cycle op-invocation budget per
210 /// `NodeConfig.cycle_op_budget`. When set, `Engine::poll` yields
211 /// after this many invocations and surfaces
212 /// `EngineStep::CycleBudgetExceeded { ops_invoked }` so the host
213 /// can re-poll. `None` disables the budget.
214 pub(crate) cycle_op_budget: Option<usize>,
215
216 /// Cap on the number of in-flight `pending_async` entries per
217 /// `NodeConfig.max_pending_async`. When at cap, an Op returning
218 /// `DispatchResult::Async(_)` fails synchronously via the
219 /// existing `OpFailed` path. `None` disables the cap.
220 pub(crate) max_pending_async: Option<usize>,
221
222 /// Cumulative cap on in-flight ingress bytes held across the
223 /// ingress queue + slot table + pending async completion
224 /// buffers. Sourced from `NodeConfig::ingress_byte_budget`.
225 /// Boundary callers call [`Self::try_charge`] before installing
226 /// a payload; the slot-table writer calls [`Self::release`] on
227 /// overwrite / eviction.
228 pub(crate) ingress_byte_budget: usize,
229
230 /// Live count of ingress bytes the engine currently holds.
231 /// Incremented by [`Self::try_charge`] on successful admission;
232 /// decremented by [`Self::release`] on slot-table overwrite /
233 /// eviction / drop. The budget guard surfaces this as a
234 /// snapshot via [`Self::ingress_bytes_in_flight`].
235 pub(crate) ingress_bytes_in_flight: usize,
236
237 // --- Single-threaded anchor ------------------------------------
238 /// `PhantomData<*const ()>` makes `Engine` neither `Send` nor
239 /// `Sync` - the single-threaded sans-IO contract is enforced at
240 /// compile time. Producers can still push to `ingress` from other
241 /// threads because the `Arc<IngressQueue>` handle is independently
242 /// `Send + Sync`.
243 _not_send: PhantomData<*const ()>,
244}
245
246impl Default for Engine {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252impl Engine {
253 /// Construct an empty engine with the default ingress capacity.
254 /// `Node::new` wraps this with `self_peer`, framework syscalls,
255 /// and config caps applied. For non-default `bus_capacity` use
256 /// [`Self::with_bus_capacity`] so the ingress queue sizes to
257 /// `bus_capacity * 4` per ENGINE.md §2.2.
258 pub fn new() -> Self {
259 Self::with_bus_capacity(crate::node::DEFAULT_BUS_CAPACITY)
260 }
261
262 /// Construct an empty engine whose ingress queue holds up to
263 /// `bus_capacity * 4` events (the ENGINE.md §2.2 ratio that
264 /// reserves headroom for async completions, app events, and
265 /// inbound envelopes between poll cycles).
266 pub fn with_bus_capacity(bus_capacity: usize) -> Self {
267 Self {
268 graphs: Vec::new(),
269 graph_index: HashMap::new(),
270 functions: HashMap::new(),
271 syscall_table: HashMap::new(),
272 slot_id_to_cref: HashMap::new(),
273 slot_id_to_role_ref: HashMap::new(),
274 components: Vec::new(),
275 self_peer: crate::ids::PeerId::from(0u64),
276 framework: FrameworkComponents::new(),
277 bus: TypedBus::new(),
278 exec: crate::exec_state::ExecState::new(),
279 binding_id_index: HashMap::new(),
280 event_subscriptions: HashMap::new(),
281 lifecycle_table: HashMap::new(),
282 ingress: Arc::new(IngressQueue::with_capacity(bus_capacity.saturating_mul(4))),
283 fired_phases: Vec::new(),
284 phase1_pre_drain_depth: 0,
285 bootstrap: crate::engine::bootstrap::BootstrapState::new(),
286 role_dispatchers: HashMap::new(),
287 bootstrap_dispatchers: HashMap::new(),
288 slots: HashMap::new(),
289 component_roles: HashMap::new(),
290 cycle_op_budget: crate::node::DEFAULT_CYCLE_OP_BUDGET,
291 max_pending_async: crate::node::DEFAULT_MAX_PENDING_ASYNC,
292 ingress_byte_budget: crate::node::DEFAULT_INGRESS_BYTE_BUDGET,
293 ingress_bytes_in_flight: 0,
294 _not_send: PhantomData,
295 }
296 }
297
298 /// wipe restorable engine state ahead of a
299 /// `Node::restore` call, leaving the install-time-stamped
300 /// surfaces (`graphs`, `functions`, `dispatch_table`,
301 /// `atomic_dispatch`, `components`, `self_peer`,
302 /// `syscall_index`, `role_dispatchers`, `binding_id_index`,
303 /// `lifecycle_table`, `event_subscriptions`,
304 /// `cycle_op_budget`, `max_pending_async`) intact. The Node
305 /// re-applies the snapshot's framework state, ID counters, and
306 /// pending async/completion queues on top of the cleared
307 /// state, so the post-restore Engine is the same install
308 /// re-seeded with the snapshot's restorable transient state.
309 ///
310 /// Restorable surfaces explicitly cleared:
311 /// - `frontier`, `slot_table`, `execution_state`,
312 /// `pending_async`, `pending_completions`, `pending_calls`,
313 /// `fired_phases`
314 /// - `framework` (FrameworkComponents reseeds from snapshot)
315 /// - `bus` (re-establishes subscriptions from snapshot)
316 /// - `ingress` queue (fresh; in-flight inbound is the host's
317 /// responsibility to redeliver)
318 pub fn clear_for_restore(&mut self) {
319 self.exec.frontier.clear();
320 self.exec.slot_table.clear();
321 self.exec.execution_state.clear();
322 self.exec.pending_async.clear();
323 self.exec.pending_completions.clear();
324 self.exec.pending_calls.clear();
325 self.fired_phases.clear();
326 // Slot-table clear above dropped every charged carrier;
327 // reset the counter so the restored snapshot doesn't inherit
328 // an in-flight balance the new state doesn't own.
329 self.ingress_bytes_in_flight = 0;
330 // Restore deliberately suppresses bootstrap re-runs: the
331 // restored Node already executed its bootstrap call before
332 // the snapshot, and replaying would re-seed the address
333 // book, re-fire the first Announce, etc.
334 // `install_order` + `module_bootstraps` stay populated for
335 // introspection (multi-target installs surface every queued
336 // target via [`Self::bootstrap_function_keys`]); `pending`,
337 // `in_flight`, `pending_requests`, `waiting`, and `next_idx`
338 // reset so `Node::run_bootstrap` is a no-op on a restored Node —
339 // bumping the index to the end of `install_order` keeps the
340 // seeder from re-firing if the host nonetheless polls.
341 self.bootstrap.clear_for_restore();
342 self.framework = FrameworkComponents::new();
343 self.bus = TypedBus::new();
344 self.ingress = Arc::new(IngressQueue::new());
345 // ID counters reset to 0; the restore path re-applies the
346 // snapshot's persisted values so post-restore IDs continue
347 // from where the pre-snapshot Node left off ().
348 self.exec.ids.next_exec_id = 0;
349 self.exec.ids.next_command_id = 0;
350 }
351
352 /// Mint a fresh `ExecId`. Replaces the prior static counter
353 /// in `src/ids.rs` so allocation runs single-threaded under
354 /// the engine's borrow discipline.
355 pub fn allocate_exec_id(&mut self) -> ExecId {
356 let id = self.exec.ids.next_exec_id;
357 self.exec.ids.next_exec_id = self
358 .exec
359 .ids
360 .next_exec_id
361 .checked_add(1)
362 .expect("ExecId counter overflow");
363 ExecId::from(id)
364 }
365
366 /// Mint a fresh `CommandId`. Used by async-suspending syscall
367 /// ops via `RuntimeResourceRef::next_command_id`.
368 pub fn allocate_command_id(&mut self) -> CommandId {
369 let id = self.exec.ids.next_command_id;
370 self.exec.ids.next_command_id = self
371 .exec
372 .ids
373 .next_command_id
374 .checked_add(1)
375 .expect("CommandId counter overflow");
376 CommandId::from(id)
377 }
378
379 /// Mint a fresh `NodeSiteId`. Used by graph installation; sites
380 /// must be globally unique across installed graphs.
381 pub fn allocate_node_site_id(&mut self) -> NodeSiteId {
382 let id = self.exec.ids.next_node_site_id;
383 self.exec.ids.next_node_site_id = self
384 .exec
385 .ids
386 .next_node_site_id
387 .checked_add(1)
388 .expect("NodeSiteId counter overflow");
389 NodeSiteId::from(id)
390 }
391
392 /// Drop slot_table and execution_state entries belonging to
393 /// executions that have completed. An execution is complete
394 /// when it has no frontier entries, no pending_async entries,
395 /// and no pending_calls entry pointing at its `ExecId`. Called
396 /// at the end of every `poll()` cycle so a long-running Node
397 /// keeps a bounded slot_table.
398 pub(crate) fn gc_completed_executions(&mut self) {
399 if self.exec.execution_state.is_empty() {
400 return;
401 }
402 let mut live: std::collections::HashSet<ExecId> =
403 std::collections::HashSet::with_capacity(self.exec.execution_state.len());
404 for (_, exec_id) in &self.exec.frontier {
405 live.insert(*exec_id);
406 }
407 for p in self.exec.pending_async.values() {
408 live.insert(p.exec_id);
409 }
410 for exec_id in self.exec.pending_calls.keys() {
411 live.insert(*exec_id);
412 }
413 let dead: Vec<ExecId> = self
414 .exec
415 .execution_state
416 .keys()
417 .copied()
418 .filter(|e| !live.contains(e))
419 .collect();
420 if dead.is_empty() {
421 return;
422 }
423 let dead_set: std::collections::HashSet<ExecId> = dead.iter().copied().collect();
424 // Walk doomed slot entries once to release any charged
425 // ingress bytes the slot-table writer admitted, then drop
426 // the entries. `retain` would let us mutate-in-place, but
427 // it borrows the table mutably for the entire walk; the
428 // explicit collect-then-remove pattern lets us drain
429 // `charged_bytes()` from each prior carrier first.
430 let doomed_keys: Vec<(NodeSiteId, ExecId)> = self
431 .exec
432 .slot_table
433 .iter()
434 .filter_map(|(key, _)| dead_set.contains(&key.1).then_some(*key))
435 .collect();
436 for key in doomed_keys {
437 // `clear_slot` releases the prior carrier's
438 // `charged_bytes()` against `ingress_bytes_in_flight`.
439 // Non-ingress carriers report 0 — release is a no-op.
440 let _ = self.clear_slot(key.0, key.1);
441 }
442 for exec_id in &dead {
443 self.exec.execution_state.remove(exec_id);
444 }
445 }
446
447 /// Apply production-safety caps from a `NodeConfig`. Called by
448 /// `Node::ensure_ready` after constructing the Engine; tests can
449 /// invoke directly to exercise specific cap values.
450 pub fn apply_config_caps(&mut self, config: &crate::node::NodeConfig) {
451 self.cycle_op_budget = config.cycle_op_budget;
452 self.max_pending_async = config.max_pending_async;
453 self.ingress_byte_budget = config.ingress_byte_budget;
454 self.framework
455 .outbound_queue
456 .set_cap(config.max_outbound_queue);
457 self.bus.set_cap(Some(config.bus_capacity));
458 // The off-thread `CompletionSink::complete` path consults the
459 // ingress queue itself for its per-item cap; reseed the
460 // atomic so sinks created before this call see the configured
461 // value on their next push.
462 self.ingress
463 .set_completion_result_cap(config.max_completion_result_bytes);
464 // Reseed the BackpressureTracker with the configured knobs.
465 // `apply_config_caps` is the canonical entry the host calls
466 // before the first poll, so a fresh tracker reflecting the
467 // resolved knobs is the only state observers see.
468 self.framework.peer_state.backpressure = crate::framework::BackpressureTracker::with_config(
469 config.backpressure_high_water_pct,
470 config.backpressure_k_before_silent,
471 config.backpressure_min_notice_interval_ns,
472 );
473 }
474
475 /// Live count of ingress bytes the engine currently holds across
476 /// its ingress queue + slot table + pending async completion
477 /// buffers. Updated by every successful charge / release pair on
478 /// the ingress paths. Surfaced for observability (operator
479 /// dashboards) and assertions.
480 pub fn ingress_bytes_in_flight(&self) -> usize {
481 self.ingress_bytes_in_flight
482 }
483
484 /// Configured cap on cumulative in-flight ingress bytes,
485 /// sourced from `NodeConfig::ingress_byte_budget`. Constant
486 /// between `apply_config_caps` calls.
487 pub fn ingress_byte_budget(&self) -> usize {
488 self.ingress_byte_budget
489 }
490
491 /// Pre-admission budget guard for an ingress payload of length
492 /// `bytes`. On success the bytes are added to
493 /// `ingress_bytes_in_flight` and the caller may install the
494 /// resulting carrier into the slot table or pending-completion
495 /// queue. On overflow the counter is left unchanged and the
496 /// caller drops the payload, emitting the appropriate
497 /// `BudgetExceeded` `InfraEvent`.
498 ///
499 /// One saturating-add + one comparison; below the cost of the
500 /// prost decode that typically follows.
501 pub(crate) fn try_charge(&mut self, bytes: usize) -> Result<(), BudgetExceededReason> {
502 let after = self.ingress_bytes_in_flight.saturating_add(bytes);
503 if after > self.ingress_byte_budget {
504 return Err(BudgetExceededReason {
505 byte_count: bytes,
506 budget_remaining: self
507 .ingress_byte_budget
508 .saturating_sub(self.ingress_bytes_in_flight),
509 });
510 }
511 self.ingress_bytes_in_flight = after;
512 Ok(())
513 }
514
515 /// Decrement `ingress_bytes_in_flight` after a charged payload
516 /// leaves engine state — slot-table overwrite, slot clear,
517 /// eviction, or in-cycle drop. `saturating_sub` defends against
518 /// a release path that arrives without a paired charge (e.g. a
519 /// snapshot replay reseeding the slot table before any wire
520 /// traffic).
521 pub(crate) fn release(&mut self, bytes: usize) {
522 self.ingress_bytes_in_flight = self.ingress_bytes_in_flight.saturating_sub(bytes);
523 }
524
525 /// Write a value into the slot table at `(site, exec_id)`,
526 /// releasing the prior occupant's `charged_bytes` against
527 /// `ingress_bytes_in_flight`. The incoming carrier's
528 /// `charged_bytes` is NOT re-added — admission callers
529 /// (`decode_typed_fill`, `deliver_event`, etc.) have already run
530 /// `try_charge` against the wire-byte budget. This helper is the
531 /// slot-table-side bookkeeping that closes the loop on overwrite.
532 ///
533 /// Returns the prior boxed value (if any) so the caller can
534 /// run additional teardown.
535 pub(crate) fn slot_write(
536 &mut self,
537 site: NodeSiteId,
538 exec_id: ExecId,
539 value: Box<dyn SlotValue>,
540 ) -> Option<Box<dyn SlotValue>> {
541 let key = (site, exec_id);
542 let prior = self.exec.slot_table.insert(key, Some(value));
543 // `prior` is `Option<Option<Box<dyn SlotValue>>>`: outer None
544 // means the slot was untouched; inner None means the slot
545 // existed but was empty (cleared previously). Release only
546 // when the prior carrier was alive.
547 match prior {
548 Some(Some(prior_box)) => {
549 self.ingress_bytes_in_flight = self
550 .ingress_bytes_in_flight
551 .saturating_sub(prior_box.charged_bytes());
552 Some(prior_box)
553 }
554 _ => None,
555 }
556 }
557
558 /// Remove the slot at `(site, exec_id)`, releasing any
559 /// `charged_bytes` the prior carrier was holding against
560 /// `ingress_bytes_in_flight`. Returns the removed carrier so
561 /// downstream paths (graph reset, GC) can pass it onward.
562 pub(crate) fn clear_slot(
563 &mut self,
564 site: NodeSiteId,
565 exec_id: ExecId,
566 ) -> Option<Box<dyn SlotValue>> {
567 let key = (site, exec_id);
568 match self.exec.slot_table.remove(&key) {
569 Some(Some(prior_box)) => {
570 self.ingress_bytes_in_flight = self
571 .ingress_bytes_in_flight
572 .saturating_sub(prior_box.charged_bytes());
573 Some(prior_box)
574 }
575 _ => None,
576 }
577 }
578
579 /// Snapshot of the engine's hot-path state, sized for cheap
580 /// reads on every poll cycle (no allocation). Production
581 /// observability: operators see saturation building up before
582 /// the process locks up.
583 pub fn engine_stats(&self) -> EngineStats {
584 EngineStats {
585 frontier_len: self.exec.frontier.len(),
586 bus_len: self.bus.len(),
587 pending_async: self.exec.pending_async.len(),
588 slot_table_occupied: self.exec.slot_table.len(),
589 ingress_depth: self.ingress.len(),
590 outbound_queue_depth: self.framework.outbound_queue.len(),
591 event_subscriptions: self.event_subscriptions.len(),
592 registered_components: self.components.len(),
593 graph_slots: self.graphs.len(),
594 }
595 }
596
597 /// Register a `ProtocolRuntime` dispatcher for the concrete
598 /// component type `T`. Call once per `T` after constructing the
599 /// Engine - typically alongside `register_component` for any
600 /// component whose `dispatch_atomic` you want to drive. Indexed
601 /// by `TypeId::of::<T>()` so dispatch is one HashMap lookup,
602 /// not a linear scan across the registry.
603 pub fn register_protocol_dispatcher<T: crate::roles::ProtocolRuntime + 'static>(&mut self)
604 where
605 T::Error: std::fmt::Display,
606 {
607 let type_id = std::any::TypeId::of::<T>();
608 self.role_dispatchers
609 .insert(type_id, make_protocol_dispatcher::<T>());
610 }
611
612 /// Register a role dispatcher keyed by `TypeId::of::<T>()` for a
613 /// concrete `IndexRuntime` impl. Lets `Node::with_index(&value)`
614 /// wire atomic dispatch even when `T` does not implement
615 /// `ProtocolRuntime`. Calling this twice for the same `T`
616 /// silently overwrites; the dispatcher is idempotent because
617 /// `T::dispatch_atomic` is the only consumer.
618 pub fn register_index_dispatcher<T: crate::roles::IndexRuntime + 'static>(&mut self)
619 where
620 <T as crate::roles::IndexRuntime>::Error: std::fmt::Display,
621 {
622 let type_id = std::any::TypeId::of::<T>();
623 self.role_dispatchers
624 .insert(type_id, crate::engine::invoke::make_index_dispatcher::<T>());
625 }
626
627 /// Register an `AggregatorRuntime` dispatcher. See
628 /// [`Engine::register_index_dispatcher`] for the rationale.
629 pub fn register_aggregator_dispatcher<T: crate::roles::AggregatorRuntime + 'static>(&mut self)
630 where
631 <T as crate::roles::AggregatorRuntime>::Error: std::fmt::Display,
632 {
633 let type_id = std::any::TypeId::of::<T>();
634 self.role_dispatchers.insert(
635 type_id,
636 crate::engine::invoke::make_aggregator_dispatcher::<T>(),
637 );
638 }
639
640 /// Register a `ModelRuntime` dispatcher. See
641 /// [`Engine::register_index_dispatcher`] for the rationale.
642 pub fn register_model_dispatcher<T: crate::roles::ModelRuntime + 'static>(&mut self)
643 where
644 <T as crate::roles::ModelRuntime>::Error: std::fmt::Display,
645 {
646 let type_id = std::any::TypeId::of::<T>();
647 self.role_dispatchers
648 .insert(type_id, crate::engine::invoke::make_model_dispatcher::<T>());
649 }
650
651 /// Register a `CodecRuntime` dispatcher. See
652 /// [`Engine::register_index_dispatcher`] for the rationale.
653 pub fn register_codec_dispatcher<T: crate::roles::CodecRuntime + 'static>(&mut self)
654 where
655 <T as crate::roles::CodecRuntime>::Error: std::fmt::Display,
656 {
657 let type_id = std::any::TypeId::of::<T>();
658 self.role_dispatchers
659 .insert(type_id, crate::engine::invoke::make_codec_dispatcher::<T>());
660 }
661
662 /// Register a `DataSourceRuntime` dispatcher. See
663 /// [`Engine::register_index_dispatcher`] for the rationale.
664 pub fn register_data_source_dispatcher<T: crate::roles::DataSourceRuntime + 'static>(&mut self)
665 where
666 <T as crate::roles::DataSourceRuntime>::Error: std::fmt::Display,
667 {
668 let type_id = std::any::TypeId::of::<T>();
669 self.role_dispatchers.insert(
670 type_id,
671 crate::engine::invoke::make_data_source_dispatcher::<T>(),
672 );
673 }
674
675 /// Register a `PeerSelectorRuntime` dispatcher. See
676 /// [`Engine::register_index_dispatcher`] for the rationale.
677 pub fn register_peer_selector_dispatcher<T: crate::roles::PeerSelectorRuntime + 'static>(
678 &mut self,
679 ) where
680 <T as crate::roles::PeerSelectorRuntime>::Error: std::fmt::Display,
681 {
682 let type_id = std::any::TypeId::of::<T>();
683 self.role_dispatchers.insert(
684 type_id,
685 crate::engine::invoke::make_peer_selector_dispatcher::<T>(),
686 );
687 }
688
689 /// Register a `BackendRuntime` dispatcher. The `Backend` Contract
690 /// trait's per-atomic-op surface dispatches through this entry,
691 /// emitted from `#[derive(bb::Backend)]`.
692 pub fn register_backend_dispatcher<T: crate::roles::BackendRuntime + 'static>(&mut self)
693 where
694 <T as crate::roles::BackendRuntime>::Error: std::fmt::Display,
695 {
696 let type_id = std::any::TypeId::of::<T>();
697 self.role_dispatchers.insert(
698 type_id,
699 crate::engine::invoke::make_backend_dispatcher::<T>(),
700 );
701 }
702
703 /// Register a `Bootstrap` dispatcher. The `Bootstrap` Contract
704 /// method dispatches through this entry — keyed on `TypeId::of::<T>()`
705 /// so the engine's `fire_ready_bootstrap` Component arm reaches
706 /// the concrete `T::bootstrap` impl via downcast without scanning
707 /// the registry. The derive bridge in F5 emits the call to this
708 /// method alongside the Component's other role registrations.
709 pub fn register_bootstrap_dispatcher<T: crate::contracts::bootstrap::Bootstrap + 'static>(
710 &mut self,
711 ) where
712 <T as crate::contracts::bootstrap::Bootstrap>::Error: std::fmt::Display,
713 {
714 let type_id = std::any::TypeId::of::<T>();
715 self.bootstrap_dispatchers.insert(
716 type_id,
717 crate::engine::invoke::make_bootstrap_dispatcher::<T>(),
718 );
719 }
720
721 /// Bind a Component bootstrap entry. Records `slot → cref` in
722 /// `bootstrap.component_bootstraps`; subsequent host-supplied
723 /// `BootstrapRequest`s targeting `slot` resolve the bound
724 /// `ComponentRef` through this map. Wires the Component-arm
725 /// seam the F5 install path will populate; F3 Commit 3 exposes
726 /// it under `test-components` so the integration tests in
727 /// `core_component_bootstrap_tests.rs` can register fixtures
728 /// without going through the install pipeline.
729 #[cfg(any(test, feature = "test-components"))]
730 pub fn register_component_bootstrap(&mut self, slot: &str, cref: ComponentRef) {
731 self.bootstrap.component_bootstraps.insert(
732 slot.to_string(),
733 crate::engine::bootstrap::ComponentBootstrap { cref },
734 );
735 }
736
737 /// Record the inventory-declared roles for a registered
738 /// component. `Node::ensure_ready` calls this once per
739 /// `ComponentRef` after registration, passing the set computed
740 /// from `crate::registry::roles_for_component(T::TYPE_NAME)`.
741 pub fn set_component_roles(
742 &mut self,
743 cref: ComponentRef,
744 roles: std::collections::HashSet<crate::registry::ComponentRole>,
745 ) {
746 self.component_roles.insert(cref, roles);
747 }
748
749 /// Return the inventory-declared roles for a registered
750 /// component, or an empty set if the component wasn't registered
751 /// through a derive emitting `ComponentRoleBinding` entries.
752 /// Introspection surface for engine tests + host tooling.
753 pub fn roles_for(
754 &self,
755 cref: ComponentRef,
756 ) -> std::collections::HashSet<crate::registry::ComponentRole> {
757 self.component_roles.get(&cref).cloned().unwrap_or_default()
758 }
759
760 /// Register a `binding_id → ComponentRef` mapping. Called by
761 /// `Node::ensure_ready` after binding resolution so the bound-
762 /// backend lookup can resolve a NodeProto's `binding_id`
763 /// metadata against an installed component.
764 pub fn register_binding_id(&mut self, binding_id: String, cref: ComponentRef) {
765 self.binding_id_index.insert(binding_id, cref);
766 }
767
768 /// Look up the `ComponentRef` bound at the given slot name -
769 /// the GENERIC dependency-resolution accessor. Components reach
770 /// declared dependencies through this method (typically via
771 /// [`crate::runtime::ComponentsView::for_slot`] at dispatch
772 /// time). Returns `None` when no slot of that name is bound.
773 pub fn slot(&self, slot: &str) -> Option<ComponentRef> {
774 self.slots.get(slot).copied()
775 }
776
777 /// Bind a `ComponentRef` at the given slot name. The
778 /// `install` facade in the `bytesandbrains` crate calls this
779 /// from the T8 chain; in-crate callers use it from the
780 /// transitional `Node::with_backend(slot, &b)` path. Returns
781 /// the previous binding if any.
782 pub fn bind_slot(&mut self, slot: String, cref: ComponentRef) -> Option<ComponentRef> {
783 self.slots.insert(slot, cref)
784 }
785
786 /// Register the compiler-assigned `slot_id` → `ComponentRef`
787 /// mapping. Called by `bb::install()` alongside
788 /// [`Self::bind_slot`]; the pair is read by
789 /// [`Self::resolve_dispatch`] when stamping
790 /// `OpDispatch::Atomic` against a role NodeProto's
791 /// `ai.bytesandbrains.slot_id` metadata. Returns the previous
792 /// binding, if any.
793 pub fn bind_slot_id(&mut self, slot_id: u32, cref: ComponentRef) -> Option<ComponentRef> {
794 self.slot_id_to_cref.insert(slot_id, cref)
795 }
796
797 /// Register the compiler-assigned `slot_id` → `(role,
798 /// ComponentRef)` mapping. Called by `bb::install()` alongside
799 /// [`Self::bind_slot_id`]; the role is required so
800 /// `decode_typed_fill` can branch between framework-carrier
801 /// decode (`Codec`, `Index`, …) and backend-mediated tensor
802 /// materialisation (`Backend`). Returns the previous binding, if
803 /// any.
804 pub fn bind_slot_id_with_role(
805 &mut self,
806 slot_id: u32,
807 role: crate::registry::ComponentRole,
808 cref: ComponentRef,
809 ) -> Option<(crate::registry::ComponentRole, ComponentRef)> {
810 self.slot_id_to_role_ref.insert(slot_id, (role, cref))
811 }
812
813 /// Look up the `(role, ComponentRef)` bound at a compiler-assigned
814 /// `slot_id`. Used by `decode_typed_fill` to discover whether an
815 /// inbound wire payload routes through a backend.
816 pub fn role_ref_for_slot_id(
817 &self,
818 slot_id: u32,
819 ) -> Option<(crate::registry::ComponentRole, ComponentRef)> {
820 self.slot_id_to_role_ref.get(&slot_id).copied()
821 }
822
823 /// Iterate every `(slot_name, ComponentRef)` pair currently
824 /// bound. Surfaces the registry to introspection tools + the
825 /// compiler's resolve-dependencies pass.
826 pub fn slots_iter(&self) -> impl Iterator<Item = (&str, ComponentRef)> {
827 self.slots.iter().map(|(k, v)| (k.as_str(), *v))
828 }
829
830 /// Subscribe a `NodeSiteId` to bus events of `event_kind` (the
831 /// discriminator returned by [`crate::bus::NodeEvent::kind`]).
832 /// The bus-routing pass writes a `TriggerValue` to each
833 /// subscribed site at a fresh `ExecId` and pushes the site's
834 /// downstream consumers onto the frontier — uniform with wire
835 /// delivery semantics per `docs/ADDRESSING.md`.
836 ///
837 /// `Node` calls this at install time for every
838 /// `EventSource` syscall op, passing the op's output `NodeSiteId`.
839 pub fn register_event_subscription(&mut self, event_kind: &str, site: NodeSiteId) {
840 let entry = self
841 .event_subscriptions
842 .entry(event_kind.to_string())
843 .or_default();
844 if !entry.contains(&site) {
845 entry.push(site);
846 }
847 }
848
849 /// Cheap clone of the shared `IngressQueue` handle. Test
850 /// harnesses + transport adapters push `IngressEvent`s through
851 /// this surface.
852 pub fn ingress_queue_handle(&self) -> Arc<IngressQueue> {
853 Arc::clone(&self.ingress)
854 }
855
856 /// Queue a lifecycle phase for firing on the next `poll()`
857 /// call. The framework emits `EngineStep::LifecycleFired { phase }`
858 /// for each queued phase and also pushes every `LifecyclePhase`
859 /// op enrolled under that phase name (via
860 /// [`Engine::register_lifecycle_op`]) onto the frontier with a
861 /// fresh `ExecId`.
862 pub fn fire_lifecycle(&mut self, phase: &str) {
863 self.fired_phases.push(phase.to_string());
864 }
865
866 /// Enroll `op_ref` under `phase` per IR_AND_DSL.md §5a.
867 /// Idempotent - the same `(phase, OpRef)` pair never enrolls
868 /// twice. `Node` calls this at install time after parsing
869 /// each `LifecyclePhase` NodeProto's `phase` attribute.
870 pub fn register_lifecycle_op(&mut self, phase: &str, op_ref: OpRef) {
871 let entry = self.lifecycle_table.entry(phase.to_string()).or_default();
872 if !entry.contains(&op_ref) {
873 entry.push(op_ref);
874 }
875 }
876
877 /// Register a stateless syscall op. Captures `TypeId::of::<T>()`
878 /// into both `dispatch_table` (TypeId → invoke fn) and
879 /// `syscall_index` ((domain, op_type) → TypeId) so
880 /// `resolve_dispatch` can stamp `OpDispatch::Stateless`.
881 ///
882 /// Register a stateless syscall by its `(domain, op_type)` key.
883 pub fn register_syscall(&mut self, domain: &str, op_type: &str, invoke_fn: StatelessInvokeFn) {
884 self.syscall_table
885 .insert((domain.to_string(), op_type.to_string()), invoke_fn);
886 }
887
888 /// Install every framework syscall shipped via inventory by
889 /// `bb-ops`. Each registration carries its own
890 /// `(domain, op_type)` + invoke pointer; the engine stamps them
891 /// into `syscall_table`.
892 pub fn register_all_framework_syscalls(&mut self) {
893 for reg in crate::registry::framework_syscalls() {
894 self.syscall_table.insert(
895 (reg.domain.to_string(), reg.op_type.to_string()),
896 reg.invoke,
897 );
898 }
899 }
900
901 /// Test-only installer. Inserts a fresh `GraphSlot` keyed by
902 /// `name` with empty per-node tables but with `op_dispatch`
903 /// pre-filled with `Unresolved` so subsequent `resolve_dispatch`
904 /// can stamp dispatch decisions. Use [`Engine::install_graph`]
905 /// for the canonical path that walks the FunctionProto.
906 #[cfg(any(test, feature = "test-components"))]
907 pub fn install_graph_for_test(
908 &mut self,
909 name: String,
910 function: FunctionProto,
911 ) -> &mut GraphSlot {
912 let mut g = GraphSlot::new_for_test(name.clone(), function);
913 g.op_dispatch = (0..g.function.node.len())
914 .map(|_| crate::engine::dispatch_entry::OpDispatch::Unresolved)
915 .collect();
916 let idx = self.push_graph_slot(name, g);
917 &mut self.graphs[idx as usize]
918 }
919
920 /// Push a `GraphSlot` onto the storage Vec and register its
921 /// name → index entry. Returns the assigned `graph_idx` (the
922 /// value that gets packed into `OpRef`). Same name twice
923 /// overwrites the existing slot but keeps the original
924 /// `graph_idx` — preserving OpRef stability across re-install.
925 pub(crate) fn push_graph_slot(&mut self, name: String, slot: GraphSlot) -> u32 {
926 if let Some(&idx) = self.graph_index.get(&name) {
927 self.graphs[idx as usize] = slot;
928 return idx;
929 }
930 let idx = self.graphs.len() as u32;
931 self.graph_index.insert(name, idx);
932 self.graphs.push(slot);
933 idx
934 }
935
936 /// Resolve a graph by name. Returns `None` when the name
937 /// isn't registered. Equivalent to `self.graphs.get(name)` on
938 /// the prior HashMap-keyed shape.
939 pub fn graph(&self, name: &str) -> Option<&GraphSlot> {
940 let idx = *self.graph_index.get(name)?;
941 self.graphs.get(idx as usize)
942 }
943
944 /// Resolve a graph by name for mutation. `None` when the name
945 /// isn't registered.
946 pub fn graph_mut(&mut self, name: &str) -> Option<&mut GraphSlot> {
947 let idx = *self.graph_index.get(name)?;
948 self.graphs.get_mut(idx as usize)
949 }
950
951 /// `true` when a graph with this name is installed.
952 pub fn has_graph(&self, name: &str) -> bool {
953 self.graph_index.contains_key(name)
954 }
955
956 /// Resolve a graph's positional index by name. Used by paths
957 /// that need to compute `OpRef::pack(idx, node_idx)` from a
958 /// graph name (function-call site resolution, etc.).
959 pub fn graph_idx(&self, name: &str) -> Option<u32> {
960 self.graph_index.get(name).copied()
961 }
962
963 /// Build an `OpRef` for the `node_idx`-th NodeProto of a graph
964 /// identified by name. Test-only convenience for tests that
965 /// used to fish the OpRef out of `GraphSlot.op_index`; with
966 /// positional `OpRef::pack(graph_idx, node_idx)` the lookup is
967 /// trivial.
968 #[cfg(any(test, feature = "test-components"))]
969 pub fn op_ref_at(&self, graph_name: &str, node_idx: u32) -> Option<OpRef> {
970 let gi = self.graph_idx(graph_name)?;
971 let g = self.graphs.get(gi as usize)?;
972 if (node_idx as usize) < g.function.node.len() {
973 Some(OpRef::pack(gi, node_idx))
974 } else {
975 None
976 }
977 }
978
979 /// Iterate every installed `GraphSlot` in install order.
980 pub fn graphs_iter(&self) -> impl Iterator<Item = &GraphSlot> {
981 self.graphs.iter()
982 }
983
984 /// Iterate every (`name`, `&GraphSlot`) pair in install order.
985 pub fn graphs_named(&self) -> impl Iterator<Item = (&str, &GraphSlot)> {
986 // graph_index maps name -> idx; rebuild idx -> name for the
987 // walk so the iteration order matches the storage Vec.
988 let mut by_idx: Vec<(u32, &str)> = self
989 .graph_index
990 .iter()
991 .map(|(n, i)| (*i, n.as_str()))
992 .collect();
993 by_idx.sort_by_key(|&(i, _)| i);
994 by_idx
995 .into_iter()
996 .filter_map(move |(i, n)| self.graphs.get(i as usize).map(|g| (n, g)))
997 }
998
999 /// Canonical install path: builds an
1000 /// [`GraphSlot`] from the FunctionProto (allocating
1001 /// `OpRef`s + `NodeSiteId`s for every node + produced value) and
1002 /// inserts it under `name`.
1003 ///
1004 /// Used by [`crate::node::Node::ready`] for each
1005 /// `ModelProto.functions[0]`. Returns a mutable reference
1006 /// for any subsequent setup (slot_bindings, local_event_subs).
1007 pub fn install_graph(&mut self, name: String, function: FunctionProto) -> &mut GraphSlot {
1008 let graph_idx = self.graphs.len() as u32;
1009 let mut g = GraphSlot::from_function(
1010 name.clone(),
1011 function,
1012 graph_idx,
1013 &mut self.exec.ids.next_node_site_id,
1014 );
1015 // Entry-point graphs (installed via `install_graph`, not
1016 // `install_function_library`) get a `NodeSiteId` registered
1017 // for every function input so `Engine::deliver_app_event`
1018 // can seed the input via ingress. Body functions used in
1019 // `OpDispatch::FunctionCall` deliberately route through
1020 // `input_aliases` and must NOT get input sites; that path
1021 // installs through `install_function_library` instead.
1022 register_function_input_sites(&mut g, &mut self.exec.ids.next_node_site_id, graph_idx);
1023 let idx = self.push_graph_slot(name, g);
1024 &mut self.graphs[idx as usize]
1025 }
1026
1027 /// Runtime-linker install: walk `model.functions[]` and install
1028 /// each FunctionProto as an `GraphSlot` keyed by its
1029 /// canonical `(domain, name, overload)`-derived string. Also
1030 /// populates the symbol-table index `functions` keyed on the same
1031 /// tuple, so call NodeProtos can be resolved at dispatch time.
1032 ///
1033 /// `entry_point_keys` lists the `FunctionKey`s for the registered
1034 /// Modules' main partition functions - those graphs get
1035 /// `is_entry_point = true` (their top-level outputs surface as
1036 /// `EngineStep::AppEvent`; sub-function bodies do not).
1037 ///
1038 /// A function stamped `MODULE_PHASE_KEY = "bootstrap"` registers
1039 /// its `FunctionKey` with the engine's bootstrap state (appends
1040 /// to `install_order`, populates `module_bootstraps`) without
1041 /// arming `pending` — install is pure. The host arms the queue by
1042 /// calling [`crate::node::Node::run_bootstrap`], which fans out
1043 /// each install-order target serially and emits one
1044 /// `BootstrapComplete` step per drained phase; multi-target
1045 /// installs surface their targets in slice order without further
1046 /// host action.
1047 ///
1048 /// Idempotent under ODR (same key + same body) - silently skips
1049 /// reinstall. Caller (Node linker) is responsible for the
1050 /// byte-equality check before calling.
1051 pub fn install_function_library(
1052 &mut self,
1053 functions: &[FunctionProto],
1054 entry_point_keys: &[FunctionKey],
1055 ) {
1056 let entry_set: std::collections::HashSet<&FunctionKey> = entry_point_keys.iter().collect();
1057 // Collect bootstrap keys + target names registered this call
1058 // so the post-pass can stamp touch sets once every function
1059 // in the batch is discoverable via `self.functions` —
1060 // necessary for forward references (a bootstrap body that
1061 // calls a sibling function declared later in `functions`).
1062 let mut new_bootstrap_targets: Vec<(FunctionKey, String)> = Vec::new();
1063 for f in functions {
1064 let key: FunctionKey = (f.domain.clone(), f.name.clone(), f.overload.clone());
1065 let graph_name = graph_name_for(&key);
1066 if self.has_graph(&graph_name) {
1067 continue;
1068 }
1069 let graph_idx = self.graphs.len() as u32;
1070 let mut g = GraphSlot::from_function(
1071 graph_name.clone(),
1072 f.clone(),
1073 graph_idx,
1074 &mut self.exec.ids.next_node_site_id,
1075 );
1076 // Only entry-point functions surface their outputs as
1077 // AppEvents. Sub-function bodies' outputs are forwarded
1078 // via output_forwarding at call sites.
1079 g.is_entry_point = entry_set.contains(&key);
1080 if !g.is_entry_point {
1081 g.top_level_outputs.clear();
1082 }
1083 let is_bootstrap = bb_ir::keys::read_function_module_phase(f)
1084 .is_some_and(|p| p == bb_ir::keys::MODULE_PHASE_BOOTSTRAP);
1085 // Bootstrap functions seed their inputs through the F5
1086 // host-driven staging path
1087 // (`Engine::enqueue_bootstrap_request`) rather than via a
1088 // FunctionCall splice. Mint a `NodeSiteId` per declared
1089 // input formal so the staging path can address the slot
1090 // via `(NodeSiteId, body_exec_id)` and the body ops can
1091 // resolve their input names through `resolve_site_name`.
1092 if is_bootstrap {
1093 register_function_input_sites(
1094 &mut g,
1095 &mut self.exec.ids.next_node_site_id,
1096 graph_idx,
1097 );
1098 }
1099 self.push_graph_slot(graph_name, g);
1100 self.functions.insert(key.clone(), f.clone());
1101 if is_bootstrap {
1102 // Register the bootstrap target. A multi-target
1103 // install registers one bootstrap per target (in
1104 // the order [`crate::install::install`] iterates the
1105 // user-supplied `targets` slice). Seeding drains
1106 // `install_order` front-to-back so each target's
1107 // bootstrap fires in slice order.
1108 let target_name = key.1.clone();
1109 self.bootstrap.register_module(key.clone());
1110 new_bootstrap_targets.push((key, target_name));
1111 }
1112 }
1113 // Stamp touch sets for every bootstrap target registered in
1114 // this batch. Deferred until after the install loop so
1115 // forward-referenced FunctionCalls (callees declared later
1116 // in `functions`) resolve against the fully populated
1117 // `self.functions` registry.
1118 for (key, target_name) in new_bootstrap_targets {
1119 let touch_set = self.compute_touch_set(&key);
1120 if let Some(meta) = self.bootstrap.module_bootstraps.get_mut(&target_name) {
1121 meta.touch_set = touch_set;
1122 }
1123 }
1124 }
1125
1126 /// Closure of every `ComponentRef` referenced by `function_key`'s
1127 /// body (slot-id NodeProtos + transitive FunctionCall callees).
1128 /// Walks the function body once: each NodeProto carrying
1129 /// [`bb_ir::keys::SLOT_ID_KEY`] contributes the bound
1130 /// `ComponentRef` from [`Self::slot_id_to_cref`]; each NodeProto
1131 /// whose `(domain, op_type, overload)` resolves a sibling
1132 /// FunctionProto in [`Self::functions`] recurses on that callee.
1133 /// `visited_keys` defends against bootstrap recursion cycles
1134 /// (Module A bootstrap calls Module B body that calls Module A
1135 /// body) so the walk terminates even if the program graph
1136 /// contains a back-edge through FunctionCalls.
1137 pub(crate) fn compute_touch_set(
1138 &self,
1139 function_key: &FunctionKey,
1140 ) -> std::collections::HashSet<ComponentRef> {
1141 let mut touch_set: std::collections::HashSet<ComponentRef> =
1142 std::collections::HashSet::new();
1143 let mut visited_keys: std::collections::HashSet<FunctionKey> =
1144 std::collections::HashSet::new();
1145 self.collect_touch_set(function_key, &mut visited_keys, &mut touch_set);
1146 touch_set
1147 }
1148
1149 /// Recursive worker for [`Self::compute_touch_set`]. Skips
1150 /// already-visited keys (cycle defense) and missing-from-registry
1151 /// keys (defensive: the install path may discover a key whose
1152 /// callee was elided by upstream passes).
1153 fn collect_touch_set(
1154 &self,
1155 function_key: &FunctionKey,
1156 visited_keys: &mut std::collections::HashSet<FunctionKey>,
1157 touch_set: &mut std::collections::HashSet<ComponentRef>,
1158 ) {
1159 if !visited_keys.insert(function_key.clone()) {
1160 return;
1161 }
1162 let Some(function) = self.functions.get(function_key) else {
1163 return;
1164 };
1165 // `function` is borrowed off `self.functions`; the recursive
1166 // calls only read from other Engine fields (`slot_id_to_cref`,
1167 // `functions`) so the borrow holds across the loop.
1168 for node in &function.node {
1169 if let Some(slot_id) = node
1170 .metadata_props
1171 .iter()
1172 .find(|p| p.key == bb_ir::keys::SLOT_ID_KEY)
1173 .and_then(|p| p.value.parse::<u32>().ok())
1174 {
1175 if let Some(&cref) = self.slot_id_to_cref.get(&slot_id) {
1176 touch_set.insert(cref);
1177 }
1178 }
1179 let callee_key: FunctionKey = (
1180 node.domain.clone(),
1181 node.op_type.clone(),
1182 node.overload.clone(),
1183 );
1184 if self.functions.contains_key(&callee_key) {
1185 self.collect_touch_set(&callee_key, visited_keys, touch_set);
1186 }
1187 }
1188 }
1189
1190 /// Allocate the next bootstrap call's body `ExecId` and push every
1191 /// body OpRef of the front of
1192 /// [`crate::engine::bootstrap::BootstrapState::install_order`]
1193 /// onto the frontier. Returns `true` when a bootstrap call was
1194 /// seeded; `false` when the engine has no remaining bootstrap
1195 /// functions or the previous call is still in flight.
1196 ///
1197 /// Host-driven (F4): `Node::run_bootstrap` invokes this once after
1198 /// arming `bootstrap.pending`; the poll cascade reseeds via
1199 /// `maybe_complete_bootstrap` after each phase drains so multi-
1200 /// target installs surface one `BootstrapComplete` per target in
1201 /// install order without further host action. Install itself no
1202 /// longer arms `pending`.
1203 pub(crate) fn seed_bootstrap_call(&mut self) -> bool {
1204 if self.bootstrap.module_exec_id().is_some() {
1205 return false;
1206 }
1207 if self.bootstrap.next_idx >= self.bootstrap.install_order.len() {
1208 // No further bootstraps to seed; the gate clears on the
1209 // next `maybe_complete_bootstrap` pass.
1210 self.bootstrap.pending = false;
1211 return false;
1212 }
1213 let target_name = self.bootstrap.install_order[self.bootstrap.next_idx].clone();
1214 let Some(meta) = self.bootstrap.module_bootstraps.get(&target_name) else {
1215 // Defensive skip: install_order names a target whose
1216 // metadata is missing. Advance past the stale entry
1217 // rather than wedging.
1218 self.bootstrap.next_idx += 1;
1219 return false;
1220 };
1221 let key = meta.function_key.clone();
1222 let touch_set = meta.touch_set.clone();
1223 if self
1224 .fire_module_bootstrap(target_name, &key, touch_set)
1225 .is_none()
1226 {
1227 self.bootstrap.next_idx += 1;
1228 return false;
1229 }
1230 true
1231 }
1232
1233 /// Seed a Module bootstrap body onto the frontier under a fresh
1234 /// ExecId and record it in `bootstrap.in_flight`. Shared by the
1235 /// install-order kick seeder ([`Self::seed_bootstrap_call`]) and
1236 /// the conflict-queue driver ([`Self::fire_ready_bootstrap`]).
1237 /// Returns the body ExecId on success or `None` when the graph
1238 /// name is missing (defensive — install populates it).
1239 fn fire_module_bootstrap(
1240 &mut self,
1241 target_name: String,
1242 key: &FunctionKey,
1243 touch_set: std::collections::HashSet<ComponentRef>,
1244 ) -> Option<ExecId> {
1245 let graph_name = graph_name_for(key);
1246 let graph_idx = self.graph_idx(&graph_name)?;
1247 let body_exec_id = self.allocate_exec_id();
1248 let node_count = self
1249 .graphs
1250 .get(graph_idx as usize)
1251 .map(|g| g.function.node.len())
1252 .unwrap_or(0);
1253 // Bootstrap takes no input formals and produces no outputs,
1254 // so no CallContext lives in `pending_calls`; the body-op
1255 // gate identifies bootstrap-descendant ExecIds by either
1256 // direct match against an in_flight ExecId or chain walk
1257 // through descendant FunctionCall CallContexts. Quiescence
1258 // resolves through `maybe_complete_bootstrap` once every
1259 // descendant frontier + pending_async entry clears.
1260 self.bootstrap
1261 .mark_module_in_flight(target_name, body_exec_id, touch_set);
1262 for node_idx in 0..node_count as u32 {
1263 let op_ref = OpRef::pack(graph_idx, node_idx);
1264 self.exec.frontier.push_back((op_ref, body_exec_id));
1265 }
1266 Some(body_exec_id)
1267 }
1268
1269 /// Drive a [`crate::engine::bootstrap::ReadyBootstrap`] returned
1270 /// by `BootstrapState::process_pending_requests` or
1271 /// `on_bootstrap_drained` onto the frontier. Module-kind entries
1272 /// allocate an ExecId and push body ops; Component-kind entries
1273 /// allocate an ExecId, lock the bound `ComponentRef` via the
1274 /// gate's touch set, and synchronously invoke the registered
1275 /// `Bootstrap::bootstrap` impl through the dispatcher registry.
1276 /// Returns the body ExecId on success.
1277 pub(crate) fn fire_ready_bootstrap(
1278 &mut self,
1279 ready: crate::engine::bootstrap::ReadyBootstrap,
1280 ) -> Option<ExecId> {
1281 match ready.kind {
1282 crate::engine::bootstrap::BootstrapKind::Module { target } => {
1283 let key = self
1284 .bootstrap
1285 .module_bootstraps
1286 .get(&target)
1287 .map(|m| m.function_key.clone())?;
1288 self.fire_module_bootstrap(target, &key, ready.touch_set)
1289 }
1290 crate::engine::bootstrap::BootstrapKind::Component { slot } => {
1291 self.fire_component_bootstrap(slot, ready.touch_set)
1292 }
1293 }
1294 }
1295
1296 /// Seed a Component bootstrap: resolve `slot → cref` via
1297 /// `bootstrap.component_bootstraps`, allocate an ExecId, lock the
1298 /// `{cref}` touch set on `bootstrap.in_flight` so body ops on
1299 /// disjoint Components keep firing, and synchronously invoke the
1300 /// concrete `Bootstrap::bootstrap` through the dispatcher
1301 /// registry. Returns the body ExecId on success.
1302 ///
1303 /// `DispatchResult::Immediate` retires the in-flight entry
1304 /// in-line via [`crate::engine::bootstrap::BootstrapState::on_bootstrap_drained`]
1305 /// and fires any promoted waiters so the conflict-queue path
1306 /// stays uniform across Module and Component kinds.
1307 /// `DispatchResult::Async` parks the ExecId on `pending_async`
1308 /// under a synthetic `OpRef`; the regular `handle_completion`
1309 /// path drives the eventual drain once the impl calls
1310 /// `ctx.complete_command(cmd_id, …)`.
1311 fn fire_component_bootstrap(
1312 &mut self,
1313 slot: String,
1314 touch_set: std::collections::HashSet<ComponentRef>,
1315 ) -> Option<ExecId> {
1316 let cref = self.bootstrap.component_bootstraps.get(&slot)?.cref;
1317 let body_exec_id = self.allocate_exec_id();
1318 // Use the supplied touch_set so caller-supplied lock semantics
1319 // win when present; F4 may extend the closure to include
1320 // declared dependencies. Falling back to `{cref}` keeps the
1321 // gate locking the dispatching Component even when the host
1322 // forgets to pass a touch_set (defensive — production
1323 // callers always populate it).
1324 let touch_set = if touch_set.is_empty() {
1325 let mut t = std::collections::HashSet::new();
1326 t.insert(cref);
1327 t
1328 } else {
1329 touch_set
1330 };
1331 self.bootstrap
1332 .in_flight
1333 .push(crate::engine::bootstrap::InFlightBootstrap {
1334 kind: crate::engine::bootstrap::BootstrapKind::Component { slot: slot.clone() },
1335 exec_id: body_exec_id,
1336 touch_set,
1337 staged_inputs: std::collections::HashSet::new(),
1338 });
1339 // Resolve dispatcher + invoke. Errors here surface as a
1340 // failed bootstrap step — the host sees an EngineStep
1341 // `OpFailed`-style emission once F5 wires the step plumbing;
1342 // for Commit 3 the synchronous-Immediate path leaves the
1343 // engine state ready for the next ready bootstrap and lets
1344 // the caller observe the success via `bootstrap.in_flight`
1345 // draining.
1346 let outcome = self.dispatch_component_bootstrap(cref);
1347 match outcome {
1348 Ok(crate::atomic::DispatchResult::Immediate(_outputs)) => {
1349 // Component bootstrap produces no slot outputs (the
1350 // Bootstrap trait returns `Result<(), Error>`). Retire
1351 // the in-flight entry by ExecId and promote any
1352 // waiters; the gate re-opens for the locked
1353 // ComponentRef once the in_flight retain step lands.
1354 let promoted = self.bootstrap.on_bootstrap_drained(body_exec_id);
1355 for r in promoted {
1356 let _ = self.fire_ready_bootstrap(r);
1357 }
1358 Some(body_exec_id)
1359 }
1360 Ok(crate::atomic::DispatchResult::Async(cmd_id)) => {
1361 // Park the body ExecId on pending_async under a
1362 // synthetic OpRef. graph_idx = u32::MAX is reserved
1363 // for Component-bootstrap synthetic ops so the
1364 // regular `node_for` lookup returns `None` (no
1365 // NodeProto on the frontier) and downstream code
1366 // skips dispatch retries.
1367 let synthetic_op = OpRef::pack(u32::MAX, 0);
1368 self.exec.pending_async.insert(
1369 cmd_id,
1370 crate::engine::pending_async::PendingAsync {
1371 op_ref: synthetic_op,
1372 exec_id: body_exec_id,
1373 output_sites: Vec::new(),
1374 deadline_ns: None,
1375 },
1376 );
1377 Some(body_exec_id)
1378 }
1379 Err(_) => {
1380 // Dispatcher missing or impl returned Err — retire
1381 // the in-flight entry so the queue does not wedge;
1382 // the host observes the unfinished work via the
1383 // missing BootstrapComplete step. F5 promotes this
1384 // path to an OpFailed-style EngineStep.
1385 let _ = self.bootstrap.on_bootstrap_drained(body_exec_id);
1386 None
1387 }
1388 }
1389 }
1390
1391 /// Look up + invoke the Bootstrap dispatcher for the Component at
1392 /// `cref`. Uses the same take-restore pattern as `invoke_atomic`
1393 /// so the dispatch closure has exclusive access to the concrete
1394 /// while the rest of `engine.components` stays borrowable for
1395 /// `RuntimeResourceRef`-style accessors (F5 plumbing — today the
1396 /// Bootstrap ctx carries only the cref).
1397 fn dispatch_component_bootstrap(
1398 &mut self,
1399 cref: ComponentRef,
1400 ) -> Result<crate::atomic::DispatchResult, String> {
1401 let mut taken = self
1402 .take_component(cref)
1403 .ok_or_else(|| "component missing".to_string())?;
1404 // `Box<dyn ErasedComponent>` coerces to `&mut dyn Any` via
1405 // the `Any` supertrait bound on `ErasedComponent`; mirrors
1406 // the dispatch path in `invoke_atomic` so the downcast in
1407 // the `BootstrapDispatchFn` stays consistent across surfaces.
1408 let any: &mut dyn std::any::Any = taken.as_mut();
1409 let tid = (*any).type_id();
1410 let dispatcher = self
1411 .bootstrap_dispatchers
1412 .get(&tid)
1413 .copied()
1414 .ok_or_else(|| "no Bootstrap dispatcher registered for component".to_string());
1415 let result = match dispatcher {
1416 Ok(f) => {
1417 let mut ctx = crate::contracts::bootstrap::BootstrapCtx::new(cref);
1418 f(any, &mut ctx)
1419 }
1420 Err(e) => Err(e),
1421 };
1422 self.restore_component(cref, taken);
1423 result
1424 }
1425
1426 /// Stage a host-supplied [`crate::engine::BootstrapRequest`] and
1427 /// fire the target Module's bootstrap immediately:
1428 ///
1429 /// 1. Resolve `request.target` → `FunctionKey` →
1430 /// `graph_name` → `graph_idx`.
1431 /// 2. Read the bootstrap function's declared input port names
1432 /// (the slots minted by `register_function_input_sites` against
1433 /// `function.input`).
1434 /// 3. Validate the supplied `(input_name, bytes)` pairs against
1435 /// the formal set — missing required → `MissingInput`, extra →
1436 /// `UnknownInput`.
1437 /// 4. Allocate the body `ExecId`.
1438 /// 5. For each formal: charge against the ingress byte budget,
1439 /// fallibly reserve a framework-owned `Vec<u8>` via the
1440 /// `try_reserve_exact` seam, copy in the caller's borrowed
1441 /// bytes, wrap as `BytesValue`, and write into the slot table
1442 /// at `(site, body_exec_id)`. The caller's slices may drop the
1443 /// moment this call returns — Principle 1a satisfied.
1444 /// 6. Mark the Module bootstrap in-flight + push the body's
1445 /// `OpRef`s onto the frontier so the next `Engine::poll` drives
1446 /// it to completion.
1447 ///
1448 /// Mid-flight failures (budget exhaustion, allocator fault) release
1449 /// every byte charge admitted earlier in the same call so the
1450 /// counter never leaks. The seed/in-flight bookkeeping only lands
1451 /// once every input stages successfully, so a failed request leaves
1452 /// the engine's bootstrap state untouched.
1453 pub fn enqueue_bootstrap_request(
1454 &mut self,
1455 request: crate::engine::bootstrap::BootstrapRequest<'_>,
1456 ) -> Result<(), crate::errors::BootstrapError> {
1457 // 1. Resolve target → function_key → graph_idx.
1458 let meta = self
1459 .bootstrap
1460 .module_bootstraps
1461 .get(request.target)
1462 .ok_or_else(|| crate::errors::BootstrapError::UnknownTarget {
1463 target_name: request.target.to_string(),
1464 available: self.bootstrap.install_order.clone(),
1465 })?;
1466 let function_key = meta.function_key.clone();
1467 let touch_set = meta.touch_set.clone();
1468 let graph_name = graph_name_for(&function_key);
1469 let graph_idx = self.graph_idx(&graph_name).ok_or_else(|| {
1470 crate::errors::BootstrapError::UnknownTarget {
1471 target_name: request.target.to_string(),
1472 available: self.bootstrap.install_order.clone(),
1473 }
1474 })?;
1475 let graph = &self.graphs[graph_idx as usize];
1476
1477 // 2. Read declared input formals from the GraphSlot.
1478 let declared: Vec<String> = graph.function.input.clone();
1479
1480 // 3. Validate. UnknownInput fires first (the supplied set is
1481 // the host's authoritative request shape; surfacing extras
1482 // before missing ones gives clearer diagnostics on typos).
1483 for (input_name, _) in request.inputs {
1484 if !declared.iter().any(|d| d == input_name) {
1485 return Err(crate::errors::BootstrapError::UnknownInput {
1486 target_name: request.target.to_string(),
1487 input_name: input_name.to_string(),
1488 declared: declared.clone(),
1489 });
1490 }
1491 }
1492 for formal in &declared {
1493 if !request.inputs.iter().any(|(name, _)| *name == formal) {
1494 return Err(crate::errors::BootstrapError::MissingInput {
1495 target_name: request.target.to_string(),
1496 input_name: formal.clone(),
1497 });
1498 }
1499 }
1500
1501 // Resolve each formal to its NodeSiteId before allocating the
1502 // ExecId — a missing site at this stage is an install
1503 // invariant violation (register_function_input_sites runs
1504 // alongside register_module above), but defending against it
1505 // keeps the error path total.
1506 let mut sites: Vec<(crate::ids::NodeSiteId, &[u8])> =
1507 Vec::with_capacity(request.inputs.len());
1508 for (input_name, bytes) in request.inputs {
1509 let Some(&site) = graph.site_names.get(*input_name) else {
1510 return Err(crate::errors::BootstrapError::UnknownInput {
1511 target_name: request.target.to_string(),
1512 input_name: input_name.to_string(),
1513 declared: declared.clone(),
1514 });
1515 };
1516 sites.push((site, *bytes));
1517 }
1518
1519 // 4. Allocate body ExecId. Done after validation so a rejected
1520 // request does not consume an ExecId counter slot.
1521 let body_exec_id = self.allocate_exec_id();
1522
1523 // 5. Per-input charge + Principle 1a copy. Track total
1524 // admitted bytes so a mid-loop failure releases the full
1525 // charge in one shot (none of the staged carriers ever
1526 // reached the slot table).
1527 let mut admitted: usize = 0;
1528 let mut staged_sites: Vec<crate::ids::NodeSiteId> = Vec::with_capacity(sites.len());
1529 for (site, bytes) in &sites {
1530 let byte_count = bytes.len();
1531 if let Err(reason) = self.try_charge(byte_count) {
1532 self.release(admitted);
1533 return Err(crate::errors::BootstrapError::AllocationFailed {
1534 target_name: request.target.to_string(),
1535 byte_count,
1536 budget_remaining: reason.budget_remaining,
1537 });
1538 }
1539 admitted = admitted.saturating_add(byte_count);
1540 let mut owned: Vec<u8> = Vec::new();
1541 if crate::fallible::try_reserve_exact(&mut owned, byte_count).is_err() {
1542 self.release(admitted);
1543 return Err(crate::errors::BootstrapError::AllocationFailed {
1544 target_name: request.target.to_string(),
1545 byte_count,
1546 budget_remaining: self
1547 .ingress_byte_budget
1548 .saturating_sub(self.ingress_bytes_in_flight),
1549 });
1550 }
1551 owned.extend_from_slice(bytes);
1552 let value: Box<dyn crate::slot_value::SlotValue> =
1553 Box::new(crate::syscall::values::BytesValue(owned));
1554 self.slot_write(*site, body_exec_id, value);
1555 staged_sites.push(*site);
1556 }
1557
1558 // 6. Mark the Module bootstrap in-flight and push every body
1559 // OpRef onto the frontier. The body-op gate (`is_op_locked`)
1560 // recognises the freshly seeded ExecId via `module_exec_id`
1561 // and the touch-set so disjoint Components keep firing. Any
1562 // body op that consumes a staged input reads its value
1563 // through `resolve_site_name` → slot_table at
1564 // `(site, body_exec_id)`.
1565 self.bootstrap.pending = true;
1566 self.bootstrap
1567 .mark_module_in_flight(request.target.to_string(), body_exec_id, touch_set);
1568 let node_count = self.graphs[graph_idx as usize].function.node.len();
1569 for node_idx in 0..node_count as u32 {
1570 let op_ref = OpRef::pack(graph_idx, node_idx);
1571 self.exec.frontier.push_back((op_ref, body_exec_id));
1572 }
1573 Ok(())
1574 }
1575
1576 /// Drain `bootstrap.pending_requests` once + fire each
1577 /// non-conflicting target. Conflicting requests stay parked on
1578 /// `bootstrap.waiting` until a drain promotes them via
1579 /// [`Self::maybe_complete_bootstrap`]. Called from
1580 /// `Engine::poll` at the top of each cycle so requests staged
1581 /// between polls land on the frontier before the body phase
1582 /// resumes.
1583 pub(crate) fn drive_pending_bootstrap_requests(&mut self) {
1584 let ready = self.bootstrap.process_pending_requests();
1585 for r in ready {
1586 let _ = self.fire_ready_bootstrap(r);
1587 }
1588 }
1589
1590 /// Arm the install-order bootstrap queue + seed the next target.
1591 /// Host entry point invoked through [`crate::node::Node::run_bootstrap`].
1592 /// Returns `true` when arming queued work (the caller should poll);
1593 /// `false` when no install-order target remains (idempotent on a
1594 /// fully drained Node).
1595 ///
1596 /// Cascade-fires multiple targets via the same
1597 /// `maybe_complete_bootstrap` reseed path the body-poll cycle
1598 /// uses: one call here picks the next target, the regular drain
1599 /// advances `next_idx`, and the next `seed_bootstrap_call`
1600 /// (inside poll) picks the following target.
1601 pub fn run_bootstrap(&mut self) -> bool {
1602 if !self.bootstrap.arm_install_order() {
1603 return false;
1604 }
1605 self.seed_bootstrap_call()
1606 }
1607
1608 /// `(domain, name, overload)` of the first bootstrap function
1609 /// recorded at install time, or `None` when no
1610 /// `module_phase = "bootstrap"` FunctionProto reached the function
1611 /// library. Stable across `clear_for_restore` (which preserves
1612 /// install-order metadata but bumps `next_idx` past every queued
1613 /// target so the restored Node does not re-fire bootstraps it
1614 /// already ran). For the full ordered list (multi-target installs
1615 /// queue one key per target), use [`Self::bootstrap_function_keys`].
1616 pub fn bootstrap_function_key(&self) -> Option<FunctionKey> {
1617 self.bootstrap.first_function_key().cloned()
1618 }
1619
1620 /// All bootstrap function keys the engine has queued, in install
1621 /// order. Multi-target installs append one entry per target via
1622 /// [`Self::install_function_library`]; the seeder fires each in
1623 /// slice order. Stable across `clear_for_restore` for snapshot
1624 /// introspection.
1625 pub fn bootstrap_function_keys(&self) -> Vec<FunctionKey> {
1626 self.bootstrap.function_keys()
1627 }
1628
1629 /// `true` while a bootstrap call is outstanding. Armed by
1630 /// [`Self::run_bootstrap`] (host kick) or by the host's
1631 /// staged-formals path (`enqueue_bootstrap_request`); cleared
1632 /// once every queued phase drains.
1633 pub fn bootstrap_pending(&self) -> bool {
1634 self.bootstrap.pending
1635 }
1636
1637 /// `true` when a Component bootstrap is registered for `slot`.
1638 /// Today the Component bootstrap registry stays empty so this
1639 /// always returns `false`; F5 wires the registration path. Wired
1640 /// now so the host can introspect the Bootstrap Contract surface
1641 /// without reaching through internal accessors.
1642 pub fn has_component_bootstrap(&self, slot: &str) -> bool {
1643 self.bootstrap.component_bootstrap(slot).is_some()
1644 }
1645
1646 /// Lifecycle status for the host-facing
1647 /// [`crate::node::Node::bootstrap_status`] accessor. `Idle` when no
1648 /// bootstrap is queued or in-flight; `Running` when one or more
1649 /// bootstraps occupy `in_flight` (the body gate is parking touched
1650 /// ops); `WaitingForInput` when work is staged on `pending_requests`
1651 /// but no in-flight phase has been seeded yet (the host must drive
1652 /// the queue to advance).
1653 pub fn bootstrap_status(&self) -> crate::engine::bootstrap::BootstrapStatus {
1654 if !self.bootstrap.in_flight.is_empty() {
1655 return crate::engine::bootstrap::BootstrapStatus::Running;
1656 }
1657 if !self.bootstrap.pending_requests.is_empty() || !self.bootstrap.waiting.is_empty() {
1658 return crate::engine::bootstrap::BootstrapStatus::WaitingForInput;
1659 }
1660 if self.bootstrap.pending && self.bootstrap.next_idx < self.bootstrap.install_order.len() {
1661 return crate::engine::bootstrap::BootstrapStatus::WaitingForInput;
1662 }
1663 crate::engine::bootstrap::BootstrapStatus::Idle
1664 }
1665
1666 /// Fire a Component bootstrap by slot name. Resolves
1667 /// `slot → ComponentRef` via `bootstrap.component_bootstraps`,
1668 /// builds a `ReadyBootstrap` with the bound cref as the touch set,
1669 /// and dispatches through the engine's internal fire path. Returns
1670 /// the dispatching ComponentRef on success so the host can wire
1671 /// per-slot telemetry; surfaces `BootstrapError::UnknownTarget`
1672 /// when no Component bootstrap is registered at the slot.
1673 pub fn fire_component_bootstrap_by_slot(
1674 &mut self,
1675 slot: &str,
1676 ) -> Result<ComponentRef, crate::errors::BootstrapError> {
1677 let cref = self
1678 .bootstrap
1679 .component_bootstrap(slot)
1680 .map(|cb| cb.cref)
1681 .ok_or_else(|| crate::errors::BootstrapError::UnknownTarget {
1682 target_name: slot.to_string(),
1683 available: self
1684 .bootstrap
1685 .component_bootstraps
1686 .keys()
1687 .cloned()
1688 .collect(),
1689 })?;
1690 let mut touch_set = std::collections::HashSet::new();
1691 touch_set.insert(cref);
1692 // `pending` flips on so the body-op gate parks touching ops
1693 // while the Component bootstrap dispatches. `fire_ready_bootstrap`
1694 // retires the in-flight slot inline on `Immediate` dispatches.
1695 self.bootstrap.pending = true;
1696 let _ = self.fire_ready_bootstrap(crate::engine::bootstrap::ReadyBootstrap {
1697 kind: crate::engine::bootstrap::BootstrapKind::Component {
1698 slot: slot.to_string(),
1699 },
1700 touch_set,
1701 staged_inputs: std::collections::HashSet::new(),
1702 });
1703 // Component bootstrap with no remaining queued work + nothing
1704 // in-flight (Immediate retired in_flight) means we can clear the
1705 // pending arming so subsequent `Node::poll` does not park body
1706 // ops indefinitely.
1707 if self.bootstrap.in_flight.is_empty()
1708 && self.bootstrap.pending_requests.is_empty()
1709 && self.bootstrap.waiting.is_empty()
1710 && self.bootstrap.next_idx >= self.bootstrap.install_order.len()
1711 {
1712 self.bootstrap.pending = false;
1713 }
1714 Ok(cref)
1715 }
1716
1717 /// `true` when `target_name` is already on the install-order Module
1718 /// bootstrap queue (so `Node::run_bootstrap` can surface
1719 /// `AlreadyTransitivelyQueued` before re-staging the same target).
1720 pub fn module_bootstrap_registered(&self, target_name: &str) -> bool {
1721 self.bootstrap.module_bootstraps.contains_key(target_name)
1722 }
1723
1724 /// Snapshot of every registered Module bootstrap target name in
1725 /// install order. Used by `Node::run_bootstrap` to enqueue an
1726 /// empty-input request per target without reaching into the
1727 /// engine's private `install_order` Vec.
1728 pub fn module_bootstrap_target_names(&self) -> Vec<String> {
1729 self.bootstrap.install_order.clone()
1730 }
1731
1732 /// Per-component body-op gate. Returns `true` when the op must
1733 /// park because an in-flight bootstrap locks the `ComponentRef`
1734 /// it touches; `false` when the op is fireable.
1735 ///
1736 /// Resolution order:
1737 /// 1. No in-flight bootstraps → fire (the gate is dormant).
1738 /// 2. `exec_id` descends from some in-flight bootstrap's ExecId
1739 /// via the `pending_calls.parent_exec_id` chain → fire. The
1740 /// bootstrap body itself (and its sub-FunctionCalls) is
1741 /// allowed to invoke any op regardless of the lock set.
1742 /// 3. Resolve the touched `ComponentRef` from the op's NodeProto
1743 /// via `SLOT_ID_KEY → slot_id_to_cref`. If the touched cref
1744 /// is in some in-flight bootstrap's `touch_set` → park.
1745 /// Stateless syscalls (no slot_id stamp, no role binding)
1746 /// fire because they reach no component.
1747 ///
1748 /// O(call depth * in-flight) per call — Module composition
1749 /// depth multiplied by the active bootstrap count. In-flight is
1750 /// 1 today; F3 Commit 2 enables concurrent disjoint bootstraps.
1751 pub(crate) fn is_op_locked(&self, op_ref: OpRef, exec_id: ExecId) -> bool {
1752 if self.bootstrap.in_flight.is_empty() {
1753 return false;
1754 }
1755 // 2. Bootstrap-descendant exec ids fire freely. Walk the
1756 // call chain once and short-circuit if any in-flight ExecId
1757 // matches.
1758 let mut current = exec_id;
1759 loop {
1760 if self
1761 .bootstrap
1762 .in_flight
1763 .iter()
1764 .any(|b| b.exec_id == current)
1765 {
1766 return false;
1767 }
1768 match self.exec.pending_calls.get(¤t) {
1769 Some(call) => current = call.parent_exec_id,
1770 None => break,
1771 }
1772 }
1773 // 3. Look up the touched ComponentRef from the op's slot_id
1774 // stamp. Ops without a slot_id (stateless syscalls,
1775 // FunctionCall nodes, unbound roles) reach no component, so
1776 // no in-flight touch set can lock them.
1777 let Some(node) = self.node_for(op_ref) else {
1778 return false;
1779 };
1780 let Some(slot_id) = node
1781 .metadata_props
1782 .iter()
1783 .find(|p| p.key == bb_ir::keys::SLOT_ID_KEY)
1784 .and_then(|p| p.value.parse::<u32>().ok())
1785 else {
1786 return false;
1787 };
1788 let Some(&touched) = self.slot_id_to_cref.get(&slot_id) else {
1789 return false;
1790 };
1791 self.bootstrap
1792 .in_flight
1793 .iter()
1794 .any(|b| b.touch_set.contains(&touched))
1795 }
1796
1797 /// Inspect engine state and pop the in-flight bootstrap key once
1798 /// every bootstrap-descendant frontier entry and `pending_async`
1799 /// entry has cleared. Returns `true` when one phase just drained
1800 /// (i.e. the caller poll cycle should append a `BootstrapComplete`
1801 /// step for that phase). With remaining queued keys, the next
1802 /// `seed_bootstrap_call` advances to the following target;
1803 /// `bootstrap.pending` flips off only after the last key drains.
1804 /// Called after each drain phase + the ingress completion drain.
1805 ///
1806 /// The descendant walk mirrors the body-op gate predicate: a
1807 /// frontier entry belongs to bootstrap when its ExecId chain
1808 /// terminates at the in-flight bootstrap's ExecId. Bootstrap
1809 /// itself never installs a `CallContext` (zero formals, zero
1810 /// outputs); child FunctionCalls that the bootstrap body invokes
1811 /// do, and the chain walk reaches the bootstrap ExecId through
1812 /// them.
1813 pub(crate) fn maybe_complete_bootstrap(&mut self) -> bool {
1814 if !self.bootstrap.pending {
1815 return false;
1816 }
1817 let Some(boot_exec) = self.bootstrap.module_exec_id() else {
1818 return false;
1819 };
1820 let descendant = |engine: &Engine, mut exec_id: ExecId| -> bool {
1821 loop {
1822 if exec_id == boot_exec {
1823 return true;
1824 }
1825 match engine.exec.pending_calls.get(&exec_id) {
1826 Some(call) => exec_id = call.parent_exec_id,
1827 None => return false,
1828 }
1829 }
1830 };
1831 if self
1832 .exec
1833 .frontier
1834 .iter()
1835 .any(|(_, exec_id)| descendant(self, *exec_id))
1836 {
1837 return false;
1838 }
1839 if self
1840 .exec
1841 .pending_async
1842 .values()
1843 .any(|p| descendant(self, p.exec_id))
1844 {
1845 return false;
1846 }
1847 // The in-flight phase drained. Advance the install_order
1848 // pointer (the post-kick cascade) and retire the in-flight
1849 // entry by ExecId via `on_bootstrap_drained`, which also
1850 // promotes any
1851 // host-driven waiter whose touch set no longer conflicts
1852 // with the remaining in-flight set. `bootstrap.pending`
1853 // stays armed while install_order keys remain queued so
1854 // body ops touching a locked Component keep parking through
1855 // the next phase's run. The `install_order` Vec itself is
1856 // append-only — introspection still reports every queued
1857 // target.
1858 self.bootstrap.next_idx += 1;
1859 let promoted = self.bootstrap.on_bootstrap_drained(boot_exec);
1860 for ready in promoted {
1861 // Promoted waiters fire immediately on the same poll
1862 // cycle so the conflict-queue path doesn't wedge waiting
1863 // for a future poll. Defensive `let _` — the only failure
1864 // path is a missing graph for the target, which the
1865 // install path forbids.
1866 let _ = self.fire_ready_bootstrap(ready);
1867 }
1868 if self.bootstrap.next_idx >= self.bootstrap.install_order.len()
1869 && self.bootstrap.in_flight.is_empty()
1870 && self.bootstrap.pending_requests.is_empty()
1871 && self.bootstrap.waiting.is_empty()
1872 {
1873 self.bootstrap.pending = false;
1874 }
1875 true
1876 }
1877
1878 /// Resolve every NodeProto's dispatch kind into `op_dispatch[]`
1879 /// per ENGINE.md §8.1 + §8.4. Run after install completes (so all
1880 /// symbols are in `self.functions`) but before the first poll.
1881 ///
1882 /// Walk order: each GraphSlot in turn. For each NodeProto:
1883 /// - syscall (`syscall_index` hit) → `Stateless`
1884 /// - call to function in `self.functions` with domain
1885 /// `ai.bytesandbrains.module` → `FunctionCall` with the target
1886 /// key + input/output rename pairs from the call's value names
1887 /// zipped against the target function's formals.
1888 /// - call to function with domain `ai.bytesandbrains.framework`
1889 /// starting with `BackendSubgraph_` → `BackendSubgraph` with
1890 /// bound backend resolved via `BINDING_ID_KEY` metadata against
1891 /// the atomic-dispatch table.
1892 /// - else atomic dispatch by `(domain, op_type, instance)` →
1893 /// `Atomic`.
1894 ///
1895 /// Returns the number of nodes that remained `Unresolved`. Caller
1896 /// should fail build if non-zero.
1897 pub fn resolve_dispatch(&mut self) -> usize {
1898 // Snapshot per-graph node lists so we don't hold a borrow on
1899 // self.graphs while reading other tables. Indices are the
1900 // positional graph_idx values packed into OpRefs.
1901 let graph_count = self.graphs.len();
1902 let mut unresolved = 0;
1903 for graph_idx in 0..graph_count {
1904 let (function_domain, nodes): (String, Vec<NodeProto>) = {
1905 let g = &self.graphs[graph_idx];
1906 (g.function.domain.clone(), g.function.node.clone())
1907 };
1908 // BackendSubgraph bodies (domain == ai.bytesandbrains.framework)
1909 // are handed wholesale to the bound Backend Contract impl per
1910 // ENGINE.md §8.5; their interior NodeProtos are never invoked
1911 // individually by the engine, so resolve_dispatch leaves the
1912 // body's op_dispatch as Unresolved without counting it as a
1913 // failure. Mirror the same skip already applied by
1914 // Node's unsupported-ops pre-flight in `src/node.rs`.
1915 if function_domain == "ai.bytesandbrains.framework" {
1916 let mut dispatch: Vec<OpDispatch> = Vec::with_capacity(nodes.len());
1917 for _ in &nodes {
1918 dispatch.push(OpDispatch::Unresolved);
1919 }
1920 self.graphs[graph_idx].op_dispatch = dispatch;
1921 continue;
1922 }
1923 let mut dispatch: Vec<OpDispatch> = Vec::with_capacity(nodes.len());
1924 for node in &nodes {
1925 let resolved = self.resolve_one(node);
1926 if matches!(resolved, OpDispatch::Unresolved) {
1927 unresolved += 1;
1928 }
1929 dispatch.push(resolved);
1930 }
1931 self.graphs[graph_idx].op_dispatch = dispatch;
1932 }
1933 unresolved
1934 }
1935
1936 fn resolve_one(&self, node: &NodeProto) -> OpDispatch {
1937 // 1) Syscall path — single lookup by (domain, op_type).
1938 if let Some(&fn_ptr) = self
1939 .syscall_table
1940 .get(&(node.domain.clone(), node.op_type.clone()))
1941 {
1942 return OpDispatch::Stateless(fn_ptr);
1943 }
1944 // `crate::registry` for custom ops registered via
1945 // `bb::register_op!{}`. DCE strips unreferenced entries,
1946 // so a binary that doesn't `use` a library's op never
1947 // pulls it into the link image.
1948 if let Some(reg) = crate::registry::find_op(&node.domain, &node.op_type) {
1949 return OpDispatch::Stateless(reg.invoke);
1950 }
1951 // 2) Function-call paths via the symbol table.
1952 let key: FunctionKey = (
1953 node.domain.clone(),
1954 node.op_type.clone(),
1955 node.overload.clone(),
1956 );
1957 if let Some(target_fn) = self.functions.get(&key) {
1958 if node.domain == "ai.bytesandbrains.module" {
1959 let input_rename: Rc<[(String, String)]> = node
1960 .input
1961 .iter()
1962 .zip(target_fn.input.iter())
1963 .map(|(caller, formal)| (caller.clone(), formal.clone()))
1964 .collect();
1965 let output_rename: Rc<[(String, String)]> = target_fn
1966 .output
1967 .iter()
1968 .zip(node.output.iter())
1969 .map(|(formal, caller)| (formal.clone(), caller.clone()))
1970 .collect();
1971 return OpDispatch::FunctionCall {
1972 target: key,
1973 input_rename,
1974 output_rename,
1975 };
1976 }
1977 }
1978 // 3) Atomic role path. The placeholders pass stamps
1979 // `ai.bytesandbrains.slot_id` on every role NodeProto; install
1980 // populates `slot_id_to_cref` from the model's binding metadata.
1981 // Single lookup chain: NodeProto.slot_id → ComponentRef →
1982 // bound role dispatcher closure.
1983 let slot_id = node
1984 .metadata_props
1985 .iter()
1986 .find(|p| p.key == bb_ir::keys::SLOT_ID_KEY)
1987 .and_then(|p| p.value.parse::<u32>().ok());
1988 if let Some(slot_id) = slot_id {
1989 if let Some(&cref) = self.slot_id_to_cref.get(&slot_id) {
1990 if let Some(dispatch_fn) = self.dispatch_fn_for_component(cref) {
1991 return OpDispatch::Atomic {
1992 component_ref: cref,
1993 dispatch_fn,
1994 };
1995 }
1996 }
1997 }
1998 OpDispatch::Unresolved
1999 }
2000
2001 /// Look up the install-time-stamped `ProtocolDispatchFn` for a
2002 /// registered component by its `TypeId`. `resolve_dispatch`
2003 /// embeds the result into `OpDispatch::Atomic { dispatch_fn }`
2004 /// so runtime invoke skips the per-op TypeId probe.
2005 fn dispatch_fn_for_component(
2006 &self,
2007 cref: ComponentRef,
2008 ) -> Option<crate::engine::invoke::ProtocolDispatchFn> {
2009 let component = self.component(cref)?;
2010 let any: &dyn std::any::Any = component;
2011 let tid = (*any).type_id();
2012 self.role_dispatchers.get(&tid).map(|d| d.dispatch)
2013 }
2014
2015 /// Resolve a registered component by `ComponentRef`.
2016 /// `None` when no component lives at that index, or when the
2017 /// slot was `mem::take`-ed out during dispatch (the caller
2018 /// dispatching component is invisible to itself).
2019 pub fn component(&self, cref: ComponentRef) -> Option<&dyn ErasedComponent> {
2020 self.components.get(cref.as_u32() as usize)?.as_deref()
2021 }
2022
2023 /// Resolve a registered component by `ComponentRef` for
2024 /// mutation. Same null semantics as [`Self::component`].
2025 pub fn component_mut(&mut self, cref: ComponentRef) -> Option<&mut Box<dyn ErasedComponent>> {
2026 self.components.get_mut(cref.as_u32() as usize)?.as_mut()
2027 }
2028
2029 /// Take the component at `cref` out of the registry, leaving
2030 /// `None` in its slot. Returns `None` if the slot was empty (or
2031 /// out of range). Paired with [`Self::restore_component`] in
2032 /// `invoke_atomic` so a [`crate::runtime::ComponentsView`] can
2033 /// borrow the rest of `engine.components` while the dispatching
2034 /// component is held exclusively.
2035 pub(crate) fn take_component(
2036 &mut self,
2037 cref: ComponentRef,
2038 ) -> Option<Box<dyn ErasedComponent>> {
2039 self.components.get_mut(cref.as_u32() as usize)?.take()
2040 }
2041
2042 /// Put a component back into the registry slot it was taken
2043 /// from via [`Self::take_component`]. The slot index must be
2044 /// within range (i.e. the cref came from a prior registration).
2045 pub(crate) fn restore_component(
2046 &mut self,
2047 cref: ComponentRef,
2048 component: Box<dyn ErasedComponent>,
2049 ) {
2050 let idx = cref.as_u32() as usize;
2051 if let Some(slot) = self.components.get_mut(idx) {
2052 *slot = Some(component);
2053 }
2054 }
2055
2056 /// test-only registrar. Stores a bound component impl
2057 /// at `cref`. Grows the underlying Vec to fit the index, filling
2058 /// holes with `None` so out-of-order registration works.
2059 pub fn register_component(&mut self, cref: ComponentRef, component: Box<dyn ErasedComponent>) {
2060 let idx = cref.as_u32() as usize;
2061 if self.components.len() <= idx {
2062 self.components.resize_with(idx + 1, || None);
2063 }
2064 self.components[idx] = Some(component);
2065 }
2066
2067 /// Push an `(OpRef, ExecId)` onto the frontier.
2068 pub fn push_frontier(&mut self, op_ref: OpRef, exec_id: ExecId) {
2069 self.exec.frontier.push_back((op_ref, exec_id));
2070 }
2071
2072 /// Pop the next `(OpRef, ExecId)` off the frontier. Used by the
2073 /// poll cycle's drain phases.
2074 pub fn pop_frontier(&mut self) -> Option<(OpRef, ExecId)> {
2075 self.exec.frontier.pop_front()
2076 }
2077
2078 /// Pop the next fireable `(OpRef, ExecId)` off the frontier,
2079 /// honouring the per-component body-op gate. With no in-flight
2080 /// bootstrap the front of the queue fires unconditionally. With
2081 /// one or more in-flight bootstraps the gate parks any op
2082 /// touching a locked `ComponentRef`; this scan picks the first
2083 /// unparked entry so disjoint Components can keep firing while
2084 /// bootstrap runs against an unrelated slot.
2085 pub(crate) fn pop_frontier_fireable(&mut self) -> Option<(OpRef, ExecId)> {
2086 if self.bootstrap.in_flight.is_empty() {
2087 return self.exec.frontier.pop_front();
2088 }
2089 let idx = self
2090 .exec
2091 .frontier
2092 .iter()
2093 .position(|(op_ref, exec_id)| !self.is_op_locked(*op_ref, *exec_id))?;
2094 self.exec.frontier.remove(idx)
2095 }
2096
2097 /// Snapshot of the `(NodeSiteId, ExecId)` keys currently in the
2098 /// slot table. Test-only - used to assert wire-envelope delivery
2099 /// lands at the right site without exposing the full
2100 /// `Box<dyn SlotValue>` payload type.
2101 pub fn slot_table_keys(&self) -> Vec<(NodeSiteId, ExecId)> {
2102 self.exec.slot_table.keys().copied().collect()
2103 }
2104
2105 /// Iterate every `((NodeSiteId, ExecId), Option<&dyn SlotValue>)`
2106 /// pair currently in the slot table. Test-only.
2107 pub fn slot_table_iter(
2108 &self,
2109 ) -> impl Iterator<Item = (&(NodeSiteId, ExecId), Option<&dyn SlotValue>)> {
2110 self.exec
2111 .slot_table
2112 .iter()
2113 .map(|(k, v)| (k, v.as_ref().map(|b| b.as_ref())))
2114 }
2115
2116 /// Read a slot value by `(NodeSiteId, ExecId)`. Returns `None`
2117 /// if the slot is empty or not yet allocated.
2118 pub fn slot_at(&self, site: NodeSiteId, exec_id: ExecId) -> Option<&dyn SlotValue> {
2119 self.exec
2120 .slot_table
2121 .get(&(site, exec_id))
2122 .and_then(|s| s.as_ref())
2123 .map(|b| b.as_ref())
2124 }
2125}
2126
2127/// Register a `NodeSiteId` for every function input name on
2128/// `graph` so `Engine::deliver_app_event` can seed the input
2129/// via ingress. Pre-existing entries (where a node output
2130/// happens to share a name with a function input - rare but
2131/// possible) are left alone. After the input sites are minted,
2132/// re-walk the function's node inputs and add consumer entries
2133/// for any node consuming an input - `GraphSlot::from_function`
2134/// already populated consumers for node outputs but skipped
2135/// inputs because they weren't in `site_names` yet.
2136fn register_function_input_sites(
2137 graph: &mut crate::engine::graph_slot::GraphSlot,
2138 next_node_site_id: &mut u64,
2139 graph_idx: u32,
2140) {
2141 for input_name in graph.function.input.clone().iter() {
2142 if input_name.is_empty() {
2143 continue;
2144 }
2145 graph
2146 .site_names
2147 .entry(input_name.clone())
2148 .or_insert_with(|| {
2149 let r = NodeSiteId::from(*next_node_site_id);
2150 *next_node_site_id = next_node_site_id.saturating_add(1);
2151 r
2152 });
2153 }
2154 // Backfill consumers for nodes whose inputs reference function
2155 // inputs we just minted sites for. Positional OpRefs make every
2156 // node's ref `OpRef::pack(graph_idx, node_idx)`.
2157 let nodes_inputs: Vec<(OpRef, Vec<String>)> = graph
2158 .function
2159 .node
2160 .iter()
2161 .enumerate()
2162 .map(|(idx, node)| (OpRef::pack(graph_idx, idx as u32), node.input.clone()))
2163 .collect();
2164 for (op_ref, inputs) in nodes_inputs {
2165 for input in inputs {
2166 if input.is_empty() {
2167 continue;
2168 }
2169 if let Some(&site) = graph.site_names.get(&input) {
2170 let entry = graph.consumers.entry(site).or_default();
2171 if !entry.contains(&op_ref) {
2172 entry.push(op_ref);
2173 }
2174 }
2175 }
2176 }
2177}
2178
2179/// Failure case returned by `Engine::try_charge` when admitting
2180/// `byte_count` more bytes against `ingress_byte_budget` would
2181/// exceed the cap. `budget_remaining` is the cap minus the live
2182/// `ingress_bytes_in_flight` at the time of the rejection — the
2183/// caller embeds both into the resulting `WireReceiveError` or
2184/// `AppIngressError` so subscribers see the magnitude of the
2185/// rejection without re-querying the engine.
2186#[derive(Clone, Copy, Debug, PartialEq, Eq)]
2187pub struct BudgetExceededReason {
2188 /// Bytes the caller tried to admit.
2189 pub byte_count: usize,
2190 /// Bytes still available under `ingress_byte_budget` at the time
2191 /// of the rejection.
2192 pub budget_remaining: usize,
2193}
2194
2195/// Stable graph-name key for `Engine.graphs` derived from a
2196/// `FunctionKey`. Joins the tuple parts so two distinct keys produce
2197/// distinct strings, preserving the symbol-table semantics.
2198pub(crate) fn graph_name_for(key: &FunctionKey) -> String {
2199 let (domain, name, overload) = key;
2200 if overload.is_empty() {
2201 format!("{domain}::{name}")
2202 } else {
2203 format!("{domain}::{name}#{overload}")
2204 }
2205}
2206
2207
2208#[cfg(test)]
2209#[path = "core_multi_bootstrap_tests.rs"]
2210mod multi_bootstrap_tests;
2211
2212#[cfg(test)]
2213#[path = "core_touch_set_tests.rs"]
2214mod touch_set_tests;
2215
2216#[cfg(test)]
2217#[path = "core_op_locked_tests.rs"]
2218mod op_locked_tests;
2219
2220#[cfg(test)]
2221#[path = "core_component_bootstrap_tests.rs"]
2222mod component_bootstrap_tests;
2223
2224#[cfg(test)]
2225#[path = "core_bootstrap_request_tests.rs"]
2226mod bootstrap_request_tests;