use std::net::SocketAddr;
pub(crate) type PeerRate = (SocketAddr, u64, bool, u32);
const DEPTH_FLOOR: u32 = 4;
const DEPTH_CEILING: u32 = 512;
const DEPTH_SLACK: u32 = 4;
const SHRINK_CONFIRM_TICKS: u8 = 3;
#[allow(clippy::cast_possible_truncation)] pub(crate) const LEGACY_DEPTH: u32 = crate::peer_shared::INITIAL_QUEUE_DEPTH as u32;
pub(crate) const CHURN_SUSPEND_FLIPS_PER_MIN: f64 = 30.0;
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)] pub(crate) fn bdp_cap(
ewma_rate: u64,
rtt_secs: Option<f64>,
prev_cap: u32,
shrink_streak: u8,
) -> (u32, u8) {
let Some(rtt) = rtt_secs else {
return (LEGACY_DEPTH, 0);
};
let blocks = (ewma_rate as f64 * rtt / 16384.0).ceil();
let computed = (blocks as u32)
.saturating_add(DEPTH_SLACK)
.clamp(DEPTH_FLOOR, DEPTH_CEILING);
if computed >= prev_cap {
let probe = prev_cap.saturating_add((prev_cap / 8).max(1));
return (computed.max(probe).min(DEPTH_CEILING), 0);
}
let shrink_line = (prev_cap - prev_cap / 3).max(DEPTH_FLOOR + 1);
if computed >= shrink_line {
return (prev_cap, 0);
}
let streak = shrink_streak.saturating_add(1);
if streak >= SHRINK_CONFIRM_TICKS {
(computed, 0)
} else {
(prev_cap, streak)
}
}
pub(crate) fn compute_quotas(
budget: u32,
floor: u32,
peers: &[PeerRate],
) -> Vec<(SocketAddr, u32)> {
if budget == 0 {
return peers
.iter()
.map(|&(a, _, _, _)| (a, LEGACY_DEPTH))
.collect();
}
let floor = floor.clamp(1, DEPTH_CEILING);
let unchoked: Vec<&PeerRate> = peers.iter().filter(|&&(_, _, u, _)| u).collect();
let n = u32::try_from(unchoked.len()).unwrap_or(u32::MAX);
let mut out: Vec<(SocketAddr, u32)> = Vec::with_capacity(peers.len());
for &(a, _, u, _) in peers {
if !u {
out.push((a, floor));
}
}
if n == 0 {
return out;
}
let floors_total = floor.saturating_mul(n);
let surplus = budget.saturating_sub(floors_total);
let total_rate: u128 = unchoked.iter().map(|&&(_, r, _, _)| u128::from(r)).sum();
let mut quotas: Vec<(SocketAddr, u32, u64, u32)> = unchoked
.iter()
.map(|&&(a, r, _, c)| {
let cap = c.max(floor);
let share = match (u128::from(surplus) * u128::from(r)).checked_div(total_rate) {
Some(s) => u32::try_from(s).unwrap_or(u32::MAX),
None => surplus / n,
};
(a, floor.saturating_add(share).min(cap), r, cap)
})
.collect();
let assigned: u32 = quotas.iter().map(|&(_, q, _, _)| q).sum();
let mut spill = budget.saturating_sub(assigned);
if spill > 0 {
quotas.sort_by_key(|q| std::cmp::Reverse(q.2));
for q in &mut quotas {
if spill == 0 {
break;
}
let room = q.3 - q.1;
let take = room.min(spill);
q.1 += take;
spill -= take;
}
}
out.extend(quotas.into_iter().map(|(a, q, _, _)| (a, q)));
out
}
#[cfg(test)]
mod tests {
use super::*;
fn addr(n: u8) -> SocketAddr {
format!("10.0.0.{n}:6881").parse().unwrap()
}
#[test]
fn budget_zero_means_legacy_cap_for_everyone() {
let q = compute_quotas(
0,
8,
&[(addr(1), 1000, true, 128), (addr(2), 0, false, 128)],
);
assert!(q.iter().all(|&(_, d)| d == LEGACY_DEPTH));
}
#[test]
fn choked_peers_get_the_floor() {
let q = compute_quotas(512, 8, &[(addr(1), 9999, false, 128)]);
assert_eq!(q, vec![(addr(1), 8)]);
}
#[test]
fn proportional_split_caps_at_128_and_respects_budget() {
let q = compute_quotas(
136,
8,
&[(addr(1), 10_000, true, 128), (addr(2), 0, true, 128)],
);
let total: u32 = q.iter().map(|&(_, d)| d).sum();
assert!(total <= 136, "sum {total} must stay within budget");
let fast = q.iter().find(|&&(a, _)| a == addr(1)).unwrap().1;
let slow = q.iter().find(|&&(a, _)| a == addr(2)).unwrap().1;
assert_eq!(slow, 8, "zero-rate peer holds the floor");
assert_eq!(fast, 128, "floor 8 + full surplus 120, capped at 128");
}
#[test]
fn all_zero_rates_split_equally() {
let q = compute_quotas(64, 8, &[(addr(1), 0, true, 128), (addr(2), 0, true, 128)]);
assert_eq!(q.iter().map(|&(_, d)| d).sum::<u32>(), 64);
assert!(q.iter().all(|&(_, d)| d == 32));
}
#[test]
fn floors_always_granted_even_when_budget_oversubscribed() {
let peers: Vec<PeerRate> = (1..=20).map(|n| (addr(n), 0, true, 128)).collect();
let q = compute_quotas(64, 8, &peers);
assert!(q.iter().all(|&(_, d)| d == 8));
}
#[test]
fn extreme_rates_do_not_overflow_the_sum() {
let q = compute_quotas(
256,
8,
&[
(addr(1), u64::MAX, true, 128),
(addr(2), u64::MAX, true, 128),
],
);
assert_eq!(q.iter().map(|&(_, d)| d).sum::<u32>(), 256);
assert!(q.iter().all(|&(_, d)| d == 128));
}
#[test]
fn surplus_distributes_by_rate_share() {
let q = compute_quotas(
256,
8,
&[(addr(1), 3000, true, 128), (addr(2), 1000, true, 128)],
);
let a = q.iter().find(|&&(p, _)| p == addr(1)).unwrap().1;
let b = q.iter().find(|&&(p, _)| p == addr(2)).unwrap().1;
assert_eq!(a + b, 256);
assert_eq!(a, 128, "8 + 180 capped at 128");
assert_eq!(b, 128, "68 from pass 1 + 60 cap spill");
}
#[test]
fn bdp_cap_no_rtt_stays_at_legacy_init() {
let (cap, streak) = bdp_cap(0, None, 128, 2);
assert_eq!((cap, streak), (128, 0));
}
#[test]
fn bdp_cap_formula_exact_high_bdp() {
let (cap, _) = bdp_cap(16_000_000, Some(0.160), 128, 0);
assert_eq!(cap, 161);
}
#[test]
fn bdp_cap_grow_is_immediate_and_resets_streak() {
let (cap, streak) = bdp_cap(16_000_000, Some(0.160), 128, 2);
assert_eq!((cap, streak), (161, 0));
}
#[test]
fn bdp_cap_grow_probes_past_measured_bdp() {
let (cap, streak) = bdp_cap(12_697_600, Some(0.160), 128, 0);
assert_eq!((cap, streak), (144, 0));
}
#[test]
fn bdp_cap_shrink_needs_three_consecutive_confirms() {
let (c1, s1) = bdp_cap(2_000_000, Some(0.030), 128, 0);
assert_eq!((c1, s1), (128, 1), "first shrink signal holds");
let (c2, s2) = bdp_cap(2_000_000, Some(0.030), c1, s1);
assert_eq!((c2, s2), (128, 2), "second shrink signal holds");
let (c3, s3) = bdp_cap(2_000_000, Some(0.030), c2, s2);
assert_eq!((c3, s3), (8, 0), "third confirm lands the shrink");
}
#[test]
fn bdp_cap_near_miss_holds_without_streak() {
let (cap, streak) = bdp_cap(10_000_000, Some(0.160), 128, 2);
assert_eq!((cap, streak), (128, 0));
}
#[test]
fn bdp_cap_depth_limited_ramp_tick3_resets_streak() {
let (cap, streak) = bdp_cap(8_611_000, Some(0.160), 128, 2);
assert_eq!((cap, streak), (128, 0));
}
#[test]
fn bdp_cap_floor_edge_shrinks_from_five_to_four() {
let (c1, s1) = bdp_cap(0, Some(0.030), 5, 0);
assert_eq!((c1, s1), (5, 1), "first floor-level signal holds");
let (c2, s2) = bdp_cap(0, Some(0.030), c1, s1);
assert_eq!((c2, s2), (5, 2), "second holds");
let (c3, s3) = bdp_cap(0, Some(0.030), c2, s2);
assert_eq!((c3, s3), (4, 0), "third lands the floor");
}
#[test]
fn bdp_cap_cold_ramp_never_shrinks_below_init() {
let mut cap = 128u32;
let mut streak = 0u8;
let mut max_seen = 0u32;
for frac in [0.30f64, 0.51, 0.657, 0.76, 0.83, 0.88, 0.92, 0.96, 0.99] {
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let rate = (13_107_200.0 * frac) as u64;
let (c, s) = bdp_cap(rate, Some(0.160), cap, streak);
assert!(c >= 128, "cap dropped below init during ramp: {c}");
cap = c;
streak = s;
max_seen = max_seen.max(c);
}
assert!(max_seen > 128, "cap never unbound during ramp");
}
#[test]
fn bdp_cap_clamps_to_floor_and_ceiling() {
let (lo, _) = bdp_cap(100_000, Some(0.010), 4, 2);
assert_eq!(lo, 5);
let (hi, _) = bdp_cap(u64::MAX, Some(10.0), 512, 0);
assert_eq!(hi, 512);
}
#[test]
fn per_peer_caps_bound_quota_and_spill() {
let q = compute_quotas(
512,
8,
&[(addr(1), 1000, true, 161), (addr(2), 1000, true, 8)],
);
let a = q.iter().find(|&&(p, _)| p == addr(1)).unwrap().1;
let b = q.iter().find(|&&(p, _)| p == addr(2)).unwrap().1;
assert_eq!(a, 161, "fast peer fills to its BDP cap");
assert_eq!(b, 8, "slow peer pinned at its BDP cap");
}
#[test]
fn cap_below_floor_loses_to_floor() {
let q = compute_quotas(64, 8, &[(addr(1), 0, true, 4)]);
assert_eq!(q, vec![(addr(1), 8)]);
}
#[test]
fn budget_zero_ignores_caps_entirely() {
let q = compute_quotas(0, 8, &[(addr(1), 1000, true, 8), (addr(2), 0, false, 4)]);
assert!(q.iter().all(|&(_, d)| d == 128));
}
}