bb-runtime 0.3.4

Sans-IO engine for the bytesandbrains framework โ€” Node, Engine, Framework, Backends, roles, snapshot, runtime-side syscalls.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
//! Typed in-Node event bus. Cross-Component signaling per
//! `docs/ENGINE.md` ยง13.1.
//!
//! Components publish events via the bus; the engine's Phase 3
//! (`route_bus_events`) routes them to subscribed Components.
//! ships the publish + drain surface; 's syscall
//! opset + 's lifecycle wire up the subscription routing.

use std::collections::VecDeque;

use crate::ids::{CommandId, ComponentRef, OpRef, PeerId};

/// All events that may flow through the in-Node bus.
#[derive(Clone, Debug)]
pub enum NodeEvent {
    /// Framework-emitted infrastructure events.
    Infra(InfraEvent),
    /// App-Op-emitted events.
    App(AppEvent),
}

impl NodeEvent {
    /// String discriminator used by [`crate::engine::Engine`]'s
    /// bus-routing lookup against
    /// `event_subscriptions: HashMap<String, Vec<NodeSiteId>>`.
    ///
    /// `Infra` events map to the variant name (e.g.
    /// `"OpFailure"`); `App` events use the user-supplied topic so
    /// `EventSource` subscribers can target a specific channel.
    pub fn kind(&self) -> &str {
        match self {
            NodeEvent::Infra(InfraEvent::WireResponseLanded { .. }) => "WireResponseLanded",
            NodeEvent::Infra(InfraEvent::OpFailure { .. }) => "OpFailure",
            NodeEvent::Infra(InfraEvent::WireDecodeFailure { .. }) => "WireDecodeFailure",
            NodeEvent::Infra(InfraEvent::WireReceiveError { .. }) => "WireReceiveError",
            NodeEvent::Infra(InfraEvent::AppIngressError { .. }) => "AppIngressError",
            NodeEvent::Infra(InfraEvent::BusOverflow { .. }) => "BusOverflow",
            NodeEvent::Infra(InfraEvent::PeerResolveFailure { .. }) => "PeerResolveFailure",
            NodeEvent::Infra(InfraEvent::PeerSuspect { .. }) => "PeerSuspect",
            NodeEvent::Infra(InfraEvent::PeerDown { .. }) => "PeerDown",
            NodeEvent::Infra(InfraEvent::PeerLive { .. }) => "PeerLive",
            NodeEvent::Infra(InfraEvent::BackoffNoticeSent { .. }) => "BackoffNoticeSent",
            NodeEvent::Infra(InfraEvent::SilentDropActive { .. }) => "SilentDropActive",
            NodeEvent::App(AppEvent::Emit { name, .. }) => name.as_str(),
            NodeEvent::App(AppEvent::Notify { name }) => name.as_str(),
        }
    }
}

