Skip to main content

bb_runtime/node/
mod.rs

1//! `Node` — the framework's runtime construct, installed via
2//! [`bytesandbrains::install`] and driven through `Node::poll`.
3//!
4//! Lifecycle:
5//!
6//! ```ignore
7//! let model    = MyModule.build()?;
8//! let compiled = bb::Compiler::new()
9//!     .bind_backend::<CpuBackend>("compute")
10//!     .compile(model)?;
11//! let mut node = bb::install(peer_id, addr, compiled, &["MyModule"], bb::Config::new())?;
12//! loop { node.poll(&mut cx); }
13//! ```
14//!
15//! Construction-time validation (passport check, binding-table
16//! parse, concrete construction) surfaces `InstallError` at install
17//! time — the engine is ready to dispatch by the time `Node::poll`
18//! sees it.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use prost::Message;
24
25use crate::concrete::ComponentHandle;
26use crate::engine::Engine;
27use crate::errors::delivery::DeliveryError;
28use crate::errors::restore::RestoreError;
29use crate::snapshot::transient::TransientSnapshot;
30use crate::snapshot::{NamedComponentSnapshot, NamedGraphSnapshot, NodeConfigSnapshot};
31use bb_ir::proto::onnx::ModelProto;
32
33/// Type-erased `EngineStep` observer installed onto a `Node` via
34/// [`Node::set_telemetry_tap`]. Each `Node::poll` call invokes the
35/// closure once per produced step.
36pub type TelemetryTap = Box<dyn FnMut(&crate::engine::EngineStep)>;
37
38/// Constructed BB Node ready to drive ML work. Produced by
39/// [`bytesandbrains::install`] — by the time the host holds one,
40/// the engine has resolved its dispatch table, registered every
41/// bound concrete, and installed the target function as the root
42/// graph.
43pub struct Node {
44    pub(crate) engine: Engine,
45    pub(crate) config: NodeConfig,
46    pub(crate) incarnation: u64,
47    /// Registered target names → the shared compiled artifact. Every
48    /// entry references the same underlying `ModelProto` so a
49    /// multi-target install (e.g. `Client` + `Server` partitions on
50    /// one peer) stores the proto bytes exactly once. The `Arc` is
51    /// installed at [`Self::register_module`]; both `deliver_event`
52    /// and `invoke` consult `contains_key` for routing, and future
53    /// per-target input-name lookups can read through this shared
54    /// handle without cloning the proto.
55    pub(crate) module_index: HashMap<String, Arc<ModelProto>>,
56    /// Single shared handle to the installed model. Equal to the
57    /// `Arc` every `module_index` entry references; held separately
58    /// so callers (and Debug) can reach the proto without first
59    /// resolving a target name. `None` before
60    /// [`Self::register_module`] runs.
61    pub(crate) model: Option<Arc<ModelProto>>,
62    pub(crate) component_handles: Vec<ComponentHandle>,
63    /// Optional observer fed every `EngineStep` produced by
64    /// `Engine::poll`. Set via [`Node::set_telemetry_tap`]; when
65    /// `None` the engine's outputs pass through unchanged.
66    pub(crate) telemetry_tap: Option<TelemetryTap>,
67}
68
69impl std::fmt::Debug for Node {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("Node")
72            .field("incarnation", &self.incarnation)
73            .field("modules", &self.module_index.keys().collect::<Vec<_>>())
74            .field("component_count", &self.component_handles.len())
75            .finish_non_exhaustive()
76    }
77}
78
79// --- Construction + chain methods --------------
80
81impl Node {
82    /// Bare constructor used by [`bytesandbrains::install`]. Not
83    /// part of the public surface — every external caller routes
84    /// through `install` so the compilation passport, binding-table
85    /// parse, and concrete construction surface a typed
86    /// `InstallError` instead of leaving the Node in a half-built
87    /// state. Doc-hidden to match the rest of the install-only
88    /// surface (`engine_install_handle`, `push_linked_component`,
89    /// `set_model`, `register_module`); the facade lives outside
90    /// `bb-runtime`, so a `pub(crate)` constructor would not reach
91    /// it — `#[doc(hidden)]` keeps the symbol off the rendered
92    /// docs while letting the canonical `install` path call it.
93    ///
94    /// `addresses` is the ordered local-address bag registered in
95    /// the engine's AddressBook against `peer_id`. The wire syscall
96    /// resolves `peer = self.peer_id()` against the bag without an
97    /// explicit `add_peer` call. An empty vec skips self-registration
98    /// so downstream "no addresses" errors surface at the protocol
99    /// level; passing `vec![Address::empty()]` registers a single
100    /// placeholder entry that filters out as empty downstream.
101    #[doc(hidden)]
102    pub fn new(peer_id: crate::ids::PeerId, addresses: Vec<crate::framework::Address>) -> Self {
103        let config = NodeConfig::new(peer_id);
104        let mut engine = Engine::with_bus_capacity(config.bus_capacity);
105        engine.self_peer = peer_id;
106        engine.apply_config_caps(&config);
107        engine.register_all_framework_syscalls();
108        let real_addresses: Vec<crate::framework::Address> = addresses
109            .into_iter()
110            .filter(|a| a != &crate::framework::Address::empty())
111            .collect();
112        if !real_addresses.is_empty() {
113            let _ = engine
114                .framework
115                .address_book
116                .add_peer(peer_id, real_addresses);
117        }
118        Self {
119            engine,
120            config,
121            incarnation: 0,
122            module_index: HashMap::new(),
123            model: None,
124            component_handles: Vec::new(),
125            telemetry_tap: None,
126        }
127    }
128
129    /// Override the default [`NodeConfig`] (the cycle / async /
130    /// outbound caps). Must be called before [`crate::install`]
131    /// for the new caps to apply at build time.
132    pub fn with_config(mut self, cfg: NodeConfig) -> Self {
133        self.config = cfg.clone();
134        self.engine.apply_config_caps(&cfg);
135        self
136    }
137}
138
139// --- Snapshot / restore / runtime methods -----
140
141impl Node {
142    /// Capture the snapshottable state. refuses to
143    /// proceed when the bus still carries un-drained events that a
144    /// restore would silently drop or re-fire stale; callers drive
145    /// `Node::poll` to quiescence first and retry.
146    pub fn snapshot(&self) -> Result<crate::snapshot::NodeSnapshot, crate::errors::SnapshotError> {
147        // Bus quiescence guard. `len()` reads the queue without
148        // draining; `dropped_since_last_drain` would only be non-zero
149        // if a publish overflowed the cap since the last poll. Both
150        // are surfaced so the host can decide whether the loss is
151        // tolerable.
152        let queued = self.engine.bus.len();
153        if queued > 0 {
154            return Err(crate::errors::SnapshotError::BusNotDrained { queued, dropped: 0 });
155        }
156        Ok(self.snapshot_inner())
157    }
158
159    fn snapshot_inner(&self) -> crate::snapshot::NodeSnapshot {
160        let mut components: Vec<NamedComponentSnapshot> = Vec::new();
161        for handle in &self.component_handles {
162            let cref = crate::ids::ComponentRef::from(handle.instance_id);
163            let Some(instance) = self.engine.component(cref) else {
164                continue;
165            };
166            let state_bytes = (handle.serialize_fn)(instance);
167            components.push(NamedComponentSnapshot {
168                type_name: handle.type_name.to_string(),
169                instance_id: handle.instance_id,
170                package: handle.package,
171                state_bytes,
172            });
173        }
174
175        let mut graphs: Vec<NamedGraphSnapshot> = Vec::new();
176        for (name, installed) in self.engine.graphs_named() {
177            let function_proto_bytes = installed.function.encode_to_vec();
178            graphs.push(NamedGraphSnapshot {
179                name: name.to_string(),
180                function_proto_bytes,
181            });
182        }
183
184        let counters: std::collections::HashMap<String, u64> =
185            self.engine.framework.counters.clone();
186
187        let event_subscriptions: std::collections::HashMap<String, Vec<u64>> = self
188            .engine
189            .event_subscriptions
190            .iter()
191            .map(|(kind, sites)| {
192                (
193                    kind.clone(),
194                    sites.iter().map(|s| s.as_u64()).collect::<Vec<u64>>(),
195                )
196            })
197            .collect();
198
199        let address_book: Vec<crate::snapshot::transient::AddressBookEntrySnapshot> = self
200            .engine
201            .framework
202            .address_book
203            .iter()
204            .map(
205                |(peer, addrs, ref_count)| crate::snapshot::transient::AddressBookEntrySnapshot {
206                    peer_id: peer.to_bytes(),
207                    addresses: addrs.iter().map(|a| a.to_bytes()).collect(),
208                    ref_count,
209                },
210            )
211            .collect();
212
213        let peer_governor = capture_peer_governor(&self.engine.framework.peer_state.governor);
214        let backoff_table = self
215            .engine
216            .framework
217            .peer_state
218            .backoff
219            .iter()
220            .map(|(p, s)| crate::snapshot::transient::BackoffEntry {
221                peer: p.to_bytes(),
222                attempts: s.attempts,
223                last_attempt_ns: s.last_attempt_ns,
224                next_retry_ns: s.next_retry_ns,
225            })
226            .collect();
227        let pending_async = self
228            .engine
229            .exec
230            .pending_async
231            .iter()
232            .map(|(cmd, p)| {
233                (
234                    cmd.as_u64(),
235                    crate::snapshot::transient::PendingAsyncSnapshot {
236                        op_ref: p.op_ref.as_u64(),
237                        exec_id: p.exec_id.as_u64(),
238                        output_sites: p.output_sites.iter().map(|s| s.as_u64()).collect(),
239                        deadline_ns: p.deadline_ns,
240                    },
241                )
242            })
243            .collect();
244        let pending_outbound = self
245            .engine
246            .framework
247            .outbound_queue
248            .iter_for_snapshot()
249            .map(|env| crate::snapshot::transient::PendingOutboundEntry {
250                envelope_bytes: crate::envelope::EnvelopeCodec::encode(env),
251                redelivered: true,
252            })
253            .collect();
254
255        let transient = TransientSnapshot {
256            framework: crate::snapshot::transient::FrameworkSnapshot {
257                counters,
258                fired_phases: self.engine.fired_phases.clone(),
259                address_book,
260                peer_governor,
261                backoff_table,
262                pending_outbound,
263                // multihash bytes + counter
264                // persistence. The peer_id_bytes lets a multihash
265                // PeerId survive restore; the counters prevent
266                // CommandId / ExecId collisions on restart.
267                peer_id_bytes: self.engine.self_peer.to_bytes(),
268                next_command_id: self.engine.exec.ids.next_command_id,
269                next_exec_id: self.engine.exec.ids.next_exec_id,
270                spec_version: crate::snapshot::transient::CURRENT_SNAPSHOT_SPEC_VERSION,
271            },
272            bus: crate::snapshot::transient::TypedBusSnapshot {
273                event_subscriptions,
274            },
275            pending_async,
276            ..Default::default()
277        };
278
279        crate::snapshot::NodeSnapshot {
280            incarnation: self.incarnation,
281            config: NodeConfigSnapshot::from(&self.config),
282            graphs,
283            components,
284            transient,
285        }
286    }
287
288    /// Restore a Node's state from a snapshot.
289    pub fn restore(
290        &mut self,
291        snap: crate::snapshot::NodeSnapshot,
292    ) -> Result<(), crate::errors::restore::RestoreError> {
293        if snap.transient.framework.spec_version
294            != crate::snapshot::transient::CURRENT_SNAPSHOT_SPEC_VERSION
295        {
296            return Err(RestoreError::SpecVersionMismatch {
297                got: snap.transient.framework.spec_version,
298                expected: crate::snapshot::transient::CURRENT_SNAPSHOT_SPEC_VERSION,
299            });
300        }
301
302        // Reinstall each captured graph. The function bytes are
303        // prost-encoded FunctionProto blobs from the source Node's
304        // snapshot path; decode failures fail the restore loudly
305        // so callers don't silently lose a graph.
306        for graph_snap in snap.graphs {
307            let function = bb_ir::proto::onnx::FunctionProto::decode(
308                graph_snap.function_proto_bytes.as_slice(),
309            )
310            .map_err(|e| {
311                RestoreError::SnapshotMismatch(format!(
312                    "restore: failed to decode graph `{}`: {e}",
313                    graph_snap.name,
314                ))
315            })?;
316            self.engine.install_graph(graph_snap.name, function);
317        }
318
319        for comp_snap in snap.components {
320            let cref = crate::ids::ComponentRef::from(comp_snap.instance_id);
321            let Some(handle) = self.component_handles.iter().find(|h| {
322                h.type_name == comp_snap.type_name && h.instance_id == comp_snap.instance_id
323            }) else {
324                return Err(RestoreError::SnapshotMismatch(format!(
325                    "no handle on live Node for component {}@{}",
326                    comp_snap.type_name, comp_snap.instance_id,
327                )));
328            };
329            let restored = (handle.restore_fn)(&comp_snap.state_bytes).map_err(|source| {
330                RestoreError::ComponentRestoreFailed {
331                    type_name: comp_snap.type_name.clone(),
332                    source,
333                }
334            })?;
335            self.engine.register_component(cref, restored);
336        }
337
338        self.engine.framework.counters.clear();
339        for (name, value) in snap.transient.framework.counters {
340            self.engine.framework.counters.insert(name, value);
341        }
342        self.engine.fired_phases = snap.transient.framework.fired_phases;
343        self.engine.event_subscriptions.clear();
344        for (kind, sites) in snap.transient.bus.event_subscriptions {
345            self.engine.event_subscriptions.insert(
346                kind,
347                sites
348                    .into_iter()
349                    .map(crate::ids::NodeSiteId::from)
350                    .collect(),
351            );
352        }
353
354        for entry in snap.transient.framework.address_book {
355            let mut decoded: Vec<crate::framework::Address> =
356                Vec::with_capacity(entry.addresses.len());
357            let mut malformed = false;
358            for bytes in &entry.addresses {
359                match crate::framework::Address::from_bytes(bytes) {
360                    Ok(a) => decoded.push(a),
361                    Err(_) => {
362                        malformed = true;
363                        break;
364                    }
365                }
366            }
367            if malformed || decoded.is_empty() {
368                continue;
369            }
370            let Ok(peer) = crate::ids::PeerId::from_bytes(&entry.peer_id) else {
371                continue;
372            };
373            self.engine
374                .framework
375                .address_book
376                .restore_entry(peer, decoded, entry.ref_count);
377        }
378        self.engine.framework.peer_state.governor = crate::framework::PeerGovernor::new();
379        let governor_snap = snap.transient.framework.peer_governor;
380        self.engine
381            .framework
382            .peer_state
383            .governor
384            .set_failure_threshold(governor_snap.failure_threshold);
385        for peer_bytes in governor_snap.blocklist {
386            let Ok(peer) = crate::ids::PeerId::from_bytes(&peer_bytes) else {
387                continue;
388            };
389            self.engine.framework.peer_state.governor.block(peer);
390        }
391        if let Some(allow) = governor_snap.allowlist {
392            let set: std::collections::HashSet<_> = allow
393                .into_iter()
394                .filter_map(|b| crate::ids::PeerId::from_bytes(&b).ok())
395                .collect();
396            self.engine
397                .framework
398                .peer_state
399                .governor
400                .set_allowlist(Some(set));
401        }
402        for (peer_bytes, consecutive_failures, last_event_ns, down) in governor_snap.health {
403            let Ok(peer) = crate::ids::PeerId::from_bytes(&peer_bytes) else {
404                continue;
405            };
406            self.engine.framework.peer_state.governor.restore_health(
407                peer,
408                crate::framework::PeerHealth {
409                    consecutive_failures,
410                    last_event_ns,
411                    down,
412                },
413            );
414        }
415        for entry in snap.transient.framework.backoff_table {
416            let Ok(peer) = crate::ids::PeerId::from_bytes(&entry.peer) else {
417                continue;
418            };
419            self.engine.framework.peer_state.backoff.restore_state(
420                peer,
421                crate::framework::backoff_table::BackoffState {
422                    attempts: entry.attempts,
423                    last_attempt_ns: entry.last_attempt_ns,
424                    next_retry_ns: entry.next_retry_ns,
425                },
426            );
427        }
428        for (cmd_u64, snap_p) in snap.transient.pending_async {
429            self.engine.exec.pending_async.insert(
430                crate::ids::CommandId::from(cmd_u64),
431                crate::engine::PendingAsync {
432                    op_ref: crate::ids::OpRef::from(snap_p.op_ref),
433                    exec_id: crate::ids::ExecId::from(snap_p.exec_id),
434                    output_sites: snap_p
435                        .output_sites
436                        .into_iter()
437                        .map(crate::ids::NodeSiteId::from)
438                        .collect(),
439                    deadline_ns: snap_p.deadline_ns,
440                },
441            );
442        }
443        for entry in snap.transient.framework.pending_outbound {
444            if let Ok(env) = crate::envelope::EnvelopeCodec::decode_capped(
445                &entry.envelope_bytes,
446                &self.config.envelope_caps,
447            ) {
448                self.engine.framework.outbound_queue.push(env);
449            }
450        }
451
452        self.incarnation = snap.incarnation + 1;
453
454        Ok(())
455    }
456
457    /// Reset transient runtime state without dropping the bound
458    /// components or installed graphs.
459    pub fn clear(&mut self) {
460        self.engine.exec.frontier.clear();
461        self.engine.exec.slot_table.clear();
462        self.engine.exec.execution_state.clear();
463        self.engine.exec.pending_async.clear();
464        self.engine.exec.pending_completions.clear();
465        let _ = self.engine.ingress.drain_all();
466        self.engine.fired_phases.clear();
467        self.engine.framework.counters.clear();
468    }
469
470    /// Current incarnation count. Bumped on every `restore()`.
471    pub fn incarnation(&self) -> u64 {
472        self.incarnation
473    }
474
475    /// Names of every module registered at construction time.
476    pub fn loaded_modules(&self) -> Vec<&str> {
477        self.module_index.keys().map(|s| s.as_str()).collect()
478    }
479
480    /// References to every owned component handle.
481    pub fn linked_components(&self) -> Vec<&ComponentHandle> {
482        self.component_handles.iter().collect()
483    }
484
485    /// Local peer identity.
486    pub fn peer_id(&self) -> crate::ids::PeerId {
487        self.config.peer_id
488    }
489
490    /// Read-only execution-state lookup.
491    pub fn execution_state(
492        &self,
493        exec_id: crate::ids::ExecId,
494    ) -> Option<&crate::engine::ExecutionState> {
495        self.engine.exec.execution_state.get(&exec_id)
496    }
497
498    /// Number of `AsyncSuspended` Ops currently awaiting completion.
499    pub fn pending_async_count(&self) -> usize {
500        self.engine.exec.pending_async.len()
501    }
502
503    /// Snapshot of the engine's hot-path counters.
504    pub fn engine_stats(&self) -> crate::engine::EngineStats {
505        self.engine.engine_stats()
506    }
507
508    /// Mutable handle to the engine for install-time setup
509    /// (registering components, binding slots, installing the
510    /// target graph, resolving dispatch). Doc-hidden because it
511    /// crosses the `pub(crate)` field boundary: the user-facing
512    /// entry point is `bb::install()`, which calls this accessor
513    /// to drive `engine.register_component` / `engine.bind_slot` /
514    /// `engine.install_graph` / `engine.resolve_dispatch`.
515    #[doc(hidden)]
516    pub fn engine_install_handle(&mut self) -> &mut crate::engine::Engine {
517        &mut self.engine
518    }
519
520    /// Push a `ComponentHandle` onto the linked-components list.
521    /// Called from `bb::install()` per supplied component so
522    /// `snapshot()` can capture state via the recorded
523    /// `serialize_fn`. Doc-hidden — the public surface for setting
524    /// up a Node is `bb::install()`.
525    #[doc(hidden)]
526    pub fn push_linked_component(&mut self, handle: crate::concrete::ComponentHandle) {
527        self.component_handles.push(handle);
528    }
529
530    /// Set the shared compiled artifact this Node was installed with.
531    /// Called once per `bb::install()` from the install path before
532    /// the per-target `register_module` calls run; downstream
533    /// `register_module` calls share the same `Arc` instead of
534    /// cloning the proto per target. Doc-hidden — the user surface
535    /// is `bb::install()`.
536    #[doc(hidden)]
537    pub fn set_model(&mut self, model: ModelProto) {
538        self.model = Some(Arc::new(model));
539    }
540
541    /// Register `module_name` as a valid target for `deliver_event` /
542    /// `invoke`. Called from `bb::install()` once per resolved
543    /// target name after the engine's root graph is installed; the
544    /// stored `Arc` is the same handle [`Self::set_model`] minted
545    /// for every target, so multi-target installs hold the proto
546    /// bytes exactly once. Doc-hidden — the user surface is
547    /// `bb::install()`.
548    #[doc(hidden)]
549    pub fn register_module(&mut self, module_name: String) {
550        let model = self
551            .model
552            .clone()
553            .expect("Node::set_model must run before register_module");
554        self.module_index.insert(module_name, model);
555    }
556
557    /// The compiled `ModelProto` the Node was installed with, or
558    /// `None` if the Node was built internally by a crate-private
559    /// path that bypassed [`crate::install`] (the public entry point
560    /// always sets a model). Returns a shared `Arc` so callers can
561    /// read per-target metadata without cloning the proto.
562    pub fn model(&self) -> Option<Arc<ModelProto>> {
563        self.model.clone()
564    }
565
566    /// Look up the `ComponentRef` bound at `slot_name` in the
567    /// engine's generic slot registry. Returns `None` when no slot
568    /// of that name is bound.
569    pub fn slot(&self, slot_name: &str) -> Option<crate::ids::ComponentRef> {
570        self.engine.slot(slot_name)
571    }
572
573    /// Iterate every `(slot_name, ComponentRef)` pair bound in the
574    /// engine's slot registry.
575    pub fn slots_iter(&self) -> impl Iterator<Item = (&str, crate::ids::ComponentRef)> {
576        self.engine.slots_iter()
577    }
578
579    /// Inventory-declared roles for a registered component. Read
580    /// from the per-Engine `component_roles` map populated by
581    /// `ensure_ready` from
582    /// `crate::registry::roles_for_component(T::TYPE_NAME)`. Returns
583    /// the empty set when the component wasn't registered via a
584    /// `#[derive(bb::<Role>)]` chain (test fixtures that hand-implement
585    /// role traits surface here as empty).
586    pub fn roles_for(
587        &self,
588        cref: crate::ids::ComponentRef,
589    ) -> std::collections::HashSet<crate::registry::ComponentRole> {
590        self.engine.roles_for(cref)
591    }
592
593    /// Add `peer` to the inbound + outbound blocklist.
594    pub fn block_peer(&mut self, peer: crate::ids::PeerId) {
595        self.engine.framework.peer_state.governor.block(peer);
596    }
597
598    /// Remove `peer` from the blocklist.
599    pub fn unblock_peer(&mut self, peer: crate::ids::PeerId) {
600        self.engine.framework.peer_state.governor.unblock(peer);
601    }
602
603    /// Configure an allowlist for inbound + outbound delivery.
604    pub fn set_allowlist(
605        &mut self,
606        allowlist: Option<std::collections::HashSet<crate::ids::PeerId>>,
607    ) {
608        self.engine
609            .framework
610            .peer_state
611            .governor
612            .set_allowlist(allowlist);
613    }
614
615    /// Per-peer health snapshot.
616    pub fn peer_health(&self, peer: crate::ids::PeerId) -> Option<crate::framework::PeerHealth> {
617        self.engine.framework.peer_state.governor.peer_health(peer)
618    }
619
620    /// Resolve a `PeerId` to its ordered address list (cloned).
621    pub fn resolve_peer_addresses(
622        &self,
623        peer: crate::ids::PeerId,
624    ) -> Option<Vec<crate::framework::Address>> {
625        self.engine
626            .framework
627            .address_book
628            .lookup(peer)
629            .map(|addrs| addrs.to_vec())
630    }
631
632    /// Install an `EngineStep` observer onto the live Node.
633    pub fn set_telemetry_tap<F>(&mut self, tap: F)
634    where
635        F: FnMut(&crate::engine::EngineStep) + 'static,
636    {
637        self.telemetry_tap = Some(Box::new(tap));
638    }
639
640    /// Announce a peer with one or more reachable addresses.
641    pub fn add_peer(
642        &mut self,
643        peer: crate::ids::PeerId,
644        addresses: Vec<crate::framework::Address>,
645    ) -> Result<(), crate::framework::AddressBookError> {
646        self.engine.framework.address_book.add_peer(peer, addresses)
647    }
648
649    /// Concern 2 - cheap-clone handle to the shared
650    /// `IngressQueue`. Used by off-thread transport / clock
651    /// adapters to push events into the Node.
652    pub fn ingress_handle(&self) -> crate::ingress::IngressQueueRef {
653        crate::ingress::IngressQueueRef::new(self.engine.ingress_queue_handle())
654    }
655
656    /// Read-only view on the registered NodeConfig.
657    pub fn config(&self) -> &NodeConfig {
658        &self.config
659    }
660
661    /// First entry of the local-address bag, or [`Address::empty`]
662    /// when none registered. The AddressBook entry for
663    /// `self.peer_id()` is the source of truth.
664    pub fn peer_address(&self) -> crate::framework::Address {
665        self.local_addresses()
666            .first()
667            .cloned()
668            .unwrap_or_else(crate::framework::Address::empty)
669    }
670
671    /// Ordered local-address bag for this Node. Reads directly from
672    /// the AddressBook entry keyed by `self.peer_id()`; returns an
673    /// empty slice when the Node was installed with no addresses or
674    /// the bag was pruned to zero entries.
675    pub fn local_addresses(&self) -> &[crate::framework::Address] {
676        self.engine
677            .framework
678            .address_book
679            .lookup(self.engine.self_peer)
680            .unwrap_or(&[])
681    }
682
683    /// Append `address` to the local-address bag. Creates the
684    /// AddressBook entry if none exists; idempotent on duplicates.
685    pub fn add_local_address(
686        &mut self,
687        address: crate::framework::Address,
688    ) -> Result<(), crate::framework::AddressBookError> {
689        let self_peer = self.engine.self_peer;
690        let book = &mut self.engine.framework.address_book;
691        if book.lookup(self_peer).is_some() {
692            book.register_address(self_peer, address)
693        } else {
694            book.add_peer(self_peer, vec![address])
695        }
696    }
697
698    /// Prune `address` from the local-address bag. Errors when no
699    /// entry exists; succeeds (no-op) when the bag has no matching
700    /// address.
701    pub fn forget_local_address(
702        &mut self,
703        address: &crate::framework::Address,
704    ) -> Result<(), crate::framework::AddressBookError> {
705        let self_peer = self.engine.self_peer;
706        self.engine
707            .framework
708            .address_book
709            .forget_address(self_peer, address)
710    }
711
712    /// Drive bootstrap targets to completion. Returns every
713    /// `EngineStep` the bootstrap path emitted, including each
714    /// drained phase's `BootstrapComplete` or the yielding
715    /// `WaitingOnBootstrap`.
716    ///
717    /// Semantics by slice shape:
718    ///
719    /// - Empty slice `&[]` — arms + seeds the install-order queue,
720    ///   then polls until every queued target reaches
721    ///   `BootstrapComplete` or one suspends on async. Idempotent on
722    ///   a fully drained Node (returns an empty vec).
723    /// - Non-empty slice — each [`BootstrapInput`]'s `inputs` are
724    ///   validated against the target's declared formals; on any
725    ///   error the engine's bootstrap state stays untouched.
726    ///   Successful staging fires the named targets in slice order
727    ///   regardless of install-order — this is the re-bootstrap
728    ///   surface (no idempotence guard).
729    ///
730    /// The framework copies each input's borrowed `&[u8]` into
731    /// engine-owned storage (Principle 1a: caller slices may drop
732    /// the moment this call returns).
733    ///
734    /// On async suspension the host posts the matured completion via
735    /// the ingress and re-invokes `run_bootstrap(&[])` to drain the
736    /// resumed op.
737    ///
738    /// Body-phase ops do not fire during this call; the
739    /// `is_op_locked` gate keeps them parked until the bootstrap
740    /// body drains.
741    pub fn run_bootstrap(
742        &mut self,
743        targets: &[crate::engine::BootstrapInput<'_>],
744    ) -> Result<Vec<crate::engine::EngineStep>, crate::errors::BootstrapError> {
745        if targets.is_empty() {
746            // Arm + seed the install-order queue (no-op when no
747            // targets remain). On a fully drained Node return an
748            // empty vec immediately without polling.
749            let armed = self.engine.run_bootstrap(&[])?;
750            if !armed && !self.engine.bootstrap_pending() {
751                return Ok(Vec::new());
752            }
753        } else {
754            // Validate each target name against the install-order
755            // registry before any staging happens, so the batch
756            // rejects atomically on the first unknown target.
757            for req in targets {
758                if !self.engine.module_bootstrap_registered(req.target) {
759                    return Err(crate::errors::BootstrapError::UnknownTarget {
760                        target_name: req.target.to_string(),
761                        available: self.engine.module_bootstrap_target_names(),
762                    });
763                }
764            }
765            self.engine.run_bootstrap(targets)?;
766        }
767        Ok(self.drain_bootstrap())
768    }
769
770    /// Drive the engine's poll loop until every staged bootstrap
771    /// drains to `BootstrapComplete` or one suspends with
772    /// `WaitingOnBootstrap`. Forwards each step through the
773    /// telemetry tap (matching `Node::poll` semantics).
774    fn drain_bootstrap(&mut self) -> Vec<crate::engine::EngineStep> {
775        let mut steps = Vec::new();
776        loop {
777            let batch = self.engine.poll();
778            let waiting = matches!(
779                batch.last(),
780                Some(crate::engine::EngineStep::WaitingOnBootstrap)
781            );
782            if let Some(tap) = self.telemetry_tap.as_mut() {
783                for step in &batch {
784                    tap(step);
785                }
786            }
787            steps.extend(batch);
788            if !self.engine.bootstrap_pending() || waiting {
789                break;
790            }
791        }
792        steps
793    }
794
795    /// Snapshot of the engine's bootstrap lifecycle. Returns
796    /// `BootstrapStatus::Idle` when no bootstrap is queued or
797    /// in-flight, `Running` when a bootstrap is in-flight (body ops
798    /// gate), and `WaitingForInput` when the install-order queue
799    /// still has unseeded targets. Pure read — does not advance any
800    /// queue.
801    pub fn bootstrap_status(&self) -> crate::engine::bootstrap::BootstrapStatus {
802        self.engine.bootstrap_status()
803    }
804
805    /// Drive one poll cycle. Returns `Pending` when the engine
806    /// drains to quiescence (the ingress waker is registered with
807    /// `cx`); otherwise yields the steps the engine made progress
808    /// on. Construction-time validation lives in [`crate::node::Node`]'s
809    /// installation path (see `bytesandbrains::install`) and surfaces
810    /// `InstallError` before the first poll.
811    pub fn poll(
812        &mut self,
813        cx: &mut std::task::Context<'_>,
814    ) -> std::task::Poll<Vec<crate::engine::EngineStep>> {
815        let steps = self.engine.poll();
816        if let Some(tap) = self.telemetry_tap.as_mut() {
817            for step in &steps {
818                tap(step);
819            }
820        }
821        if steps.is_empty() {
822            self.engine.ingress.register_waker(cx.waker());
823            return std::task::Poll::Pending;
824        }
825        std::task::Poll::Ready(steps)
826    }
827
828    /// Decode and push inbound wire bytes onto the ingress queue.
829    /// Routes through `EnvelopeCodec::decode_capped` so malformed,
830    /// schema-mismatched, or oversize buffers fail with
831    /// `DeliveryError::InvalidEnvelope` BEFORE any prost allocation.
832    /// The transport layer supplies `src_peer` so the engine can
833    /// consult `PeerGovernor::check_inbound` before routing.
834    pub fn deliver_inbound(
835        &mut self,
836        src_peer: crate::ids::PeerId,
837        bytes: &[u8],
838    ) -> Result<(), crate::errors::delivery::DeliveryError> {
839        let envelope =
840            crate::envelope::EnvelopeCodec::decode_capped(bytes, &self.config.envelope_caps)
841                .map_err(|e| {
842                    crate::errors::delivery::DeliveryError::InvalidEnvelope(e.to_string())
843                })?;
844        self.engine
845            .ingress
846            .push(crate::ingress::IngressEvent::EnvelopeFrom {
847                src_peer,
848                envelope,
849                // The raw-bytes deliver path doesn't surface an
850                // observed address — host adapters that want
851                // reflexive-address discovery push via the typed
852                // ingress queue directly.
853                src_observed_address: None,
854            })
855            .map_err(|_| crate::errors::delivery::DeliveryError::IngressClosed)
856    }
857
858    /// Push an app-event onto the ingress queue.
859    ///
860    /// Per Principle 1a (`docs/internal/superpowers/specs/2026-06-24-engine-boundary-fallibility-and-backend-owned-tensors.md`)
861    /// the byte payload crosses the engine boundary as a BORROWED
862    /// slice: the framework caps `value_bytes.len()` against
863    /// `NodeConfig::max_app_event_bytes`, charges the length against
864    /// `NodeConfig::ingress_byte_budget`, fallibly reserves a fresh
865    /// framework-owned `Vec<u8>`, and copies the caller's bytes in.
866    /// The caller may free `value_bytes` the moment this call
867    /// returns. Cap / budget / alloc failures return a synchronous
868    /// `DeliveryError::*` AND publish a matching
869    /// `InfraEvent::AppIngressError` on the bus for observers.
870    pub fn deliver_event(
871        &mut self,
872        module: &str,
873        input: &str,
874        value_bytes: &[u8],
875    ) -> Result<(), crate::errors::delivery::DeliveryError> {
876        if !self.module_index.contains_key(module) {
877            return Err(DeliveryError::UnknownModule(module.to_string()));
878        }
879        let byte_count = value_bytes.len();
880        let cap = self.config.max_app_event_bytes;
881        let source = || crate::bus::AppIngressSource::AppEvent {
882            module: module.to_string(),
883            input: input.to_string(),
884        };
885        if byte_count > cap {
886            self.emit_app_ingress_error(
887                source(),
888                byte_count,
889                crate::bus::AppIngressErrorKind::PerItemCapExceeded { cap },
890            );
891            return Err(DeliveryError::OversizePayload { byte_count, cap });
892        }
893        if let Err(reason) = self.engine.try_charge(byte_count) {
894            self.emit_app_ingress_error(
895                source(),
896                byte_count,
897                crate::bus::AppIngressErrorKind::BudgetExceeded {
898                    budget_remaining: reason.budget_remaining,
899                },
900            );
901            return Err(DeliveryError::BudgetExceeded {
902                byte_count,
903                budget_remaining: reason.budget_remaining,
904            });
905        }
906        let mut owned: Vec<u8> = Vec::new();
907        // Route through `crate::fallible::try_reserve_exact` so the
908        // thread-local fault seam (used by `tests/fallible_ingress.rs`)
909        // intercepts the boundary reservation.
910        if crate::fallible::try_reserve_exact(&mut owned, byte_count).is_err() {
911            // Release the budget charge before surfacing the alloc
912            // failure — the bytes never made it past the boundary.
913            self.engine.release(byte_count);
914            self.emit_app_ingress_error(
915                source(),
916                byte_count,
917                crate::bus::AppIngressErrorKind::AllocationFailed {
918                    reason: crate::bus::AllocFailReason::HeapExhausted,
919                },
920            );
921            return Err(DeliveryError::AllocationFailed {
922                byte_count,
923                reason: crate::bus::AllocFailReason::HeapExhausted,
924            });
925        }
926        owned.extend_from_slice(value_bytes);
927        self.engine
928            .ingress
929            .push(crate::ingress::IngressEvent::AppEvent {
930                module_name: module.to_string(),
931                input_name: input.to_string(),
932                value_bytes: owned,
933            })
934            .map_err(|_| {
935                // Ingress queue closed — release the charge so the
936                // counter does not leak the rejected push.
937                self.engine.release(byte_count);
938                DeliveryError::IngressClosed
939            })
940    }
941
942    /// Invoke a Module with the given pre-encoded inputs.
943    ///
944    /// Inputs cross as borrowed `&[(&str, &[u8])]` per Principle 1a;
945    /// the framework caps count + cumulative bytes, charges against
946    /// `ingress_byte_budget`, and per-input fallibly reserves +
947    /// copies into framework-owned `Vec<u8>` storage. Any failure
948    /// releases prior charges and emits the matching
949    /// `InfraEvent::AppIngressError`.
950    pub fn invoke(
951        &mut self,
952        module: &str,
953        inputs: &[(&str, &[u8])],
954    ) -> Result<crate::ids::ExecId, crate::errors::delivery::DeliveryError> {
955        if !self.module_index.contains_key(module) {
956            return Err(DeliveryError::UnknownModule(module.to_string()));
957        }
958        let input_count = inputs.len();
959        let input_cap = self.config.max_invoke_inputs;
960        let source = || crate::bus::AppIngressSource::Invoke {
961            module: module.to_string(),
962            input_count,
963        };
964        if input_count > input_cap {
965            // `byte_count` reports the requested input count for the
966            // count-cap rejection — the byte cap will surface
967            // separately below for cumulative-byte rejections.
968            self.emit_app_ingress_error(
969                source(),
970                input_count,
971                crate::bus::AppIngressErrorKind::PerItemCapExceeded { cap: input_cap },
972            );
973            return Err(DeliveryError::TooManyInputs {
974                count: input_count,
975                cap: input_cap,
976            });
977        }
978        let total_bytes: usize = inputs
979            .iter()
980            .fold(0usize, |acc, (_, b)| acc.saturating_add(b.len()));
981        let bytes_cap = self.config.max_invoke_bytes;
982        if total_bytes > bytes_cap {
983            self.emit_app_ingress_error(
984                source(),
985                total_bytes,
986                crate::bus::AppIngressErrorKind::PerItemCapExceeded { cap: bytes_cap },
987            );
988            return Err(DeliveryError::OversizePayload {
989                byte_count: total_bytes,
990                cap: bytes_cap,
991            });
992        }
993        if let Err(reason) = self.engine.try_charge(total_bytes) {
994            self.emit_app_ingress_error(
995                source(),
996                total_bytes,
997                crate::bus::AppIngressErrorKind::BudgetExceeded {
998                    budget_remaining: reason.budget_remaining,
999                },
1000            );
1001            return Err(DeliveryError::BudgetExceeded {
1002                byte_count: total_bytes,
1003                budget_remaining: reason.budget_remaining,
1004            });
1005        }
1006        // Per-input fallible reservation. Track how many bytes we've
1007        // already copied so a mid-loop failure can release the full
1008        // `total_bytes` charge in one shot (the slot-table never
1009        // observed any of these payloads).
1010        let mut owned: Vec<(String, Vec<u8>)> = Vec::new();
1011        if crate::fallible::try_reserve_exact(&mut owned, input_count).is_err() {
1012            self.engine.release(total_bytes);
1013            self.emit_app_ingress_error(
1014                source(),
1015                total_bytes,
1016                crate::bus::AppIngressErrorKind::AllocationFailed {
1017                    reason: crate::bus::AllocFailReason::HeapExhausted,
1018                },
1019            );
1020            return Err(DeliveryError::AllocationFailed {
1021                byte_count: total_bytes,
1022                reason: crate::bus::AllocFailReason::HeapExhausted,
1023            });
1024        }
1025        for (name, bytes) in inputs.iter() {
1026            let mut buf: Vec<u8> = Vec::new();
1027            if crate::fallible::try_reserve_exact(&mut buf, bytes.len()).is_err() {
1028                self.engine.release(total_bytes);
1029                self.emit_app_ingress_error(
1030                    source(),
1031                    total_bytes,
1032                    crate::bus::AppIngressErrorKind::AllocationFailed {
1033                        reason: crate::bus::AllocFailReason::HeapExhausted,
1034                    },
1035                );
1036                return Err(DeliveryError::AllocationFailed {
1037                    byte_count: total_bytes,
1038                    reason: crate::bus::AllocFailReason::HeapExhausted,
1039                });
1040            }
1041            buf.extend_from_slice(bytes);
1042            owned.push(((*name).to_string(), buf));
1043        }
1044        let exec_id = self.engine.allocate_exec_id();
1045        self.engine
1046            .ingress
1047            .push(crate::ingress::IngressEvent::Invoke {
1048                module_name: module.to_string(),
1049                inputs: owned,
1050                exec_id,
1051            })
1052            .map_err(|_| {
1053                self.engine.release(total_bytes);
1054                DeliveryError::IngressClosed
1055            })?;
1056        Ok(exec_id)
1057    }
1058
1059    /// Publish a freshly-built [`crate::bus::InfraEvent::AppIngressError`]
1060    /// onto the in-Node bus. Internal helper used by the
1061    /// application-ingress entry points (`deliver_event`, `invoke`)
1062    /// to mirror their synchronous `DeliveryError` returns on the
1063    /// observer surface.
1064    fn emit_app_ingress_error(
1065        &mut self,
1066        source: crate::bus::AppIngressSource,
1067        byte_count: usize,
1068        kind: crate::bus::AppIngressErrorKind,
1069    ) {
1070        self.engine.bus.publish(crate::bus::NodeEvent::Infra(
1071            crate::bus::InfraEvent::AppIngressError {
1072                source,
1073                byte_count,
1074                kind,
1075            },
1076        ));
1077    }
1078}
1079
1080/// Snapshot the PeerGovernor's policy + health state.
1081fn capture_peer_governor(
1082    governor: &crate::framework::PeerGovernor,
1083) -> crate::snapshot::transient::PeerGovernorSnapshot {
1084    crate::snapshot::transient::PeerGovernorSnapshot {
1085        blocklist: governor.blocklist().iter().map(|p| p.to_bytes()).collect(),
1086        allowlist: governor
1087            .allowlist()
1088            .map(|s| s.iter().map(|p| p.to_bytes()).collect()),
1089        health: governor
1090            .iter_health()
1091            .map(|(p, h)| {
1092                (
1093                    p.to_bytes(),
1094                    h.consecutive_failures,
1095                    h.last_event_ns,
1096                    h.down,
1097                )
1098            })
1099            .collect(),
1100        failure_threshold: governor.failure_threshold(),
1101    }
1102}
1103
1104pub mod config;
1105pub mod derivation;
1106pub use config::{
1107    NodeConfig, DEFAULT_BUS_CAPACITY, DEFAULT_CYCLE_OP_BUDGET, DEFAULT_INGRESS_BYTE_BUDGET,
1108    DEFAULT_MAX_APP_EVENT_BYTES, DEFAULT_MAX_COMPLETION_RESULT_BYTES, DEFAULT_MAX_INVOKE_BYTES,
1109    DEFAULT_MAX_INVOKE_INPUTS, DEFAULT_MAX_OUTBOUND_QUEUE, DEFAULT_MAX_PENDING_ASYNC,
1110    EDGE_INGRESS_BYTE_BUDGET, EDGE_MAX_APP_EVENT_BYTES, EDGE_MAX_COMPLETION_RESULT_BYTES,
1111    EDGE_MAX_INVOKE_BYTES, EDGE_MAX_INVOKE_INPUTS,
1112};
1113
1114
1115#[cfg(test)]
1116#[path = "snapshot_fidelity_tests.rs"]
1117mod snapshot_fidelity_tests;
1118
1119#[cfg(test)]
1120#[path = "shared_model_tests.rs"]
1121mod shared_model_tests;