bb_runtime/engine/core.rs
1//! The `Engine` struct + test-only constructor + registration
2//! accessors. Hot-path poll cycle lives in `engine::poll`.
3
4use std::collections::HashMap;
5use std::marker::PhantomData;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use crate::bus::TypedBus;
10use crate::component::ErasedComponent;
11use crate::engine::dispatch_entry::{FunctionKey, OpDispatch, StatelessInvokeFn};
12use crate::engine::graph_slot::GraphSlot;
13use crate::engine::invoke::{make_protocol_dispatcher, RoleDispatcher};
14use crate::framework::FrameworkComponents;
15use crate::ids::{CommandId, ComponentRef, ExecId, NodeSiteId, OpRef};
16use crate::ingress::IngressQueue;
17use crate::slot_value::SlotValue;
18use bb_ir::proto::onnx::{FunctionProto, NodeProto};
19
20/// Point-in-time hot-path counters for dashboards + saturation
21/// detection. Not synchronized against an in-flight poll cycle.
22#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
23pub struct EngineStats {
24 /// `(OpRef, ExecId)` pairs on the frontier. Climbing → poll
25 /// loop falling behind ingress.
26 pub frontier_len: usize,
27 /// Events queued on the typed bus awaiting routing.
28 pub bus_len: usize,
29 /// Suspended async commands. Approaching
30 /// `NodeConfig.max_pending_async` → cap pressure.
31 pub pending_async: usize,
32 /// `(NodeSiteId, ExecId)` entries holding a value.
33 pub slot_table_occupied: usize,
34 /// Approximate MPMC ingress depth.
35 pub ingress_depth: usize,
36 /// Envelopes queued for next outbound drain.
37 pub outbound_queue_depth: usize,
38 /// Event kinds with at least one subscriber.
39 pub event_subscriptions: usize,
40 /// Number of registered components.
41 pub registered_components: usize,
42 /// Number of installed graphs.
43 pub graph_slots: usize,
44}
45
46/// Single-Node engine state. Built via `Node::ensure_ready`;
47/// tests use `Engine::new()` and seed fields directly.
48pub struct Engine {
49 // --- Graph storage ---------------------------------------------
50 /// Installed graphs in insertion order. `OpRef::pack` puts the
51 /// slot index in the high 32 bits so `dispatch_for` resolves an
52 /// op via two direct array accesses (no HashMap probe).
53 pub(crate) graphs: Vec<GraphSlot>,
54
55 /// Graph name → `graphs[]` index. Lookup table for name-keyed
56 /// resolution (function-call targets, `Engine::graph(name)`
57 /// accessors). The index doubles as the value packed into every
58 /// `OpRef`.
59 pub(crate) graph_index: HashMap<String, u32>,
60
61 /// Sub-Module function registry by `(domain, name, overload)`.
62 /// Populated by `Node` at install time.
63 pub(crate) functions: HashMap<(String, String, String), FunctionProto>,
64
65 // --- Dispatch --------------------------------------------------
66 /// Unified syscall dispatch table — one lookup per op by
67 /// `(domain, op_type)` to its stateless invoke fn pointer.
68 /// Populated by `register_syscall` and `register_all_framework_syscalls`
69 /// at Engine construction; `resolve_dispatch` reads this to stamp
70 /// `OpDispatch::Stateless` for matching NodeProtos.
71 pub syscall_table: HashMap<(String, String), StatelessInvokeFn>,
72
73 // --- Component storage -----------------------------------------
74 /// All bound runtime impls indexed by `ComponentRef.as_usize()`.
75 /// The `Option<...>` wrapper lets `invoke_atomic` take the
76 /// dispatching component out of the Vec via `mem::take` so a
77 /// live [`crate::runtime::ComponentsView`] can borrow
78 /// `&self.components` for cross-component reads while the
79 /// dispatch closure runs - the dispatching slot is restored
80 /// after the closure returns.
81 pub(crate) components: Vec<Option<Box<dyn ErasedComponent>>>,
82
83 /// The Node's own `PeerId` (the one passed to `Node::new`).
84 /// Threaded into every `RuntimeResourceRef` so Components can
85 /// identify themselves in outbound envelopes.
86 pub self_peer: crate::ids::PeerId,
87
88 /// Per-Node framework primitive bundle (scheduler, peer_gate,
89 /// request_tracker, backoff_table, inbound_dedup, address_book).
90 /// Exposed as `pub` so cross-crate test fixtures in `bb-ops` can
91 /// construct `RuntimeResourceRef` from an Engine's framework
92 /// primitives. Engine internals proper stay encapsulated; this is
93 /// the dependency-injection boundary.
94 pub framework: FrameworkComponents,
95
96 /// The per-Node typed event bus.
97 pub bus: TypedBus,
98
99 // --- Execution state -------------------------------------------
100 /// Per-poll execution-state bundle: frontier, slot table, per-
101 /// execution liveness, parked async ops + in-cycle completions,
102 /// function-call invocation frames, inbound-envelope context
103 /// map, the timer scheduler, and the monotonic ID allocator.
104 /// See [`crate::exec_state::ExecState`].
105 pub exec: crate::exec_state::ExecState,
106
107 /// Reverse index from a fused `binding_id` (e.g.
108 /// `"BurnBackend#0"`) to the bound component's `ComponentRef`.
109 /// Populated by `Node` at install time; read by dispatch
110 /// resolution to bind NodeProtos that reference a backend by
111 /// binding_id.
112 pub(crate) binding_id_index: HashMap<String, ComponentRef>,
113
114 /// Per-`event_kind` subscription map. Each entry names the
115 /// `NodeSiteId`(s) of `EventSource` ops listening for that kind.
116 /// The bus-routing pass writes a `TriggerValue` into each
117 /// subscribed site at a fresh `ExecId` and pushes the site's
118 /// downstream consumers — matching the wire delivery semantics
119 /// per `docs/ADDRESSING.md` so bus + wire share one model.
120 pub(crate) event_subscriptions: HashMap<String, Vec<NodeSiteId>>,
121
122 /// Per-LifecyclePhase Op enrollments. `pub` for cross-crate
123 /// `bb-ops` test fixtures that exercise the LifecyclePhase
124 /// syscall.
125 pub lifecycle_table: HashMap<String, Vec<OpRef>>,
126
127 // --- Async + cross-thread --------------------------------------
128 /// Thread-safe inbox for external events. Producers (transport
129 /// adapters, host invocations) may push from any thread; the
130 /// Engine drains serially from its own (single) thread. Engine
131 /// holds an `Arc` so it can hand clones to producers without
132 /// surrendering ownership.
133 pub(crate) ingress: Arc<IngressQueue>,
134
135 /// Lifecycle phases queued for firing by `Engine::fire_lifecycle`
136 /// on the next poll.
137 pub(crate) fired_phases: Vec<String>,
138
139 /// Snapshot of the ingress queue depth at the start of the
140 /// engine's ingress drain. Used by the backpressure detection
141 /// hook in `process_ingress_event` to compare against the
142 /// configured high-water mark. Refreshed every poll cycle.
143 pub(crate) phase1_pre_drain_depth: usize,
144
145 // --- Bootstrap ---------------------------------------------------
146 /// Consolidated bootstrap state — every field the install path,
147 /// poll seeder, and body-op gate read goes through here. See
148 /// [`crate::engine::bootstrap::BootstrapState`] for field roles.
149 pub(crate) bootstrap: crate::engine::bootstrap::BootstrapState,
150
151 // --- Dispatcher registries (per-Engine) ------------------------
152 /// Concrete-type `ProtocolRuntime` dispatchers registered against
153 /// this Engine, indexed by `TypeId::of::<T>()`. Populated by
154 /// [`Engine::register_protocol_dispatcher`] at setup; consulted by
155 /// `invoke_atomic` via direct HashMap lookup (no linear scan).
156 pub(crate) role_dispatchers: HashMap<std::any::TypeId, RoleDispatcher>,
157
158 // --- Generic slot registry -------------------------------------
159 /// Slot-name → `ComponentRef` registry. Generic over component
160 /// role: indexes EVERY bound Component (backends, indexes,
161 /// models, peer selectors, custom) by binding slot name
162 /// (defaults to the field name; overridable via
163 /// `#[bb::slot("custom")]`). Components reach declared
164 /// dependencies through this map at dispatch time via
165 /// [`crate::runtime::ComponentsView::for_slot`].
166 ///
167 /// This is the canonical lookup table. Every install path
168 /// populates it; every dispatch path reads through it. No
169 /// per-role specialization above the slot abstraction.
170 pub(crate) slots: HashMap<String, ComponentRef>,
171
172 /// Parallel index: compiler-assigned slot id (the value of
173 /// `ai.bytesandbrains.slot_id` stamped on role NodeProtos by the
174 /// placeholders pass) → `ComponentRef`. Populated alongside
175 /// [`Self::slots`] at install time. `resolve_dispatch` reads
176 /// the role NodeProto's `slot_id`, looks it up here, and stamps
177 /// `OpDispatch::Atomic` against the resolved component. Single
178 /// source of truth: install populates both indexes from the same
179 /// `bb.binding.<target>.<slot>` metadata.
180 pub(crate) slot_id_to_cref: HashMap<u32, ComponentRef>,
181
182 /// Parallel index: compiler-assigned `slot_id` → `(role,
183 /// ComponentRef)`. Populated by [`Self::bind_slot_id_with_role`]
184 /// at install time from the same `binding.<target>.<slot>`
185 /// metadata that drives `slot_id_to_cref`; retains the
186 /// `ComponentRole` so `decode_typed_fill` can decide between the
187 /// framework-carrier path and the backend-mediated tensor path.
188 pub(crate) slot_id_to_role_ref: HashMap<u32, (crate::registry::ComponentRole, ComponentRef)>,
189
190 // --- Component role introspection ------------------------------
191 /// Per-component set of declared roles, sourced from
192 /// `inventory::iter::<ComponentRoleBinding>` keyed by
193 /// `T::TYPE_NAME`. Populated by `Node::ensure_ready` after
194 /// component registration; reported by [`Engine::roles_for`] for
195 /// introspection (engine tests, host tooling).
196 pub(crate) component_roles:
197 HashMap<ComponentRef, std::collections::HashSet<crate::registry::ComponentRole>>,
198
199 // --- Production-safety caps ------------------------------------
200 /// Soft per-poll-cycle op-invocation budget per
201 /// `NodeConfig.cycle_op_budget`. When set, `Engine::poll` yields
202 /// after this many invocations and surfaces
203 /// `EngineStep::CycleBudgetExceeded { ops_invoked }` so the host
204 /// can re-poll. `None` disables the budget.
205 pub(crate) cycle_op_budget: Option<usize>,
206
207 /// Cap on the number of in-flight `pending_async` entries per
208 /// `NodeConfig.max_pending_async`. When at cap, an Op returning
209 /// `DispatchResult::Async(_)` fails synchronously via the
210 /// existing `OpFailed` path. `None` disables the cap.
211 pub(crate) max_pending_async: Option<usize>,
212
213 /// Cumulative cap on in-flight ingress bytes held across the
214 /// ingress queue + slot table + pending async completion
215 /// buffers. Sourced from `NodeConfig::ingress_byte_budget`.
216 /// Boundary callers call [`Self::try_charge`] before installing
217 /// a payload; the slot-table writer calls [`Self::release`] on
218 /// overwrite / eviction.
219 pub(crate) ingress_byte_budget: usize,
220
221 /// Live count of ingress bytes the engine currently holds.
222 /// Incremented by [`Self::try_charge`] on successful admission;
223 /// decremented by [`Self::release`] on slot-table overwrite /
224 /// eviction / drop. The budget guard surfaces this as a
225 /// snapshot via [`Self::ingress_bytes_in_flight`].
226 pub(crate) ingress_bytes_in_flight: usize,
227
228 // --- Single-threaded anchor ------------------------------------
229 /// `PhantomData<*const ()>` makes `Engine` neither `Send` nor
230 /// `Sync` - the single-threaded sans-IO contract is enforced at
231 /// compile time. Producers can still push to `ingress` from other
232 /// threads because the `Arc<IngressQueue>` handle is independently
233 /// `Send + Sync`.
234 _not_send: PhantomData<*const ()>,
235}
236
237impl Default for Engine {
238 fn default() -> Self {
239 Self::new()
240 }
241}
242
243impl Engine {
244 /// Construct an empty engine with the default ingress capacity.
245 /// `Node::new` wraps this with `self_peer`, framework syscalls,
246 /// and config caps applied. For non-default `bus_capacity` use
247 /// [`Self::with_bus_capacity`] so the ingress queue sizes to
248 /// `bus_capacity * 4` per ENGINE.md §2.2.
249 pub fn new() -> Self {
250 Self::with_bus_capacity(crate::node::DEFAULT_BUS_CAPACITY)
251 }
252
253 /// Construct an empty engine whose ingress queue holds up to
254 /// `bus_capacity * 4` events (the ENGINE.md §2.2 ratio that
255 /// reserves headroom for async completions, app events, and
256 /// inbound envelopes between poll cycles).
257 pub fn with_bus_capacity(bus_capacity: usize) -> Self {
258 Self {
259 graphs: Vec::new(),
260 graph_index: HashMap::new(),
261 functions: HashMap::new(),
262 syscall_table: HashMap::new(),
263 slot_id_to_cref: HashMap::new(),
264 slot_id_to_role_ref: HashMap::new(),
265 components: Vec::new(),
266 self_peer: crate::ids::PeerId::from(0u64),
267 framework: FrameworkComponents::new(),
268 bus: TypedBus::new(),
269 exec: crate::exec_state::ExecState::new(),
270 binding_id_index: HashMap::new(),
271 event_subscriptions: HashMap::new(),
272 lifecycle_table: HashMap::new(),
273 ingress: Arc::new(IngressQueue::with_capacity(bus_capacity.saturating_mul(4))),
274 fired_phases: Vec::new(),
275 phase1_pre_drain_depth: 0,
276 bootstrap: crate::engine::bootstrap::BootstrapState::new(),
277 role_dispatchers: HashMap::new(),
278 slots: HashMap::new(),
279 component_roles: HashMap::new(),
280 cycle_op_budget: crate::node::DEFAULT_CYCLE_OP_BUDGET,
281 max_pending_async: crate::node::DEFAULT_MAX_PENDING_ASYNC,
282 ingress_byte_budget: crate::node::DEFAULT_INGRESS_BYTE_BUDGET,
283 ingress_bytes_in_flight: 0,
284 _not_send: PhantomData,
285 }
286 }
287
288 /// wipe restorable engine state ahead of a
289 /// `Node::restore` call, leaving the install-time-stamped
290 /// surfaces (`graphs`, `functions`, `dispatch_table`,
291 /// `atomic_dispatch`, `components`, `self_peer`,
292 /// `syscall_index`, `role_dispatchers`, `binding_id_index`,
293 /// `lifecycle_table`, `event_subscriptions`,
294 /// `cycle_op_budget`, `max_pending_async`) intact. The Node
295 /// re-applies the snapshot's framework state, ID counters, and
296 /// pending async/completion queues on top of the cleared
297 /// state, so the post-restore Engine is the same install
298 /// re-seeded with the snapshot's restorable transient state.
299 ///
300 /// Restorable surfaces explicitly cleared:
301 /// - `frontier`, `slot_table`, `execution_state`,
302 /// `pending_async`, `pending_completions`, `pending_calls`,
303 /// `fired_phases`
304 /// - `framework` (FrameworkComponents reseeds from snapshot)
305 /// - `bus` (re-establishes subscriptions from snapshot)
306 /// - `ingress` queue (fresh; in-flight inbound is the host's
307 /// responsibility to redeliver)
308 pub fn clear_for_restore(&mut self) {
309 self.exec.frontier.clear();
310 self.exec.slot_table.clear();
311 self.exec.execution_state.clear();
312 self.exec.pending_async.clear();
313 self.exec.pending_completions.clear();
314 self.exec.pending_calls.clear();
315 self.fired_phases.clear();
316 // Slot-table clear above dropped every charged carrier;
317 // reset the counter so the restored snapshot doesn't inherit
318 // an in-flight balance the new state doesn't own.
319 self.ingress_bytes_in_flight = 0;
320 // Restore deliberately suppresses bootstrap re-runs: the
321 // restored Node already executed its bootstrap call before
322 // the snapshot, and replaying would re-seed the address
323 // book, re-fire the first Announce, etc.
324 // `install_order` + `module_bootstraps` stay populated for
325 // introspection (multi-target installs surface every queued
326 // target via [`Self::bootstrap_function_keys`]); `pending`,
327 // `current_exec_id`, and `next_idx` reset so
328 // `Node::run_bootstrap` is a no-op on a restored Node —
329 // bumping the index to the end of `install_order` keeps the
330 // seeder from re-firing if the host nonetheless polls.
331 self.bootstrap.clear_for_restore();
332 self.framework = FrameworkComponents::new();
333 self.bus = TypedBus::new();
334 self.ingress = Arc::new(IngressQueue::new());
335 // ID counters reset to 0; the restore path re-applies the
336 // snapshot's persisted values so post-restore IDs continue
337 // from where the pre-snapshot Node left off ().
338 self.exec.ids.next_exec_id = 0;
339 self.exec.ids.next_command_id = 0;
340 }
341
342 /// Mint a fresh `ExecId`. Replaces the prior static counter
343 /// in `src/ids.rs` so allocation runs single-threaded under
344 /// the engine's borrow discipline.
345 pub fn allocate_exec_id(&mut self) -> ExecId {
346 let id = self.exec.ids.next_exec_id;
347 self.exec.ids.next_exec_id = self
348 .exec
349 .ids
350 .next_exec_id
351 .checked_add(1)
352 .expect("ExecId counter overflow");
353 ExecId::from(id)
354 }
355
356 /// Mint a fresh `CommandId`. Used by async-suspending syscall
357 /// ops via `RuntimeResourceRef::next_command_id`.
358 pub fn allocate_command_id(&mut self) -> CommandId {
359 let id = self.exec.ids.next_command_id;
360 self.exec.ids.next_command_id = self
361 .exec
362 .ids
363 .next_command_id
364 .checked_add(1)
365 .expect("CommandId counter overflow");
366 CommandId::from(id)
367 }
368
369 /// Mint a fresh `NodeSiteId`. Used by graph installation; sites
370 /// must be globally unique across installed graphs.
371 pub fn allocate_node_site_id(&mut self) -> NodeSiteId {
372 let id = self.exec.ids.next_node_site_id;
373 self.exec.ids.next_node_site_id = self
374 .exec
375 .ids
376 .next_node_site_id
377 .checked_add(1)
378 .expect("NodeSiteId counter overflow");
379 NodeSiteId::from(id)
380 }
381
382 /// Drop slot_table and execution_state entries belonging to
383 /// executions that have completed. An execution is complete
384 /// when it has no frontier entries, no pending_async entries,
385 /// and no pending_calls entry pointing at its `ExecId`. Called
386 /// at the end of every `poll()` cycle so a long-running Node
387 /// keeps a bounded slot_table.
388 pub(crate) fn gc_completed_executions(&mut self) {
389 if self.exec.execution_state.is_empty() {
390 return;
391 }
392 let mut live: std::collections::HashSet<ExecId> =
393 std::collections::HashSet::with_capacity(self.exec.execution_state.len());
394 for (_, exec_id) in &self.exec.frontier {
395 live.insert(*exec_id);
396 }
397 for p in self.exec.pending_async.values() {
398 live.insert(p.exec_id);
399 }
400 for exec_id in self.exec.pending_calls.keys() {
401 live.insert(*exec_id);
402 }
403 let dead: Vec<ExecId> = self
404 .exec
405 .execution_state
406 .keys()
407 .copied()
408 .filter(|e| !live.contains(e))
409 .collect();
410 if dead.is_empty() {
411 return;
412 }
413 let dead_set: std::collections::HashSet<ExecId> = dead.iter().copied().collect();
414 // Walk doomed slot entries once to release any charged
415 // ingress bytes the slot-table writer admitted, then drop
416 // the entries. `retain` would let us mutate-in-place, but
417 // it borrows the table mutably for the entire walk; the
418 // explicit collect-then-remove pattern lets us drain
419 // `charged_bytes()` from each prior carrier first.
420 let doomed_keys: Vec<(NodeSiteId, ExecId)> = self
421 .exec
422 .slot_table
423 .iter()
424 .filter_map(|(key, _)| dead_set.contains(&key.1).then_some(*key))
425 .collect();
426 for key in doomed_keys {
427 // `clear_slot` releases the prior carrier's
428 // `charged_bytes()` against `ingress_bytes_in_flight`.
429 // Non-ingress carriers report 0 — release is a no-op.
430 let _ = self.clear_slot(key.0, key.1);
431 }
432 for exec_id in &dead {
433 self.exec.execution_state.remove(exec_id);
434 }
435 }
436
437 /// Apply production-safety caps from a `NodeConfig`. Called by
438 /// `Node::ensure_ready` after constructing the Engine; tests can
439 /// invoke directly to exercise specific cap values.
440 pub fn apply_config_caps(&mut self, config: &crate::node::NodeConfig) {
441 self.cycle_op_budget = config.cycle_op_budget;
442 self.max_pending_async = config.max_pending_async;
443 self.ingress_byte_budget = config.ingress_byte_budget;
444 self.framework
445 .outbound_queue
446 .set_cap(config.max_outbound_queue);
447 self.bus.set_cap(Some(config.bus_capacity));
448 // The off-thread `CompletionSink::complete` path consults the
449 // ingress queue itself for its per-item cap; reseed the
450 // atomic so sinks created before this call see the configured
451 // value on their next push.
452 self.ingress
453 .set_completion_result_cap(config.max_completion_result_bytes);
454 // Reseed the BackpressureTracker with the configured knobs.
455 // `apply_config_caps` is the canonical entry the host calls
456 // before the first poll, so a fresh tracker reflecting the
457 // resolved knobs is the only state observers see.
458 self.framework.peer_state.backpressure = crate::framework::BackpressureTracker::with_config(
459 config.backpressure_high_water_pct,
460 config.backpressure_k_before_silent,
461 config.backpressure_min_notice_interval_ns,
462 );
463 }
464
465 /// Live count of ingress bytes the engine currently holds across
466 /// its ingress queue + slot table + pending async completion
467 /// buffers. Updated by every successful charge / release pair on
468 /// the ingress paths. Surfaced for observability (operator
469 /// dashboards) and assertions.
470 pub fn ingress_bytes_in_flight(&self) -> usize {
471 self.ingress_bytes_in_flight
472 }
473
474 /// Configured cap on cumulative in-flight ingress bytes,
475 /// sourced from `NodeConfig::ingress_byte_budget`. Constant
476 /// between `apply_config_caps` calls.
477 pub fn ingress_byte_budget(&self) -> usize {
478 self.ingress_byte_budget
479 }
480
481 /// Pre-admission budget guard for an ingress payload of length
482 /// `bytes`. On success the bytes are added to
483 /// `ingress_bytes_in_flight` and the caller may install the
484 /// resulting carrier into the slot table or pending-completion
485 /// queue. On overflow the counter is left unchanged and the
486 /// caller drops the payload, emitting the appropriate
487 /// `BudgetExceeded` `InfraEvent`.
488 ///
489 /// One saturating-add + one comparison; below the cost of the
490 /// prost decode that typically follows.
491 pub(crate) fn try_charge(&mut self, bytes: usize) -> Result<(), BudgetExceededReason> {
492 let after = self.ingress_bytes_in_flight.saturating_add(bytes);
493 if after > self.ingress_byte_budget {
494 return Err(BudgetExceededReason {
495 byte_count: bytes,
496 budget_remaining: self
497 .ingress_byte_budget
498 .saturating_sub(self.ingress_bytes_in_flight),
499 });
500 }
501 self.ingress_bytes_in_flight = after;
502 Ok(())
503 }
504
505 /// Decrement `ingress_bytes_in_flight` after a charged payload
506 /// leaves engine state — slot-table overwrite, slot clear,
507 /// eviction, or in-cycle drop. `saturating_sub` defends against
508 /// a release path that arrives without a paired charge (e.g. a
509 /// snapshot replay reseeding the slot table before any wire
510 /// traffic).
511 pub(crate) fn release(&mut self, bytes: usize) {
512 self.ingress_bytes_in_flight = self.ingress_bytes_in_flight.saturating_sub(bytes);
513 }
514
515 /// Write a value into the slot table at `(site, exec_id)`,
516 /// releasing the prior occupant's `charged_bytes` against
517 /// `ingress_bytes_in_flight`. The incoming carrier's
518 /// `charged_bytes` is NOT re-added — admission callers
519 /// (`decode_typed_fill`, `deliver_event`, etc.) have already run
520 /// `try_charge` against the wire-byte budget. This helper is the
521 /// slot-table-side bookkeeping that closes the loop on overwrite.
522 ///
523 /// Returns the prior boxed value (if any) so the caller can
524 /// run additional teardown.
525 pub(crate) fn slot_write(
526 &mut self,
527 site: NodeSiteId,
528 exec_id: ExecId,
529 value: Box<dyn SlotValue>,
530 ) -> Option<Box<dyn SlotValue>> {
531 let key = (site, exec_id);
532 let prior = self.exec.slot_table.insert(key, Some(value));
533 // `prior` is `Option<Option<Box<dyn SlotValue>>>`: outer None
534 // means the slot was untouched; inner None means the slot
535 // existed but was empty (cleared previously). Release only
536 // when the prior carrier was alive.
537 match prior {
538 Some(Some(prior_box)) => {
539 self.ingress_bytes_in_flight = self
540 .ingress_bytes_in_flight
541 .saturating_sub(prior_box.charged_bytes());
542 Some(prior_box)
543 }
544 _ => None,
545 }
546 }
547
548 /// Remove the slot at `(site, exec_id)`, releasing any
549 /// `charged_bytes` the prior carrier was holding against
550 /// `ingress_bytes_in_flight`. Returns the removed carrier so
551 /// downstream paths (graph reset, GC) can pass it onward.
552 pub(crate) fn clear_slot(
553 &mut self,
554 site: NodeSiteId,
555 exec_id: ExecId,
556 ) -> Option<Box<dyn SlotValue>> {
557 let key = (site, exec_id);
558 match self.exec.slot_table.remove(&key) {
559 Some(Some(prior_box)) => {
560 self.ingress_bytes_in_flight = self
561 .ingress_bytes_in_flight
562 .saturating_sub(prior_box.charged_bytes());
563 Some(prior_box)
564 }
565 _ => None,
566 }
567 }
568
569 /// Snapshot of the engine's hot-path state, sized for cheap
570 /// reads on every poll cycle (no allocation). Production
571 /// observability: operators see saturation building up before
572 /// the process locks up.
573 pub fn engine_stats(&self) -> EngineStats {
574 EngineStats {
575 frontier_len: self.exec.frontier.len(),
576 bus_len: self.bus.len(),
577 pending_async: self.exec.pending_async.len(),
578 slot_table_occupied: self.exec.slot_table.len(),
579 ingress_depth: self.ingress.len(),
580 outbound_queue_depth: self.framework.outbound_queue.len(),
581 event_subscriptions: self.event_subscriptions.len(),
582 registered_components: self.components.len(),
583 graph_slots: self.graphs.len(),
584 }
585 }
586
587 /// Register a `ProtocolRuntime` dispatcher for the concrete
588 /// component type `T`. Call once per `T` after constructing the
589 /// Engine - typically alongside `register_component` for any
590 /// component whose `dispatch_atomic` you want to drive. Indexed
591 /// by `TypeId::of::<T>()` so dispatch is one HashMap lookup,
592 /// not a linear scan across the registry.
593 pub fn register_protocol_dispatcher<T: crate::roles::ProtocolRuntime + 'static>(&mut self)
594 where
595 T::Error: std::fmt::Display,
596 {
597 let type_id = std::any::TypeId::of::<T>();
598 self.role_dispatchers
599 .insert(type_id, make_protocol_dispatcher::<T>());
600 }
601
602 /// Register a role dispatcher keyed by `TypeId::of::<T>()` for a
603 /// concrete `IndexRuntime` impl. Lets `Node::with_index(&value)`
604 /// wire atomic dispatch even when `T` does not implement
605 /// `ProtocolRuntime`. Calling this twice for the same `T`
606 /// silently overwrites; the dispatcher is idempotent because
607 /// `T::dispatch_atomic` is the only consumer.
608 pub fn register_index_dispatcher<T: crate::roles::IndexRuntime + 'static>(&mut self)
609 where
610 <T as crate::roles::IndexRuntime>::Error: std::fmt::Display,
611 {
612 let type_id = std::any::TypeId::of::<T>();
613 self.role_dispatchers
614 .insert(type_id, crate::engine::invoke::make_index_dispatcher::<T>());
615 }
616
617 /// Register an `AggregatorRuntime` dispatcher. See
618 /// [`Engine::register_index_dispatcher`] for the rationale.
619 pub fn register_aggregator_dispatcher<T: crate::roles::AggregatorRuntime + 'static>(&mut self)
620 where
621 <T as crate::roles::AggregatorRuntime>::Error: std::fmt::Display,
622 {
623 let type_id = std::any::TypeId::of::<T>();
624 self.role_dispatchers.insert(
625 type_id,
626 crate::engine::invoke::make_aggregator_dispatcher::<T>(),
627 );
628 }
629
630 /// Register a `ModelRuntime` dispatcher. See
631 /// [`Engine::register_index_dispatcher`] for the rationale.
632 pub fn register_model_dispatcher<T: crate::roles::ModelRuntime + 'static>(&mut self)
633 where
634 <T as crate::roles::ModelRuntime>::Error: std::fmt::Display,
635 {
636 let type_id = std::any::TypeId::of::<T>();
637 self.role_dispatchers
638 .insert(type_id, crate::engine::invoke::make_model_dispatcher::<T>());
639 }
640
641 /// Register a `CodecRuntime` dispatcher. See
642 /// [`Engine::register_index_dispatcher`] for the rationale.
643 pub fn register_codec_dispatcher<T: crate::roles::CodecRuntime + 'static>(&mut self)
644 where
645 <T as crate::roles::CodecRuntime>::Error: std::fmt::Display,
646 {
647 let type_id = std::any::TypeId::of::<T>();
648 self.role_dispatchers
649 .insert(type_id, crate::engine::invoke::make_codec_dispatcher::<T>());
650 }
651
652 /// Register a `DataSourceRuntime` dispatcher. See
653 /// [`Engine::register_index_dispatcher`] for the rationale.
654 pub fn register_data_source_dispatcher<T: crate::roles::DataSourceRuntime + 'static>(&mut self)
655 where
656 <T as crate::roles::DataSourceRuntime>::Error: std::fmt::Display,
657 {
658 let type_id = std::any::TypeId::of::<T>();
659 self.role_dispatchers.insert(
660 type_id,
661 crate::engine::invoke::make_data_source_dispatcher::<T>(),
662 );
663 }
664
665 /// Register a `PeerSelectorRuntime` dispatcher. See
666 /// [`Engine::register_index_dispatcher`] for the rationale.
667 pub fn register_peer_selector_dispatcher<T: crate::roles::PeerSelectorRuntime + 'static>(
668 &mut self,
669 ) where
670 <T as crate::roles::PeerSelectorRuntime>::Error: std::fmt::Display,
671 {
672 let type_id = std::any::TypeId::of::<T>();
673 self.role_dispatchers.insert(
674 type_id,
675 crate::engine::invoke::make_peer_selector_dispatcher::<T>(),
676 );
677 }
678
679 /// Register a `BackendRuntime` dispatcher. The `Backend` Contract
680 /// trait's per-atomic-op surface dispatches through this entry,
681 /// emitted from `#[derive(bb::Backend)]`.
682 pub fn register_backend_dispatcher<T: crate::roles::BackendRuntime + 'static>(&mut self)
683 where
684 <T as crate::roles::BackendRuntime>::Error: std::fmt::Display,
685 {
686 let type_id = std::any::TypeId::of::<T>();
687 self.role_dispatchers.insert(
688 type_id,
689 crate::engine::invoke::make_backend_dispatcher::<T>(),
690 );
691 }
692
693 /// Record the inventory-declared roles for a registered
694 /// component. `Node::ensure_ready` calls this once per
695 /// `ComponentRef` after registration, passing the set computed
696 /// from `crate::registry::roles_for_component(T::TYPE_NAME)`.
697 pub fn set_component_roles(
698 &mut self,
699 cref: ComponentRef,
700 roles: std::collections::HashSet<crate::registry::ComponentRole>,
701 ) {
702 self.component_roles.insert(cref, roles);
703 }
704
705 /// Return the inventory-declared roles for a registered
706 /// component, or an empty set if the component wasn't registered
707 /// through a derive emitting `ComponentRoleBinding` entries.
708 /// Introspection surface for engine tests + host tooling.
709 pub fn roles_for(
710 &self,
711 cref: ComponentRef,
712 ) -> std::collections::HashSet<crate::registry::ComponentRole> {
713 self.component_roles.get(&cref).cloned().unwrap_or_default()
714 }
715
716 /// Register a `binding_id → ComponentRef` mapping. Called by
717 /// `Node::ensure_ready` after binding resolution so the bound-
718 /// backend lookup can resolve a NodeProto's `binding_id`
719 /// metadata against an installed component.
720 pub fn register_binding_id(&mut self, binding_id: String, cref: ComponentRef) {
721 self.binding_id_index.insert(binding_id, cref);
722 }
723
724 /// Look up the `ComponentRef` bound at the given slot name -
725 /// the GENERIC dependency-resolution accessor. Components reach
726 /// declared dependencies through this method (typically via
727 /// [`crate::runtime::ComponentsView::for_slot`] at dispatch
728 /// time). Returns `None` when no slot of that name is bound.
729 pub fn slot(&self, slot: &str) -> Option<ComponentRef> {
730 self.slots.get(slot).copied()
731 }
732
733 /// Bind a `ComponentRef` at the given slot name. The
734 /// `install` facade in the `bytesandbrains` crate calls this
735 /// from the T8 chain; in-crate callers use it from the
736 /// transitional `Node::with_backend(slot, &b)` path. Returns
737 /// the previous binding if any.
738 pub fn bind_slot(&mut self, slot: String, cref: ComponentRef) -> Option<ComponentRef> {
739 self.slots.insert(slot, cref)
740 }
741
742 /// Register the compiler-assigned `slot_id` → `ComponentRef`
743 /// mapping. Called by `bb::install()` alongside
744 /// [`Self::bind_slot`]; the pair is read by
745 /// [`Self::resolve_dispatch`] when stamping
746 /// `OpDispatch::Atomic` against a role NodeProto's
747 /// `ai.bytesandbrains.slot_id` metadata. Returns the previous
748 /// binding, if any.
749 pub fn bind_slot_id(&mut self, slot_id: u32, cref: ComponentRef) -> Option<ComponentRef> {
750 self.slot_id_to_cref.insert(slot_id, cref)
751 }
752
753 /// Register the compiler-assigned `slot_id` → `(role,
754 /// ComponentRef)` mapping. Called by `bb::install()` alongside
755 /// [`Self::bind_slot_id`]; the role is required so
756 /// `decode_typed_fill` can branch between framework-carrier
757 /// decode (`Codec`, `Index`, …) and backend-mediated tensor
758 /// materialisation (`Backend`). Returns the previous binding, if
759 /// any.
760 pub fn bind_slot_id_with_role(
761 &mut self,
762 slot_id: u32,
763 role: crate::registry::ComponentRole,
764 cref: ComponentRef,
765 ) -> Option<(crate::registry::ComponentRole, ComponentRef)> {
766 self.slot_id_to_role_ref.insert(slot_id, (role, cref))
767 }
768
769 /// Look up the `(role, ComponentRef)` bound at a compiler-assigned
770 /// `slot_id`. Used by `decode_typed_fill` to discover whether an
771 /// inbound wire payload routes through a backend.
772 pub fn role_ref_for_slot_id(
773 &self,
774 slot_id: u32,
775 ) -> Option<(crate::registry::ComponentRole, ComponentRef)> {
776 self.slot_id_to_role_ref.get(&slot_id).copied()
777 }
778
779 /// Iterate every `(slot_name, ComponentRef)` pair currently
780 /// bound. Surfaces the registry to introspection tools + the
781 /// compiler's resolve-dependencies pass.
782 pub fn slots_iter(&self) -> impl Iterator<Item = (&str, ComponentRef)> {
783 self.slots.iter().map(|(k, v)| (k.as_str(), *v))
784 }
785
786 /// Subscribe a `NodeSiteId` to bus events of `event_kind` (the
787 /// discriminator returned by [`crate::bus::NodeEvent::kind`]).
788 /// The bus-routing pass writes a `TriggerValue` to each
789 /// subscribed site at a fresh `ExecId` and pushes the site's
790 /// downstream consumers onto the frontier — uniform with wire
791 /// delivery semantics per `docs/ADDRESSING.md`.
792 ///
793 /// `Node` calls this at install time for every
794 /// `EventSource` syscall op, passing the op's output `NodeSiteId`.
795 pub fn register_event_subscription(&mut self, event_kind: &str, site: NodeSiteId) {
796 let entry = self
797 .event_subscriptions
798 .entry(event_kind.to_string())
799 .or_default();
800 if !entry.contains(&site) {
801 entry.push(site);
802 }
803 }
804
805 /// Cheap clone of the shared `IngressQueue` handle. Test
806 /// harnesses + transport adapters push `IngressEvent`s through
807 /// this surface.
808 pub fn ingress_queue_handle(&self) -> Arc<IngressQueue> {
809 Arc::clone(&self.ingress)
810 }
811
812 /// Queue a lifecycle phase for firing on the next `poll()`
813 /// call. The framework emits `EngineStep::LifecycleFired { phase }`
814 /// for each queued phase and also pushes every `LifecyclePhase`
815 /// op enrolled under that phase name (via
816 /// [`Engine::register_lifecycle_op`]) onto the frontier with a
817 /// fresh `ExecId`.
818 pub fn fire_lifecycle(&mut self, phase: &str) {
819 self.fired_phases.push(phase.to_string());
820 }
821
822 /// Enroll `op_ref` under `phase` per IR_AND_DSL.md §5a.
823 /// Idempotent - the same `(phase, OpRef)` pair never enrolls
824 /// twice. `Node` calls this at install time after parsing
825 /// each `LifecyclePhase` NodeProto's `phase` attribute.
826 pub fn register_lifecycle_op(&mut self, phase: &str, op_ref: OpRef) {
827 let entry = self.lifecycle_table.entry(phase.to_string()).or_default();
828 if !entry.contains(&op_ref) {
829 entry.push(op_ref);
830 }
831 }
832
833 /// Register a stateless syscall op. Captures `TypeId::of::<T>()`
834 /// into both `dispatch_table` (TypeId → invoke fn) and
835 /// `syscall_index` ((domain, op_type) → TypeId) so
836 /// `resolve_dispatch` can stamp `OpDispatch::Stateless`.
837 ///
838 /// Register a stateless syscall by its `(domain, op_type)` key.
839 pub fn register_syscall(&mut self, domain: &str, op_type: &str, invoke_fn: StatelessInvokeFn) {
840 self.syscall_table
841 .insert((domain.to_string(), op_type.to_string()), invoke_fn);
842 }
843
844 /// Install every framework syscall shipped via inventory by
845 /// `bb-ops`. Each registration carries its own
846 /// `(domain, op_type)` + invoke pointer; the engine stamps them
847 /// into `syscall_table`.
848 pub fn register_all_framework_syscalls(&mut self) {
849 for reg in crate::registry::framework_syscalls() {
850 self.syscall_table.insert(
851 (reg.domain.to_string(), reg.op_type.to_string()),
852 reg.invoke,
853 );
854 }
855 }
856
857 /// Test-only installer. Inserts a fresh `GraphSlot` keyed by
858 /// `name` with empty per-node tables but with `op_dispatch`
859 /// pre-filled with `Unresolved` so subsequent `resolve_dispatch`
860 /// can stamp dispatch decisions. Use [`Engine::install_graph`]
861 /// for the canonical path that walks the FunctionProto.
862 #[cfg(any(test, feature = "test-components"))]
863 pub fn install_graph_for_test(
864 &mut self,
865 name: String,
866 function: FunctionProto,
867 ) -> &mut GraphSlot {
868 let mut g = GraphSlot::new_for_test(name.clone(), function);
869 g.op_dispatch = (0..g.function.node.len())
870 .map(|_| crate::engine::dispatch_entry::OpDispatch::Unresolved)
871 .collect();
872 let idx = self.push_graph_slot(name, g);
873 &mut self.graphs[idx as usize]
874 }
875
876 /// Push a `GraphSlot` onto the storage Vec and register its
877 /// name → index entry. Returns the assigned `graph_idx` (the
878 /// value that gets packed into `OpRef`). Same name twice
879 /// overwrites the existing slot but keeps the original
880 /// `graph_idx` — preserving OpRef stability across re-install.
881 pub(crate) fn push_graph_slot(&mut self, name: String, slot: GraphSlot) -> u32 {
882 if let Some(&idx) = self.graph_index.get(&name) {
883 self.graphs[idx as usize] = slot;
884 return idx;
885 }
886 let idx = self.graphs.len() as u32;
887 self.graph_index.insert(name, idx);
888 self.graphs.push(slot);
889 idx
890 }
891
892 /// Resolve a graph by name. Returns `None` when the name
893 /// isn't registered. Equivalent to `self.graphs.get(name)` on
894 /// the prior HashMap-keyed shape.
895 pub fn graph(&self, name: &str) -> Option<&GraphSlot> {
896 let idx = *self.graph_index.get(name)?;
897 self.graphs.get(idx as usize)
898 }
899
900 /// Resolve a graph by name for mutation. `None` when the name
901 /// isn't registered.
902 pub fn graph_mut(&mut self, name: &str) -> Option<&mut GraphSlot> {
903 let idx = *self.graph_index.get(name)?;
904 self.graphs.get_mut(idx as usize)
905 }
906
907 /// `true` when a graph with this name is installed.
908 pub fn has_graph(&self, name: &str) -> bool {
909 self.graph_index.contains_key(name)
910 }
911
912 /// Resolve a graph's positional index by name. Used by paths
913 /// that need to compute `OpRef::pack(idx, node_idx)` from a
914 /// graph name (function-call site resolution, etc.).
915 pub fn graph_idx(&self, name: &str) -> Option<u32> {
916 self.graph_index.get(name).copied()
917 }
918
919 /// Build an `OpRef` for the `node_idx`-th NodeProto of a graph
920 /// identified by name. Test-only convenience for tests that
921 /// used to fish the OpRef out of `GraphSlot.op_index`; with
922 /// positional `OpRef::pack(graph_idx, node_idx)` the lookup is
923 /// trivial.
924 #[cfg(any(test, feature = "test-components"))]
925 pub fn op_ref_at(&self, graph_name: &str, node_idx: u32) -> Option<OpRef> {
926 let gi = self.graph_idx(graph_name)?;
927 let g = self.graphs.get(gi as usize)?;
928 if (node_idx as usize) < g.function.node.len() {
929 Some(OpRef::pack(gi, node_idx))
930 } else {
931 None
932 }
933 }
934
935 /// Iterate every installed `GraphSlot` in install order.
936 pub fn graphs_iter(&self) -> impl Iterator<Item = &GraphSlot> {
937 self.graphs.iter()
938 }
939
940 /// Iterate every (`name`, `&GraphSlot`) pair in install order.
941 pub fn graphs_named(&self) -> impl Iterator<Item = (&str, &GraphSlot)> {
942 // graph_index maps name -> idx; rebuild idx -> name for the
943 // walk so the iteration order matches the storage Vec.
944 let mut by_idx: Vec<(u32, &str)> = self
945 .graph_index
946 .iter()
947 .map(|(n, i)| (*i, n.as_str()))
948 .collect();
949 by_idx.sort_by_key(|&(i, _)| i);
950 by_idx
951 .into_iter()
952 .filter_map(move |(i, n)| self.graphs.get(i as usize).map(|g| (n, g)))
953 }
954
955 /// Canonical install path: builds an
956 /// [`GraphSlot`] from the FunctionProto (allocating
957 /// `OpRef`s + `NodeSiteId`s for every node + produced value) and
958 /// inserts it under `name`.
959 ///
960 /// Used by [`crate::node::Node::ready`] for each
961 /// `ModelProto.functions[0]`. Returns a mutable reference
962 /// for any subsequent setup (slot_bindings, local_event_subs).
963 pub fn install_graph(&mut self, name: String, function: FunctionProto) -> &mut GraphSlot {
964 let graph_idx = self.graphs.len() as u32;
965 let mut g = GraphSlot::from_function(
966 name.clone(),
967 function,
968 graph_idx,
969 &mut self.exec.ids.next_node_site_id,
970 );
971 // Entry-point graphs (installed via `install_graph`, not
972 // `install_function_library`) get a `NodeSiteId` registered
973 // for every function input so `Engine::deliver_app_event`
974 // can seed the input via ingress. Body functions used in
975 // `OpDispatch::FunctionCall` deliberately route through
976 // `input_aliases` and must NOT get input sites; that path
977 // installs through `install_function_library` instead.
978 register_function_input_sites(&mut g, &mut self.exec.ids.next_node_site_id, graph_idx);
979 let idx = self.push_graph_slot(name, g);
980 &mut self.graphs[idx as usize]
981 }
982
983 /// Runtime-linker install: walk `model.functions[]` and install
984 /// each FunctionProto as an `GraphSlot` keyed by its
985 /// canonical `(domain, name, overload)`-derived string. Also
986 /// populates the symbol-table index `functions` keyed on the same
987 /// tuple, so call NodeProtos can be resolved at dispatch time.
988 ///
989 /// `entry_point_keys` lists the `FunctionKey`s for the registered
990 /// Modules' main partition functions - those graphs get
991 /// `is_entry_point = true` (their top-level outputs surface as
992 /// `EngineStep::AppEvent`; sub-function bodies do not).
993 ///
994 /// A function stamped `MODULE_PHASE_KEY = "bootstrap"` registers
995 /// its `FunctionKey` with the engine's bootstrap state (appends
996 /// to `install_order`, populates `module_bootstraps`) without
997 /// arming `pending` — install is pure. The host arms the queue by
998 /// calling [`crate::node::Node::run_bootstrap`], which fans out
999 /// each install-order target serially and emits one
1000 /// `BootstrapComplete` step per drained phase; multi-target
1001 /// installs surface their targets in slice order without further
1002 /// host action.
1003 ///
1004 /// Idempotent under ODR (same key + same body) - silently skips
1005 /// reinstall. Caller (Node linker) is responsible for the
1006 /// byte-equality check before calling.
1007 pub fn install_function_library(
1008 &mut self,
1009 functions: &[FunctionProto],
1010 entry_point_keys: &[FunctionKey],
1011 ) {
1012 let entry_set: std::collections::HashSet<&FunctionKey> = entry_point_keys.iter().collect();
1013 for f in functions {
1014 let key: FunctionKey = (f.domain.clone(), f.name.clone(), f.overload.clone());
1015 let graph_name = graph_name_for(&key);
1016 if self.has_graph(&graph_name) {
1017 continue;
1018 }
1019 let graph_idx = self.graphs.len() as u32;
1020 let mut g = GraphSlot::from_function(
1021 graph_name.clone(),
1022 f.clone(),
1023 graph_idx,
1024 &mut self.exec.ids.next_node_site_id,
1025 );
1026 // Only entry-point functions surface their outputs as
1027 // AppEvents. Sub-function bodies' outputs are forwarded
1028 // via output_forwarding at call sites.
1029 g.is_entry_point = entry_set.contains(&key);
1030 if !g.is_entry_point {
1031 g.top_level_outputs.clear();
1032 }
1033 let is_bootstrap = bb_ir::keys::read_function_module_phase(f)
1034 .is_some_and(|p| p == bb_ir::keys::MODULE_PHASE_BOOTSTRAP);
1035 // Bootstrap functions seed their inputs through the
1036 // host-driven staging path
1037 // (`Node::run_bootstrap(&[BootstrapInput])`) rather than
1038 // via a FunctionCall splice. Mint a `NodeSiteId` per
1039 // declared input formal so the staging path can address
1040 // the slot via `(NodeSiteId, body_exec_id)` and the body
1041 // ops can resolve their input names through
1042 // `resolve_site_name`.
1043 if is_bootstrap {
1044 register_function_input_sites(
1045 &mut g,
1046 &mut self.exec.ids.next_node_site_id,
1047 graph_idx,
1048 );
1049 }
1050 self.push_graph_slot(graph_name, g);
1051 self.functions.insert(key.clone(), f.clone());
1052 if is_bootstrap {
1053 // Register the bootstrap target. A multi-target
1054 // install registers one bootstrap per target (in
1055 // the order [`crate::install::install`] iterates the
1056 // user-supplied `targets` slice). Seeding drains
1057 // `install_order` front-to-back so each target's
1058 // bootstrap fires in slice order.
1059 self.bootstrap.register_module(key);
1060 }
1061 }
1062 }
1063
1064 /// Allocate the next bootstrap call's body `ExecId` and push every
1065 /// body OpRef of the front of
1066 /// [`crate::engine::bootstrap::BootstrapState::install_order`]
1067 /// onto the frontier. Returns `true` when a bootstrap call was
1068 /// seeded; `false` when the engine has no remaining bootstrap
1069 /// functions or the previous call is still in flight.
1070 ///
1071 /// Host-driven: `Node::run_bootstrap(&[])` invokes this once after
1072 /// arming `bootstrap.pending`; the poll cascade reseeds via
1073 /// `maybe_complete_bootstrap` after each phase drains so multi-
1074 /// target installs surface one `BootstrapComplete` per target in
1075 /// install order without further host action. Install itself no
1076 /// longer arms `pending`.
1077 pub(crate) fn seed_bootstrap_call(&mut self) -> bool {
1078 if self.bootstrap.current_exec_id.is_some() {
1079 return false;
1080 }
1081 if self.bootstrap.next_idx >= self.bootstrap.install_order.len() {
1082 // No further bootstraps to seed; the gate clears on the
1083 // next `maybe_complete_bootstrap` pass.
1084 self.bootstrap.pending = false;
1085 return false;
1086 }
1087 let target_name = self.bootstrap.install_order[self.bootstrap.next_idx].clone();
1088 let Some(meta) = self.bootstrap.module_bootstraps.get(&target_name) else {
1089 // Defensive skip: install_order names a target whose
1090 // metadata is missing. Advance past the stale entry
1091 // rather than wedging.
1092 self.bootstrap.next_idx += 1;
1093 return false;
1094 };
1095 let key = meta.function_key.clone();
1096 if self.fire_module_bootstrap(target_name, &key).is_none() {
1097 self.bootstrap.next_idx += 1;
1098 return false;
1099 }
1100 true
1101 }
1102
1103 /// Seed a Module bootstrap body onto the frontier under a fresh
1104 /// ExecId and record its ExecId in `bootstrap.current_exec_id`.
1105 /// Returns the body ExecId on success or `None` when the graph
1106 /// name is missing (defensive — install populates it).
1107 fn fire_module_bootstrap(&mut self, target_name: String, key: &FunctionKey) -> Option<ExecId> {
1108 let graph_name = graph_name_for(key);
1109 let graph_idx = self.graph_idx(&graph_name)?;
1110 let body_exec_id = self.allocate_exec_id();
1111 let node_count = self
1112 .graphs
1113 .get(graph_idx as usize)
1114 .map(|g| g.function.node.len())
1115 .unwrap_or(0);
1116 // Bootstrap takes no input formals and produces no outputs,
1117 // so no CallContext lives in `pending_calls`; the body-op
1118 // gate identifies bootstrap-descendant ExecIds by either
1119 // direct match against the current bootstrap ExecId or chain
1120 // walk through descendant FunctionCall CallContexts.
1121 // Quiescence resolves through `maybe_complete_bootstrap` once
1122 // every descendant frontier + pending_async entry clears.
1123 self.bootstrap
1124 .mark_module_in_flight(target_name, body_exec_id);
1125 for node_idx in 0..node_count as u32 {
1126 let op_ref = OpRef::pack(graph_idx, node_idx);
1127 self.exec.frontier.push_back((op_ref, body_exec_id));
1128 }
1129 Some(body_exec_id)
1130 }
1131
1132 /// Stage one [`crate::engine::BootstrapInput`] against its target's
1133 /// declared formal inputs, copy the bytes via Principle 1a, and
1134 /// seed the body onto the frontier. Helper called by
1135 /// [`Self::run_bootstrap`] per non-empty target.
1136 fn enqueue_module_bootstrap(
1137 &mut self,
1138 request: crate::engine::bootstrap::BootstrapInput<'_>,
1139 ) -> Result<(), crate::errors::BootstrapError> {
1140 // 1. Resolve target → function_key → graph_idx.
1141 let meta = self
1142 .bootstrap
1143 .module_bootstraps
1144 .get(request.target)
1145 .ok_or_else(|| crate::errors::BootstrapError::UnknownTarget {
1146 target_name: request.target.to_string(),
1147 available: self.bootstrap.install_order.clone(),
1148 })?;
1149 let function_key = meta.function_key.clone();
1150 let graph_name = graph_name_for(&function_key);
1151 let graph_idx = self.graph_idx(&graph_name).ok_or_else(|| {
1152 crate::errors::BootstrapError::UnknownTarget {
1153 target_name: request.target.to_string(),
1154 available: self.bootstrap.install_order.clone(),
1155 }
1156 })?;
1157 let graph = &self.graphs[graph_idx as usize];
1158
1159 // 2. Read declared input formals from the GraphSlot.
1160 let declared: Vec<String> = graph.function.input.clone();
1161
1162 // 3. Validate. UnknownInput fires first (the supplied set is
1163 // the host's authoritative request shape; surfacing extras
1164 // before missing ones gives clearer diagnostics on typos).
1165 for (input_name, _) in request.inputs {
1166 if !declared.iter().any(|d| d == input_name) {
1167 return Err(crate::errors::BootstrapError::UnknownInput {
1168 target_name: request.target.to_string(),
1169 input_name: input_name.to_string(),
1170 declared: declared.clone(),
1171 });
1172 }
1173 }
1174 for formal in &declared {
1175 if !request.inputs.iter().any(|(name, _)| *name == formal) {
1176 return Err(crate::errors::BootstrapError::MissingInput {
1177 target_name: request.target.to_string(),
1178 input_name: formal.clone(),
1179 });
1180 }
1181 }
1182
1183 // Resolve each formal to its NodeSiteId before allocating the
1184 // ExecId — a missing site at this stage is an install
1185 // invariant violation, but defending against it keeps the
1186 // error path total.
1187 let mut sites: Vec<(crate::ids::NodeSiteId, &[u8])> =
1188 Vec::with_capacity(request.inputs.len());
1189 for (input_name, bytes) in request.inputs {
1190 let Some(&site) = graph.site_names.get(*input_name) else {
1191 return Err(crate::errors::BootstrapError::UnknownInput {
1192 target_name: request.target.to_string(),
1193 input_name: input_name.to_string(),
1194 declared: declared.clone(),
1195 });
1196 };
1197 sites.push((site, *bytes));
1198 }
1199
1200 // 4. Allocate body ExecId. Done after validation so a rejected
1201 // request does not consume an ExecId counter slot.
1202 let body_exec_id = self.allocate_exec_id();
1203
1204 // 5. Per-input charge + Principle 1a copy. Track total
1205 // admitted bytes so a mid-loop failure releases the full
1206 // charge in one shot.
1207 let mut admitted: usize = 0;
1208 for (site, bytes) in &sites {
1209 let byte_count = bytes.len();
1210 if let Err(reason) = self.try_charge(byte_count) {
1211 self.release(admitted);
1212 return Err(crate::errors::BootstrapError::AllocationFailed {
1213 target_name: request.target.to_string(),
1214 byte_count,
1215 budget_remaining: reason.budget_remaining,
1216 });
1217 }
1218 admitted = admitted.saturating_add(byte_count);
1219 let mut owned: Vec<u8> = Vec::new();
1220 if crate::fallible::try_reserve_exact(&mut owned, byte_count).is_err() {
1221 self.release(admitted);
1222 return Err(crate::errors::BootstrapError::AllocationFailed {
1223 target_name: request.target.to_string(),
1224 byte_count,
1225 budget_remaining: self
1226 .ingress_byte_budget
1227 .saturating_sub(self.ingress_bytes_in_flight),
1228 });
1229 }
1230 owned.extend_from_slice(bytes);
1231 let value: Box<dyn crate::slot_value::SlotValue> =
1232 Box::new(crate::syscall::values::BytesValue(owned));
1233 self.slot_write(*site, body_exec_id, value);
1234 }
1235
1236 // 6. Mark the Module bootstrap in-flight and push every body
1237 // OpRef onto the frontier. The body-op gate (`is_op_locked`)
1238 // recognises the freshly seeded ExecId via `current_exec_id`
1239 // so descendant ops keep firing.
1240 self.bootstrap.pending = true;
1241 self.bootstrap
1242 .mark_module_in_flight(request.target.to_string(), body_exec_id);
1243 let node_count = self.graphs[graph_idx as usize].function.node.len();
1244 for node_idx in 0..node_count as u32 {
1245 let op_ref = OpRef::pack(graph_idx, node_idx);
1246 self.exec.frontier.push_back((op_ref, body_exec_id));
1247 }
1248 Ok(())
1249 }
1250
1251 /// Flat host-facing bootstrap entry point. Empty slice fires the
1252 /// install-order kick (arming + seeding every queued target);
1253 /// non-empty slice stages each [`BootstrapInput`] in slice order
1254 /// using the same validation + Principle 1a copy + frontier seed
1255 /// as `enqueue_module_bootstrap`.
1256 pub fn run_bootstrap(
1257 &mut self,
1258 targets: &[crate::engine::bootstrap::BootstrapInput<'_>],
1259 ) -> Result<bool, crate::errors::BootstrapError> {
1260 if targets.is_empty() {
1261 if !self.bootstrap.arm_install_order() {
1262 return Ok(false);
1263 }
1264 return Ok(self.seed_bootstrap_call());
1265 }
1266 for req in targets {
1267 self.enqueue_module_bootstrap(crate::engine::bootstrap::BootstrapInput {
1268 target: req.target,
1269 inputs: req.inputs,
1270 })?;
1271 }
1272 Ok(true)
1273 }
1274
1275 /// `(domain, name, overload)` of the first bootstrap function
1276 /// recorded at install time, or `None` when no
1277 /// `module_phase = "bootstrap"` FunctionProto reached the function
1278 /// library. Stable across `clear_for_restore` (which preserves
1279 /// install-order metadata but bumps `next_idx` past every queued
1280 /// target so the restored Node does not re-fire bootstraps it
1281 /// already ran). For the full ordered list (multi-target installs
1282 /// queue one key per target), use [`Self::bootstrap_function_keys`].
1283 pub fn bootstrap_function_key(&self) -> Option<FunctionKey> {
1284 self.bootstrap.first_function_key().cloned()
1285 }
1286
1287 /// All bootstrap function keys the engine has queued, in install
1288 /// order. Multi-target installs append one entry per target via
1289 /// [`Self::install_function_library`]; the seeder fires each in
1290 /// slice order. Stable across `clear_for_restore` for snapshot
1291 /// introspection.
1292 pub fn bootstrap_function_keys(&self) -> Vec<FunctionKey> {
1293 self.bootstrap.function_keys()
1294 }
1295
1296 /// `true` while a bootstrap call is outstanding. Armed by
1297 /// [`Self::run_bootstrap`]; cleared once every queued phase
1298 /// drains.
1299 pub fn bootstrap_pending(&self) -> bool {
1300 self.bootstrap.pending
1301 }
1302
1303 /// Lifecycle status for the host-facing
1304 /// [`crate::node::Node::bootstrap_status`] accessor. `Idle` when no
1305 /// bootstrap is queued or in-flight; `Running` when a bootstrap
1306 /// body is currently in-flight; `WaitingForInput` when the
1307 /// install-order queue still has unseeded targets but no body is
1308 /// active yet (the host must drive the queue to advance).
1309 pub fn bootstrap_status(&self) -> crate::engine::bootstrap::BootstrapStatus {
1310 if self.bootstrap.current_exec_id.is_some() {
1311 return crate::engine::bootstrap::BootstrapStatus::Running;
1312 }
1313 if self.bootstrap.pending && self.bootstrap.next_idx < self.bootstrap.install_order.len() {
1314 return crate::engine::bootstrap::BootstrapStatus::WaitingForInput;
1315 }
1316 crate::engine::bootstrap::BootstrapStatus::Idle
1317 }
1318
1319 /// `true` when `target_name` is already on the install-order Module
1320 /// bootstrap queue. Used by `Node::run_bootstrap` validation.
1321 pub fn module_bootstrap_registered(&self, target_name: &str) -> bool {
1322 self.bootstrap.module_bootstraps.contains_key(target_name)
1323 }
1324
1325 /// Snapshot of every registered Module bootstrap target name in
1326 /// install order. Returned by `BootstrapError::UnknownTarget` so
1327 /// callers see the legal set.
1328 pub fn module_bootstrap_target_names(&self) -> Vec<String> {
1329 self.bootstrap.install_order.clone()
1330 }
1331
1332 /// Body-op gate. Returns `true` when the op must park because a
1333 /// bootstrap body is in-flight and the op's ExecId is not a
1334 /// descendant of the bootstrap ExecId; `false` when the op is
1335 /// fireable.
1336 ///
1337 /// Resolution order:
1338 /// 1. `bootstrap.pending` clear → fire (gate dormant).
1339 /// 2. `exec_id` descends from the in-flight bootstrap ExecId via
1340 /// the `pending_calls.parent_exec_id` chain → fire. Bootstrap
1341 /// body + its sub-FunctionCalls keep firing while the body
1342 /// runs.
1343 /// 3. Otherwise → park. The collapsed gate denies every body-op
1344 /// until the bootstrap drains.
1345 pub(crate) fn is_op_locked(&self, _op_ref: OpRef, exec_id: ExecId) -> bool {
1346 if !self.bootstrap.pending {
1347 return false;
1348 }
1349 let Some(boot_exec) = self.bootstrap.current_exec_id else {
1350 return false;
1351 };
1352 // 2. Bootstrap-descendant exec ids fire freely. Walk the
1353 // call chain once and short-circuit if the in-flight ExecId
1354 // matches anywhere along the chain.
1355 let mut current = exec_id;
1356 loop {
1357 if current == boot_exec {
1358 return false;
1359 }
1360 match self.exec.pending_calls.get(¤t) {
1361 Some(call) => current = call.parent_exec_id,
1362 None => break,
1363 }
1364 }
1365 true
1366 }
1367
1368 /// Inspect engine state and pop the in-flight bootstrap key once
1369 /// every bootstrap-descendant frontier entry and `pending_async`
1370 /// entry has cleared. Returns `true` when one phase just drained
1371 /// (i.e. the caller poll cycle should append a `BootstrapComplete`
1372 /// step for that phase). With remaining queued keys, the next
1373 /// `seed_bootstrap_call` advances to the following target;
1374 /// `bootstrap.pending` flips off only after the last key drains.
1375 /// Called after each drain phase + the ingress completion drain.
1376 pub(crate) fn maybe_complete_bootstrap(&mut self) -> bool {
1377 if !self.bootstrap.pending {
1378 return false;
1379 }
1380 let Some(boot_exec) = self.bootstrap.current_exec_id else {
1381 return false;
1382 };
1383 let descendant = |engine: &Engine, mut exec_id: ExecId| -> bool {
1384 loop {
1385 if exec_id == boot_exec {
1386 return true;
1387 }
1388 match engine.exec.pending_calls.get(&exec_id) {
1389 Some(call) => exec_id = call.parent_exec_id,
1390 None => return false,
1391 }
1392 }
1393 };
1394 if self
1395 .exec
1396 .frontier
1397 .iter()
1398 .any(|(_, exec_id)| descendant(self, *exec_id))
1399 {
1400 return false;
1401 }
1402 if self
1403 .exec
1404 .pending_async
1405 .values()
1406 .any(|p| descendant(self, p.exec_id))
1407 {
1408 return false;
1409 }
1410 // The in-flight phase drained. Advance the install_order
1411 // pointer (the post-kick cascade) and retire the in-flight
1412 // ExecId. `bootstrap.pending` clears once the cursor reaches
1413 // the end of `install_order` AND no in-flight body remains.
1414 self.bootstrap.next_idx += 1;
1415 self.bootstrap.clear_in_flight();
1416 if self.bootstrap.next_idx >= self.bootstrap.install_order.len()
1417 && self.bootstrap.current_exec_id.is_none()
1418 {
1419 self.bootstrap.pending = false;
1420 }
1421 true
1422 }
1423
1424 /// Resolve every NodeProto's dispatch kind into `op_dispatch[]`
1425 /// per ENGINE.md §8.1 + §8.4. Run after install completes (so all
1426 /// symbols are in `self.functions`) but before the first poll.
1427 ///
1428 /// Walk order: each GraphSlot in turn. For each NodeProto:
1429 /// - syscall (`syscall_index` hit) → `Stateless`
1430 /// - call to function in `self.functions` with domain
1431 /// `ai.bytesandbrains.module` → `FunctionCall` with the target
1432 /// key + input/output rename pairs from the call's value names
1433 /// zipped against the target function's formals.
1434 /// - call to function with domain `ai.bytesandbrains.framework`
1435 /// starting with `BackendSubgraph_` → `BackendSubgraph` with
1436 /// bound backend resolved via `BINDING_ID_KEY` metadata against
1437 /// the atomic-dispatch table.
1438 /// - else atomic dispatch by `(domain, op_type, instance)` →
1439 /// `Atomic`.
1440 ///
1441 /// Returns the number of nodes that remained `Unresolved`. Caller
1442 /// should fail build if non-zero.
1443 pub fn resolve_dispatch(&mut self) -> usize {
1444 // Snapshot per-graph node lists so we don't hold a borrow on
1445 // self.graphs while reading other tables. Indices are the
1446 // positional graph_idx values packed into OpRefs.
1447 let graph_count = self.graphs.len();
1448 let mut unresolved = 0;
1449 for graph_idx in 0..graph_count {
1450 let (function_domain, nodes): (String, Vec<NodeProto>) = {
1451 let g = &self.graphs[graph_idx];
1452 (g.function.domain.clone(), g.function.node.clone())
1453 };
1454 // BackendSubgraph bodies (domain == ai.bytesandbrains.framework)
1455 // are handed wholesale to the bound Backend Contract impl per
1456 // ENGINE.md §8.5; their interior NodeProtos are never invoked
1457 // individually by the engine, so resolve_dispatch leaves the
1458 // body's op_dispatch as Unresolved without counting it as a
1459 // failure. Mirror the same skip already applied by
1460 // Node's unsupported-ops pre-flight in `src/node.rs`.
1461 if function_domain == "ai.bytesandbrains.framework" {
1462 let mut dispatch: Vec<OpDispatch> = Vec::with_capacity(nodes.len());
1463 for _ in &nodes {
1464 dispatch.push(OpDispatch::Unresolved);
1465 }
1466 self.graphs[graph_idx].op_dispatch = dispatch;
1467 continue;
1468 }
1469 let mut dispatch: Vec<OpDispatch> = Vec::with_capacity(nodes.len());
1470 for node in &nodes {
1471 let resolved = self.resolve_one(node);
1472 if matches!(resolved, OpDispatch::Unresolved) {
1473 unresolved += 1;
1474 }
1475 dispatch.push(resolved);
1476 }
1477 self.graphs[graph_idx].op_dispatch = dispatch;
1478 }
1479 unresolved
1480 }
1481
1482 fn resolve_one(&self, node: &NodeProto) -> OpDispatch {
1483 // 1) Syscall path — single lookup by (domain, op_type).
1484 if let Some(&fn_ptr) = self
1485 .syscall_table
1486 .get(&(node.domain.clone(), node.op_type.clone()))
1487 {
1488 return OpDispatch::Stateless(fn_ptr);
1489 }
1490 // `crate::registry` for custom ops registered via
1491 // `bb::register_op!{}`. DCE strips unreferenced entries,
1492 // so a binary that doesn't `use` a library's op never
1493 // pulls it into the link image.
1494 if let Some(reg) = crate::registry::find_op(&node.domain, &node.op_type) {
1495 return OpDispatch::Stateless(reg.invoke);
1496 }
1497 // 2) Function-call paths via the symbol table.
1498 let key: FunctionKey = (
1499 node.domain.clone(),
1500 node.op_type.clone(),
1501 node.overload.clone(),
1502 );
1503 if let Some(target_fn) = self.functions.get(&key) {
1504 if node.domain == "ai.bytesandbrains.module" {
1505 let input_rename: Rc<[(String, String)]> = node
1506 .input
1507 .iter()
1508 .zip(target_fn.input.iter())
1509 .map(|(caller, formal)| (caller.clone(), formal.clone()))
1510 .collect();
1511 let output_rename: Rc<[(String, String)]> = target_fn
1512 .output
1513 .iter()
1514 .zip(node.output.iter())
1515 .map(|(formal, caller)| (formal.clone(), caller.clone()))
1516 .collect();
1517 return OpDispatch::FunctionCall {
1518 target: key,
1519 input_rename,
1520 output_rename,
1521 };
1522 }
1523 }
1524 // 3) Atomic role path. The placeholders pass stamps
1525 // `ai.bytesandbrains.slot_id` on every role NodeProto; install
1526 // populates `slot_id_to_cref` from the model's binding metadata.
1527 // Single lookup chain: NodeProto.slot_id → ComponentRef →
1528 // bound role dispatcher closure.
1529 let slot_id = node
1530 .metadata_props
1531 .iter()
1532 .find(|p| p.key == bb_ir::keys::SLOT_ID_KEY)
1533 .and_then(|p| p.value.parse::<u32>().ok());
1534 if let Some(slot_id) = slot_id {
1535 if let Some(&cref) = self.slot_id_to_cref.get(&slot_id) {
1536 if let Some(dispatch_fn) = self.dispatch_fn_for_component(cref) {
1537 return OpDispatch::Atomic {
1538 component_ref: cref,
1539 dispatch_fn,
1540 };
1541 }
1542 }
1543 }
1544 OpDispatch::Unresolved
1545 }
1546
1547 /// Look up the install-time-stamped `ProtocolDispatchFn` for a
1548 /// registered component by its `TypeId`. `resolve_dispatch`
1549 /// embeds the result into `OpDispatch::Atomic { dispatch_fn }`
1550 /// so runtime invoke skips the per-op TypeId probe.
1551 fn dispatch_fn_for_component(
1552 &self,
1553 cref: ComponentRef,
1554 ) -> Option<crate::engine::invoke::ProtocolDispatchFn> {
1555 let component = self.component(cref)?;
1556 let any: &dyn std::any::Any = component;
1557 let tid = (*any).type_id();
1558 self.role_dispatchers.get(&tid).map(|d| d.dispatch)
1559 }
1560
1561 /// Resolve a registered component by `ComponentRef`.
1562 /// `None` when no component lives at that index, or when the
1563 /// slot was `mem::take`-ed out during dispatch (the caller
1564 /// dispatching component is invisible to itself).
1565 pub fn component(&self, cref: ComponentRef) -> Option<&dyn ErasedComponent> {
1566 self.components.get(cref.as_u32() as usize)?.as_deref()
1567 }
1568
1569 /// Resolve a registered component by `ComponentRef` for
1570 /// mutation. Same null semantics as [`Self::component`].
1571 pub fn component_mut(&mut self, cref: ComponentRef) -> Option<&mut Box<dyn ErasedComponent>> {
1572 self.components.get_mut(cref.as_u32() as usize)?.as_mut()
1573 }
1574
1575 /// Take the component at `cref` out of the registry, leaving
1576 /// `None` in its slot. Returns `None` if the slot was empty (or
1577 /// out of range). Paired with [`Self::restore_component`] in
1578 /// `invoke_atomic` so a [`crate::runtime::ComponentsView`] can
1579 /// borrow the rest of `engine.components` while the dispatching
1580 /// component is held exclusively.
1581 pub(crate) fn take_component(
1582 &mut self,
1583 cref: ComponentRef,
1584 ) -> Option<Box<dyn ErasedComponent>> {
1585 self.components.get_mut(cref.as_u32() as usize)?.take()
1586 }
1587
1588 /// Put a component back into the registry slot it was taken
1589 /// from via [`Self::take_component`]. The slot index must be
1590 /// within range (i.e. the cref came from a prior registration).
1591 pub(crate) fn restore_component(
1592 &mut self,
1593 cref: ComponentRef,
1594 component: Box<dyn ErasedComponent>,
1595 ) {
1596 let idx = cref.as_u32() as usize;
1597 if let Some(slot) = self.components.get_mut(idx) {
1598 *slot = Some(component);
1599 }
1600 }
1601
1602 /// test-only registrar. Stores a bound component impl
1603 /// at `cref`. Grows the underlying Vec to fit the index, filling
1604 /// holes with `None` so out-of-order registration works.
1605 pub fn register_component(&mut self, cref: ComponentRef, component: Box<dyn ErasedComponent>) {
1606 let idx = cref.as_u32() as usize;
1607 if self.components.len() <= idx {
1608 self.components.resize_with(idx + 1, || None);
1609 }
1610 self.components[idx] = Some(component);
1611 }
1612
1613 /// Push an `(OpRef, ExecId)` onto the frontier.
1614 pub fn push_frontier(&mut self, op_ref: OpRef, exec_id: ExecId) {
1615 self.exec.frontier.push_back((op_ref, exec_id));
1616 }
1617
1618 /// Pop the next `(OpRef, ExecId)` off the frontier. Used by the
1619 /// poll cycle's drain phases.
1620 pub fn pop_frontier(&mut self) -> Option<(OpRef, ExecId)> {
1621 self.exec.frontier.pop_front()
1622 }
1623
1624 /// Pop the next fireable `(OpRef, ExecId)` off the frontier,
1625 /// honouring the per-component body-op gate. With no in-flight
1626 /// bootstrap the front of the queue fires unconditionally. With
1627 /// one or more in-flight bootstraps the gate parks any op
1628 /// touching a locked `ComponentRef`; this scan picks the first
1629 /// unparked entry so disjoint Components can keep firing while
1630 /// bootstrap runs against an unrelated slot.
1631 pub(crate) fn pop_frontier_fireable(&mut self) -> Option<(OpRef, ExecId)> {
1632 if !self.bootstrap.pending {
1633 return self.exec.frontier.pop_front();
1634 }
1635 let idx = self
1636 .exec
1637 .frontier
1638 .iter()
1639 .position(|(op_ref, exec_id)| !self.is_op_locked(*op_ref, *exec_id))?;
1640 self.exec.frontier.remove(idx)
1641 }
1642
1643 /// Snapshot of the `(NodeSiteId, ExecId)` keys currently in the
1644 /// slot table. Test-only - used to assert wire-envelope delivery
1645 /// lands at the right site without exposing the full
1646 /// `Box<dyn SlotValue>` payload type.
1647 pub fn slot_table_keys(&self) -> Vec<(NodeSiteId, ExecId)> {
1648 self.exec.slot_table.keys().copied().collect()
1649 }
1650
1651 /// Iterate every `((NodeSiteId, ExecId), Option<&dyn SlotValue>)`
1652 /// pair currently in the slot table. Test-only.
1653 pub fn slot_table_iter(
1654 &self,
1655 ) -> impl Iterator<Item = (&(NodeSiteId, ExecId), Option<&dyn SlotValue>)> {
1656 self.exec
1657 .slot_table
1658 .iter()
1659 .map(|(k, v)| (k, v.as_ref().map(|b| b.as_ref())))
1660 }
1661
1662 /// Read a slot value by `(NodeSiteId, ExecId)`. Returns `None`
1663 /// if the slot is empty or not yet allocated.
1664 pub fn slot_at(&self, site: NodeSiteId, exec_id: ExecId) -> Option<&dyn SlotValue> {
1665 self.exec
1666 .slot_table
1667 .get(&(site, exec_id))
1668 .and_then(|s| s.as_ref())
1669 .map(|b| b.as_ref())
1670 }
1671}
1672
1673/// Register a `NodeSiteId` for every function input name on
1674/// `graph` so `Engine::deliver_app_event` can seed the input
1675/// via ingress. Pre-existing entries (where a node output
1676/// happens to share a name with a function input - rare but
1677/// possible) are left alone. After the input sites are minted,
1678/// re-walk the function's node inputs and add consumer entries
1679/// for any node consuming an input - `GraphSlot::from_function`
1680/// already populated consumers for node outputs but skipped
1681/// inputs because they weren't in `site_names` yet.
1682fn register_function_input_sites(
1683 graph: &mut crate::engine::graph_slot::GraphSlot,
1684 next_node_site_id: &mut u64,
1685 graph_idx: u32,
1686) {
1687 for input_name in graph.function.input.clone().iter() {
1688 if input_name.is_empty() {
1689 continue;
1690 }
1691 graph
1692 .site_names
1693 .entry(input_name.clone())
1694 .or_insert_with(|| {
1695 let r = NodeSiteId::from(*next_node_site_id);
1696 *next_node_site_id = next_node_site_id.saturating_add(1);
1697 r
1698 });
1699 }
1700 // Backfill consumers for nodes whose inputs reference function
1701 // inputs we just minted sites for. Positional OpRefs make every
1702 // node's ref `OpRef::pack(graph_idx, node_idx)`.
1703 let nodes_inputs: Vec<(OpRef, Vec<String>)> = graph
1704 .function
1705 .node
1706 .iter()
1707 .enumerate()
1708 .map(|(idx, node)| (OpRef::pack(graph_idx, idx as u32), node.input.clone()))
1709 .collect();
1710 for (op_ref, inputs) in nodes_inputs {
1711 for input in inputs {
1712 if input.is_empty() {
1713 continue;
1714 }
1715 if let Some(&site) = graph.site_names.get(&input) {
1716 let entry = graph.consumers.entry(site).or_default();
1717 if !entry.contains(&op_ref) {
1718 entry.push(op_ref);
1719 }
1720 }
1721 }
1722 }
1723}
1724
1725/// Failure case returned by `Engine::try_charge` when admitting
1726/// `byte_count` more bytes against `ingress_byte_budget` would
1727/// exceed the cap. `budget_remaining` is the cap minus the live
1728/// `ingress_bytes_in_flight` at the time of the rejection — the
1729/// caller embeds both into the resulting `WireReceiveError` or
1730/// `AppIngressError` so subscribers see the magnitude of the
1731/// rejection without re-querying the engine.
1732#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1733pub struct BudgetExceededReason {
1734 /// Bytes the caller tried to admit.
1735 pub byte_count: usize,
1736 /// Bytes still available under `ingress_byte_budget` at the time
1737 /// of the rejection.
1738 pub budget_remaining: usize,
1739}
1740
1741/// Stable graph-name key for `Engine.graphs` derived from a
1742/// `FunctionKey`. Joins the tuple parts so two distinct keys produce
1743/// distinct strings, preserving the symbol-table semantics.
1744pub(crate) fn graph_name_for(key: &FunctionKey) -> String {
1745 let (domain, name, overload) = key;
1746 if overload.is_empty() {
1747 format!("{domain}::{name}")
1748 } else {
1749 format!("{domain}::{name}#{overload}")
1750 }
1751}
1752
1753
1754#[cfg(test)]
1755#[path = "core_multi_bootstrap_tests.rs"]
1756mod multi_bootstrap_tests;
1757
1758#[cfg(test)]
1759#[path = "core_op_locked_tests.rs"]
1760mod op_locked_tests;
1761
1762#[cfg(test)]
1763#[path = "core_bootstrap_input_tests.rs"]
1764mod bootstrap_input_tests;