/// Framework-emitted infrastructure events per
/// `docs/ENGINE.md` ยง13.1.
#[derive(Clone, Debug)]
pub enum InfraEvent {
    /// A previously-suspended wire-request's response landed.
    WireResponseLanded {
        /// The `CommandId` that the wire-request was suspended on.
        cmd_id: CommandId,
    },
    /// An Op invocation failed.
    OpFailure {
        /// The Op that failed.
        op_ref: OpRef,
        /// The failure detail.
        error: OpError,
    },
    /// An inbound wire envelope's payload could not be decoded -
    /// the payload's wire-type hash didn't resolve, the bytes were
    /// malformed, or the destination address parsing failed. The
    /// engine drops the envelope's slot fill rather than writing
    /// garbage into a slot; this event lets the host observe the
    /// drop. Emitted by the engine's Phase 1 envelope router.
    WireDecodeFailure {
        /// Wire-type hash that the envelope advertised (0 if the
        /// failure occurred before the hash could be read).
        hash: u64,
        /// Length of the offending payload, in bytes.
        payload_size: usize,
        /// Human-readable failure detail.
        detail: String,
    },
    /// Per-fill failure on the wire-receive typed-decode path.
    /// Distinct from [`InfraEvent::WireDecodeFailure`]
    /// (envelope-level: malformed `dest_suffix` / header) -
    /// `WireReceiveError` fires after the envelope has parsed and
    /// an individual fill reached the decoder-registry lookup +
    /// typed materialisation step. Other fills in the same
    /// envelope continue to deliver (partial-delivery semantics).
    WireReceiveError {
        /// Sender of the failing envelope, if the wire layer was
        /// able to identify them.
        src_peer: Option<PeerId>,
        /// Position of the failing fill within the envelope
        /// (0-based). Other fills in the same envelope are still
        /// delivered.
        fill_index: u32,
        /// The `type_hash` the sender stamped on the fill.
        actual_hash: u64,
        /// Bytes that did not deliver. Tracked for telemetry, NOT
        /// for fallback decode - degrading to `BytesValue` is
        /// exactly the silent type-loss path this surface closes.
        payload_size: usize,
        /// Which failure mode fired.
        kind: WireReceiveErrorKind,
    },
    /// Application-side ingress failure - host pushed an
    /// `AppEvent` / `Invoke` / async completion whose payload could
    /// not enter engine state because allocation failed, the
    /// engine-wide ingress byte budget was exhausted, or a per-item
    /// cap rejected the request at the boundary. The offending
    /// bytes are dropped; the engine continues processing other
    /// ingress work. Audience: host / SDK author watching their
    /// own push errors (distinct from wire-side
    /// [`InfraEvent::WireReceiveError`]).
    AppIngressError {
        /// Which application-side entry point raised the failure
        /// and the identity it carries (module/input name for
        /// `AppEvent` / `Invoke`, `CommandId` for an async
        /// completion).
        source: AppIngressSource,
        /// Bytes the boundary was asked to admit.
        byte_count: usize,
        /// Which failure mode fired.
        kind: AppIngressErrorKind,
    },
    /// The typed bus dropped `count` oldest events to make room for
    /// newer publishes when `NodeConfig.bus_capacity` was hit.
    /// Emitted by the engine's Phase 3 if any drops accumulated
    /// since the last poll.
    BusOverflow {
        /// Number of events FIFO-dropped since the last poll.
        count: usize,
    },
    /// Routable telemetry mirror of
    /// [`crate::engine::EngineStep::PeerResolveFailed`].
    /// Surfaces via the bus to subscribers so dashboards can
    /// monitor peer-resolution failures alongside `PeerBlocked` /
    /// `PeerDown` / `PeerUp` from .
    PeerResolveFailure {
        /// The peer whose addresses could not be resolved. `None`
        /// when the failing Send op had no parseable `peer` input.
        peer: Option<PeerId>,
        /// The Send op that failed to resolve.
        op_ref: OpRef,
    },
    /// -v - ฯ†-accrual failure
    /// detector crossed the suspect threshold for the named
    /// logical site. Components (gossip overlays, peer-sampling
    /// services, deadline planners) subscribe to react.
    PeerSuspect {
        /// Suspect logical site.
        site: crate::ids::NodeSiteId,
        /// Current ฯ† value (informational; subscribers can ignore).
        phi: f64,
    },
    /// -v - ฯ†-accrual failure
    /// detector crossed the hard-down threshold for the named
    /// logical site.
    PeerDown {
        /// Down logical site.
        site: crate::ids::NodeSiteId,
        /// Current ฯ† value.
        phi: f64,
    },
    /// -v - ฯ† collapsed back below
    /// the suspect threshold after a `PeerSuspect` or `PeerDown`
    /// was emitted. Lets subscribers reinstate the peer.
    PeerLive {
        /// Recovered logical site.
        site: crate::ids::NodeSiteId,
    },
    /// A `BackoffNotice` envelope was emitted to `peer` per the
    /// backpressure protocol at
    /// `docs/internal/superpowers/specs/2026-06-23-backpressure-runtime.md`
    /// ยง7. Surfaces the local overload decision on the bus so ops
    /// dashboards + Component authors who want to react to local
    /// overload can subscribe.
    BackoffNoticeSent {
        /// The sender the receiver asked to slow down.
        peer: PeerId,
        /// Why the receiver requested back-off.
        cause: crate::framework::BackoffCause,
        /// `min_backoff_ns` quoted on the notice.
        min_backoff_ns: u64,
    },
    /// `peer` crossed the K-notices-without-recovery threshold;
    /// subsequent envelopes from that peer are dropped silently at
    /// Phase 1 of `Engine::poll` until the peer recovers per
    /// `docs/internal/superpowers/specs/2026-06-23-backpressure-runtime.md`
    /// ยง3. Emitted once per silent-drop transition; the recovery
    /// path reuses the existing `PeerLive` event.
    SilentDropActive {
        /// The sender now in silent-drop mode.
        peer: PeerId,
    },
}

