bb_runtime/node/mod.rs
1//! `Node` — the framework's runtime construct, installed via
2//! [`bytesandbrains::install`] and driven through `Node::poll`.
3//!
4//! Lifecycle:
5//!
6//! ```ignore
7//! let model = MyModule.build()?;
8//! let compiled = bb::Compiler::new()
9//! .bind_backend::<CpuBackend>("compute")
10//! .compile(model)?;
11//! let mut node = bb::install(peer_id, addr, compiled, &["MyModule"], bb::Config::new())?;
12//! loop { node.poll(&mut cx); }
13//! ```
14//!
15//! Construction-time validation (passport check, binding-table
16//! parse, concrete construction) surfaces `InstallError` at install
17//! time — the engine is ready to dispatch by the time `Node::poll`
18//! sees it.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use prost::Message;
24
25use crate::concrete::ComponentHandle;
26use crate::engine::Engine;
27use crate::errors::delivery::DeliveryError;
28use crate::errors::restore::RestoreError;
29use crate::snapshot::transient::TransientSnapshot;
30use crate::snapshot::{NamedComponentSnapshot, NamedGraphSnapshot, NodeConfigSnapshot};
31use bb_ir::proto::onnx::ModelProto;
32
33/// Type-erased `EngineStep` observer installed onto a `Node` via
34/// [`Node::set_telemetry_tap`]. Each `Node::poll` call invokes the
35/// closure once per produced step.
36pub type TelemetryTap = Box<dyn FnMut(&crate::engine::EngineStep)>;
37
38/// Selector for [`Node::run_bootstrap`]. One enum, four variants
39/// covering every bootstrap-kick shape — install-order, by Module
40/// target name, by Module target name with staged inputs, and by
41/// Component slot.
42pub enum BootstrapTarget<'a> {
43 /// Drive every install-order Module bootstrap target on this Node.
44 /// Equivalent to the F4 host kick: arms + seeds the install-order
45 /// queue, then drives the engine until every queued target reaches
46 /// `BootstrapComplete` or one suspends on async.
47 All,
48 /// Drive specific Module bootstrap targets by name (with empty
49 /// inputs). Surfaces `BootstrapError::UnknownTarget` when any name
50 /// is not a registered Module bootstrap; the batch validates
51 /// atomically before any staging happens. Useful when the host
52 /// knows every target's bootstrap takes no formals.
53 ModuleNames(&'a [&'a str]),
54 /// Drive Module bootstrap targets with explicit inputs. Each
55 /// `BootstrapRequest`'s `inputs` are validated against the
56 /// target's declared formals; the framework copies each
57 /// borrowed `&[u8]` into engine-owned storage (Principle 1a).
58 /// On any validation failure the engine's bootstrap state stays
59 /// untouched.
60 ModuleRequests(&'a [crate::engine::BootstrapRequest<'a>]),
61 /// Drive Component bootstraps by slot name. Each slot must resolve
62 /// to a registered Component bootstrap; the batch validates
63 /// atomically before any dispatch fires. Slots fire in slice
64 /// order through the registered Bootstrap dispatcher.
65 Slots(&'a [&'a str]),
66}
67
68/// Constructed BB Node ready to drive ML work. Produced by
69/// [`bytesandbrains::install`] — by the time the host holds one,
70/// the engine has resolved its dispatch table, registered every
71/// bound concrete, and installed the target function as the root
72/// graph.
73pub struct Node {
74 pub(crate) engine: Engine,
75 pub(crate) config: NodeConfig,
76 pub(crate) incarnation: u64,
77 /// Registered target names → the shared compiled artifact. Every
78 /// entry references the same underlying `ModelProto` so a
79 /// multi-target install (e.g. `Client` + `Server` partitions on
80 /// one peer) stores the proto bytes exactly once. The `Arc` is
81 /// installed at [`Self::register_module`]; both `deliver_event`
82 /// and `invoke` consult `contains_key` for routing, and future
83 /// per-target input-name lookups can read through this shared
84 /// handle without cloning the proto.
85 pub(crate) module_index: HashMap<String, Arc<ModelProto>>,
86 /// Single shared handle to the installed model. Equal to the
87 /// `Arc` every `module_index` entry references; held separately
88 /// so callers (and Debug) can reach the proto without first
89 /// resolving a target name. `None` before
90 /// [`Self::register_module`] runs.
91 pub(crate) model: Option<Arc<ModelProto>>,
92 pub(crate) component_handles: Vec<ComponentHandle>,
93 /// Optional observer fed every `EngineStep` produced by
94 /// `Engine::poll`. Set via [`Node::set_telemetry_tap`]; when
95 /// `None` the engine's outputs pass through unchanged.
96 pub(crate) telemetry_tap: Option<TelemetryTap>,
97}
98
99impl std::fmt::Debug for Node {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.debug_struct("Node")
102 .field("incarnation", &self.incarnation)
103 .field("modules", &self.module_index.keys().collect::<Vec<_>>())
104 .field("component_count", &self.component_handles.len())
105 .finish_non_exhaustive()
106 }
107}
108
109// --- Construction + chain methods --------------
110
111impl Node {
112 /// Bare constructor used by [`bytesandbrains::install`]. Not
113 /// part of the public surface — every external caller routes
114 /// through `install` so the compilation passport, binding-table
115 /// parse, and concrete construction surface a typed
116 /// `InstallError` instead of leaving the Node in a half-built
117 /// state. Doc-hidden to match the rest of the install-only
118 /// surface (`engine_install_handle`, `push_linked_component`,
119 /// `set_model`, `register_module`); the facade lives outside
120 /// `bb-runtime`, so a `pub(crate)` constructor would not reach
121 /// it — `#[doc(hidden)]` keeps the symbol off the rendered
122 /// docs while letting the canonical `install` path call it.
123 ///
124 /// `addresses` is the ordered local-address bag registered in
125 /// the engine's AddressBook against `peer_id`. The wire syscall
126 /// resolves `peer = self.peer_id()` against the bag without an
127 /// explicit `add_peer` call. An empty vec skips self-registration
128 /// so downstream "no addresses" errors surface at the protocol
129 /// level; passing `vec![Address::empty()]` registers a single
130 /// placeholder entry that filters out as empty downstream.
131 #[doc(hidden)]
132 pub fn new(peer_id: crate::ids::PeerId, addresses: Vec<crate::framework::Address>) -> Self {
133 let config = NodeConfig::new(peer_id);
134 let mut engine = Engine::with_bus_capacity(config.bus_capacity);
135 engine.self_peer = peer_id;
136 engine.apply_config_caps(&config);
137 engine.register_all_framework_syscalls();
138 let real_addresses: Vec<crate::framework::Address> = addresses
139 .into_iter()
140 .filter(|a| a != &crate::framework::Address::empty())
141 .collect();
142 if !real_addresses.is_empty() {
143 let _ = engine
144 .framework
145 .address_book
146 .add_peer(peer_id, real_addresses);
147 }
148 Self {
149 engine,
150 config,
151 incarnation: 0,
152 module_index: HashMap::new(),
153 model: None,
154 component_handles: Vec::new(),
155 telemetry_tap: None,
156 }
157 }
158
159 /// Override the default [`NodeConfig`] (the cycle / async /
160 /// outbound caps). Must be called before [`crate::install`]
161 /// for the new caps to apply at build time.
162 pub fn with_config(mut self, cfg: NodeConfig) -> Self {
163 self.config = cfg.clone();
164 self.engine.apply_config_caps(&cfg);
165 self
166 }
167}
168
169// --- Snapshot / restore / runtime methods -----
170
171impl Node {
172 /// Capture the snapshottable state. refuses to
173 /// proceed when the bus still carries un-drained events that a
174 /// restore would silently drop or re-fire stale; callers drive
175 /// `Node::poll` to quiescence first and retry.
176 pub fn snapshot(&self) -> Result<crate::snapshot::NodeSnapshot, crate::errors::SnapshotError> {
177 // Bus quiescence guard. `len()` reads the queue without
178 // draining; `dropped_since_last_drain` would only be non-zero
179 // if a publish overflowed the cap since the last poll. Both
180 // are surfaced so the host can decide whether the loss is
181 // tolerable.
182 let queued = self.engine.bus.len();
183 if queued > 0 {
184 return Err(crate::errors::SnapshotError::BusNotDrained { queued, dropped: 0 });
185 }
186 Ok(self.snapshot_inner())
187 }
188
189 fn snapshot_inner(&self) -> crate::snapshot::NodeSnapshot {
190 let mut components: Vec<NamedComponentSnapshot> = Vec::new();
191 for handle in &self.component_handles {
192 let cref = crate::ids::ComponentRef::from(handle.instance_id);
193 let Some(instance) = self.engine.component(cref) else {
194 continue;
195 };
196 let state_bytes = (handle.serialize_fn)(instance);
197 components.push(NamedComponentSnapshot {
198 type_name: handle.type_name.to_string(),
199 instance_id: handle.instance_id,
200 package: handle.package,
201 state_bytes,
202 });
203 }
204
205 let mut graphs: Vec<NamedGraphSnapshot> = Vec::new();
206 for (name, installed) in self.engine.graphs_named() {
207 let function_proto_bytes = installed.function.encode_to_vec();
208 graphs.push(NamedGraphSnapshot {
209 name: name.to_string(),
210 function_proto_bytes,
211 });
212 }
213
214 let counters: std::collections::HashMap<String, u64> =
215 self.engine.framework.counters.clone();
216
217 let event_subscriptions: std::collections::HashMap<String, Vec<u64>> = self
218 .engine
219 .event_subscriptions
220 .iter()
221 .map(|(kind, sites)| {
222 (
223 kind.clone(),
224 sites.iter().map(|s| s.as_u64()).collect::<Vec<u64>>(),
225 )
226 })
227 .collect();
228
229 let address_book: Vec<crate::snapshot::transient::AddressBookEntrySnapshot> = self
230 .engine
231 .framework
232 .address_book
233 .iter()
234 .map(
235 |(peer, addrs, ref_count)| crate::snapshot::transient::AddressBookEntrySnapshot {
236 peer_id: peer.to_bytes(),
237 addresses: addrs.iter().map(|a| a.to_bytes()).collect(),
238 ref_count,
239 },
240 )
241 .collect();
242
243 let peer_governor = capture_peer_governor(&self.engine.framework.peer_state.governor);
244 let backoff_table = self
245 .engine
246 .framework
247 .peer_state
248 .backoff
249 .iter()
250 .map(|(p, s)| crate::snapshot::transient::BackoffEntry {
251 peer: p.to_bytes(),
252 attempts: s.attempts,
253 last_attempt_ns: s.last_attempt_ns,
254 next_retry_ns: s.next_retry_ns,
255 })
256 .collect();
257 let pending_async = self
258 .engine
259 .exec
260 .pending_async
261 .iter()
262 .map(|(cmd, p)| {
263 (
264 cmd.as_u64(),
265 crate::snapshot::transient::PendingAsyncSnapshot {
266 op_ref: p.op_ref.as_u64(),
267 exec_id: p.exec_id.as_u64(),
268 output_sites: p.output_sites.iter().map(|s| s.as_u64()).collect(),
269 deadline_ns: p.deadline_ns,
270 },
271 )
272 })
273 .collect();
274 let pending_outbound = self
275 .engine
276 .framework
277 .outbound_queue
278 .iter_for_snapshot()
279 .map(|env| crate::snapshot::transient::PendingOutboundEntry {
280 envelope_bytes: crate::envelope::EnvelopeCodec::encode(env),
281 redelivered: true,
282 })
283 .collect();
284
285 let transient = TransientSnapshot {
286 framework: crate::snapshot::transient::FrameworkSnapshot {
287 counters,
288 fired_phases: self.engine.fired_phases.clone(),
289 address_book,
290 peer_governor,
291 backoff_table,
292 pending_outbound,
293 // multihash bytes + counter
294 // persistence. The peer_id_bytes lets a multihash
295 // PeerId survive restore; the counters prevent
296 // CommandId / ExecId collisions on restart.
297 peer_id_bytes: self.engine.self_peer.to_bytes(),
298 next_command_id: self.engine.exec.ids.next_command_id,
299 next_exec_id: self.engine.exec.ids.next_exec_id,
300 spec_version: crate::snapshot::transient::CURRENT_SNAPSHOT_SPEC_VERSION,
301 },
302 bus: crate::snapshot::transient::TypedBusSnapshot {
303 event_subscriptions,
304 },
305 pending_async,
306 ..Default::default()
307 };
308
309 crate::snapshot::NodeSnapshot {
310 incarnation: self.incarnation,
311 config: NodeConfigSnapshot::from(&self.config),
312 graphs,
313 components,
314 transient,
315 }
316 }
317
318 /// Restore a Node's state from a snapshot.
319 pub fn restore(
320 &mut self,
321 snap: crate::snapshot::NodeSnapshot,
322 ) -> Result<(), crate::errors::restore::RestoreError> {
323 if snap.transient.framework.spec_version
324 != crate::snapshot::transient::CURRENT_SNAPSHOT_SPEC_VERSION
325 {
326 return Err(RestoreError::SpecVersionMismatch {
327 got: snap.transient.framework.spec_version,
328 expected: crate::snapshot::transient::CURRENT_SNAPSHOT_SPEC_VERSION,
329 });
330 }
331
332 // Reinstall each captured graph. The function bytes are
333 // prost-encoded FunctionProto blobs from the source Node's
334 // snapshot path; decode failures fail the restore loudly
335 // so callers don't silently lose a graph.
336 for graph_snap in snap.graphs {
337 let function = bb_ir::proto::onnx::FunctionProto::decode(
338 graph_snap.function_proto_bytes.as_slice(),
339 )
340 .map_err(|e| {
341 RestoreError::SnapshotMismatch(format!(
342 "restore: failed to decode graph `{}`: {e}",
343 graph_snap.name,
344 ))
345 })?;
346 self.engine.install_graph(graph_snap.name, function);
347 }
348
349 for comp_snap in snap.components {
350 let cref = crate::ids::ComponentRef::from(comp_snap.instance_id);
351 let Some(handle) = self.component_handles.iter().find(|h| {
352 h.type_name == comp_snap.type_name && h.instance_id == comp_snap.instance_id
353 }) else {
354 return Err(RestoreError::SnapshotMismatch(format!(
355 "no handle on live Node for component {}@{}",
356 comp_snap.type_name, comp_snap.instance_id,
357 )));
358 };
359 let restored = (handle.restore_fn)(&comp_snap.state_bytes).map_err(|source| {
360 RestoreError::ComponentRestoreFailed {
361 type_name: comp_snap.type_name.clone(),
362 source,
363 }
364 })?;
365 self.engine.register_component(cref, restored);
366 }
367
368 self.engine.framework.counters.clear();
369 for (name, value) in snap.transient.framework.counters {
370 self.engine.framework.counters.insert(name, value);
371 }
372 self.engine.fired_phases = snap.transient.framework.fired_phases;
373 self.engine.event_subscriptions.clear();
374 for (kind, sites) in snap.transient.bus.event_subscriptions {
375 self.engine.event_subscriptions.insert(
376 kind,
377 sites
378 .into_iter()
379 .map(crate::ids::NodeSiteId::from)
380 .collect(),
381 );
382 }
383
384 for entry in snap.transient.framework.address_book {
385 let mut decoded: Vec<crate::framework::Address> =
386 Vec::with_capacity(entry.addresses.len());
387 let mut malformed = false;
388 for bytes in &entry.addresses {
389 match crate::framework::Address::from_bytes(bytes) {
390 Ok(a) => decoded.push(a),
391 Err(_) => {
392 malformed = true;
393 break;
394 }
395 }
396 }
397 if malformed || decoded.is_empty() {
398 continue;
399 }
400 let Ok(peer) = crate::ids::PeerId::from_bytes(&entry.peer_id) else {
401 continue;
402 };
403 self.engine
404 .framework
405 .address_book
406 .restore_entry(peer, decoded, entry.ref_count);
407 }
408 self.engine.framework.peer_state.governor = crate::framework::PeerGovernor::new();
409 let governor_snap = snap.transient.framework.peer_governor;
410 self.engine
411 .framework
412 .peer_state
413 .governor
414 .set_failure_threshold(governor_snap.failure_threshold);
415 for peer_bytes in governor_snap.blocklist {
416 let Ok(peer) = crate::ids::PeerId::from_bytes(&peer_bytes) else {
417 continue;
418 };
419 self.engine.framework.peer_state.governor.block(peer);
420 }
421 if let Some(allow) = governor_snap.allowlist {
422 let set: std::collections::HashSet<_> = allow
423 .into_iter()
424 .filter_map(|b| crate::ids::PeerId::from_bytes(&b).ok())
425 .collect();
426 self.engine
427 .framework
428 .peer_state
429 .governor
430 .set_allowlist(Some(set));
431 }
432 for (peer_bytes, consecutive_failures, last_event_ns, down) in governor_snap.health {
433 let Ok(peer) = crate::ids::PeerId::from_bytes(&peer_bytes) else {
434 continue;
435 };
436 self.engine.framework.peer_state.governor.restore_health(
437 peer,
438 crate::framework::PeerHealth {
439 consecutive_failures,
440 last_event_ns,
441 down,
442 },
443 );
444 }
445 for entry in snap.transient.framework.backoff_table {
446 let Ok(peer) = crate::ids::PeerId::from_bytes(&entry.peer) else {
447 continue;
448 };
449 self.engine.framework.peer_state.backoff.restore_state(
450 peer,
451 crate::framework::backoff_table::BackoffState {
452 attempts: entry.attempts,
453 last_attempt_ns: entry.last_attempt_ns,
454 next_retry_ns: entry.next_retry_ns,
455 },
456 );
457 }
458 for (cmd_u64, snap_p) in snap.transient.pending_async {
459 self.engine.exec.pending_async.insert(
460 crate::ids::CommandId::from(cmd_u64),
461 crate::engine::PendingAsync {
462 op_ref: crate::ids::OpRef::from(snap_p.op_ref),
463 exec_id: crate::ids::ExecId::from(snap_p.exec_id),
464 output_sites: snap_p
465 .output_sites
466 .into_iter()
467 .map(crate::ids::NodeSiteId::from)
468 .collect(),
469 deadline_ns: snap_p.deadline_ns,
470 },
471 );
472 }
473 for entry in snap.transient.framework.pending_outbound {
474 if let Ok(env) = crate::envelope::EnvelopeCodec::decode_capped(
475 &entry.envelope_bytes,
476 &self.config.envelope_caps,
477 ) {
478 self.engine.framework.outbound_queue.push(env);
479 }
480 }
481
482 self.incarnation = snap.incarnation + 1;
483
484 Ok(())
485 }
486
487 /// Reset transient runtime state without dropping the bound
488 /// components or installed graphs.
489 pub fn clear(&mut self) {
490 self.engine.exec.frontier.clear();
491 self.engine.exec.slot_table.clear();
492 self.engine.exec.execution_state.clear();
493 self.engine.exec.pending_async.clear();
494 self.engine.exec.pending_completions.clear();
495 let _ = self.engine.ingress.drain_all();
496 self.engine.fired_phases.clear();
497 self.engine.framework.counters.clear();
498 }
499
500 /// Current incarnation count. Bumped on every `restore()`.
501 pub fn incarnation(&self) -> u64 {
502 self.incarnation
503 }
504
505 /// Names of every module registered at construction time.
506 pub fn loaded_modules(&self) -> Vec<&str> {
507 self.module_index.keys().map(|s| s.as_str()).collect()
508 }
509
510 /// References to every owned component handle.
511 pub fn linked_components(&self) -> Vec<&ComponentHandle> {
512 self.component_handles.iter().collect()
513 }
514
515 /// Local peer identity.
516 pub fn peer_id(&self) -> crate::ids::PeerId {
517 self.config.peer_id
518 }
519
520 /// Read-only execution-state lookup.
521 pub fn execution_state(
522 &self,
523 exec_id: crate::ids::ExecId,
524 ) -> Option<&crate::engine::ExecutionState> {
525 self.engine.exec.execution_state.get(&exec_id)
526 }
527
528 /// Number of `AsyncSuspended` Ops currently awaiting completion.
529 pub fn pending_async_count(&self) -> usize {
530 self.engine.exec.pending_async.len()
531 }
532
533 /// Snapshot of the engine's hot-path counters.
534 pub fn engine_stats(&self) -> crate::engine::EngineStats {
535 self.engine.engine_stats()
536 }
537
538 /// Mutable handle to the engine for install-time setup
539 /// (registering components, binding slots, installing the
540 /// target graph, resolving dispatch). Doc-hidden because it
541 /// crosses the `pub(crate)` field boundary: the user-facing
542 /// entry point is `bb::install()`, which calls this accessor
543 /// to drive `engine.register_component` / `engine.bind_slot` /
544 /// `engine.install_graph` / `engine.resolve_dispatch`.
545 #[doc(hidden)]
546 pub fn engine_install_handle(&mut self) -> &mut crate::engine::Engine {
547 &mut self.engine
548 }
549
550 /// Push a `ComponentHandle` onto the linked-components list.
551 /// Called from `bb::install()` per supplied component so
552 /// `snapshot()` can capture state via the recorded
553 /// `serialize_fn`. Doc-hidden — the public surface for setting
554 /// up a Node is `bb::install()`.
555 #[doc(hidden)]
556 pub fn push_linked_component(&mut self, handle: crate::concrete::ComponentHandle) {
557 self.component_handles.push(handle);
558 }
559
560 /// Set the shared compiled artifact this Node was installed with.
561 /// Called once per `bb::install()` from the install path before
562 /// the per-target `register_module` calls run; downstream
563 /// `register_module` calls share the same `Arc` instead of
564 /// cloning the proto per target. Doc-hidden — the user surface
565 /// is `bb::install()`.
566 #[doc(hidden)]
567 pub fn set_model(&mut self, model: ModelProto) {
568 self.model = Some(Arc::new(model));
569 }
570
571 /// Register `module_name` as a valid target for `deliver_event` /
572 /// `invoke`. Called from `bb::install()` once per resolved
573 /// target name after the engine's root graph is installed; the
574 /// stored `Arc` is the same handle [`Self::set_model`] minted
575 /// for every target, so multi-target installs hold the proto
576 /// bytes exactly once. Doc-hidden — the user surface is
577 /// `bb::install()`.
578 #[doc(hidden)]
579 pub fn register_module(&mut self, module_name: String) {
580 let model = self
581 .model
582 .clone()
583 .expect("Node::set_model must run before register_module");
584 self.module_index.insert(module_name, model);
585 }
586
587 /// The compiled `ModelProto` the Node was installed with, or
588 /// `None` if the Node was built internally by a crate-private
589 /// path that bypassed [`crate::install`] (the public entry point
590 /// always sets a model). Returns a shared `Arc` so callers can
591 /// read per-target metadata without cloning the proto.
592 pub fn model(&self) -> Option<Arc<ModelProto>> {
593 self.model.clone()
594 }
595
596 /// Look up the `ComponentRef` bound at `slot_name` in the
597 /// engine's generic slot registry. Returns `None` when no slot
598 /// of that name is bound.
599 pub fn slot(&self, slot_name: &str) -> Option<crate::ids::ComponentRef> {
600 self.engine.slot(slot_name)
601 }
602
603 /// Iterate every `(slot_name, ComponentRef)` pair bound in the
604 /// engine's slot registry.
605 pub fn slots_iter(&self) -> impl Iterator<Item = (&str, crate::ids::ComponentRef)> {
606 self.engine.slots_iter()
607 }
608
609 /// Inventory-declared roles for a registered component. Read
610 /// from the per-Engine `component_roles` map populated by
611 /// `ensure_ready` from
612 /// `crate::registry::roles_for_component(T::TYPE_NAME)`. Returns
613 /// the empty set when the component wasn't registered via a
614 /// `#[derive(bb::<Role>)]` chain (test fixtures that hand-implement
615 /// role traits surface here as empty).
616 pub fn roles_for(
617 &self,
618 cref: crate::ids::ComponentRef,
619 ) -> std::collections::HashSet<crate::registry::ComponentRole> {
620 self.engine.roles_for(cref)
621 }
622
623 /// Add `peer` to the inbound + outbound blocklist.
624 pub fn block_peer(&mut self, peer: crate::ids::PeerId) {
625 self.engine.framework.peer_state.governor.block(peer);
626 }
627
628 /// Remove `peer` from the blocklist.
629 pub fn unblock_peer(&mut self, peer: crate::ids::PeerId) {
630 self.engine.framework.peer_state.governor.unblock(peer);
631 }
632
633 /// Configure an allowlist for inbound + outbound delivery.
634 pub fn set_allowlist(
635 &mut self,
636 allowlist: Option<std::collections::HashSet<crate::ids::PeerId>>,
637 ) {
638 self.engine
639 .framework
640 .peer_state
641 .governor
642 .set_allowlist(allowlist);
643 }
644
645 /// Per-peer health snapshot.
646 pub fn peer_health(&self, peer: crate::ids::PeerId) -> Option<crate::framework::PeerHealth> {
647 self.engine.framework.peer_state.governor.peer_health(peer)
648 }
649
650 /// Resolve a `PeerId` to its ordered address list (cloned).
651 pub fn resolve_peer_addresses(
652 &self,
653 peer: crate::ids::PeerId,
654 ) -> Option<Vec<crate::framework::Address>> {
655 self.engine
656 .framework
657 .address_book
658 .lookup(peer)
659 .map(|addrs| addrs.to_vec())
660 }
661
662 /// Install an `EngineStep` observer onto the live Node.
663 pub fn set_telemetry_tap<F>(&mut self, tap: F)
664 where
665 F: FnMut(&crate::engine::EngineStep) + 'static,
666 {
667 self.telemetry_tap = Some(Box::new(tap));
668 }
669
670 /// Announce a peer with one or more reachable addresses.
671 pub fn add_peer(
672 &mut self,
673 peer: crate::ids::PeerId,
674 addresses: Vec<crate::framework::Address>,
675 ) -> Result<(), crate::framework::AddressBookError> {
676 self.engine.framework.address_book.add_peer(peer, addresses)
677 }
678
679 /// Concern 2 - cheap-clone handle to the shared
680 /// `IngressQueue`. Used by off-thread transport / clock
681 /// adapters to push events into the Node.
682 pub fn ingress_handle(&self) -> crate::ingress::IngressQueueRef {
683 crate::ingress::IngressQueueRef::new(self.engine.ingress_queue_handle())
684 }
685
686 /// Read-only view on the registered NodeConfig.
687 pub fn config(&self) -> &NodeConfig {
688 &self.config
689 }
690
691 /// First entry of the local-address bag, or [`Address::empty`]
692 /// when none registered. The AddressBook entry for
693 /// `self.peer_id()` is the source of truth.
694 pub fn peer_address(&self) -> crate::framework::Address {
695 self.local_addresses()
696 .first()
697 .cloned()
698 .unwrap_or_else(crate::framework::Address::empty)
699 }
700
701 /// Ordered local-address bag for this Node. Reads directly from
702 /// the AddressBook entry keyed by `self.peer_id()`; returns an
703 /// empty slice when the Node was installed with no addresses or
704 /// the bag was pruned to zero entries.
705 pub fn local_addresses(&self) -> &[crate::framework::Address] {
706 self.engine
707 .framework
708 .address_book
709 .lookup(self.engine.self_peer)
710 .unwrap_or(&[])
711 }
712
713 /// Append `address` to the local-address bag. Creates the
714 /// AddressBook entry if none exists; idempotent on duplicates.
715 pub fn add_local_address(
716 &mut self,
717 address: crate::framework::Address,
718 ) -> Result<(), crate::framework::AddressBookError> {
719 let self_peer = self.engine.self_peer;
720 let book = &mut self.engine.framework.address_book;
721 if book.lookup(self_peer).is_some() {
722 book.register_address(self_peer, address)
723 } else {
724 book.add_peer(self_peer, vec![address])
725 }
726 }
727
728 /// Prune `address` from the local-address bag. Errors when no
729 /// entry exists; succeeds (no-op) when the bag has no matching
730 /// address.
731 pub fn forget_local_address(
732 &mut self,
733 address: &crate::framework::Address,
734 ) -> Result<(), crate::framework::AddressBookError> {
735 let self_peer = self.engine.self_peer;
736 self.engine
737 .framework
738 .address_book
739 .forget_address(self_peer, address)
740 }
741
742 /// Drive bootstrap targets to completion. Returns every
743 /// `EngineStep` the bootstrap path emitted, including each
744 /// drained phase's `BootstrapComplete` or the yielding
745 /// `WaitingOnBootstrap`. Idempotent: a Node whose bootstrap
746 /// already completed (or a Node with no install-order recording)
747 /// returns an empty vec.
748 ///
749 /// Variant semantics:
750 ///
751 /// - [`BootstrapTarget::All`] arms + seeds the install-order
752 /// queue, then polls until every queued target reaches
753 /// `BootstrapComplete` or one suspends on an async completion.
754 /// Equivalent to the F4 host kick.
755 /// - [`BootstrapTarget::ModuleNames`] / [`BootstrapTarget::ModuleRequests`]
756 /// validate target names + input formals atomically — the whole
757 /// batch rejects with `BootstrapError::UnknownTarget` /
758 /// `BootstrapError::AlreadyTransitivelyQueued` /
759 /// `BootstrapError::UnknownInput` / `BootstrapError::MissingInput`
760 /// before any staging happens. On success the framework copies
761 /// each request's borrowed `&[u8]` inputs into engine-owned
762 /// storage (Principle 1a: caller slices may drop the moment
763 /// this call returns), then drives the staged bodies to
764 /// completion.
765 /// - [`BootstrapTarget::Slots`] resolves each slot to a registered
766 /// Component bootstrap, validates the whole batch atomically,
767 /// then fires each in slice order through the dispatcher. The
768 /// drain loop runs after all slots fire so synchronous
769 /// `Immediate` dispatches retire inline and async ones surface
770 /// `WaitingOnBootstrap`.
771 ///
772 /// On async suspension the host posts the matured completion via
773 /// the ingress and re-invokes `run_bootstrap` to drain the
774 /// resumed op.
775 ///
776 /// Body-phase ops touching a locked `ComponentRef` do not fire
777 /// during this call; the `is_op_locked` gate in
778 /// `pop_frontier_fireable` keeps them parked until the bootstrap
779 /// in-flight set drops.
780 pub fn run_bootstrap(
781 &mut self,
782 target: BootstrapTarget<'_>,
783 ) -> Result<Vec<crate::engine::EngineStep>, crate::errors::BootstrapError> {
784 match target {
785 BootstrapTarget::All => {
786 // Arm + seed the install-order queue (no-op when no
787 // targets remain). `Engine::run_bootstrap` returns
788 // false on a fully drained Node so the drain loop
789 // exits immediately.
790 let armed = self.engine.run_bootstrap();
791 if !armed && !self.engine.bootstrap_pending() {
792 return Ok(Vec::new());
793 }
794 }
795 BootstrapTarget::ModuleNames(targets) => {
796 let requests: Vec<crate::engine::BootstrapRequest<'_>> = targets
797 .iter()
798 .map(|t| crate::engine::BootstrapRequest {
799 target: t,
800 inputs: &[],
801 })
802 .collect();
803 self.stage_module_requests(&requests)?;
804 }
805 BootstrapTarget::ModuleRequests(requests) => {
806 self.stage_module_requests(requests)?;
807 }
808 BootstrapTarget::Slots(slots) => {
809 // Pre-flight: every slot must resolve to a registered
810 // Component bootstrap. Atomic — no firing happens
811 // until every slot in the batch passes.
812 for slot in slots {
813 if !self.engine.has_component_bootstrap(slot) {
814 return Err(crate::errors::BootstrapError::UnknownTarget {
815 target_name: (*slot).to_string(),
816 available: self
817 .engine
818 .module_bootstrap_target_names()
819 .into_iter()
820 .chain(std::iter::empty())
821 .collect(),
822 });
823 }
824 }
825 for slot in slots {
826 self.engine.fire_component_bootstrap_by_slot(slot)?;
827 }
828 }
829 }
830 Ok(self.drain_bootstrap())
831 }
832
833 /// Pre-flight + stage a batch of `BootstrapRequest`s through the
834 /// engine's F5 immediate-fire entry point. Validates target
835 /// registration and duplicate detection atomically before any
836 /// per-input copy happens; on any error path the engine's
837 /// bootstrap state stays untouched.
838 fn stage_module_requests(
839 &mut self,
840 requests: &[crate::engine::BootstrapRequest<'_>],
841 ) -> Result<(), crate::errors::BootstrapError> {
842 let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
843 for req in requests {
844 if !self.engine.module_bootstrap_registered(req.target) {
845 return Err(crate::errors::BootstrapError::UnknownTarget {
846 target_name: req.target.to_string(),
847 available: self.engine.module_bootstrap_target_names(),
848 });
849 }
850 if !seen.insert(req.target) {
851 return Err(crate::errors::BootstrapError::AlreadyTransitivelyQueued {
852 target_name: req.target.to_string(),
853 });
854 }
855 }
856 for req in requests {
857 // Borrow each request through its declared lifetime; the
858 // engine copies bytes into framework-owned storage so the
859 // borrow ends when this call returns.
860 self.enqueue_bootstrap_request(crate::engine::BootstrapRequest {
861 target: req.target,
862 inputs: req.inputs,
863 })?;
864 }
865 Ok(())
866 }
867
868 /// Drive the engine's poll loop until every staged bootstrap
869 /// drains to `BootstrapComplete` or one suspends with
870 /// `WaitingOnBootstrap`. Forwards each step through the
871 /// telemetry tap (matching `Node::poll` semantics).
872 fn drain_bootstrap(&mut self) -> Vec<crate::engine::EngineStep> {
873 let mut steps = Vec::new();
874 loop {
875 let batch = self.engine.poll();
876 let waiting = matches!(
877 batch.last(),
878 Some(crate::engine::EngineStep::WaitingOnBootstrap)
879 );
880 if let Some(tap) = self.telemetry_tap.as_mut() {
881 for step in &batch {
882 tap(step);
883 }
884 }
885 steps.extend(batch);
886 if !self.engine.bootstrap_pending() || waiting {
887 break;
888 }
889 }
890 steps
891 }
892
893 /// Stage a host-supplied [`crate::engine::BootstrapRequest`] +
894 /// fire the addressed Module bootstrap immediately. Crate-internal
895 /// helper for [`Self::run_bootstrap`]'s
896 /// [`BootstrapTarget::ModuleRequests`] arm. Forwards to the
897 /// engine's F5 immediate-fire entry point: validates
898 /// `request.inputs` against the target's declared formals, runs
899 /// the Principle 1a copy (`try_charge` → `try_reserve_exact` →
900 /// `extend_from_slice`) against the engine's ingress byte budget,
901 /// and pushes the body ops onto the frontier. The caller's
902 /// borrowed `&[u8]` slices may drop the moment this call returns.
903 ///
904 /// Returns `BootstrapError::UnknownTarget` /
905 /// `BootstrapError::UnknownInput` / `BootstrapError::MissingInput`
906 /// when the request fails validation, and
907 /// `BootstrapError::AllocationFailed` when the engine's budget /
908 /// allocator rejects a staged payload. On any error path the
909 /// engine's bootstrap state stays untouched — partially admitted
910 /// byte charges are released before the error surfaces.
911 pub(crate) fn enqueue_bootstrap_request(
912 &mut self,
913 request: crate::engine::BootstrapRequest<'_>,
914 ) -> Result<(), crate::errors::BootstrapError> {
915 self.engine.enqueue_bootstrap_request(request)
916 }
917
918 /// Snapshot of the engine's bootstrap lifecycle. Returns
919 /// `BootstrapStatus::Idle` when no bootstrap is queued or
920 /// in-flight, `Running` when at least one bootstrap is in-flight
921 /// (the body gate is parking touched ops), and `WaitingForInput`
922 /// when the install-order queue still has unseeded targets or
923 /// host-staged requests sit on `pending_requests` / `waiting`.
924 /// Pure read — does not advance any queue.
925 pub fn bootstrap_status(&self) -> crate::engine::bootstrap::BootstrapStatus {
926 self.engine.bootstrap_status()
927 }
928
929 /// Drive one poll cycle. Returns `Pending` when the engine
930 /// drains to quiescence (the ingress waker is registered with
931 /// `cx`); otherwise yields the steps the engine made progress
932 /// on. Construction-time validation lives in [`crate::node::Node`]'s
933 /// installation path (see `bytesandbrains::install`) and surfaces
934 /// `InstallError` before the first poll.
935 pub fn poll(
936 &mut self,
937 cx: &mut std::task::Context<'_>,
938 ) -> std::task::Poll<Vec<crate::engine::EngineStep>> {
939 let steps = self.engine.poll();
940 if let Some(tap) = self.telemetry_tap.as_mut() {
941 for step in &steps {
942 tap(step);
943 }
944 }
945 if steps.is_empty() {
946 self.engine.ingress.register_waker(cx.waker());
947 return std::task::Poll::Pending;
948 }
949 std::task::Poll::Ready(steps)
950 }
951
952 /// Decode and push inbound wire bytes onto the ingress queue.
953 /// Routes through `EnvelopeCodec::decode_capped` so malformed,
954 /// schema-mismatched, or oversize buffers fail with
955 /// `DeliveryError::InvalidEnvelope` BEFORE any prost allocation.
956 /// The transport layer supplies `src_peer` so the engine can
957 /// consult `PeerGovernor::check_inbound` before routing.
958 pub fn deliver_inbound(
959 &mut self,
960 src_peer: crate::ids::PeerId,
961 bytes: &[u8],
962 ) -> Result<(), crate::errors::delivery::DeliveryError> {
963 let envelope =
964 crate::envelope::EnvelopeCodec::decode_capped(bytes, &self.config.envelope_caps)
965 .map_err(|e| {
966 crate::errors::delivery::DeliveryError::InvalidEnvelope(e.to_string())
967 })?;
968 self.engine
969 .ingress
970 .push(crate::ingress::IngressEvent::EnvelopeFrom {
971 src_peer,
972 envelope,
973 // The raw-bytes deliver path doesn't surface an
974 // observed address — host adapters that want
975 // reflexive-address discovery push via the typed
976 // ingress queue directly.
977 src_observed_address: None,
978 })
979 .map_err(|_| crate::errors::delivery::DeliveryError::IngressClosed)
980 }
981
982 /// Push an app-event onto the ingress queue.
983 ///
984 /// Per Principle 1a (`docs/internal/superpowers/specs/2026-06-24-engine-boundary-fallibility-and-backend-owned-tensors.md`)
985 /// the byte payload crosses the engine boundary as a BORROWED
986 /// slice: the framework caps `value_bytes.len()` against
987 /// `NodeConfig::max_app_event_bytes`, charges the length against
988 /// `NodeConfig::ingress_byte_budget`, fallibly reserves a fresh
989 /// framework-owned `Vec<u8>`, and copies the caller's bytes in.
990 /// The caller may free `value_bytes` the moment this call
991 /// returns. Cap / budget / alloc failures return a synchronous
992 /// `DeliveryError::*` AND publish a matching
993 /// `InfraEvent::AppIngressError` on the bus for observers.
994 pub fn deliver_event(
995 &mut self,
996 module: &str,
997 input: &str,
998 value_bytes: &[u8],
999 ) -> Result<(), crate::errors::delivery::DeliveryError> {
1000 if !self.module_index.contains_key(module) {
1001 return Err(DeliveryError::UnknownModule(module.to_string()));
1002 }
1003 let byte_count = value_bytes.len();
1004 let cap = self.config.max_app_event_bytes;
1005 let source = || crate::bus::AppIngressSource::AppEvent {
1006 module: module.to_string(),
1007 input: input.to_string(),
1008 };
1009 if byte_count > cap {
1010 self.emit_app_ingress_error(
1011 source(),
1012 byte_count,
1013 crate::bus::AppIngressErrorKind::PerItemCapExceeded { cap },
1014 );
1015 return Err(DeliveryError::OversizePayload { byte_count, cap });
1016 }
1017 if let Err(reason) = self.engine.try_charge(byte_count) {
1018 self.emit_app_ingress_error(
1019 source(),
1020 byte_count,
1021 crate::bus::AppIngressErrorKind::BudgetExceeded {
1022 budget_remaining: reason.budget_remaining,
1023 },
1024 );
1025 return Err(DeliveryError::BudgetExceeded {
1026 byte_count,
1027 budget_remaining: reason.budget_remaining,
1028 });
1029 }
1030 let mut owned: Vec<u8> = Vec::new();
1031 // Route through `crate::fallible::try_reserve_exact` so the
1032 // thread-local fault seam (used by `tests/fallible_ingress.rs`)
1033 // intercepts the boundary reservation.
1034 if crate::fallible::try_reserve_exact(&mut owned, byte_count).is_err() {
1035 // Release the budget charge before surfacing the alloc
1036 // failure — the bytes never made it past the boundary.
1037 self.engine.release(byte_count);
1038 self.emit_app_ingress_error(
1039 source(),
1040 byte_count,
1041 crate::bus::AppIngressErrorKind::AllocationFailed {
1042 reason: crate::bus::AllocFailReason::HeapExhausted,
1043 },
1044 );
1045 return Err(DeliveryError::AllocationFailed {
1046 byte_count,
1047 reason: crate::bus::AllocFailReason::HeapExhausted,
1048 });
1049 }
1050 owned.extend_from_slice(value_bytes);
1051 self.engine
1052 .ingress
1053 .push(crate::ingress::IngressEvent::AppEvent {
1054 module_name: module.to_string(),
1055 input_name: input.to_string(),
1056 value_bytes: owned,
1057 })
1058 .map_err(|_| {
1059 // Ingress queue closed — release the charge so the
1060 // counter does not leak the rejected push.
1061 self.engine.release(byte_count);
1062 DeliveryError::IngressClosed
1063 })
1064 }
1065
1066 /// Invoke a Module with the given pre-encoded inputs.
1067 ///
1068 /// Inputs cross as borrowed `&[(&str, &[u8])]` per Principle 1a;
1069 /// the framework caps count + cumulative bytes, charges against
1070 /// `ingress_byte_budget`, and per-input fallibly reserves +
1071 /// copies into framework-owned `Vec<u8>` storage. Any failure
1072 /// releases prior charges and emits the matching
1073 /// `InfraEvent::AppIngressError`.
1074 pub fn invoke(
1075 &mut self,
1076 module: &str,
1077 inputs: &[(&str, &[u8])],
1078 ) -> Result<crate::ids::ExecId, crate::errors::delivery::DeliveryError> {
1079 if !self.module_index.contains_key(module) {
1080 return Err(DeliveryError::UnknownModule(module.to_string()));
1081 }
1082 let input_count = inputs.len();
1083 let input_cap = self.config.max_invoke_inputs;
1084 let source = || crate::bus::AppIngressSource::Invoke {
1085 module: module.to_string(),
1086 input_count,
1087 };
1088 if input_count > input_cap {
1089 // `byte_count` reports the requested input count for the
1090 // count-cap rejection — the byte cap will surface
1091 // separately below for cumulative-byte rejections.
1092 self.emit_app_ingress_error(
1093 source(),
1094 input_count,
1095 crate::bus::AppIngressErrorKind::PerItemCapExceeded { cap: input_cap },
1096 );
1097 return Err(DeliveryError::TooManyInputs {
1098 count: input_count,
1099 cap: input_cap,
1100 });
1101 }
1102 let total_bytes: usize = inputs
1103 .iter()
1104 .fold(0usize, |acc, (_, b)| acc.saturating_add(b.len()));
1105 let bytes_cap = self.config.max_invoke_bytes;
1106 if total_bytes > bytes_cap {
1107 self.emit_app_ingress_error(
1108 source(),
1109 total_bytes,
1110 crate::bus::AppIngressErrorKind::PerItemCapExceeded { cap: bytes_cap },
1111 );
1112 return Err(DeliveryError::OversizePayload {
1113 byte_count: total_bytes,
1114 cap: bytes_cap,
1115 });
1116 }
1117 if let Err(reason) = self.engine.try_charge(total_bytes) {
1118 self.emit_app_ingress_error(
1119 source(),
1120 total_bytes,
1121 crate::bus::AppIngressErrorKind::BudgetExceeded {
1122 budget_remaining: reason.budget_remaining,
1123 },
1124 );
1125 return Err(DeliveryError::BudgetExceeded {
1126 byte_count: total_bytes,
1127 budget_remaining: reason.budget_remaining,
1128 });
1129 }
1130 // Per-input fallible reservation. Track how many bytes we've
1131 // already copied so a mid-loop failure can release the full
1132 // `total_bytes` charge in one shot (the slot-table never
1133 // observed any of these payloads).
1134 let mut owned: Vec<(String, Vec<u8>)> = Vec::new();
1135 if crate::fallible::try_reserve_exact(&mut owned, input_count).is_err() {
1136 self.engine.release(total_bytes);
1137 self.emit_app_ingress_error(
1138 source(),
1139 total_bytes,
1140 crate::bus::AppIngressErrorKind::AllocationFailed {
1141 reason: crate::bus::AllocFailReason::HeapExhausted,
1142 },
1143 );
1144 return Err(DeliveryError::AllocationFailed {
1145 byte_count: total_bytes,
1146 reason: crate::bus::AllocFailReason::HeapExhausted,
1147 });
1148 }
1149 for (name, bytes) in inputs.iter() {
1150 let mut buf: Vec<u8> = Vec::new();
1151 if crate::fallible::try_reserve_exact(&mut buf, bytes.len()).is_err() {
1152 self.engine.release(total_bytes);
1153 self.emit_app_ingress_error(
1154 source(),
1155 total_bytes,
1156 crate::bus::AppIngressErrorKind::AllocationFailed {
1157 reason: crate::bus::AllocFailReason::HeapExhausted,
1158 },
1159 );
1160 return Err(DeliveryError::AllocationFailed {
1161 byte_count: total_bytes,
1162 reason: crate::bus::AllocFailReason::HeapExhausted,
1163 });
1164 }
1165 buf.extend_from_slice(bytes);
1166 owned.push(((*name).to_string(), buf));
1167 }
1168 let exec_id = self.engine.allocate_exec_id();
1169 self.engine
1170 .ingress
1171 .push(crate::ingress::IngressEvent::Invoke {
1172 module_name: module.to_string(),
1173 inputs: owned,
1174 exec_id,
1175 })
1176 .map_err(|_| {
1177 self.engine.release(total_bytes);
1178 DeliveryError::IngressClosed
1179 })?;
1180 Ok(exec_id)
1181 }
1182
1183 /// Publish a freshly-built [`crate::bus::InfraEvent::AppIngressError`]
1184 /// onto the in-Node bus. Internal helper used by the
1185 /// application-ingress entry points (`deliver_event`, `invoke`)
1186 /// to mirror their synchronous `DeliveryError` returns on the
1187 /// observer surface.
1188 fn emit_app_ingress_error(
1189 &mut self,
1190 source: crate::bus::AppIngressSource,
1191 byte_count: usize,
1192 kind: crate::bus::AppIngressErrorKind,
1193 ) {
1194 self.engine.bus.publish(crate::bus::NodeEvent::Infra(
1195 crate::bus::InfraEvent::AppIngressError {
1196 source,
1197 byte_count,
1198 kind,
1199 },
1200 ));
1201 }
1202}
1203
1204/// Snapshot the PeerGovernor's policy + health state.
1205fn capture_peer_governor(
1206 governor: &crate::framework::PeerGovernor,
1207) -> crate::snapshot::transient::PeerGovernorSnapshot {
1208 crate::snapshot::transient::PeerGovernorSnapshot {
1209 blocklist: governor.blocklist().iter().map(|p| p.to_bytes()).collect(),
1210 allowlist: governor
1211 .allowlist()
1212 .map(|s| s.iter().map(|p| p.to_bytes()).collect()),
1213 health: governor
1214 .iter_health()
1215 .map(|(p, h)| {
1216 (
1217 p.to_bytes(),
1218 h.consecutive_failures,
1219 h.last_event_ns,
1220 h.down,
1221 )
1222 })
1223 .collect(),
1224 failure_threshold: governor.failure_threshold(),
1225 }
1226}
1227
1228pub mod config;
1229pub mod derivation;
1230pub use config::{
1231 NodeConfig, DEFAULT_BUS_CAPACITY, DEFAULT_CYCLE_OP_BUDGET, DEFAULT_INGRESS_BYTE_BUDGET,
1232 DEFAULT_MAX_APP_EVENT_BYTES, DEFAULT_MAX_COMPLETION_RESULT_BYTES, DEFAULT_MAX_INVOKE_BYTES,
1233 DEFAULT_MAX_INVOKE_INPUTS, DEFAULT_MAX_OUTBOUND_QUEUE, DEFAULT_MAX_PENDING_ASYNC,
1234 EDGE_INGRESS_BYTE_BUDGET, EDGE_MAX_APP_EVENT_BYTES, EDGE_MAX_COMPLETION_RESULT_BYTES,
1235 EDGE_MAX_INVOKE_BYTES, EDGE_MAX_INVOKE_INPUTS,
1236};
1237
1238
1239#[cfg(test)]
1240#[path = "snapshot_fidelity_tests.rs"]
1241mod snapshot_fidelity_tests;
1242
1243#[cfg(test)]
1244#[path = "shared_model_tests.rs"]
1245mod shared_model_tests;