use std::collections::HashMap;
use crate::ids::PeerId;
pub const DEFAULT_HIGH_WATER_PCT: u8 = 75;
pub const DEFAULT_K_BEFORE_SILENT: u32 = 3;
pub const DEFAULT_MIN_NOTICE_INTERVAL_NS: u64 = 1_000_000_000;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BackoffCause {
QueueFull,
PhiAccrual,
ExplicitDrop,
}
#[derive(Clone, Copy, Debug, Default)]
pub struct BackpressureEntry {
pub notices_sent: u32,
pub last_notice_at_ns: u64,
pub last_min_backoff_ns: u64,
pub silent_drop_active: bool,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Decision {
EmitNotice {
min_backoff_ns: u64,
cause: BackoffCause,
},
Suppress,
SilentDrop,
}
pub struct BackpressureTracker {
entries: HashMap<PeerId, BackpressureEntry>,
high_water_mark_pct: u8,
notice_threshold_k: u32,
min_notice_interval_ns: u64,
}
impl Default for BackpressureTracker {
fn default() -> Self {
Self::new()
}
}
impl BackpressureTracker {
pub fn new() -> Self {
Self::with_config(
DEFAULT_HIGH_WATER_PCT,
DEFAULT_K_BEFORE_SILENT,
DEFAULT_MIN_NOTICE_INTERVAL_NS,
)
}
pub fn with_config(
high_water_mark_pct: u8,
notice_threshold_k: u32,
min_notice_interval_ns: u64,
) -> Self {
Self {
entries: HashMap::new(),
high_water_mark_pct: high_water_mark_pct.clamp(1, 100),
notice_threshold_k: notice_threshold_k.max(1),
min_notice_interval_ns: min_notice_interval_ns.max(1),
}
}
pub fn high_water_mark_pct(&self) -> u8 {
self.high_water_mark_pct
}
pub fn is_over_high_water(&self, len: usize, capacity: usize) -> bool {
if capacity == 0 {
return false;
}
let lhs = (len as u128).saturating_mul(100);
let rhs = (capacity as u128).saturating_mul(self.high_water_mark_pct as u128);
lhs >= rhs
}
pub fn notice_threshold_k(&self) -> u32 {
self.notice_threshold_k
}
pub fn min_notice_interval_ns(&self) -> u64 {
self.min_notice_interval_ns
}
pub fn is_silent_drop_active(&self, peer: PeerId) -> bool {
self.entries
.get(&peer)
.is_some_and(|e| e.silent_drop_active)
}
pub fn entry(&self, peer: PeerId) -> Option<BackpressureEntry> {
self.entries.get(&peer).copied()
}
pub fn iter(&self) -> impl Iterator<Item = (PeerId, BackpressureEntry)> + '_ {
self.entries.iter().map(|(p, e)| (*p, *e))
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn observe_overload(
&mut self,
peer: PeerId,
cause: BackoffCause,
min_backoff_ns: u64,
now_ns: u64,
) -> Decision {
let prior = self.entries.get(&peer).copied().unwrap_or_default();
if prior.silent_drop_active {
return Decision::SilentDrop;
}
let effective_min = min_backoff_ns.max(self.min_notice_interval_ns);
if prior.last_notice_at_ns != 0
&& now_ns
< prior
.last_notice_at_ns
.saturating_add(prior.last_min_backoff_ns)
{
return Decision::Suppress;
}
let new_notices = prior.notices_sent.saturating_add(1);
let silent_drop_active = new_notices >= self.notice_threshold_k;
self.entries.insert(
peer,
BackpressureEntry {
notices_sent: new_notices,
last_notice_at_ns: now_ns,
last_min_backoff_ns: effective_min,
silent_drop_active,
},
);
Decision::EmitNotice {
min_backoff_ns: effective_min,
cause,
}
}
pub fn record_recovery(&mut self, peer: PeerId) {
self.entries.remove(&peer);
}
pub fn in_suppression_window(&self, peer: PeerId, now_ns: u64) -> bool {
let Some(entry) = self.entries.get(&peer) else {
return false;
};
if entry.last_notice_at_ns == 0 {
return false;
}
now_ns
< entry
.last_notice_at_ns
.saturating_add(entry.last_min_backoff_ns)
}
}