/// Sub-kind discriminator for [`InfraEvent::WireReceiveError`].
/// One top-level variant + an enum sub-kind keeps the bus-topic
/// count down and lets subscribers route on the variant while
/// matching the sub-kind out of the variant fields - the
/// `PeerSuspect`/`PeerDown`/`PeerLive` triple pattern is for
/// distinct lifecycle events; these three share lifecycle (one
/// fill, one decode step) and audience (wire-payload integrity).
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum WireReceiveErrorKind {
    /// No decoder is registered for `actual_hash`. The sender
    /// shipped a value whose concrete type is unknown to this
    /// Node's inventory - either a version skew (sender has a
    /// carrier the receiver hasn't compiled in) or a malicious /
    /// fuzzed envelope.
    UnknownTypeHash,
    /// Destination slot carries a compile-time wire-type
    /// assertion (`expected_hash`) and the fill's `actual_hash`
    /// does not match.
    TypeMismatch {
        /// The `type_hash` the destination slot's compile-time
        /// metadata declared.
        expected_hash: u64,
    },
    /// Decoder ran and returned `Err` - the bytes were not a
    /// valid encoding of the advertised type.
    DecodeFailed {
        /// Human-readable underlying error from the registered
        /// decoder (typically a `bincode::Error::to_string`).
        error_summary: String,
    },
    /// The framework-owned scratch buffer could not be reserved
    /// before decode - heap allocation failed or a per-item cap
    /// rejected the request. Emitted by the
    /// `Engine::decode_typed_fill` boundary on `Vec::try_reserve_exact`
    /// failure or before the prost decode runs when the fill's
    /// payload length exceeds `EnvelopeCaps::max_per_fill_bytes`.
    AllocationFailed {
        /// Number of bytes the boundary tried to reserve.
        byte_count: usize,
        /// Why the reservation failed.
        reason: AllocFailReason,
    },
    /// Admitting this fill's payload would push the engine over
    /// `NodeConfig::ingress_byte_budget`. The fill is dropped; the
    /// envelope's other fills continue to deliver.
    BudgetExceeded {
        /// Bytes the fill would have added to the in-flight budget.
        byte_count: usize,
        /// Bytes still available under `NodeConfig::ingress_byte_budget`
        /// at the time of the rejection.
        budget_remaining: usize,
    },
    /// The destination slot is bound to a `Backend` role and the
    /// backend's `materialize_from_wire` impl returned `Err`. The
    /// engine drops the fill, releases the byte charge, and emits
    /// this event so operators can see which backend rejected
    /// inbound payloads. Distinct from
    /// [`WireReceiveErrorKind::DecodeFailed`] (framework-side
    /// registry decoder failure).
    BackendMaterializeFailed {
        /// `ComponentRef` of the destination slot's bound backend.
        backend_ref: ComponentRef,
        /// Short `Display` of the backend's typed error.
        backend_error_summary: String,
    },
}

/// Why a fallible-allocation boundary refused to admit bytes into
/// engine state. Carried by [`WireReceiveErrorKind::AllocationFailed`]
/// and [`AppIngressErrorKind::AllocationFailed`].
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AllocFailReason {
    /// `Vec::try_reserve_exact` (or equivalent fallible-allocator
    /// call) returned `TryReserveError`. The host's allocator has
    /// no headroom for the request.
    HeapExhausted,
    /// A caller-side per-item cap (e.g.
    /// `EnvelopeCaps::max_per_fill_bytes`,
    /// `NodeConfig::max_app_event_bytes`,
    /// `NodeConfig::max_invoke_bytes`) rejected the request before
    /// any allocation was attempted.
    PerItemCapExceeded {
        /// The cap value the boundary enforced.
        cap: usize,
    },
}

/// Application-side entry point that raised an
/// [`InfraEvent::AppIngressError`]. Carries the identity the host
/// referenced when the failure occurred so subscribers can correlate
/// the bus event with the original push call.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AppIngressSource {
    /// `Node::deliver_event(module, input, value_bytes)` failed at
    /// the boundary.
    AppEvent {
        /// Target module name (the host's `module` argument).
        module: String,
        /// Target input slot name on that module.
        input: String,
    },
    /// `Node::invoke(module, inputs)` failed at the boundary.
    Invoke {
        /// Target module name.
        module: String,
        /// Number of `(name, bytes)` inputs the host attempted to
        /// admit. Lets a subscriber distinguish a cap-by-count
        /// rejection (`max_invoke_inputs`) from a
        /// cap-by-bytes rejection (`max_invoke_bytes`).
        input_count: usize,
    },
    /// `CompletionSink::complete(cmd, ...)` or `::fail(cmd, ...)`
    /// failed at the boundary. The `CommandId` identifies the
    /// pending async operation the host was attempting to settle.
    Completion {
        /// The pending command the completion targeted.
        command: CommandId,
    },
}

