Skip to main content

bb_runtime/framework/
mod.rs

1//! 9 framework primitives the engine bundles into the
2//! `RuntimeResourceRef` for every `dispatch_atomic` call per
3//! `docs/ENGINE.md` §10 + `docs/internal/IMPLEMENTATION_PLAN.md` //! lines 770-779.
4//!
5//! ships real impls for Scheduler / PeerGate /
6//! RequestTracker + adds 3 new primitives (serialize_queue /
7//! hold_table / record_buffer). Other primitives stay minimal until
8pub mod address_book;
9pub mod backoff_table;
10pub mod backpressure_notice;
11pub mod backpressure_tracker;
12pub mod event_source;
13pub mod hold_table;
14pub mod inbound_dedup;
15pub mod outbound_queue;
16pub mod peer_gate;
17pub mod peer_governor;
18pub mod peer_state;
19pub mod record_buffer;
20pub mod request_tracker;
21pub mod rng;
22pub mod rtt_tracker;
23pub mod scheduler;
24pub mod serialize_queue;
25
26pub use address_book::{
27    Address, AddressBook, AddressBookError, AddressError, Multiaddress, Protocol,
28};
29pub use backoff_table::BackoffTable;
30pub use backpressure_notice::{
31    backoff_notice_type_hash, build_backoff_notice_envelope, BackoffCauseWire,
32    BackoffNoticePayload, BACKPRESSURE_DOMAIN,
33};
34pub use backpressure_tracker::{
35    BackoffCause, BackpressureEntry, BackpressureTracker, Decision as BackpressureDecision,
36    DEFAULT_HIGH_WATER_PCT, DEFAULT_K_BEFORE_SILENT, DEFAULT_MIN_NOTICE_INTERVAL_NS,
37};
38pub use event_source::EventSource;
39pub use hold_table::HoldTable;
40pub use inbound_dedup::InboundDedup;
41pub use outbound_queue::OutboundQueue;
42pub use peer_gate::PeerGate;
43pub use peer_governor::{
44    BlockReason, Decision, LifecycleTransition, PeerGovernor, PeerHealth, DEFAULT_FAILURE_THRESHOLD,
45};
46pub use peer_state::PeerState;
47pub use record_buffer::RecordBuffer;
48pub use request_tracker::RequestTracker;
49pub use rng::{CounterRng, GetrandomU64, RngU64Source};
50pub use scheduler::{Scheduler, TimerKind};
51pub use serialize_queue::SerializeQueue;
52
53/// Bundle of framework primitives held on the `Engine` per
54// Re-export AppEvent for FrameworkComponents.pending_app_events.
55use crate::bus::AppEvent;
56
57/// `docs/ENGINE.md` §3 bundle of framework primitives. Split-borrowed
58/// into each `dispatch_atomic` call's `RuntimeResourceRef`.
59pub struct FrameworkComponents {
60    /// Sorted timer heap. `Sleep`/`Interval`/`Pulse` syscalls
61    /// schedule entries here; Phase 4 of the poll cycle drains
62    /// matured timers and re-fires their consumer ops.
63    pub scheduler: Scheduler,
64    /// Consolidated per-peer state: named concurrency gate, policy +
65    /// health governor, and exponential backoff. Component authors
66    /// reach the three sub-primitives through `peer_state.{gate,
67    /// governor, backoff}`.
68    pub peer_state: PeerState,
69    /// In-flight wire-request → CommandId map + token minter.
70    pub request_tracker: RequestTracker,
71    /// Sliding-window seen-message tracker.
72    pub inbound_dedup: InboundDedup,
73    /// `PeerId → Address` mapping.
74    pub address_book: AddressBook,
75    /// Per-NodeSiteId adaptive RTT tracker driving deadline
76    /// derivation for every wire round-trip the engine observes. Fed
77    /// by `Engine::wire_send_tracked` on send + by the response path
78    /// on completion.
79    pub rtt_tracker: rtt_tracker::RttTracker,
80    /// FIFO of wire envelopes ready to ship.
81    pub outbound_queue: OutboundQueue,
82    /// Registered `EventKind → ComponentTag` subscriptions.
83    pub event_source: EventSource,
84    /// Named-FIFO map for Serialize.Enqueue / Dequeue.
85    pub serialize_queue: SerializeQueue,
86    /// Named-slot value buffer for Hold.Stash / Flush.
87    pub hold_table: HoldTable,
88    /// Per-name bounded ring buffer for Record.
89    pub record_buffer: RecordBuffer,
90    /// App events pending Phase 8 emission.
91    pub pending_app_events: Vec<AppEvent>,
92    /// Per-Node counters bumped by `IncrMetric` syscalls.
93    pub counters: std::collections::HashMap<String, u64>,
94    /// `u64` RNG source used by the `RngU64` syscall.
95    pub rng: Box<dyn RngU64Source>,
96    /// Per-`group` first-arrival latch for the `Any` syscall. Once a
97    /// group fires, subsequent arrivals are absorbed without
98    /// re-firing. Cleared on snapshot restore via the framework
99    /// reset.
100    pub any_fired_groups: std::collections::HashSet<String>,
101    /// Per-`(OpRef, ExecId)` latch for the `DeadlineMatch` syscall.
102    /// First invocation per execution determines the winner (`then`
103    /// if non-empty, otherwise `timeout`); subsequent invocations
104    /// inside the same execution are absorbed. New executions start
105    /// fresh — the latch is keyed by ExecId, not just OpRef, so a
106    /// DeadlineMatch op fires once per logical execution rather
107    /// than once per Node lifetime.
108    pub deadline_match_fired: std::collections::HashSet<(u64, u64)>,
109    /// Peer-resolution failures captured during `wire::Send`
110    /// dispatch when the destination `PeerId` either isn't in the
111    /// `AddressBook` or maps to an empty address list. The engine
112    /// drains this in Phase 8 and surfaces each entry as both a
113    /// `EngineStep::PeerResolveFailed` and a bus
114    /// `InfraEvent::PeerResolveFailure`. Per
115    /// `docs/ADDRESSING.md`.
116    pub pending_peer_resolve_failures: Vec<(Option<crate::ids::PeerId>, crate::ids::OpRef)>,
117    /// Per-`ExecId` inbound envelope context. Populated by
118    /// `Engine::route_envelope` when a wire envelope arrives; read by
119    /// RX gates (`PeerHealthGateRx`, `BackoffGateRx`) for src-peer
120    /// filtering and by `wire.Send` for in-chain correlation token
121    /// reuse + Dapper-style elapsed-time accounting.
122    pub inbound_contexts: std::collections::HashMap<crate::ids::ExecId, InboundContext>,
123}
124
125/// Per-`ExecId` context captured at inbound envelope delivery.
126/// Replaces the four parallel `envelope_*` HashMaps with one struct
127/// of optional fields. Components access this through
128/// `RuntimeResourceRef::inbound`.
129#[derive(Clone, Debug, Default)]
130pub struct InboundContext {
131    /// Source peer of the inbound envelope, if known. RX gates
132    /// (`PeerHealthGateRx`, `BackoffGateRx`) filter on this.
133    pub src_peer: Option<crate::ids::PeerId>,
134    /// Inbound wire-correlation token. `wire.Send` reuses this when
135    /// forwarding inside a chain instead of minting a fresh one.
136    /// `None` when the envelope was not part of a request/response
137    /// chain.
138    pub wire_req_id: Option<u64>,
139    /// Arrival timestamp (engine ns). `wire.Send` subtracts this from
140    /// `now_ns` for Dapper-style elapsed-time accounting.
141    pub arrival_ns: Option<u64>,
142    /// Remaining deadline propagated by the sender. `wire.Send` carries
143    /// this forward (minus elapsed) instead of re-estimating from RTT.
144    pub remaining_deadline_ns: Option<u64>,
145}
146
147impl FrameworkComponents {
148    /// Construct a fresh bundle.
149    pub fn new() -> Self {
150        Self {
151            scheduler: Scheduler::new(),
152            peer_state: PeerState::new(),
153            request_tracker: RequestTracker::new(),
154            inbound_dedup: InboundDedup::new(),
155            address_book: AddressBook::new(),
156            rtt_tracker: rtt_tracker::RttTracker::new(),
157            outbound_queue: OutboundQueue::new(),
158            event_source: EventSource::new(),
159            serialize_queue: SerializeQueue::new(),
160            hold_table: HoldTable::new(),
161            record_buffer: RecordBuffer::new(),
162            pending_app_events: Vec::new(),
163            counters: std::collections::HashMap::new(),
164            rng: Box::new(GetrandomU64::new()),
165            any_fired_groups: std::collections::HashSet::new(),
166            deadline_match_fired: std::collections::HashSet::new(),
167            pending_peer_resolve_failures: Vec::new(),
168            inbound_contexts: std::collections::HashMap::new(),
169        }
170    }
171}
172
173impl Default for FrameworkComponents {
174    fn default() -> Self {
175        Self::new()
176    }
177}