bb_runtime/framework/mod.rs
1//! 9 framework primitives the engine bundles into the
2//! `RuntimeResourceRef` for every `dispatch_atomic` call per
3//! `docs/ENGINE.md` §10 + `docs/internal/IMPLEMENTATION_PLAN.md` //! lines 770-779.
4//!
5//! ships real impls for Scheduler / PeerGate /
6//! RequestTracker + adds 3 new primitives (serialize_queue /
7//! hold_table / record_buffer). Other primitives stay minimal until
8pub mod address_book;
9pub mod backoff_table;
10pub mod backpressure_notice;
11pub mod backpressure_tracker;
12pub mod event_source;
13pub mod hold_table;
14pub mod inbound_dedup;
15pub mod outbound_queue;
16pub mod peer_gate;
17pub mod peer_governor;
18pub mod peer_state;
19pub mod record_buffer;
20pub mod request_tracker;
21pub mod rng;
22pub mod rtt_tracker;
23pub mod scheduler;
24pub mod serialize_queue;
25
26pub use address_book::{
27 Address, AddressBook, AddressBookError, AddressError, Multiaddress, Protocol,
28};
29pub use backoff_table::BackoffTable;
30pub use backpressure_notice::{
31 backoff_notice_type_hash, build_backoff_notice_envelope, BackoffCauseWire,
32 BackoffNoticePayload, BACKPRESSURE_DOMAIN,
33};
34pub use backpressure_tracker::{
35 BackoffCause, BackpressureEntry, BackpressureTracker, Decision as BackpressureDecision,
36 DEFAULT_HIGH_WATER_PCT, DEFAULT_K_BEFORE_SILENT, DEFAULT_MIN_NOTICE_INTERVAL_NS,
37};
38pub use event_source::EventSource;
39pub use hold_table::HoldTable;
40pub use inbound_dedup::InboundDedup;
41pub use outbound_queue::OutboundQueue;
42pub use peer_gate::PeerGate;
43pub use peer_governor::{
44 BlockReason, Decision, LifecycleTransition, PeerGovernor, PeerHealth, DEFAULT_FAILURE_THRESHOLD,
45};
46pub use peer_state::PeerState;
47pub use record_buffer::RecordBuffer;
48pub use request_tracker::RequestTracker;
49pub use rng::{CounterRng, GetrandomU64, RngU64Source};
50pub use scheduler::{Scheduler, TimerKind};
51pub use serialize_queue::SerializeQueue;
52
53/// Bundle of framework primitives held on the `Engine` per
54// Re-export AppEvent for FrameworkComponents.pending_app_events.
55use crate::bus::AppEvent;
56
57/// `docs/ENGINE.md` §3 bundle of framework primitives. Split-borrowed
58/// into each `dispatch_atomic` call's `RuntimeResourceRef`.
59pub struct FrameworkComponents {
60 /// Sorted timer heap. `Sleep`/`Interval`/`Pulse` syscalls
61 /// schedule entries here; Phase 4 of the poll cycle drains
62 /// matured timers and re-fires their consumer ops.
63 pub scheduler: Scheduler,
64 /// Consolidated per-peer state: named concurrency gate, policy +
65 /// health governor, and exponential backoff. Component authors
66 /// reach the three sub-primitives through `peer_state.{gate,
67 /// governor, backoff}`.
68 pub peer_state: PeerState,
69 /// In-flight wire-request → CommandId map + token minter.
70 pub request_tracker: RequestTracker,
71 /// Sliding-window seen-message tracker.
72 pub inbound_dedup: InboundDedup,
73 /// `PeerId → Address` mapping.
74 pub address_book: AddressBook,
75 /// Per-NodeSiteId adaptive RTT tracker driving deadline
76 /// derivation for every wire round-trip the engine observes. Fed
77 /// by `Engine::wire_send_tracked` on send + by the response path
78 /// on completion.
79 pub rtt_tracker: rtt_tracker::RttTracker,
80 /// FIFO of wire envelopes ready to ship.
81 pub outbound_queue: OutboundQueue,
82 /// Registered `EventKind → ComponentTag` subscriptions.
83 pub event_source: EventSource,
84 /// Named-FIFO map for Serialize.Enqueue / Dequeue.
85 pub serialize_queue: SerializeQueue,
86 /// Named-slot value buffer for Hold.Stash / Flush.
87 pub hold_table: HoldTable,
88 /// Per-name bounded ring buffer for Record.
89 pub record_buffer: RecordBuffer,
90 /// App events pending Phase 8 emission.
91 pub pending_app_events: Vec<AppEvent>,
92 /// Per-Node counters bumped by `IncrMetric` syscalls.
93 pub counters: std::collections::HashMap<String, u64>,
94 /// `u64` RNG source used by the `RngU64` syscall.
95 pub rng: Box<dyn RngU64Source>,
96 /// Per-`group` first-arrival latch for the `Any` syscall. Once a
97 /// group fires, subsequent arrivals are absorbed without
98 /// re-firing. Cleared on snapshot restore via the framework
99 /// reset.
100 pub any_fired_groups: std::collections::HashSet<String>,
101 /// Per-`(OpRef, ExecId)` latch for the `DeadlineMatch` syscall.
102 /// First invocation per execution determines the winner (`then`
103 /// if non-empty, otherwise `timeout`); subsequent invocations
104 /// inside the same execution are absorbed. New executions start
105 /// fresh — the latch is keyed by ExecId, not just OpRef, so a
106 /// DeadlineMatch op fires once per logical execution rather
107 /// than once per Node lifetime.
108 pub deadline_match_fired: std::collections::HashSet<(u64, u64)>,
109 /// Peer-resolution failures captured during `wire::Send`
110 /// dispatch when the destination `PeerId` either isn't in the
111 /// `AddressBook` or maps to an empty address list. The engine
112 /// drains this in Phase 8 and surfaces each entry as both a
113 /// `EngineStep::PeerResolveFailed` and a bus
114 /// `InfraEvent::PeerResolveFailure`. Per
115 /// `docs/ADDRESSING.md`.
116 pub pending_peer_resolve_failures: Vec<(Option<crate::ids::PeerId>, crate::ids::OpRef)>,
117 /// Per-`ExecId` inbound envelope context. Populated by
118 /// `Engine::route_envelope` when a wire envelope arrives; read by
119 /// RX gates (`PeerHealthGateRx`, `BackoffGateRx`) for src-peer
120 /// filtering and by `wire.Send` for in-chain correlation token
121 /// reuse + Dapper-style elapsed-time accounting.
122 pub inbound_contexts: std::collections::HashMap<crate::ids::ExecId, InboundContext>,
123}
124
125/// Per-`ExecId` context captured at inbound envelope delivery.
126/// Replaces the four parallel `envelope_*` HashMaps with one struct
127/// of optional fields. Components access this through
128/// `RuntimeResourceRef::inbound`.
129#[derive(Clone, Debug, Default)]
130pub struct InboundContext {
131 /// Source peer of the inbound envelope, if known. RX gates
132 /// (`PeerHealthGateRx`, `BackoffGateRx`) filter on this.
133 pub src_peer: Option<crate::ids::PeerId>,
134 /// Inbound wire-correlation token. `wire.Send` reuses this when
135 /// forwarding inside a chain instead of minting a fresh one.
136 /// `None` when the envelope was not part of a request/response
137 /// chain.
138 pub wire_req_id: Option<u64>,
139 /// Arrival timestamp (engine ns). `wire.Send` subtracts this from
140 /// `now_ns` for Dapper-style elapsed-time accounting.
141 pub arrival_ns: Option<u64>,
142 /// Remaining deadline propagated by the sender. `wire.Send` carries
143 /// this forward (minus elapsed) instead of re-estimating from RTT.
144 pub remaining_deadline_ns: Option<u64>,
145}
146
147impl FrameworkComponents {
148 /// Construct a fresh bundle.
149 pub fn new() -> Self {
150 Self {
151 scheduler: Scheduler::new(),
152 peer_state: PeerState::new(),
153 request_tracker: RequestTracker::new(),
154 inbound_dedup: InboundDedup::new(),
155 address_book: AddressBook::new(),
156 rtt_tracker: rtt_tracker::RttTracker::new(),
157 outbound_queue: OutboundQueue::new(),
158 event_source: EventSource::new(),
159 serialize_queue: SerializeQueue::new(),
160 hold_table: HoldTable::new(),
161 record_buffer: RecordBuffer::new(),
162 pending_app_events: Vec::new(),
163 counters: std::collections::HashMap::new(),
164 rng: Box::new(GetrandomU64::new()),
165 any_fired_groups: std::collections::HashSet::new(),
166 deadline_match_fired: std::collections::HashSet::new(),
167 pending_peer_resolve_failures: Vec::new(),
168 inbound_contexts: std::collections::HashMap::new(),
169 }
170 }
171}
172
173impl Default for FrameworkComponents {
174 fn default() -> Self {
175 Self::new()
176 }
177}