Skip to main content

bb_runtime/framework/
backpressure_tracker.rs

1//! `BackpressureTracker` - per-peer receiver-side state for the
2//! typed-overload-signal protocol per
3//! `docs/internal/superpowers/specs/2026-06-23-backpressure-runtime.md`.
4//!
5//! The framework owns this primitive; the engine consults it at
6//! Phase 1 of the poll cycle when ingress depth crosses the
7//! high-water mark or when `RttTracker::scan_phi` flips a sender to
8//! `Suspect`. Each consultation either yields a
9//! [`Decision::EmitNotice`] (the receiver will emit a typed
10//! `BackoffNotice` envelope back to the sender) or a
11//! [`Decision::Suppress`] (duplicate-suppression window still
12//! open, silent-drop mode active, or another reason the receiver
13//! should not act).
14//!
15//! ## Composition
16//!
17//! Per `peer_state.rs`, the tracker joins the existing per-peer
18//! state cluster (`gate`, `governor`, `backoff`) as a sibling
19//! field. Component authors and engine sites reach it through
20//! `ctx.peers.backpressure` / `framework.peer_state.backpressure`,
21//! matching the existing access pattern.
22//!
23//! The tracker is receiver-side state. The matching sender-side
24//! state lives in the existing `BackoffTable` - on receipt of a
25//! `BackoffNotice`, the sender updates `backoff` so the existing
26//! `BackoffGateTx` consultation already gates the next outbound
27//! send. No new sender-side primitive is required.
28//!
29//! ## K-then-silent semantics
30//!
31//! Each emitted notice bumps `notices_sent` for the peer. The
32//! per-peer counter resets when the sender's `RttTracker::scan_phi`
33//! flips back to `Live` (matching the existing recovery surface).
34//! Once `notices_sent` exceeds `notice_threshold_k` without
35//! recovery, [`Decision::EmitNotice`] is no longer returned;
36//! [`Decision::SilentDrop`] is returned instead. The engine's Phase
37//! 1 envelope router drops envelopes from a silent-drop peer
38//! without further notice emission. The first silent-drop
39//! transition surfaces as `InfraEvent::SilentDropActive` on the
40//! bus; subsequent envelopes from the same peer surface no further
41//! event until the peer recovers.
42
43use std::collections::HashMap;
44
45use crate::ids::PeerId;
46
47/// Default high-water mark percentage. Matches the spec default
48/// in §6: ingress queue depth >= 75% of capacity triggers a
49/// `BackoffCause::QueueFull` notice.
50pub const DEFAULT_HIGH_WATER_PCT: u8 = 75;
51
52/// Default K (notices-without-slowdown before silent-drop).
53/// Matches `RttEma::is_warm`'s "evidence sufficient to act"
54/// threshold of 3 samples per
55/// `bb-runtime/src/framework/rtt_tracker.rs:126-128`.
56pub const DEFAULT_K_BEFORE_SILENT: u32 = 3;
57
58/// Default minimum interval between successive notices to the
59/// same peer (1 second). Acts as a hard lower bound on the
60/// duplicate-suppression window so a flood of inbound envelopes
61/// from one peer produces at most one notice per second even when
62/// the receiver lacks a tighter per-cause `min_backoff_ns` hint.
63pub const DEFAULT_MIN_NOTICE_INTERVAL_NS: u64 = 1_000_000_000;
64
65/// Why the receiver is requesting a back-off.
66///
67/// Mirrors the wire-protocol `BackoffCause` payload field; lives
68/// on the framework side so the engine + bus events reference
69/// the same enum the wire op encodes.
70#[derive(Clone, Copy, Debug, PartialEq, Eq)]
71pub enum BackoffCause {
72    /// `IngressQueue` depth crossed the high-water mark.
73    QueueFull,
74    /// `PhiAccrualState` marked the sender as `Suspect` (sender is
75    /// too fast for this receiver's processing rate).
76    PhiAccrual,
77    /// A Component returned a typed reject (e.g. role rate-limit
78    /// such as an aggregator already filled its per-round window).
79    ExplicitDrop,
80}
81
82/// Per-peer back-pressure bookkeeping.
83#[derive(Clone, Copy, Debug, Default)]
84pub struct BackpressureEntry {
85    /// Total notices emitted to this peer since the last reset.
86    /// Reset to 0 when the peer's φ collapses back below `Suspect`.
87    pub notices_sent: u32,
88    /// `now_ns` recorded at the most recent notice emission.
89    /// `0` if none have been emitted yet. Used together with
90    /// `last_min_backoff_ns` to suppress redundant notices inside
91    /// the previously-quoted back-off window.
92    pub last_notice_at_ns: u64,
93    /// `min_backoff_ns` quoted on the most recent notice. The
94    /// duplicate-suppression check skips emission while
95    /// `now_ns < last_notice_at_ns + last_min_backoff_ns`. `0`
96    /// when no notice has been emitted yet.
97    pub last_min_backoff_ns: u64,
98    /// `true` once `notices_sent` >= K without observed recovery.
99    /// Phase 1 of `Engine::poll` drops envelopes from this peer
100    /// while set.
101    pub silent_drop_active: bool,
102}
103
104/// Decision returned by [`BackpressureTracker::observe_overload`].
105#[derive(Clone, Copy, Debug, PartialEq, Eq)]
106pub enum Decision {
107    /// Receiver should emit a `BackoffNotice` envelope back to the
108    /// sender. Carries the `min_backoff_ns` the receiver chose for
109    /// the notice (either propagated from the caller or sized
110    /// proportional to the cause).
111    EmitNotice {
112        /// Minimum back-off the notice will quote.
113        min_backoff_ns: u64,
114        /// Why the notice is being emitted.
115        cause: BackoffCause,
116    },
117    /// Receiver should not emit (duplicate-suppression window is
118    /// still open, or the cause was already covered by a recent
119    /// notice).
120    Suppress,
121    /// Receiver should drop the inbound envelope without emitting a
122    /// notice. Returned once the K-without-recovery threshold has
123    /// been crossed.
124    SilentDrop,
125}
126
127/// Per-peer receiver-side back-pressure state.
128///
129/// Sibling field on `PeerState` per
130/// `bb-runtime/src/framework/peer_state.rs`. The tracker is
131/// receiver-state-only; sender-side back-off lives in the existing
132/// `BackoffTable`.
133pub struct BackpressureTracker {
134    entries: HashMap<PeerId, BackpressureEntry>,
135    high_water_mark_pct: u8,
136    notice_threshold_k: u32,
137    min_notice_interval_ns: u64,
138}
139
140impl Default for BackpressureTracker {
141    fn default() -> Self {
142        Self::new()
143    }
144}
145
146impl BackpressureTracker {
147    /// Construct a fresh tracker with the spec's defaults
148    /// (high-water = 75%, K = 3, min-notice-interval = 1 second).
149    pub fn new() -> Self {
150        Self::with_config(
151            DEFAULT_HIGH_WATER_PCT,
152            DEFAULT_K_BEFORE_SILENT,
153            DEFAULT_MIN_NOTICE_INTERVAL_NS,
154        )
155    }
156
157    /// Construct a tracker with custom config values. `high_water_mark_pct`
158    /// is clamped to `1..=100`; `notice_threshold_k` is clamped to
159    /// at least 1; `min_notice_interval_ns` is clamped to at least 1.
160    pub fn with_config(
161        high_water_mark_pct: u8,
162        notice_threshold_k: u32,
163        min_notice_interval_ns: u64,
164    ) -> Self {
165        Self {
166            entries: HashMap::new(),
167            high_water_mark_pct: high_water_mark_pct.clamp(1, 100),
168            notice_threshold_k: notice_threshold_k.max(1),
169            min_notice_interval_ns: min_notice_interval_ns.max(1),
170        }
171    }
172
173    /// High-water mark threshold as a percentage.
174    pub fn high_water_mark_pct(&self) -> u8 {
175        self.high_water_mark_pct
176    }
177
178    /// Whether the supplied queue depth (`len`) crosses the
179    /// configured high-water mark for the supplied capacity.
180    pub fn is_over_high_water(&self, len: usize, capacity: usize) -> bool {
181        if capacity == 0 {
182            return false;
183        }
184        // Use 128-bit arithmetic to avoid overflow on
185        // pathologically large capacities; cast back is safe
186        // because `100 * usize::MAX` fits in u128.
187        let lhs = (len as u128).saturating_mul(100);
188        let rhs = (capacity as u128).saturating_mul(self.high_water_mark_pct as u128);
189        lhs >= rhs
190    }
191
192    /// K threshold (notices-without-recovery before silent-drop).
193    pub fn notice_threshold_k(&self) -> u32 {
194        self.notice_threshold_k
195    }
196
197    /// Minimum interval enforced between successive notices to the
198    /// same peer.
199    pub fn min_notice_interval_ns(&self) -> u64 {
200        self.min_notice_interval_ns
201    }
202
203    /// Whether the peer is currently in silent-drop mode.
204    pub fn is_silent_drop_active(&self, peer: PeerId) -> bool {
205        self.entries
206            .get(&peer)
207            .is_some_and(|e| e.silent_drop_active)
208    }
209
210    /// Inspect the recorded entry for `peer`. Returns `None` when
211    /// no overload event has been observed for this peer yet.
212    pub fn entry(&self, peer: PeerId) -> Option<BackpressureEntry> {
213        self.entries.get(&peer).copied()
214    }
215
216    /// Iterate `(PeerId, BackpressureEntry)` for snapshot capture.
217    pub fn iter(&self) -> impl Iterator<Item = (PeerId, BackpressureEntry)> + '_ {
218        self.entries.iter().map(|(p, e)| (*p, *e))
219    }
220
221    /// Number of peers currently tracked.
222    pub fn len(&self) -> usize {
223        self.entries.len()
224    }
225
226    /// Whether any peer has been tracked.
227    pub fn is_empty(&self) -> bool {
228        self.entries.is_empty()
229    }
230
231    /// Observe an overload condition for `peer` at `now_ns`.
232    ///
233    /// Returns:
234    /// - `Decision::SilentDrop` if the peer is already in
235    ///   silent-drop mode. The caller drops the envelope; no
236    ///   notice is emitted.
237    /// - `Decision::Suppress` if a recent notice's quoted back-off
238    ///   window has not yet elapsed (duplicate suppression).
239    /// - `Decision::EmitNotice` if the caller should emit a notice.
240    ///   The tracker increments `notices_sent` + records the
241    ///   emission timestamp + back-off. If this push crosses the
242    ///   K threshold, the entry transitions to `silent_drop_active`
243    ///   in the *next* observation - the current decision still
244    ///   emits the K-th notice so the sender gets the final
245    ///   warning before silent drop kicks in.
246    ///
247    /// `min_backoff_ns` is the back-off duration the caller intends
248    /// to quote on the notice. The tracker uses it for the
249    /// duplicate-suppression window. A 0 value collapses to the
250    /// configured `min_notice_interval_ns` floor.
251    pub fn observe_overload(
252        &mut self,
253        peer: PeerId,
254        cause: BackoffCause,
255        min_backoff_ns: u64,
256        now_ns: u64,
257    ) -> Decision {
258        // Capture the prior emit-state so we can decide between
259        // Suppress (early-return without mutation) and EmitNotice
260        // (which mutates) before the per-entry borrow extends.
261        let prior = self.entries.get(&peer).copied().unwrap_or_default();
262
263        if prior.silent_drop_active {
264            return Decision::SilentDrop;
265        }
266
267        let effective_min = min_backoff_ns.max(self.min_notice_interval_ns);
268
269        // Duplicate suppression: skip emission while inside the
270        // previously-quoted back-off window.
271        if prior.last_notice_at_ns != 0
272            && now_ns
273                < prior
274                    .last_notice_at_ns
275                    .saturating_add(prior.last_min_backoff_ns)
276        {
277            return Decision::Suppress;
278        }
279
280        // Emit. Bump the counter + record the emission window.
281        let new_notices = prior.notices_sent.saturating_add(1);
282        let silent_drop_active = new_notices >= self.notice_threshold_k;
283        self.entries.insert(
284            peer,
285            BackpressureEntry {
286                notices_sent: new_notices,
287                last_notice_at_ns: now_ns,
288                last_min_backoff_ns: effective_min,
289                silent_drop_active,
290            },
291        );
292        Decision::EmitNotice {
293            min_backoff_ns: effective_min,
294            cause,
295        }
296    }
297
298    /// Record that the sender has recovered (e.g., φ-accrual
299    /// transitioned back to `Live`). Resets the per-peer counter
300    /// and clears `silent_drop_active`. The next `observe_overload`
301    /// for the peer starts fresh.
302    pub fn record_recovery(&mut self, peer: PeerId) {
303        self.entries.remove(&peer);
304    }
305
306    /// Whether `peer` is currently inside its duplicate-suppression
307    /// window. Used by tests + introspection.
308    pub fn in_suppression_window(&self, peer: PeerId, now_ns: u64) -> bool {
309        let Some(entry) = self.entries.get(&peer) else {
310            return false;
311        };
312        if entry.last_notice_at_ns == 0 {
313            return false;
314        }
315        now_ns
316            < entry
317                .last_notice_at_ns
318                .saturating_add(entry.last_min_backoff_ns)
319    }
320}
321