bb_runtime/framework/backpressure_tracker.rs
1//! `BackpressureTracker` - per-peer receiver-side state for the
2//! typed-overload-signal protocol per
3//! `docs/internal/superpowers/specs/2026-06-23-backpressure-runtime.md`.
4//!
5//! The framework owns this primitive; the engine consults it at
6//! Phase 1 of the poll cycle when ingress depth crosses the
7//! high-water mark or when `RttTracker::scan_phi` flips a sender to
8//! `Suspect`. Each consultation either yields a
9//! [`Decision::EmitNotice`] (the receiver will emit a typed
10//! `BackoffNotice` envelope back to the sender) or a
11//! [`Decision::Suppress`] (duplicate-suppression window still
12//! open, silent-drop mode active, or another reason the receiver
13//! should not act).
14//!
15//! ## Composition
16//!
17//! Per `peer_state.rs`, the tracker joins the existing per-peer
18//! state cluster (`gate`, `governor`, `backoff`) as a sibling
19//! field. Component authors and engine sites reach it through
20//! `ctx.peers.backpressure` / `framework.peer_state.backpressure`,
21//! matching the existing access pattern.
22//!
23//! The tracker is receiver-side state. The matching sender-side
24//! state lives in the existing `BackoffTable` - on receipt of a
25//! `BackoffNotice`, the sender updates `backoff` so the existing
26//! `BackoffGateTx` consultation already gates the next outbound
27//! send. No new sender-side primitive is required.
28//!
29//! ## K-then-silent semantics
30//!
31//! Each emitted notice bumps `notices_sent` for the peer. The
32//! per-peer counter resets when the sender's `RttTracker::scan_phi`
33//! flips back to `Live` (matching the existing recovery surface).
34//! Once `notices_sent` exceeds `notice_threshold_k` without
35//! recovery, [`Decision::EmitNotice`] is no longer returned;
36//! [`Decision::SilentDrop`] is returned instead. The engine's Phase
37//! 1 envelope router drops envelopes from a silent-drop peer
38//! without further notice emission. The first silent-drop
39//! transition surfaces as `InfraEvent::SilentDropActive` on the
40//! bus; subsequent envelopes from the same peer surface no further
41//! event until the peer recovers.
42
43use std::collections::HashMap;
44
45use crate::ids::PeerId;
46
47/// Default high-water mark percentage. Matches the spec default
48/// in §6: ingress queue depth >= 75% of capacity triggers a
49/// `BackoffCause::QueueFull` notice.
50pub const DEFAULT_HIGH_WATER_PCT: u8 = 75;
51
52/// Default K (notices-without-slowdown before silent-drop).
53/// Matches `RttEma::is_warm`'s "evidence sufficient to act"
54/// threshold of 3 samples per
55/// `bb-runtime/src/framework/rtt_tracker.rs:126-128`.
56pub const DEFAULT_K_BEFORE_SILENT: u32 = 3;
57
58/// Default minimum interval between successive notices to the
59/// same peer (1 second). Acts as a hard lower bound on the
60/// duplicate-suppression window so a flood of inbound envelopes
61/// from one peer produces at most one notice per second even when
62/// the receiver lacks a tighter per-cause `min_backoff_ns` hint.
63pub const DEFAULT_MIN_NOTICE_INTERVAL_NS: u64 = 1_000_000_000;
64
65/// Why the receiver is requesting a back-off.
66///
67/// Mirrors the wire-protocol `BackoffCause` payload field; lives
68/// on the framework side so the engine + bus events reference
69/// the same enum the wire op encodes.
70#[derive(Clone, Copy, Debug, PartialEq, Eq)]
71pub enum BackoffCause {
72 /// `IngressQueue` depth crossed the high-water mark.
73 QueueFull,
74 /// `PhiAccrualState` marked the sender as `Suspect` (sender is
75 /// too fast for this receiver's processing rate).
76 PhiAccrual,
77 /// A Component returned a typed reject (e.g. role rate-limit
78 /// such as an aggregator already filled its per-round window).
79 ExplicitDrop,
80}
81
82/// Per-peer back-pressure bookkeeping.
83#[derive(Clone, Copy, Debug, Default)]
84pub struct BackpressureEntry {
85 /// Total notices emitted to this peer since the last reset.
86 /// Reset to 0 when the peer's φ collapses back below `Suspect`.
87 pub notices_sent: u32,
88 /// `now_ns` recorded at the most recent notice emission.
89 /// `0` if none have been emitted yet. Used together with
90 /// `last_min_backoff_ns` to suppress redundant notices inside
91 /// the previously-quoted back-off window.
92 pub last_notice_at_ns: u64,
93 /// `min_backoff_ns` quoted on the most recent notice. The
94 /// duplicate-suppression check skips emission while
95 /// `now_ns < last_notice_at_ns + last_min_backoff_ns`. `0`
96 /// when no notice has been emitted yet.
97 pub last_min_backoff_ns: u64,
98 /// `true` once `notices_sent` >= K without observed recovery.
99 /// Phase 1 of `Engine::poll` drops envelopes from this peer
100 /// while set.
101 pub silent_drop_active: bool,
102}
103
104/// Decision returned by [`BackpressureTracker::observe_overload`].
105#[derive(Clone, Copy, Debug, PartialEq, Eq)]
106pub enum Decision {
107 /// Receiver should emit a `BackoffNotice` envelope back to the
108 /// sender. Carries the `min_backoff_ns` the receiver chose for
109 /// the notice (either propagated from the caller or sized
110 /// proportional to the cause).
111 EmitNotice {
112 /// Minimum back-off the notice will quote.
113 min_backoff_ns: u64,
114 /// Why the notice is being emitted.
115 cause: BackoffCause,
116 },
117 /// Receiver should not emit (duplicate-suppression window is
118 /// still open, or the cause was already covered by a recent
119 /// notice).
120 Suppress,
121 /// Receiver should drop the inbound envelope without emitting a
122 /// notice. Returned once the K-without-recovery threshold has
123 /// been crossed.
124 SilentDrop,
125}
126
127/// Per-peer receiver-side back-pressure state.
128///
129/// Sibling field on `PeerState` per
130/// `bb-runtime/src/framework/peer_state.rs`. The tracker is
131/// receiver-state-only; sender-side back-off lives in the existing
132/// `BackoffTable`.
133pub struct BackpressureTracker {
134 entries: HashMap<PeerId, BackpressureEntry>,
135 high_water_mark_pct: u8,
136 notice_threshold_k: u32,
137 min_notice_interval_ns: u64,
138}
139
140impl Default for BackpressureTracker {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146impl BackpressureTracker {
147 /// Construct a fresh tracker with the spec's defaults
148 /// (high-water = 75%, K = 3, min-notice-interval = 1 second).
149 pub fn new() -> Self {
150 Self::with_config(
151 DEFAULT_HIGH_WATER_PCT,
152 DEFAULT_K_BEFORE_SILENT,
153 DEFAULT_MIN_NOTICE_INTERVAL_NS,
154 )
155 }
156
157 /// Construct a tracker with custom config values. `high_water_mark_pct`
158 /// is clamped to `1..=100`; `notice_threshold_k` is clamped to
159 /// at least 1; `min_notice_interval_ns` is clamped to at least 1.
160 pub fn with_config(
161 high_water_mark_pct: u8,
162 notice_threshold_k: u32,
163 min_notice_interval_ns: u64,
164 ) -> Self {
165 Self {
166 entries: HashMap::new(),
167 high_water_mark_pct: high_water_mark_pct.clamp(1, 100),
168 notice_threshold_k: notice_threshold_k.max(1),
169 min_notice_interval_ns: min_notice_interval_ns.max(1),
170 }
171 }
172
173 /// High-water mark threshold as a percentage.
174 pub fn high_water_mark_pct(&self) -> u8 {
175 self.high_water_mark_pct
176 }
177
178 /// Whether the supplied queue depth (`len`) crosses the
179 /// configured high-water mark for the supplied capacity.
180 pub fn is_over_high_water(&self, len: usize, capacity: usize) -> bool {
181 if capacity == 0 {
182 return false;
183 }
184 // Use 128-bit arithmetic to avoid overflow on
185 // pathologically large capacities; cast back is safe
186 // because `100 * usize::MAX` fits in u128.
187 let lhs = (len as u128).saturating_mul(100);
188 let rhs = (capacity as u128).saturating_mul(self.high_water_mark_pct as u128);
189 lhs >= rhs
190 }
191
192 /// K threshold (notices-without-recovery before silent-drop).
193 pub fn notice_threshold_k(&self) -> u32 {
194 self.notice_threshold_k
195 }
196
197 /// Minimum interval enforced between successive notices to the
198 /// same peer.
199 pub fn min_notice_interval_ns(&self) -> u64 {
200 self.min_notice_interval_ns
201 }
202
203 /// Whether the peer is currently in silent-drop mode.
204 pub fn is_silent_drop_active(&self, peer: PeerId) -> bool {
205 self.entries
206 .get(&peer)
207 .is_some_and(|e| e.silent_drop_active)
208 }
209
210 /// Inspect the recorded entry for `peer`. Returns `None` when
211 /// no overload event has been observed for this peer yet.
212 pub fn entry(&self, peer: PeerId) -> Option<BackpressureEntry> {
213 self.entries.get(&peer).copied()
214 }
215
216 /// Iterate `(PeerId, BackpressureEntry)` for snapshot capture.
217 pub fn iter(&self) -> impl Iterator<Item = (PeerId, BackpressureEntry)> + '_ {
218 self.entries.iter().map(|(p, e)| (*p, *e))
219 }
220
221 /// Number of peers currently tracked.
222 pub fn len(&self) -> usize {
223 self.entries.len()
224 }
225
226 /// Whether any peer has been tracked.
227 pub fn is_empty(&self) -> bool {
228 self.entries.is_empty()
229 }
230
231 /// Observe an overload condition for `peer` at `now_ns`.
232 ///
233 /// Returns:
234 /// - `Decision::SilentDrop` if the peer is already in
235 /// silent-drop mode. The caller drops the envelope; no
236 /// notice is emitted.
237 /// - `Decision::Suppress` if a recent notice's quoted back-off
238 /// window has not yet elapsed (duplicate suppression).
239 /// - `Decision::EmitNotice` if the caller should emit a notice.
240 /// The tracker increments `notices_sent` + records the
241 /// emission timestamp + back-off. If this push crosses the
242 /// K threshold, the entry transitions to `silent_drop_active`
243 /// in the *next* observation - the current decision still
244 /// emits the K-th notice so the sender gets the final
245 /// warning before silent drop kicks in.
246 ///
247 /// `min_backoff_ns` is the back-off duration the caller intends
248 /// to quote on the notice. The tracker uses it for the
249 /// duplicate-suppression window. A 0 value collapses to the
250 /// configured `min_notice_interval_ns` floor.
251 pub fn observe_overload(
252 &mut self,
253 peer: PeerId,
254 cause: BackoffCause,
255 min_backoff_ns: u64,
256 now_ns: u64,
257 ) -> Decision {
258 // Capture the prior emit-state so we can decide between
259 // Suppress (early-return without mutation) and EmitNotice
260 // (which mutates) before the per-entry borrow extends.
261 let prior = self.entries.get(&peer).copied().unwrap_or_default();
262
263 if prior.silent_drop_active {
264 return Decision::SilentDrop;
265 }
266
267 let effective_min = min_backoff_ns.max(self.min_notice_interval_ns);
268
269 // Duplicate suppression: skip emission while inside the
270 // previously-quoted back-off window.
271 if prior.last_notice_at_ns != 0
272 && now_ns
273 < prior
274 .last_notice_at_ns
275 .saturating_add(prior.last_min_backoff_ns)
276 {
277 return Decision::Suppress;
278 }
279
280 // Emit. Bump the counter + record the emission window.
281 let new_notices = prior.notices_sent.saturating_add(1);
282 let silent_drop_active = new_notices >= self.notice_threshold_k;
283 self.entries.insert(
284 peer,
285 BackpressureEntry {
286 notices_sent: new_notices,
287 last_notice_at_ns: now_ns,
288 last_min_backoff_ns: effective_min,
289 silent_drop_active,
290 },
291 );
292 Decision::EmitNotice {
293 min_backoff_ns: effective_min,
294 cause,
295 }
296 }
297
298 /// Record that the sender has recovered (e.g., φ-accrual
299 /// transitioned back to `Live`). Resets the per-peer counter
300 /// and clears `silent_drop_active`. The next `observe_overload`
301 /// for the peer starts fresh.
302 pub fn record_recovery(&mut self, peer: PeerId) {
303 self.entries.remove(&peer);
304 }
305
306 /// Whether `peer` is currently inside its duplicate-suppression
307 /// window. Used by tests + introspection.
308 pub fn in_suppression_window(&self, peer: PeerId, now_ns: u64) -> bool {
309 let Some(entry) = self.entries.get(&peer) else {
310 return false;
311 };
312 if entry.last_notice_at_ns == 0 {
313 return false;
314 }
315 now_ns
316 < entry
317 .last_notice_at_ns
318 .saturating_add(entry.last_min_backoff_ns)
319 }
320}
321