bb_runtime/bus.rs
1//! Typed in-Node event bus. Cross-Component signaling per
2//! `docs/ENGINE.md` §13.1.
3//!
4//! Components publish events via the bus; the engine's bus-event
5//! routing pass delivers them to subscribed Components.
6
7use std::collections::VecDeque;
8
9use crate::ids::{CommandId, ComponentRef, OpRef, PeerId};
10
11/// All events that may flow through the in-Node bus.
12#[derive(Clone, Debug)]
13pub enum NodeEvent {
14 /// Framework-emitted infrastructure events.
15 Infra(InfraEvent),
16 /// App-Op-emitted events.
17 App(AppEvent),
18}
19
20impl NodeEvent {
21 /// String discriminator used by [`crate::engine::Engine`]'s
22 /// bus-routing lookup against
23 /// `event_subscriptions: HashMap<String, Vec<NodeSiteId>>`.
24 ///
25 /// `Infra` events map to the variant name (e.g.
26 /// `"OpFailure"`); `App` events use the user-supplied topic so
27 /// `EventSource` subscribers can target a specific channel.
28 pub fn kind(&self) -> &str {
29 match self {
30 NodeEvent::Infra(InfraEvent::WireResponseLanded { .. }) => "WireResponseLanded",
31 NodeEvent::Infra(InfraEvent::OpFailure { .. }) => "OpFailure",
32 NodeEvent::Infra(InfraEvent::WireDecodeFailure { .. }) => "WireDecodeFailure",
33 NodeEvent::Infra(InfraEvent::WireReceiveError { .. }) => "WireReceiveError",
34 NodeEvent::Infra(InfraEvent::AppIngressError { .. }) => "AppIngressError",
35 NodeEvent::Infra(InfraEvent::BusOverflow { .. }) => "BusOverflow",
36 NodeEvent::Infra(InfraEvent::PeerResolveFailure { .. }) => "PeerResolveFailure",
37 NodeEvent::Infra(InfraEvent::PeerSuspect { .. }) => "PeerSuspect",
38 NodeEvent::Infra(InfraEvent::PeerDown { .. }) => "PeerDown",
39 NodeEvent::Infra(InfraEvent::PeerLive { .. }) => "PeerLive",
40 NodeEvent::Infra(InfraEvent::BackoffNoticeSent { .. }) => "BackoffNoticeSent",
41 NodeEvent::Infra(InfraEvent::SilentDropActive { .. }) => "SilentDropActive",
42 NodeEvent::App(AppEvent::Emit { name, .. }) => name.as_str(),
43 NodeEvent::App(AppEvent::Notify { name }) => name.as_str(),
44 }
45 }
46}
47
48/// Framework-emitted infrastructure events per
49/// `docs/ENGINE.md` §13.1.
50#[derive(Clone, Debug)]
51pub enum InfraEvent {
52 /// A previously-suspended wire-request's response landed.
53 WireResponseLanded {
54 /// The `CommandId` that the wire-request was suspended on.
55 cmd_id: CommandId,
56 },
57 /// An Op invocation failed.
58 OpFailure {
59 /// The Op that failed.
60 op_ref: OpRef,
61 /// The failure detail.
62 error: OpError,
63 },
64 /// An inbound wire envelope's payload could not be decoded -
65 /// the payload's wire-type hash didn't resolve, the bytes were
66 /// malformed, or the destination address parsing failed. The
67 /// engine drops the envelope's slot fill rather than writing
68 /// garbage into a slot; this event lets the host observe the
69 /// drop. Emitted by the inbound envelope router.
70 WireDecodeFailure {
71 /// Wire-type hash that the envelope advertised (0 if the
72 /// failure occurred before the hash could be read).
73 hash: u64,
74 /// Length of the offending payload, in bytes.
75 payload_size: usize,
76 /// Human-readable failure detail.
77 detail: String,
78 },
79 /// Per-fill failure on the wire-receive typed-decode path.
80 /// Distinct from [`InfraEvent::WireDecodeFailure`]
81 /// (envelope-level: malformed `dest_suffix` / header) -
82 /// `WireReceiveError` fires after the envelope has parsed and
83 /// an individual fill reached the decoder-registry lookup +
84 /// typed materialisation step. Other fills in the same
85 /// envelope continue to deliver (partial-delivery semantics).
86 WireReceiveError {
87 /// Sender of the failing envelope, if the wire layer was
88 /// able to identify them.
89 src_peer: Option<PeerId>,
90 /// Position of the failing fill within the envelope
91 /// (0-based). Other fills in the same envelope are still
92 /// delivered.
93 fill_index: u32,
94 /// The `type_hash` the sender stamped on the fill.
95 actual_hash: u64,
96 /// Bytes that did not deliver. Tracked for telemetry, NOT
97 /// for fallback decode - degrading to `BytesValue` is
98 /// exactly the silent type-loss path this surface closes.
99 payload_size: usize,
100 /// Which failure mode fired.
101 kind: WireReceiveErrorKind,
102 },
103 /// Application-side ingress failure - host pushed an
104 /// `AppEvent` / `Invoke` / async completion whose payload could
105 /// not enter engine state because allocation failed, the
106 /// engine-wide ingress byte budget was exhausted, or a per-item
107 /// cap rejected the request at the boundary. The offending
108 /// bytes are dropped; the engine continues processing other
109 /// ingress work. Audience: host / SDK author watching their
110 /// own push errors (distinct from wire-side
111 /// [`InfraEvent::WireReceiveError`]).
112 AppIngressError {
113 /// Which application-side entry point raised the failure
114 /// and the identity it carries (module/input name for
115 /// `AppEvent` / `Invoke`, `CommandId` for an async
116 /// completion).
117 source: AppIngressSource,
118 /// Bytes the boundary was asked to admit.
119 byte_count: usize,
120 /// Which failure mode fired.
121 kind: AppIngressErrorKind,
122 },
123 /// The typed bus dropped `count` oldest events to make room for
124 /// newer publishes when `NodeConfig.bus_capacity` was hit.
125 /// Emitted by the bus-routing pass if any drops accumulated
126 /// since the last poll.
127 BusOverflow {
128 /// Number of events FIFO-dropped since the last poll.
129 count: usize,
130 },
131 /// Routable telemetry mirror of
132 /// [`crate::engine::EngineStep::PeerResolveFailed`].
133 /// Surfaces via the bus to subscribers so dashboards can
134 /// monitor peer-resolution failures alongside `PeerBlocked` /
135 /// `PeerDown` / `PeerUp` from .
136 PeerResolveFailure {
137 /// The peer whose addresses could not be resolved. `None`
138 /// when the failing Send op had no parseable `peer` input.
139 peer: Option<PeerId>,
140 /// The Send op that failed to resolve.
141 op_ref: OpRef,
142 },
143 /// φ-accrual failure detector crossed the suspect threshold
144 /// for the named logical site. Components (gossip overlays,
145 /// peer-sampling services, deadline planners) subscribe to react.
146 PeerSuspect {
147 /// Suspect logical site.
148 site: crate::ids::NodeSiteId,
149 /// Current φ value (informational; subscribers can ignore).
150 phi: f64,
151 },
152 /// φ-accrual failure detector crossed the hard-down threshold
153 /// for the named logical site.
154 PeerDown {
155 /// Down logical site.
156 site: crate::ids::NodeSiteId,
157 /// Current φ value.
158 phi: f64,
159 },
160 /// φ collapsed back below the suspect threshold after a
161 /// `PeerSuspect` or `PeerDown` was emitted. Lets subscribers
162 /// reinstate the peer.
163 PeerLive {
164 /// Recovered logical site.
165 site: crate::ids::NodeSiteId,
166 },
167 /// A `BackoffNotice` envelope was emitted to `peer`. Surfaces
168 /// the local overload decision on the bus so ops dashboards +
169 /// Component authors who want to react to local overload can
170 /// subscribe.
171 BackoffNoticeSent {
172 /// The sender the receiver asked to slow down.
173 peer: PeerId,
174 /// Why the receiver requested back-off.
175 cause: crate::framework::BackoffCause,
176 /// `min_backoff_ns` quoted on the notice.
177 min_backoff_ns: u64,
178 },
179 /// `peer` crossed the K-notices-without-recovery threshold;
180 /// subsequent envelopes from that peer are dropped silently at
181 /// the inbound boundary until the peer recovers. Emitted once
182 /// per silent-drop transition; the recovery path reuses the
183 /// existing `PeerLive` event.
184 SilentDropActive {
185 /// The sender now in silent-drop mode.
186 peer: PeerId,
187 },
188}
189
190/// Sub-kind discriminator for [`InfraEvent::WireReceiveError`].
191/// One top-level variant + an enum sub-kind keeps the bus-topic
192/// count down and lets subscribers route on the variant while
193/// matching the sub-kind out of the variant fields - the
194/// `PeerSuspect`/`PeerDown`/`PeerLive` triple pattern is for
195/// distinct lifecycle events; these three share lifecycle (one
196/// fill, one decode step) and audience (wire-payload integrity).
197#[derive(Clone, Debug, PartialEq, Eq)]
198pub enum WireReceiveErrorKind {
199 /// No decoder is registered for `actual_hash`. The sender
200 /// shipped a value whose concrete type is unknown to this
201 /// Node's inventory - either a version skew (sender has a
202 /// carrier the receiver hasn't compiled in) or a malicious /
203 /// fuzzed envelope.
204 UnknownTypeHash,
205 /// Destination slot carries a compile-time wire-type
206 /// assertion (`expected_hash`) and the fill's `actual_hash`
207 /// does not match.
208 TypeMismatch {
209 /// The `type_hash` the destination slot's compile-time
210 /// metadata declared.
211 expected_hash: u64,
212 },
213 /// Decoder ran and returned `Err` - the bytes were not a
214 /// valid encoding of the advertised type.
215 DecodeFailed {
216 /// Human-readable underlying error from the registered
217 /// decoder (typically a `bincode::Error::to_string`).
218 error_summary: String,
219 },
220 /// The framework-owned scratch buffer could not be reserved
221 /// before decode - heap allocation failed or a per-item cap
222 /// rejected the request. Emitted by the
223 /// `Engine::decode_typed_fill` boundary on `Vec::try_reserve_exact`
224 /// failure or before the prost decode runs when the fill's
225 /// payload length exceeds `EnvelopeCaps::max_per_fill_bytes`.
226 AllocationFailed {
227 /// Number of bytes the boundary tried to reserve.
228 byte_count: usize,
229 /// Why the reservation failed.
230 reason: AllocFailReason,
231 },
232 /// Admitting this fill's payload would push the engine over
233 /// `NodeConfig::ingress_byte_budget`. The fill is dropped; the
234 /// envelope's other fills continue to deliver.
235 BudgetExceeded {
236 /// Bytes the fill would have added to the in-flight budget.
237 byte_count: usize,
238 /// Bytes still available under `NodeConfig::ingress_byte_budget`
239 /// at the time of the rejection.
240 budget_remaining: usize,
241 },
242 /// The destination slot is bound to a `Backend` role and the
243 /// backend's `materialize_from_wire` impl returned `Err`. The
244 /// engine drops the fill, releases the byte charge, and emits
245 /// this event so operators can see which backend rejected
246 /// inbound payloads. Distinct from
247 /// [`WireReceiveErrorKind::DecodeFailed`] (framework-side
248 /// registry decoder failure).
249 BackendMaterializeFailed {
250 /// `ComponentRef` of the destination slot's bound backend.
251 backend_ref: ComponentRef,
252 /// Short `Display` of the backend's typed error.
253 backend_error_summary: String,
254 },
255}
256
257/// Why a fallible-allocation boundary refused to admit bytes into
258/// engine state. Carried by [`WireReceiveErrorKind::AllocationFailed`]
259/// and [`AppIngressErrorKind::AllocationFailed`].
260#[derive(Clone, Debug, PartialEq, Eq)]
261pub enum AllocFailReason {
262 /// `Vec::try_reserve_exact` (or equivalent fallible-allocator
263 /// call) returned `TryReserveError`. The host's allocator has
264 /// no headroom for the request.
265 HeapExhausted,
266 /// A caller-side per-item cap (e.g.
267 /// `EnvelopeCaps::max_per_fill_bytes`,
268 /// `NodeConfig::max_app_event_bytes`,
269 /// `NodeConfig::max_invoke_bytes`) rejected the request before
270 /// any allocation was attempted.
271 PerItemCapExceeded {
272 /// The cap value the boundary enforced.
273 cap: usize,
274 },
275}
276
277/// Application-side entry point that raised an
278/// [`InfraEvent::AppIngressError`]. Carries the identity the host
279/// referenced when the failure occurred so subscribers can correlate
280/// the bus event with the original push call.
281#[derive(Clone, Debug, PartialEq, Eq)]
282pub enum AppIngressSource {
283 /// `Node::deliver_event(module, input, value_bytes)` failed at
284 /// the boundary.
285 AppEvent {
286 /// Target module name (the host's `module` argument).
287 module: String,
288 /// Target input slot name on that module.
289 input: String,
290 },
291 /// `Node::invoke(module, inputs)` failed at the boundary.
292 Invoke {
293 /// Target module name.
294 module: String,
295 /// Number of `(name, bytes)` inputs the host attempted to
296 /// admit. Lets a subscriber distinguish a cap-by-count
297 /// rejection (`max_invoke_inputs`) from a
298 /// cap-by-bytes rejection (`max_invoke_bytes`).
299 input_count: usize,
300 },
301 /// `CompletionSink::complete(cmd, ...)` or `::fail(cmd, ...)`
302 /// failed at the boundary. The `CommandId` identifies the
303 /// pending async operation the host was attempting to settle.
304 Completion {
305 /// The pending command the completion targeted.
306 command: CommandId,
307 },
308}
309
310/// Sub-kind discriminator for [`InfraEvent::AppIngressError`]. One
311/// top-level variant + sub-kind keeps the bus-topic count down -
312/// subscribers route on the variant name and match the sub-kind out
313/// of the event's fields when they need to distinguish allocation
314/// failure from budget exhaustion from a per-item cap.
315#[derive(Clone, Debug, PartialEq, Eq)]
316pub enum AppIngressErrorKind {
317 /// Fallible reservation of the engine-side ingress buffer
318 /// returned `TryReserveError` or hit a per-item cap before
319 /// allocation was attempted.
320 AllocationFailed {
321 /// Why the reservation failed.
322 reason: AllocFailReason,
323 },
324 /// Admitting this payload would push the engine over
325 /// `NodeConfig::ingress_byte_budget`.
326 BudgetExceeded {
327 /// Bytes still available under `NodeConfig::ingress_byte_budget`
328 /// at the time of the rejection.
329 budget_remaining: usize,
330 },
331 /// A per-item cap (`max_app_event_bytes`, `max_invoke_inputs`,
332 /// `max_invoke_bytes`, or `max_completion_result_bytes`) rejected
333 /// the payload before any allocation was attempted. The host's
334 /// synchronous `deliver_event` / `invoke` call returns
335 /// `DeliveryError::OversizePayload` in addition to this bus
336 /// emission.
337 PerItemCapExceeded {
338 /// The cap value the boundary enforced.
339 cap: usize,
340 },
341}
342
343/// App-Op-emitted events surfaced as `EngineStep::AppEvent`.
344///
345/// Application code constructs these via [`AppEvent::emit`] /
346/// [`AppEvent::notify`] which reject framework-reserved topic
347/// prefixes (`bb.`, `ai.bytesandbrains.`) so the framework's own
348/// `InfraEvent` topic namespace can never be impersonated by user
349/// publish calls (closes the trust-boundary requirement
350/// in §Trust boundary). The struct variants stay constructible
351/// directly for framework-internal forwarding paths that are not
352/// crossing the application boundary.
353#[derive(Clone, Debug)]
354pub enum AppEvent {
355 /// `AppEmit(name, value_bytes)` - full app event carrying bytes.
356 Emit {
357 /// Event topic.
358 name: String,
359 /// Encoded value payload.
360 value_bytes: Vec<u8>,
361 },
362 /// `AppNotify(name)` - marker-only notification.
363 Notify {
364 /// Event topic.
365 name: String,
366 },
367}
368
369/// Reserved topic prefix #1 — framework-emitted infra topics.
370const RESERVED_PREFIX_BB: &str = "bb.";
371
372/// Reserved topic prefix #2 — framework-namespaced metadata keys.
373const RESERVED_PREFIX_FRAMEWORK: &str = "ai.bytesandbrains.";
374
375/// Reasons an [`AppEvent::emit`] / [`AppEvent::notify`] construction
376/// can fail.
377#[derive(Clone, Debug, PartialEq, Eq)]
378pub enum AppEventError {
379 /// Topic begins with one of the framework-reserved prefixes
380 /// (`bb.` or `ai.bytesandbrains.`) — only the framework's own
381 /// `InfraEvent` may publish under those prefixes.
382 ReservedPrefix {
383 /// The offending topic the application tried to publish.
384 topic: String,
385 /// Which prefix it collided with.
386 prefix: &'static str,
387 },
388}
389
390impl std::fmt::Display for AppEventError {
391 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392 match self {
393 Self::ReservedPrefix { topic, prefix } => write!(
394 f,
395 "AppEvent topic `{topic}` collides with the framework-reserved prefix `{prefix}`",
396 ),
397 }
398 }
399}
400
401impl std::error::Error for AppEventError {}
402
403impl AppEvent {
404 /// Construct an [`AppEvent::Emit`], rejecting framework-reserved
405 /// topic prefixes. The trust boundary `AppEvent::new` referenced
406 /// in `docs-plan/CORRECTED_ARCHITECTURE.md` §Trust boundary.
407 pub fn emit(name: impl Into<String>, value_bytes: Vec<u8>) -> Result<Self, AppEventError> {
408 let name = name.into();
409 check_reserved(&name)?;
410 Ok(Self::Emit { name, value_bytes })
411 }
412
413 /// Construct an [`AppEvent::Notify`], rejecting framework-
414 /// reserved topic prefixes. See [`Self::emit`].
415 pub fn notify(name: impl Into<String>) -> Result<Self, AppEventError> {
416 let name = name.into();
417 check_reserved(&name)?;
418 Ok(Self::Notify { name })
419 }
420}
421
422fn check_reserved(topic: &str) -> Result<(), AppEventError> {
423 for prefix in [RESERVED_PREFIX_BB, RESERVED_PREFIX_FRAMEWORK] {
424 if topic.starts_with(prefix) {
425 return Err(AppEventError::ReservedPrefix {
426 topic: topic.to_string(),
427 prefix,
428 });
429 }
430 }
431 Ok(())
432}
433
434/// Op invocation failure kind. a stable categorical
435/// label consumers match on for retry / report / drop policy
436/// decisions without parsing the freeform `OpError::detail`.
437#[derive(Clone, Copy, Debug, PartialEq, Eq)]
438pub enum OpErrorKind {
439 /// Input slot value didn't match the expected concrete type
440 /// (typed downcast failed in a dispatch arm).
441 TypeMismatch,
442 /// A required slot was absent at dispatch time.
443 MissingSlot,
444 /// The dispatched op has no registered handler.
445 NotRegistered,
446 /// The op handler ran but its work failed (numeric, IO,
447 /// inventory, etc.). The detail string carries specifics.
448 ExecutionFailed,
449 /// An off-thread completion handle returned an error (the
450 /// user's Contract method's `Error` type produced this via
451 /// `ContractResponse::Now(Err)` or `CompletionHandle::fail`).
452 RemoteFailed,
453 /// The op's deadline elapsed before completion landed.
454 Timeout,
455 /// Adversarial / malformed input from a peer.
456 BadInput,
457 /// Peer-health / backoff gate held the op (transient retry-
458 /// eligible failure).
459 Cooldown,
460 /// Catch-all for failures that don't fit a more specific kind.
461 /// Default for call sites that only carry a `detail` string.
462 Other,
463}
464
465impl Default for OpErrorKind {
466 fn default() -> Self {
467 Self::Other
468 }
469}
470
471/// Op invocation failure detail. Surfaced by the engine when
472/// `dispatch_atomic` returns `Err` or the dispatch table lookup
473/// misses. Three-field shape: `kind` is a stable categorical label,
474/// `reason` is a `&'static str` (e.g. `"blocklisted"`, `"cooldown"`)
475/// callers can match on, `detail` is a free-form human-readable
476/// description.
477#[derive(Clone, Debug, Default)]
478pub struct OpError {
479 /// Categorical kind. Default `Other`.
480 pub kind: OpErrorKind,
481 /// Stable diagnostic label (e.g. `"blocklisted"`, `"cooldown"`,
482 /// `"nak"`) consumers match on. Default `""`.
483 pub reason: &'static str,
484 /// Human-readable failure detail.
485 pub detail: String,
486}
487
488impl std::fmt::Display for OpError {
489 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490 if self.reason.is_empty() {
491 write!(f, "op error[{:?}]: {}", self.kind, self.detail)
492 } else {
493 write!(
494 f,
495 "op error[{:?} reason={}]: {}",
496 self.kind, self.reason, self.detail,
497 )
498 }
499 }
500}
501
502impl std::error::Error for OpError {}
503
504/// Typed in-Node event bus. Carries published events from one
505/// poll cycle to the next; the engine's bus-routing pass drains
506/// the queue and routes to subscribed `NodeSiteId`s.
507///
508/// Bounded by `NodeConfig.bus_capacity` (default 1024). When a
509/// publish would exceed the cap, the oldest event is FIFO-dropped
510/// and a counter increments. The routing pass reads the counter
511/// and emits `InfraEvent::BusOverflow { count }` so the host sees
512/// the loss.
513#[derive(Default)]
514pub struct TypedBus {
515 queue: VecDeque<NodeEvent>,
516 cap: Option<usize>,
517 dropped_since_last_drain: usize,
518}
519
520impl TypedBus {
521 /// Construct a fresh empty bus with no cap.
522 pub fn new() -> Self {
523 Self::default()
524 }
525
526 /// Construct a fresh empty bus with the given FIFO-drop cap.
527 pub fn with_cap(cap: Option<usize>) -> Self {
528 Self {
529 queue: VecDeque::new(),
530 cap,
531 dropped_since_last_drain: 0,
532 }
533 }
534
535 /// Set the FIFO-drop cap. `None` removes the cap.
536 pub fn set_cap(&mut self, cap: Option<usize>) {
537 self.cap = cap;
538 }
539
540 /// Publish an event. The engine's bus-routing pass delivers
541 /// published events to subscribed Components in the next poll
542 /// cycle.
543 pub fn publish(&mut self, event: NodeEvent) {
544 if let Some(cap) = self.cap {
545 while self.queue.len() >= cap {
546 self.queue.pop_front();
547 self.dropped_since_last_drain += 1;
548 }
549 }
550 self.queue.push_back(event);
551 }
552
553 /// Drain all queued events. Called by the engine's bus-routing pass.
554 pub fn drain(&mut self) -> Vec<NodeEvent> {
555 self.queue.drain(..).collect()
556 }
557
558 /// Read + reset the count of FIFO-dropped events since the last
559 /// call. Returns 0 when no drops occurred.
560 pub fn take_dropped_count(&mut self) -> usize {
561 std::mem::take(&mut self.dropped_since_last_drain)
562 }
563
564 /// `true` when the bus has no queued events.
565 pub fn is_empty(&self) -> bool {
566 self.queue.is_empty()
567 }
568
569 /// Number of queued events.
570 pub fn len(&self) -> usize {
571 self.queue.len()
572 }
573}
574