bb_runtime/framework/peer_governor.rs
1//! `PeerGovernor` - the single source of truth for peer policy
2//! and health tracking per .
3//!
4//! The framework owns this Component; it's consulted at both
5//! delivery boundaries:
6//!
7//! - **Inbound **: Phase 1 of `Engine::poll` calls
8//! [`PeerGovernor::check_inbound`] for every
9//! `IngressEvent::EnvelopeFrom { src_peer, .. }`. Blocked /
10//! non-allowlisted peers are dropped before any slot is written.
11//!
12//! - **Outbound **: the compiler pass
13//! `bb-compiler/src/insert_peer_health_gates.rs` inserts a
14//! `PeerHealthGate` syscall op upstream of every `wire::Send`.
15//! The gate's `dispatch_atomic` calls
16//! [`PeerGovernor::check_outbound`].
17//!
18//! Health state (per-peer consecutive failures + last-seen) is
19//! updated by Send-completion callbacks so Component authors stop
20//! touching [`BackoffTable`] by hand - the compiler wires the
21//! tracking in for every wire send automatically.
22//!
23//! [`BackoffTable`]: crate::framework::BackoffTable
24
25use std::collections::{HashMap, HashSet};
26
27use crate::ids::PeerId;
28
29/// Default number of consecutive `wire::Send` failures before a
30/// peer is marked as down. `PeerGovernor::record_failure` emits
31/// the lifecycle transition; the engine's Phase 8 surfaces it as
32/// `EngineStep::PeerDown`.
33pub const DEFAULT_FAILURE_THRESHOLD: u32 = 5;
34
35/// Why a peer can't receive an envelope, surfaced both inbound
36/// (drop) and outbound (gate failure).
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub enum BlockReason {
39 /// Peer is explicitly in the blocklist.
40 Blocklisted,
41 /// An allowlist is configured and the peer isn't on it.
42 NotAllowlisted,
43 /// Peer is currently in failure-driven cooldown.
44 Cooldown {
45 /// `scheduler.now_ns()` past which the peer becomes
46 /// eligible again.
47 retry_ns: u64,
48 },
49}
50
51/// Per-peer health snapshot. Read by `Node::peer_health()` for
52/// operator introspection.
53#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
54pub struct PeerHealth {
55 /// Number of consecutive `record_failure` calls since the
56 /// last `record_success`.
57 pub consecutive_failures: u32,
58 /// `scheduler.now_ns()` at the last `record_success` /
59 /// `record_failure`. `0` if neither has been called.
60 pub last_event_ns: u64,
61 /// True once `consecutive_failures` has reached the
62 /// configured threshold without an intervening success;
63 /// remains true until a success clears the streak.
64 pub down: bool,
65}
66
67/// Outcome of a `check_inbound` / `check_outbound` consultation.
68#[derive(Clone, Debug, PartialEq, Eq)]
69pub enum Decision {
70 /// Delivery / send is permitted.
71 Allow,
72 /// Delivery / send is denied; `reason` carries why.
73 Deny(BlockReason),
74}
75
76/// Side-effect of recording a failure or success - the engine
77/// translates these into `EngineStep` variants in Phase 8.
78#[derive(Clone, Copy, Debug, PartialEq, Eq)]
79pub enum LifecycleTransition {
80 /// No observable state change.
81 None,
82 /// Peer just crossed below the failure threshold.
83 WentDown,
84 /// Peer just came back up after a failure streak.
85 CameUp,
86}
87
88/// Per-peer policy + health state owner.
89///
90/// Snapshot/restore: full state is captured in
91/// `FrameworkSnapshot` (work). Pre-Stage-5, restore
92/// rebuilds the governor from scratch - blocklist + allowlist
93/// settings on `NodeConfig` are re-applied at construction time.
94pub struct PeerGovernor {
95 blocklist: HashSet<PeerId>,
96 allowlist: Option<HashSet<PeerId>>,
97 health: HashMap<PeerId, PeerHealth>,
98 failure_threshold: u32,
99}
100
101impl Default for PeerGovernor {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl PeerGovernor {
108 /// Construct a fresh governor with no blocklist, no
109 /// allowlist, and the default failure threshold.
110 pub fn new() -> Self {
111 Self {
112 blocklist: HashSet::new(),
113 allowlist: None,
114 health: HashMap::new(),
115 failure_threshold: DEFAULT_FAILURE_THRESHOLD,
116 }
117 }
118
119 /// Configure the failure threshold (consecutive failures
120 /// before a peer is marked down). Default
121 /// [`DEFAULT_FAILURE_THRESHOLD`].
122 pub fn with_failure_threshold(mut self, threshold: u32) -> Self {
123 self.failure_threshold = threshold.max(1);
124 self
125 }
126
127 /// Add a peer to the blocklist. Subsequent inbound envelopes
128 /// from this peer are dropped; subsequent outbound sends fail.
129 pub fn block(&mut self, peer: PeerId) {
130 self.blocklist.insert(peer);
131 }
132
133 /// Remove a peer from the blocklist.
134 pub fn unblock(&mut self, peer: PeerId) {
135 self.blocklist.remove(&peer);
136 }
137
138 /// Set (or clear) the allowlist. When `Some`, only peers in
139 /// the allowlist may communicate; everyone else is denied.
140 /// `None` means "open" (default).
141 pub fn set_allowlist(&mut self, allowlist: Option<HashSet<PeerId>>) {
142 self.allowlist = allowlist;
143 }
144
145 /// Per-peer health snapshot read; returns `None` if no
146 /// success/failure has ever been recorded for `peer`.
147 pub fn peer_health(&self, peer: PeerId) -> Option<PeerHealth> {
148 self.health.get(&peer).copied()
149 }
150
151 /// True when `peer` is currently marked down.
152 pub fn is_down(&self, peer: PeerId) -> bool {
153 self.health.get(&peer).is_some_and(|h| h.down)
154 }
155
156 /// Inbound consultation - called from the engine's Phase 1
157 /// envelope router before any slot is written.
158 pub fn check_inbound(&self, peer: PeerId) -> Decision {
159 if self.blocklist.contains(&peer) {
160 return Decision::Deny(BlockReason::Blocklisted);
161 }
162 if let Some(allow) = &self.allowlist {
163 if !allow.contains(&peer) {
164 return Decision::Deny(BlockReason::NotAllowlisted);
165 }
166 }
167 Decision::Allow
168 }
169
170 /// Outbound consultation - called from the compiler-inserted
171 /// `PeerHealthGate` syscall op upstream of every `wire::Send`.
172 /// Returns `Deny(Cooldown { retry_ns })` when the peer is in
173 /// failure cooldown, prompting the gate to reschedule.
174 pub fn check_outbound(
175 &self,
176 peer: PeerId,
177 backoff: &super::BackoffTable,
178 now_ns: u64,
179 ) -> Decision {
180 if self.blocklist.contains(&peer) {
181 return Decision::Deny(BlockReason::Blocklisted);
182 }
183 if let Some(allow) = &self.allowlist {
184 if !allow.contains(&peer) {
185 return Decision::Deny(BlockReason::NotAllowlisted);
186 }
187 }
188 if !backoff.should_retry(peer, now_ns) {
189 let retry_ns = backoff
190 .state(peer)
191 .map(|s| s.next_retry_ns)
192 .unwrap_or(now_ns);
193 return Decision::Deny(BlockReason::Cooldown { retry_ns });
194 }
195 Decision::Allow
196 }
197
198 /// Record a successful exchange with `peer` at `now_ns`.
199 /// Resets the consecutive-failure counter; clears `down`.
200 /// Returns the lifecycle transition the engine should
201 /// surface as an `EngineStep::PeerUp`.
202 pub fn record_success(&mut self, peer: PeerId, now_ns: u64) -> LifecycleTransition {
203 let was_down = self.health.get(&peer).map(|h| h.down).unwrap_or(false);
204 self.health.insert(
205 peer,
206 PeerHealth {
207 consecutive_failures: 0,
208 last_event_ns: now_ns,
209 down: false,
210 },
211 );
212 if was_down {
213 LifecycleTransition::CameUp
214 } else {
215 LifecycleTransition::None
216 }
217 }
218
219 /// Record a failure for `peer` at `now_ns`. Returns
220 /// `WentDown` when the failure pushes the streak across the
221 /// threshold.
222 pub fn record_failure(&mut self, peer: PeerId, now_ns: u64) -> LifecycleTransition {
223 let prev = self.health.get(&peer).copied().unwrap_or_default();
224 let consecutive_failures = prev.consecutive_failures.saturating_add(1);
225 let just_went_down = !prev.down && consecutive_failures >= self.failure_threshold;
226 self.health.insert(
227 peer,
228 PeerHealth {
229 consecutive_failures,
230 last_event_ns: now_ns,
231 down: prev.down || just_went_down,
232 },
233 );
234 if just_went_down {
235 LifecycleTransition::WentDown
236 } else {
237 LifecycleTransition::None
238 }
239 }
240
241 /// Number of distinct peers with health state recorded.
242 pub fn tracked_peers(&self) -> usize {
243 self.health.len()
244 }
245
246 /// Read-only view of the blocklist. Used by snapshot capture.
247 pub fn blocklist(&self) -> &HashSet<PeerId> {
248 &self.blocklist
249 }
250
251 /// Read-only view of the allowlist.
252 pub fn allowlist(&self) -> Option<&HashSet<PeerId>> {
253 self.allowlist.as_ref()
254 }
255
256 /// Iterate `(PeerId, PeerHealth)` entries for snapshot capture.
257 pub fn iter_health(&self) -> impl Iterator<Item = (PeerId, PeerHealth)> + '_ {
258 self.health.iter().map(|(p, h)| (*p, *h))
259 }
260
261 /// Current failure threshold (consecutive failures to mark a
262 /// peer down). Used by snapshot capture.
263 pub fn failure_threshold(&self) -> u32 {
264 self.failure_threshold
265 }
266
267 /// Replace a peer's health state directly. Used by
268 /// `Node::restore` to re-seed health from a snapshot entry.
269 pub fn restore_health(&mut self, peer: PeerId, health: PeerHealth) {
270 self.health.insert(peer, health);
271 }
272
273 /// Overwrite the failure threshold. Used by `Node::restore`.
274 pub fn set_failure_threshold(&mut self, threshold: u32) {
275 self.failure_threshold = threshold.max(1);
276 }
277}
278