/// Sub-kind discriminator for [`InfraEvent::AppIngressError`]. One
/// top-level variant + sub-kind keeps the bus-topic count down -
/// subscribers route on the variant name and match the sub-kind out
/// of the event's fields when they need to distinguish allocation
/// failure from budget exhaustion from a per-item cap.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AppIngressErrorKind {
    /// Fallible reservation of the engine-side ingress buffer
    /// returned `TryReserveError` or hit a per-item cap before
    /// allocation was attempted.
    AllocationFailed {
        /// Why the reservation failed.
        reason: AllocFailReason,
    },
    /// Admitting this payload would push the engine over
    /// `NodeConfig::ingress_byte_budget`.
    BudgetExceeded {
        /// Bytes still available under `NodeConfig::ingress_byte_budget`
        /// at the time of the rejection.
        budget_remaining: usize,
    },
    /// A per-item cap (`max_app_event_bytes`, `max_invoke_inputs`,
    /// `max_invoke_bytes`, or `max_completion_result_bytes`) rejected
    /// the payload before any allocation was attempted. The host's
    /// synchronous `deliver_event` / `invoke` call returns
    /// `DeliveryError::OversizePayload` in addition to this bus
    /// emission.
    PerItemCapExceeded {
        /// The cap value the boundary enforced.
        cap: usize,
    },
}

/// App-Op-emitted events surfaced as `EngineStep::AppEvent`.
///
/// Application code constructs these via [`AppEvent::emit`] /
/// [`AppEvent::notify`] which reject framework-reserved topic
/// prefixes (`bb.`, `ai.bytesandbrains.`) so the framework's own
/// `InfraEvent` topic namespace can never be impersonated by user
/// publish calls (closes the trust-boundary requirement
/// in ยงTrust boundary). The struct variants stay constructible
/// directly for framework-internal forwarding paths that are not
/// crossing the application boundary.
#[derive(Clone, Debug)]
pub enum AppEvent {
    /// `AppEmit(name, value_bytes)` - full app event carrying bytes.
    Emit {
        /// Event topic.
        name: String,
        /// Encoded value payload.
        value_bytes: Vec<u8>,
    },
    /// `AppNotify(name)` - marker-only notification.
    Notify {
        /// Event topic.
        name: String,
    },
}

/// Reserved topic prefix #1 โ€” framework-emitted infra topics.
const RESERVED_PREFIX_BB: &str = "bb.";

/// Reserved topic prefix #2 โ€” framework-namespaced metadata keys.
const RESERVED_PREFIX_FRAMEWORK: &str = "ai.bytesandbrains.";

/// Reasons an [`AppEvent::emit`] / [`AppEvent::notify`] construction
/// can fail.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AppEventError {
    /// Topic begins with one of the framework-reserved prefixes
    /// (`bb.` or `ai.bytesandbrains.`) โ€” only the framework's own
    /// `InfraEvent` may publish under those prefixes.
    ReservedPrefix {
        /// The offending topic the application tried to publish.
        topic: String,
        /// Which prefix it collided with.
        prefix: &'static str,
    },
}

impl std::fmt::Display for AppEventError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::ReservedPrefix { topic, prefix } => write!(
                f,
                "AppEvent topic `{topic}` collides with the framework-reserved prefix `{prefix}`",
            ),
        }
    }
}

impl std::error::Error for AppEventError {}

impl AppEvent {
    /// Construct an [`AppEvent::Emit`], rejecting framework-reserved
    /// topic prefixes. The trust boundary `AppEvent::new` referenced
    /// in `docs-plan/CORRECTED_ARCHITECTURE.md` ยงTrust boundary.
    pub fn emit(name: impl Into<String>, value_bytes: Vec<u8>) -> Result<Self, AppEventError> {
        let name = name.into();
        check_reserved(&name)?;
        Ok(Self::Emit { name, value_bytes })
    }

    /// Construct an [`AppEvent::Notify`], rejecting framework-
    /// reserved topic prefixes. See [`Self::emit`].
    pub fn notify(name: impl Into<String>) -> Result<Self, AppEventError> {
        let name = name.into();
        check_reserved(&name)?;
        Ok(Self::Notify { name })
    }
}

fn check_reserved(topic: &str) -> Result<(), AppEventError> {
    for prefix in [RESERVED_PREFIX_BB, RESERVED_PREFIX_FRAMEWORK] {
        if topic.starts_with(prefix) {
            return Err(AppEventError::ReservedPrefix {
                topic: topic.to_string(),
                prefix,
            });
        }
    }
    Ok(())
}

