Skip to main content

bb_runtime/framework/
peer_governor.rs

1//! `PeerGovernor` - the single source of truth for peer policy
2//! and health tracking per .
3//!
4//! The framework owns this Component; it's consulted at both
5//! delivery boundaries:
6//!
7//! - **Inbound **: Phase 1 of `Engine::poll` calls
8//!   [`PeerGovernor::check_inbound`] for every
9//!   `IngressEvent::EnvelopeFrom { src_peer, .. }`. Blocked /
10//!   non-allowlisted peers are dropped before any slot is written.
11//!
12//! - **Outbound **: the compiler pass
13//!   `bb-compiler/src/insert_peer_health_gates.rs` inserts a
14//!   `PeerHealthGate` syscall op upstream of every `wire::Send`.
15//!   The gate's `dispatch_atomic` calls
16//!   [`PeerGovernor::check_outbound`].
17//!
18//! Health state (per-peer consecutive failures + last-seen) is
19//! updated by Send-completion callbacks so Component authors stop
20//! touching [`BackoffTable`] by hand - the compiler wires the
21//! tracking in for every wire send automatically.
22//!
23//! [`BackoffTable`]: crate::framework::BackoffTable
24
25use std::collections::{HashMap, HashSet};
26
27use crate::ids::PeerId;
28
29/// Default number of consecutive `wire::Send` failures before a
30/// peer is marked as down. `PeerGovernor::record_failure` emits
31/// the lifecycle transition; the engine's Phase 8 surfaces it as
32/// `EngineStep::PeerDown`.
33pub const DEFAULT_FAILURE_THRESHOLD: u32 = 5;
34
35/// Why a peer can't receive an envelope, surfaced both inbound
36/// (drop) and outbound (gate failure).
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub enum BlockReason {
39    /// Peer is explicitly in the blocklist.
40    Blocklisted,
41    /// An allowlist is configured and the peer isn't on it.
42    NotAllowlisted,
43    /// Peer is currently in failure-driven cooldown.
44    Cooldown {
45        /// `scheduler.now_ns()` past which the peer becomes
46        /// eligible again.
47        retry_ns: u64,
48    },
49}
50
51/// Per-peer health snapshot. Read by `Node::peer_health()` for
52/// operator introspection.
53#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
54pub struct PeerHealth {
55    /// Number of consecutive `record_failure` calls since the
56    /// last `record_success`.
57    pub consecutive_failures: u32,
58    /// `scheduler.now_ns()` at the last `record_success` /
59    /// `record_failure`. `0` if neither has been called.
60    pub last_event_ns: u64,
61    /// True once `consecutive_failures` has reached the
62    /// configured threshold without an intervening success;
63    /// remains true until a success clears the streak.
64    pub down: bool,
65}
66
67/// Outcome of a `check_inbound` / `check_outbound` consultation.
68#[derive(Clone, Debug, PartialEq, Eq)]
69pub enum Decision {
70    /// Delivery / send is permitted.
71    Allow,
72    /// Delivery / send is denied; `reason` carries why.
73    Deny(BlockReason),
74}
75
76/// Side-effect of recording a failure or success - the engine
77/// translates these into `EngineStep` variants in Phase 8.
78#[derive(Clone, Copy, Debug, PartialEq, Eq)]
79pub enum LifecycleTransition {
80    /// No observable state change.
81    None,
82    /// Peer just crossed below the failure threshold.
83    WentDown,
84    /// Peer just came back up after a failure streak.
85    CameUp,
86}
87
88/// Per-peer policy + health state owner.
89///
90/// Snapshot/restore: full state is captured in
91/// `FrameworkSnapshot` (work). Pre-Stage-5, restore
92/// rebuilds the governor from scratch - blocklist + allowlist
93/// settings on `NodeConfig` are re-applied at construction time.
94pub struct PeerGovernor {
95    blocklist: HashSet<PeerId>,
96    allowlist: Option<HashSet<PeerId>>,
97    health: HashMap<PeerId, PeerHealth>,
98    failure_threshold: u32,
99}
100
101impl Default for PeerGovernor {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl PeerGovernor {
108    /// Construct a fresh governor with no blocklist, no
109    /// allowlist, and the default failure threshold.
110    pub fn new() -> Self {
111        Self {
112            blocklist: HashSet::new(),
113            allowlist: None,
114            health: HashMap::new(),
115            failure_threshold: DEFAULT_FAILURE_THRESHOLD,
116        }
117    }
118
119    /// Configure the failure threshold (consecutive failures
120    /// before a peer is marked down). Default
121    /// [`DEFAULT_FAILURE_THRESHOLD`].
122    pub fn with_failure_threshold(mut self, threshold: u32) -> Self {
123        self.failure_threshold = threshold.max(1);
124        self
125    }
126
127    /// Add a peer to the blocklist. Subsequent inbound envelopes
128    /// from this peer are dropped; subsequent outbound sends fail.
129    pub fn block(&mut self, peer: PeerId) {
130        self.blocklist.insert(peer);
131    }
132
133    /// Remove a peer from the blocklist.
134    pub fn unblock(&mut self, peer: PeerId) {
135        self.blocklist.remove(&peer);
136    }
137
138    /// Set (or clear) the allowlist. When `Some`, only peers in
139    /// the allowlist may communicate; everyone else is denied.
140    /// `None` means "open" (default).
141    pub fn set_allowlist(&mut self, allowlist: Option<HashSet<PeerId>>) {
142        self.allowlist = allowlist;
143    }
144
145    /// Per-peer health snapshot read; returns `None` if no
146    /// success/failure has ever been recorded for `peer`.
147    pub fn peer_health(&self, peer: PeerId) -> Option<PeerHealth> {
148        self.health.get(&peer).copied()
149    }
150
151    /// True when `peer` is currently marked down.
152    pub fn is_down(&self, peer: PeerId) -> bool {
153        self.health.get(&peer).is_some_and(|h| h.down)
154    }
155
156    /// Inbound consultation - called from the engine's Phase 1
157    /// envelope router before any slot is written.
158    pub fn check_inbound(&self, peer: PeerId) -> Decision {
159        if self.blocklist.contains(&peer) {
160            return Decision::Deny(BlockReason::Blocklisted);
161        }
162        if let Some(allow) = &self.allowlist {
163            if !allow.contains(&peer) {
164                return Decision::Deny(BlockReason::NotAllowlisted);
165            }
166        }
167        Decision::Allow
168    }
169
170    /// Outbound consultation - called from the compiler-inserted
171    /// `PeerHealthGate` syscall op upstream of every `wire::Send`.
172    /// Returns `Deny(Cooldown { retry_ns })` when the peer is in
173    /// failure cooldown, prompting the gate to reschedule.
174    pub fn check_outbound(
175        &self,
176        peer: PeerId,
177        backoff: &super::BackoffTable,
178        now_ns: u64,
179    ) -> Decision {
180        if self.blocklist.contains(&peer) {
181            return Decision::Deny(BlockReason::Blocklisted);
182        }
183        if let Some(allow) = &self.allowlist {
184            if !allow.contains(&peer) {
185                return Decision::Deny(BlockReason::NotAllowlisted);
186            }
187        }
188        if !backoff.should_retry(peer, now_ns) {
189            let retry_ns = backoff
190                .state(peer)
191                .map(|s| s.next_retry_ns)
192                .unwrap_or(now_ns);
193            return Decision::Deny(BlockReason::Cooldown { retry_ns });
194        }
195        Decision::Allow
196    }
197
198    /// Record a successful exchange with `peer` at `now_ns`.
199    /// Resets the consecutive-failure counter; clears `down`.
200    /// Returns the lifecycle transition the engine should
201    /// surface as an `EngineStep::PeerUp`.
202    pub fn record_success(&mut self, peer: PeerId, now_ns: u64) -> LifecycleTransition {
203        let was_down = self.health.get(&peer).map(|h| h.down).unwrap_or(false);
204        self.health.insert(
205            peer,
206            PeerHealth {
207                consecutive_failures: 0,
208                last_event_ns: now_ns,
209                down: false,
210            },
211        );
212        if was_down {
213            LifecycleTransition::CameUp
214        } else {
215            LifecycleTransition::None
216        }
217    }
218
219    /// Record a failure for `peer` at `now_ns`. Returns
220    /// `WentDown` when the failure pushes the streak across the
221    /// threshold.
222    pub fn record_failure(&mut self, peer: PeerId, now_ns: u64) -> LifecycleTransition {
223        let prev = self.health.get(&peer).copied().unwrap_or_default();
224        let consecutive_failures = prev.consecutive_failures.saturating_add(1);
225        let just_went_down = !prev.down && consecutive_failures >= self.failure_threshold;
226        self.health.insert(
227            peer,
228            PeerHealth {
229                consecutive_failures,
230                last_event_ns: now_ns,
231                down: prev.down || just_went_down,
232            },
233        );
234        if just_went_down {
235            LifecycleTransition::WentDown
236        } else {
237            LifecycleTransition::None
238        }
239    }
240
241    /// Number of distinct peers with health state recorded.
242    pub fn tracked_peers(&self) -> usize {
243        self.health.len()
244    }
245
246    /// Read-only view of the blocklist. Used by snapshot capture.
247    pub fn blocklist(&self) -> &HashSet<PeerId> {
248        &self.blocklist
249    }
250
251    /// Read-only view of the allowlist.
252    pub fn allowlist(&self) -> Option<&HashSet<PeerId>> {
253        self.allowlist.as_ref()
254    }
255
256    /// Iterate `(PeerId, PeerHealth)` entries for snapshot capture.
257    pub fn iter_health(&self) -> impl Iterator<Item = (PeerId, PeerHealth)> + '_ {
258        self.health.iter().map(|(p, h)| (*p, *h))
259    }
260
261    /// Current failure threshold (consecutive failures to mark a
262    /// peer down). Used by snapshot capture.
263    pub fn failure_threshold(&self) -> u32 {
264        self.failure_threshold
265    }
266
267    /// Replace a peer's health state directly. Used by
268    /// `Node::restore` to re-seed health from a snapshot entry.
269    pub fn restore_health(&mut self, peer: PeerId, health: PeerHealth) {
270        self.health.insert(peer, health);
271    }
272
273    /// Overwrite the failure threshold. Used by `Node::restore`.
274    pub fn set_failure_threshold(&mut self, threshold: u32) {
275        self.failure_threshold = threshold.max(1);
276    }
277}
278