Skip to main content

bb_runtime/
bus.rs

1//! Typed in-Node event bus. Cross-Component signaling per
2//! `docs/ENGINE.md` §13.1.
3//!
4//! Components publish events via the bus; the engine's bus-event
5//! routing pass delivers them to subscribed Components.
6
7use std::collections::VecDeque;
8
9use crate::ids::{CommandId, ComponentRef, OpRef, PeerId};
10
11/// All events that may flow through the in-Node bus.
12#[derive(Clone, Debug)]
13pub enum NodeEvent {
14    /// Framework-emitted infrastructure events.
15    Infra(InfraEvent),
16    /// App-Op-emitted events.
17    App(AppEvent),
18}
19
20impl NodeEvent {
21    /// String discriminator used by [`crate::engine::Engine`]'s
22    /// bus-routing lookup against
23    /// `event_subscriptions: HashMap<String, Vec<NodeSiteId>>`.
24    ///
25    /// `Infra` events map to the variant name (e.g.
26    /// `"OpFailure"`); `App` events use the user-supplied topic so
27    /// `EventSource` subscribers can target a specific channel.
28    pub fn kind(&self) -> &str {
29        match self {
30            NodeEvent::Infra(InfraEvent::WireResponseLanded { .. }) => "WireResponseLanded",
31            NodeEvent::Infra(InfraEvent::OpFailure { .. }) => "OpFailure",
32            NodeEvent::Infra(InfraEvent::WireDecodeFailure { .. }) => "WireDecodeFailure",
33            NodeEvent::Infra(InfraEvent::WireReceiveError { .. }) => "WireReceiveError",
34            NodeEvent::Infra(InfraEvent::AppIngressError { .. }) => "AppIngressError",
35            NodeEvent::Infra(InfraEvent::BusOverflow { .. }) => "BusOverflow",
36            NodeEvent::Infra(InfraEvent::PeerResolveFailure { .. }) => "PeerResolveFailure",
37            NodeEvent::Infra(InfraEvent::PeerSuspect { .. }) => "PeerSuspect",
38            NodeEvent::Infra(InfraEvent::PeerDown { .. }) => "PeerDown",
39            NodeEvent::Infra(InfraEvent::PeerLive { .. }) => "PeerLive",
40            NodeEvent::Infra(InfraEvent::BackoffNoticeSent { .. }) => "BackoffNoticeSent",
41            NodeEvent::Infra(InfraEvent::SilentDropActive { .. }) => "SilentDropActive",
42            NodeEvent::App(AppEvent::Emit { name, .. }) => name.as_str(),
43            NodeEvent::App(AppEvent::Notify { name }) => name.as_str(),
44        }
45    }
46}
47
48/// Framework-emitted infrastructure events per
49/// `docs/ENGINE.md` §13.1.
50#[derive(Clone, Debug)]
51pub enum InfraEvent {
52    /// A previously-suspended wire-request's response landed.
53    WireResponseLanded {
54        /// The `CommandId` that the wire-request was suspended on.
55        cmd_id: CommandId,
56    },
57    /// An Op invocation failed.
58    OpFailure {
59        /// The Op that failed.
60        op_ref: OpRef,
61        /// The failure detail.
62        error: OpError,
63    },
64    /// An inbound wire envelope's payload could not be decoded -
65    /// the payload's wire-type hash didn't resolve, the bytes were
66    /// malformed, or the destination address parsing failed. The
67    /// engine drops the envelope's slot fill rather than writing
68    /// garbage into a slot; this event lets the host observe the
69    /// drop. Emitted by the inbound envelope router.
70    WireDecodeFailure {
71        /// Wire-type hash that the envelope advertised (0 if the
72        /// failure occurred before the hash could be read).
73        hash: u64,
74        /// Length of the offending payload, in bytes.
75        payload_size: usize,
76        /// Human-readable failure detail.
77        detail: String,
78    },
79    /// Per-fill failure on the wire-receive typed-decode path.
80    /// Distinct from [`InfraEvent::WireDecodeFailure`]
81    /// (envelope-level: malformed `dest_suffix` / header) -
82    /// `WireReceiveError` fires after the envelope has parsed and
83    /// an individual fill reached the decoder-registry lookup +
84    /// typed materialisation step. Other fills in the same
85    /// envelope continue to deliver (partial-delivery semantics).
86    WireReceiveError {
87        /// Sender of the failing envelope, if the wire layer was
88        /// able to identify them.
89        src_peer: Option<PeerId>,
90        /// Position of the failing fill within the envelope
91        /// (0-based). Other fills in the same envelope are still
92        /// delivered.
93        fill_index: u32,
94        /// The `type_hash` the sender stamped on the fill.
95        actual_hash: u64,
96        /// Bytes that did not deliver. Tracked for telemetry, NOT
97        /// for fallback decode - degrading to `BytesValue` is
98        /// exactly the silent type-loss path this surface closes.
99        payload_size: usize,
100        /// Which failure mode fired.
101        kind: WireReceiveErrorKind,
102    },
103    /// Application-side ingress failure - host pushed an
104    /// `AppEvent` / `Invoke` / async completion whose payload could
105    /// not enter engine state because allocation failed, the
106    /// engine-wide ingress byte budget was exhausted, or a per-item
107    /// cap rejected the request at the boundary. The offending
108    /// bytes are dropped; the engine continues processing other
109    /// ingress work. Audience: host / SDK author watching their
110    /// own push errors (distinct from wire-side
111    /// [`InfraEvent::WireReceiveError`]).
112    AppIngressError {
113        /// Which application-side entry point raised the failure
114        /// and the identity it carries (module/input name for
115        /// `AppEvent` / `Invoke`, `CommandId` for an async
116        /// completion).
117        source: AppIngressSource,
118        /// Bytes the boundary was asked to admit.
119        byte_count: usize,
120        /// Which failure mode fired.
121        kind: AppIngressErrorKind,
122    },
123    /// The typed bus dropped `count` oldest events to make room for
124    /// newer publishes when `NodeConfig.bus_capacity` was hit.
125    /// Emitted by the bus-routing pass if any drops accumulated
126    /// since the last poll.
127    BusOverflow {
128        /// Number of events FIFO-dropped since the last poll.
129        count: usize,
130    },
131    /// Routable telemetry mirror of
132    /// [`crate::engine::EngineStep::PeerResolveFailed`].
133    /// Surfaces via the bus to subscribers so dashboards can
134    /// monitor peer-resolution failures alongside `PeerBlocked` /
135    /// `PeerDown` / `PeerUp` from .
136    PeerResolveFailure {
137        /// The peer whose addresses could not be resolved. `None`
138        /// when the failing Send op had no parseable `peer` input.
139        peer: Option<PeerId>,
140        /// The Send op that failed to resolve.
141        op_ref: OpRef,
142    },
143    /// φ-accrual failure detector crossed the suspect threshold
144    /// for the named logical site. Components (gossip overlays,
145    /// peer-sampling services, deadline planners) subscribe to react.
146    PeerSuspect {
147        /// Suspect logical site.
148        site: crate::ids::NodeSiteId,
149        /// Current φ value (informational; subscribers can ignore).
150        phi: f64,
151    },
152    /// φ-accrual failure detector crossed the hard-down threshold
153    /// for the named logical site.
154    PeerDown {
155        /// Down logical site.
156        site: crate::ids::NodeSiteId,
157        /// Current φ value.
158        phi: f64,
159    },
160    /// φ collapsed back below the suspect threshold after a
161    /// `PeerSuspect` or `PeerDown` was emitted. Lets subscribers
162    /// reinstate the peer.
163    PeerLive {
164        /// Recovered logical site.
165        site: crate::ids::NodeSiteId,
166    },
167    /// A `BackoffNotice` envelope was emitted to `peer`. Surfaces
168    /// the local overload decision on the bus so ops dashboards +
169    /// Component authors who want to react to local overload can
170    /// subscribe.
171    BackoffNoticeSent {
172        /// The sender the receiver asked to slow down.
173        peer: PeerId,
174        /// Why the receiver requested back-off.
175        cause: crate::framework::BackoffCause,
176        /// `min_backoff_ns` quoted on the notice.
177        min_backoff_ns: u64,
178    },
179    /// `peer` crossed the K-notices-without-recovery threshold;
180    /// subsequent envelopes from that peer are dropped silently at
181    /// the inbound boundary until the peer recovers. Emitted once
182    /// per silent-drop transition; the recovery path reuses the
183    /// existing `PeerLive` event.
184    SilentDropActive {
185        /// The sender now in silent-drop mode.
186        peer: PeerId,
187    },
188}
189
190/// Sub-kind discriminator for [`InfraEvent::WireReceiveError`].
191/// One top-level variant + an enum sub-kind keeps the bus-topic
192/// count down and lets subscribers route on the variant while
193/// matching the sub-kind out of the variant fields - the
194/// `PeerSuspect`/`PeerDown`/`PeerLive` triple pattern is for
195/// distinct lifecycle events; these three share lifecycle (one
196/// fill, one decode step) and audience (wire-payload integrity).
197#[derive(Clone, Debug, PartialEq, Eq)]
198pub enum WireReceiveErrorKind {
199    /// No decoder is registered for `actual_hash`. The sender
200    /// shipped a value whose concrete type is unknown to this
201    /// Node's inventory - either a version skew (sender has a
202    /// carrier the receiver hasn't compiled in) or a malicious /
203    /// fuzzed envelope.
204    UnknownTypeHash,
205    /// Destination slot carries a compile-time wire-type
206    /// assertion (`expected_hash`) and the fill's `actual_hash`
207    /// does not match.
208    TypeMismatch {
209        /// The `type_hash` the destination slot's compile-time
210        /// metadata declared.
211        expected_hash: u64,
212    },
213    /// Decoder ran and returned `Err` - the bytes were not a
214    /// valid encoding of the advertised type.
215    DecodeFailed {
216        /// Human-readable underlying error from the registered
217        /// decoder (typically a `bincode::Error::to_string`).
218        error_summary: String,
219    },
220    /// The framework-owned scratch buffer could not be reserved
221    /// before decode - heap allocation failed or a per-item cap
222    /// rejected the request. Emitted by the
223    /// `Engine::decode_typed_fill` boundary on `Vec::try_reserve_exact`
224    /// failure or before the prost decode runs when the fill's
225    /// payload length exceeds `EnvelopeCaps::max_per_fill_bytes`.
226    AllocationFailed {
227        /// Number of bytes the boundary tried to reserve.
228        byte_count: usize,
229        /// Why the reservation failed.
230        reason: AllocFailReason,
231    },
232    /// Admitting this fill's payload would push the engine over
233    /// `NodeConfig::ingress_byte_budget`. The fill is dropped; the
234    /// envelope's other fills continue to deliver.
235    BudgetExceeded {
236        /// Bytes the fill would have added to the in-flight budget.
237        byte_count: usize,
238        /// Bytes still available under `NodeConfig::ingress_byte_budget`
239        /// at the time of the rejection.
240        budget_remaining: usize,
241    },
242    /// The destination slot is bound to a `Backend` role and the
243    /// backend's `materialize_from_wire` impl returned `Err`. The
244    /// engine drops the fill, releases the byte charge, and emits
245    /// this event so operators can see which backend rejected
246    /// inbound payloads. Distinct from
247    /// [`WireReceiveErrorKind::DecodeFailed`] (framework-side
248    /// registry decoder failure).
249    BackendMaterializeFailed {
250        /// `ComponentRef` of the destination slot's bound backend.
251        backend_ref: ComponentRef,
252        /// Short `Display` of the backend's typed error.
253        backend_error_summary: String,
254    },
255}
256
257/// Why a fallible-allocation boundary refused to admit bytes into
258/// engine state. Carried by [`WireReceiveErrorKind::AllocationFailed`]
259/// and [`AppIngressErrorKind::AllocationFailed`].
260#[derive(Clone, Debug, PartialEq, Eq)]
261pub enum AllocFailReason {
262    /// `Vec::try_reserve_exact` (or equivalent fallible-allocator
263    /// call) returned `TryReserveError`. The host's allocator has
264    /// no headroom for the request.
265    HeapExhausted,
266    /// A caller-side per-item cap (e.g.
267    /// `EnvelopeCaps::max_per_fill_bytes`,
268    /// `NodeConfig::max_app_event_bytes`,
269    /// `NodeConfig::max_invoke_bytes`) rejected the request before
270    /// any allocation was attempted.
271    PerItemCapExceeded {
272        /// The cap value the boundary enforced.
273        cap: usize,
274    },
275}
276
277/// Application-side entry point that raised an
278/// [`InfraEvent::AppIngressError`]. Carries the identity the host
279/// referenced when the failure occurred so subscribers can correlate
280/// the bus event with the original push call.
281#[derive(Clone, Debug, PartialEq, Eq)]
282pub enum AppIngressSource {
283    /// `Node::deliver_event(module, input, value_bytes)` failed at
284    /// the boundary.
285    AppEvent {
286        /// Target module name (the host's `module` argument).
287        module: String,
288        /// Target input slot name on that module.
289        input: String,
290    },
291    /// `Node::invoke(module, inputs)` failed at the boundary.
292    Invoke {
293        /// Target module name.
294        module: String,
295        /// Number of `(name, bytes)` inputs the host attempted to
296        /// admit. Lets a subscriber distinguish a cap-by-count
297        /// rejection (`max_invoke_inputs`) from a
298        /// cap-by-bytes rejection (`max_invoke_bytes`).
299        input_count: usize,
300    },
301    /// `CompletionSink::complete(cmd, ...)` or `::fail(cmd, ...)`
302    /// failed at the boundary. The `CommandId` identifies the
303    /// pending async operation the host was attempting to settle.
304    Completion {
305        /// The pending command the completion targeted.
306        command: CommandId,
307    },
308}
309
310/// Sub-kind discriminator for [`InfraEvent::AppIngressError`]. One
311/// top-level variant + sub-kind keeps the bus-topic count down -
312/// subscribers route on the variant name and match the sub-kind out
313/// of the event's fields when they need to distinguish allocation
314/// failure from budget exhaustion from a per-item cap.
315#[derive(Clone, Debug, PartialEq, Eq)]
316pub enum AppIngressErrorKind {
317    /// Fallible reservation of the engine-side ingress buffer
318    /// returned `TryReserveError` or hit a per-item cap before
319    /// allocation was attempted.
320    AllocationFailed {
321        /// Why the reservation failed.
322        reason: AllocFailReason,
323    },
324    /// Admitting this payload would push the engine over
325    /// `NodeConfig::ingress_byte_budget`.
326    BudgetExceeded {
327        /// Bytes still available under `NodeConfig::ingress_byte_budget`
328        /// at the time of the rejection.
329        budget_remaining: usize,
330    },
331    /// A per-item cap (`max_app_event_bytes`, `max_invoke_inputs`,
332    /// `max_invoke_bytes`, or `max_completion_result_bytes`) rejected
333    /// the payload before any allocation was attempted. The host's
334    /// synchronous `deliver_event` / `invoke` call returns
335    /// `DeliveryError::OversizePayload` in addition to this bus
336    /// emission.
337    PerItemCapExceeded {
338        /// The cap value the boundary enforced.
339        cap: usize,
340    },
341}
342
343/// App-Op-emitted events surfaced as `EngineStep::AppEvent`.
344///
345/// Application code constructs these via [`AppEvent::emit`] /
346/// [`AppEvent::notify`] which reject framework-reserved topic
347/// prefixes (`bb.`, `ai.bytesandbrains.`) so the framework's own
348/// `InfraEvent` topic namespace can never be impersonated by user
349/// publish calls (closes the trust-boundary requirement
350/// in §Trust boundary). The struct variants stay constructible
351/// directly for framework-internal forwarding paths that are not
352/// crossing the application boundary.
353#[derive(Clone, Debug)]
354pub enum AppEvent {
355    /// `AppEmit(name, value_bytes)` - full app event carrying bytes.
356    Emit {
357        /// Event topic.
358        name: String,
359        /// Encoded value payload.
360        value_bytes: Vec<u8>,
361    },
362    /// `AppNotify(name)` - marker-only notification.
363    Notify {
364        /// Event topic.
365        name: String,
366    },
367}
368
369/// Reserved topic prefix #1 — framework-emitted infra topics.
370const RESERVED_PREFIX_BB: &str = "bb.";
371
372/// Reserved topic prefix #2 — framework-namespaced metadata keys.
373const RESERVED_PREFIX_FRAMEWORK: &str = "ai.bytesandbrains.";
374
375/// Reasons an [`AppEvent::emit`] / [`AppEvent::notify`] construction
376/// can fail.
377#[derive(Clone, Debug, PartialEq, Eq)]
378pub enum AppEventError {
379    /// Topic begins with one of the framework-reserved prefixes
380    /// (`bb.` or `ai.bytesandbrains.`) — only the framework's own
381    /// `InfraEvent` may publish under those prefixes.
382    ReservedPrefix {
383        /// The offending topic the application tried to publish.
384        topic: String,
385        /// Which prefix it collided with.
386        prefix: &'static str,
387    },
388}
389
390impl std::fmt::Display for AppEventError {
391    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392        match self {
393            Self::ReservedPrefix { topic, prefix } => write!(
394                f,
395                "AppEvent topic `{topic}` collides with the framework-reserved prefix `{prefix}`",
396            ),
397        }
398    }
399}
400
401impl std::error::Error for AppEventError {}
402
403impl AppEvent {
404    /// Construct an [`AppEvent::Emit`], rejecting framework-reserved
405    /// topic prefixes. The trust boundary `AppEvent::new` referenced
406    /// in `docs-plan/CORRECTED_ARCHITECTURE.md` §Trust boundary.
407    pub fn emit(name: impl Into<String>, value_bytes: Vec<u8>) -> Result<Self, AppEventError> {
408        let name = name.into();
409        check_reserved(&name)?;
410        Ok(Self::Emit { name, value_bytes })
411    }
412
413    /// Construct an [`AppEvent::Notify`], rejecting framework-
414    /// reserved topic prefixes. See [`Self::emit`].
415    pub fn notify(name: impl Into<String>) -> Result<Self, AppEventError> {
416        let name = name.into();
417        check_reserved(&name)?;
418        Ok(Self::Notify { name })
419    }
420}
421
422fn check_reserved(topic: &str) -> Result<(), AppEventError> {
423    for prefix in [RESERVED_PREFIX_BB, RESERVED_PREFIX_FRAMEWORK] {
424        if topic.starts_with(prefix) {
425            return Err(AppEventError::ReservedPrefix {
426                topic: topic.to_string(),
427                prefix,
428            });
429        }
430    }
431    Ok(())
432}
433
434/// Op invocation failure kind. a stable categorical
435/// label consumers match on for retry / report / drop policy
436/// decisions without parsing the freeform `OpError::detail`.
437#[derive(Clone, Copy, Debug, PartialEq, Eq)]
438pub enum OpErrorKind {
439    /// Input slot value didn't match the expected concrete type
440    /// (typed downcast failed in a dispatch arm).
441    TypeMismatch,
442    /// A required slot was absent at dispatch time.
443    MissingSlot,
444    /// The dispatched op has no registered handler.
445    NotRegistered,
446    /// The op handler ran but its work failed (numeric, IO,
447    /// inventory, etc.). The detail string carries specifics.
448    ExecutionFailed,
449    /// An off-thread completion handle returned an error (the
450    /// user's Contract method's `Error` type produced this via
451    /// `ContractResponse::Now(Err)` or `CompletionHandle::fail`).
452    RemoteFailed,
453    /// The op's deadline elapsed before completion landed.
454    Timeout,
455    /// Adversarial / malformed input from a peer.
456    BadInput,
457    /// Peer-health / backoff gate held the op (transient retry-
458    /// eligible failure).
459    Cooldown,
460    /// Catch-all for failures that don't fit a more specific kind.
461    /// Default for call sites that only carry a `detail` string.
462    Other,
463}
464
465impl Default for OpErrorKind {
466    fn default() -> Self {
467        Self::Other
468    }
469}
470
471/// Op invocation failure detail. Surfaced by the engine when
472/// `dispatch_atomic` returns `Err` or the dispatch table lookup
473/// misses. Three-field shape: `kind` is a stable categorical label,
474/// `reason` is a `&'static str` (e.g. `"blocklisted"`, `"cooldown"`)
475/// callers can match on, `detail` is a free-form human-readable
476/// description.
477#[derive(Clone, Debug, Default)]
478pub struct OpError {
479    /// Categorical kind. Default `Other`.
480    pub kind: OpErrorKind,
481    /// Stable diagnostic label (e.g. `"blocklisted"`, `"cooldown"`,
482    /// `"nak"`) consumers match on. Default `""`.
483    pub reason: &'static str,
484    /// Human-readable failure detail.
485    pub detail: String,
486}
487
488impl std::fmt::Display for OpError {
489    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490        if self.reason.is_empty() {
491            write!(f, "op error[{:?}]: {}", self.kind, self.detail)
492        } else {
493            write!(
494                f,
495                "op error[{:?} reason={}]: {}",
496                self.kind, self.reason, self.detail,
497            )
498        }
499    }
500}
501
502impl std::error::Error for OpError {}
503
504/// Typed in-Node event bus. Carries published events from one
505/// poll cycle to the next; the engine's bus-routing pass drains
506/// the queue and routes to subscribed `NodeSiteId`s.
507///
508/// Bounded by `NodeConfig.bus_capacity` (default 1024). When a
509/// publish would exceed the cap, the oldest event is FIFO-dropped
510/// and a counter increments. The routing pass reads the counter
511/// and emits `InfraEvent::BusOverflow { count }` so the host sees
512/// the loss.
513#[derive(Default)]
514pub struct TypedBus {
515    queue: VecDeque<NodeEvent>,
516    cap: Option<usize>,
517    dropped_since_last_drain: usize,
518}
519
520impl TypedBus {
521    /// Construct a fresh empty bus with no cap.
522    pub fn new() -> Self {
523        Self::default()
524    }
525
526    /// Construct a fresh empty bus with the given FIFO-drop cap.
527    pub fn with_cap(cap: Option<usize>) -> Self {
528        Self {
529            queue: VecDeque::new(),
530            cap,
531            dropped_since_last_drain: 0,
532        }
533    }
534
535    /// Set the FIFO-drop cap. `None` removes the cap.
536    pub fn set_cap(&mut self, cap: Option<usize>) {
537        self.cap = cap;
538    }
539
540    /// Publish an event. The engine's bus-routing pass delivers
541    /// published events to subscribed Components in the next poll
542    /// cycle.
543    pub fn publish(&mut self, event: NodeEvent) {
544        if let Some(cap) = self.cap {
545            while self.queue.len() >= cap {
546                self.queue.pop_front();
547                self.dropped_since_last_drain += 1;
548            }
549        }
550        self.queue.push_back(event);
551    }
552
553    /// Drain all queued events. Called by the engine's bus-routing pass.
554    pub fn drain(&mut self) -> Vec<NodeEvent> {
555        self.queue.drain(..).collect()
556    }
557
558    /// Read + reset the count of FIFO-dropped events since the last
559    /// call. Returns 0 when no drops occurred.
560    pub fn take_dropped_count(&mut self) -> usize {
561        std::mem::take(&mut self.dropped_since_last_drain)
562    }
563
564    /// `true` when the bus has no queued events.
565    pub fn is_empty(&self) -> bool {
566        self.queue.is_empty()
567    }
568
569    /// Number of queued events.
570    pub fn len(&self) -> usize {
571        self.queue.len()
572    }
573}
574