/// Op invocation failure kind. a stable categorical
/// label consumers match on for retry / report / drop policy
/// decisions without parsing the freeform `OpError::detail`.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum OpErrorKind {
    /// Input slot value didn't match the expected concrete type
    /// (typed downcast failed in a dispatch arm).
    TypeMismatch,
    /// A required slot was absent at dispatch time.
    MissingSlot,
    /// The dispatched op has no registered handler.
    NotRegistered,
    /// The op handler ran but its work failed (numeric, IO,
    /// inventory, etc.). The detail string carries specifics.
    ExecutionFailed,
    /// An off-thread completion handle returned an error (the
    /// user's Contract method's `Error` type produced this via
    /// `ContractResponse::Now(Err)` or `CompletionHandle::fail`).
    RemoteFailed,
    /// The op's deadline elapsed before completion landed.
    Timeout,
    /// Adversarial / malformed input from a peer.
    BadInput,
    /// Peer-health / backoff gate held the op (transient retry-
    /// eligible failure).
    Cooldown,
    /// Catch-all for failures that don't fit a more specific kind.
    /// Default for legacy call sites that only carry a `detail`
    /// string.
    Other,
}

impl Default for OpErrorKind {
    fn default() -> Self {
        Self::Other
    }
}

/// Op invocation failure detail. Surfaced by the engine when
/// `dispatch_atomic` returns `Err` or the dispatch table lookup
/// misses.
///
/// three-field shape `{ kind, reason, detail }`:
/// `kind` is a stable categorical label; `reason` is a `&'static
/// str` short label (e.g. `"blocklisted"`, `"cooldown"`) that
/// callers can match on; `detail` is a free-form description for
/// human-readable diagnostics. Existing call sites that built
/// `OpError { detail }` keep working via the `Default` impl โ€”
/// they implicitly get `kind: Other` and `reason: ""`.
#[derive(Clone, Debug, Default)]
pub struct OpError {
    /// Categorical kind (default `Other` for legacy call sites).
    pub kind: OpErrorKind,
    /// Stable diagnostic label (e.g. `"blocklisted"`, `"cooldown"`,
    /// `"nak"`) consumers match on. Default `""`.
    pub reason: &'static str,
    /// Human-readable failure detail.
    pub detail: String,
}

impl std::fmt::Display for OpError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        if self.reason.is_empty() {
            write!(f, "op error[{:?}]: {}", self.kind, self.detail)
        } else {
            write!(
                f,
                "op error[{:?} reason={}]: {}",
                self.kind, self.reason, self.detail,
            )
        }
    }
}

impl std::error::Error for OpError {}

/// Typed in-Node event bus. Carries published events from one
/// poll cycle to the next; Phase 3 of the engine's poll drains the
/// queue and routes to subscribed `NodeSiteId`s.
///
/// Bounded by `NodeConfig.bus_capacity` (default 1024). When a
/// publish would exceed the cap, the oldest event is FIFO-dropped
/// and a counter increments. Phase 3 reads the counter and emits
/// `InfraEvent::BusOverflow { count }` so the host sees the loss.
#[derive(Default)]
pub struct TypedBus {
    queue: VecDeque<NodeEvent>,
    cap: Option<usize>,
    dropped_since_last_drain: usize,
}

impl TypedBus {
    /// Construct a fresh empty bus with no cap.
    pub fn new() -> Self {
        Self::default()
    }

    /// Construct a fresh empty bus with the given FIFO-drop cap.
    pub fn with_cap(cap: Option<usize>) -> Self {
        Self {
            queue: VecDeque::new(),
            cap,
            dropped_since_last_drain: 0,
        }
    }

    /// Set the FIFO-drop cap. `None` removes the cap.
    pub fn set_cap(&mut self, cap: Option<usize>) {
        self.cap = cap;
    }

    /// Publish an event. The engine's Phase 3 routes published events
    /// to subscribed Components in the next poll cycle.
    pub fn publish(&mut self, event: NodeEvent) {
        if let Some(cap) = self.cap {
            while self.queue.len() >= cap {
                self.queue.pop_front();
                self.dropped_since_last_drain += 1;
            }
        }
        self.queue.push_back(event);
    }

    /// Drain all queued events. Called by Phase 3 of the poll cycle.
    pub fn drain(&mut self) -> Vec<NodeEvent> {
        self.queue.drain(..).collect()
    }

    /// Read + reset the count of FIFO-dropped events since the last
    /// call. Returns 0 when no drops occurred.
    pub fn take_dropped_count(&mut self) -> usize {
        std::mem::take(&mut self.dropped_since_last_drain)
    }

    /// `true` when the bus has no queued events.
    pub fn is_empty(&self) -> bool {
        self.queue.is_empty()
    }

    /// Number of queued events.
    pub fn len(&self) -> usize {
        self.queue.len()
    }
}