use commonware_utils::{time::NANOS_PER_SEC, BigRationalExt, DurationExt};
use num_rational::BigRational;
use num_traits::Zero;
use std::{cmp::Ordering, collections::BTreeMap, time::Duration};
#[derive(Clone, Debug)]
pub struct Flow<P> {
pub id: u64,
pub origin: P,
pub recipient: P,
pub delivered: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Rate {
Unlimited,
Finite(BigRational),
}
#[derive(Debug)]
struct Resource {
remaining: BigRational,
members: Vec<usize>,
active: usize,
}
impl Resource {
fn new(limit: u128) -> Self {
Self {
remaining: BigRational::from_u128(limit),
members: Vec::new(),
active: 0,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
enum Constraint<P> {
Egress(P),
Ingress(P),
}
struct State {
resources: Vec<usize>,
limited: bool,
active: bool,
}
impl State {
const fn new() -> Self {
Self {
resources: Vec::new(),
limited: false,
active: false,
}
}
}
struct Planner<'a, P> {
flows: &'a [Flow<P>],
resources: Vec<Resource>,
indices: BTreeMap<Constraint<P>, usize>,
states: Vec<State>,
rates: Vec<Option<BigRational>>,
active: usize,
fill: BigRational,
}
impl<'a, P: Clone + Ord> Planner<'a, P> {
fn new<E, I>(flows: &'a [Flow<P>], egress_cap: &mut E, ingress_cap: &mut I) -> Self
where
E: FnMut(&P) -> Option<u128>,
I: FnMut(&P) -> Option<u128>,
{
let mut planner = Self {
flows,
resources: Vec::new(),
indices: BTreeMap::new(),
states: Vec::with_capacity(flows.len()),
rates: vec![None; flows.len()],
active: 0,
fill: BigRational::zero(),
};
planner.register(egress_cap, ingress_cap);
planner
}
fn constrain(&mut self, constraint: Constraint<P>, limit: Option<u128>) -> Option<usize> {
if let Some(index) = self.indices.get(&constraint) {
return Some(*index);
}
let limit = limit?;
let idx = self.resources.len();
self.resources.push(Resource::new(limit));
self.indices.insert(constraint, idx);
Some(idx)
}
fn attach(&mut self, resource_idx: usize, flow_idx: usize, state: &mut State) {
let resource = &mut self.resources[resource_idx];
resource.members.push(flow_idx);
resource.active += 1;
state.resources.push(resource_idx);
state.limited = true;
}
fn register<E, I>(&mut self, egress_cap: &mut E, ingress_cap: &mut I)
where
E: FnMut(&P) -> Option<u128>,
I: FnMut(&P) -> Option<u128>,
{
for (flow_idx, flow) in self.flows.iter().enumerate() {
let mut state = State::new();
if let Some(resource_idx) = self.constrain(
Constraint::Egress(flow.origin.clone()),
egress_cap(&flow.origin),
) {
self.attach(resource_idx, flow_idx, &mut state);
}
if flow.delivered {
if let Some(resource_idx) = self.constrain(
Constraint::Ingress(flow.recipient.clone()),
ingress_cap(&flow.recipient),
) {
self.attach(resource_idx, flow_idx, &mut state);
}
}
if state.limited {
state.active = true;
self.active += 1;
}
self.states.push(state);
}
}
fn freeze(&mut self, res_idx: usize) {
let members = self.resources[res_idx].members.clone();
for flow_idx in members {
let state = &mut self.states[flow_idx];
if !state.active {
continue;
}
self.rates[flow_idx] = Some(self.fill.clone());
state.active = false;
self.active -= 1;
for &other_res_idx in &state.resources {
let resource = &mut self.resources[other_res_idx];
if resource.active > 0 {
resource.active -= 1;
}
}
}
}
fn fill(&mut self) {
while self.active > 0 {
let mut limiting = Vec::new();
let mut min_delta: Option<BigRational> = None;
for (res_idx, resource) in self.resources.iter().enumerate() {
if resource.active == 0 {
continue;
}
if resource.remaining.is_zero() {
limiting.clear();
limiting.push(res_idx);
min_delta = Some(BigRational::zero());
break;
}
let share = &resource.remaining / BigRational::from_usize(resource.active);
match &min_delta {
None => {
min_delta = Some(share);
limiting.clear();
limiting.push(res_idx);
}
Some(current) => match share.cmp(current) {
Ordering::Less => {
min_delta = Some(share);
limiting.clear();
limiting.push(res_idx);
}
Ordering::Equal => limiting.push(res_idx),
Ordering::Greater => {
}
},
}
}
let delta = match min_delta {
Some(delta) => delta,
None => {
assert_eq!(self.active, 0, "active flows without constraints");
break;
}
};
if delta.is_zero() {
for &res_idx in &limiting {
self.freeze(res_idx);
}
continue;
}
self.fill += δ
let mut saturated = Vec::new();
for (res_idx, resource) in self.resources.iter_mut().enumerate() {
if resource.active == 0 {
continue;
}
let usage = &delta * BigRational::from_usize(resource.active);
if usage.is_zero() {
continue;
}
resource.remaining -= usage;
if resource.remaining.is_zero() {
saturated.push(res_idx);
}
}
saturated.extend(limiting);
if saturated.is_empty() {
continue;
}
saturated.sort();
saturated.dedup();
for res_idx in saturated {
self.freeze(res_idx);
}
}
}
fn solve(mut self) -> BTreeMap<u64, Rate> {
self.fill();
let mut result = BTreeMap::new();
for (idx, flow) in self.flows.iter().enumerate() {
let rate = self.rates[idx]
.as_ref()
.map_or(Rate::Unlimited, |ratio| Rate::Finite(ratio.clone()));
result.insert(flow.id, rate);
}
result
}
#[cfg(test)]
fn resources(&self) -> &[Resource] {
&self.resources
}
#[cfg(test)]
fn states(&self) -> &[State] {
&self.states
}
#[cfg(test)]
const fn active(&self) -> usize {
self.active
}
#[cfg(test)]
fn rates(&self) -> &[Option<BigRational>] {
&self.rates
}
}
pub fn allocate<P, E, I>(
flows: &[Flow<P>],
mut egress_cap: E,
mut ingress_cap: I,
) -> BTreeMap<u64, Rate>
where
P: Clone + Ord,
E: FnMut(&P) -> Option<u128>,
I: FnMut(&P) -> Option<u128>,
{
if flows.is_empty() {
return BTreeMap::new();
}
let planner = Planner::new(flows, &mut egress_cap, &mut ingress_cap);
planner.solve()
}
pub fn duration(rate: &Rate, remaining: &BigRational) -> Option<Duration> {
match rate {
Rate::Unlimited => Some(Duration::ZERO),
Rate::Finite(rate) => {
if rate.is_zero() {
return None;
}
let seconds = remaining / rate;
let nanos = seconds * BigRational::from_u128(NANOS_PER_SEC);
let ns = nanos.ceil_to_u128()?;
Some(Duration::from_nanos_saturating(ns))
}
}
}
pub fn transfer(rate: &Rate, elapsed: Duration, mut remaining: BigRational) -> BigRational {
if remaining.is_zero() {
return remaining;
}
match rate {
Rate::Unlimited => BigRational::zero(),
Rate::Finite(ratio) => {
if ratio.is_zero() || elapsed.is_zero() {
return remaining;
}
let delta_ns = elapsed.as_nanos();
if delta_ns == 0 {
return remaining;
}
let elapsed = BigRational::from_frac_u128(delta_ns, NANOS_PER_SEC);
let usage = ratio * &elapsed;
if usage >= remaining {
return BigRational::zero();
}
remaining -= usage;
remaining
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use num_rational::BigRational;
use std::collections::BTreeMap;
fn constant(limit: u128) -> impl FnMut(&u8) -> Option<u128> {
move |_| Some(limit)
}
fn unlimited() -> impl FnMut(&u8) -> Option<u128> {
move |_| None
}
fn assert_rational_eq(r: &BigRational, num: u64, den: u64) {
assert_eq!(r, &BigRational::from_frac_u64(num, den));
}
#[test]
fn equal_share_on_single_egress() {
let flows = vec![
Flow {
id: 1,
origin: 1,
recipient: 10,
delivered: true,
},
Flow {
id: 2,
origin: 1,
recipient: 11,
delivered: true,
},
];
let allocations = allocate(&flows, constant(1_000), unlimited());
assert_eq!(allocations.len(), 2);
for rate in allocations.values() {
let Rate::Finite(ratio) = rate else {
panic!("expected finite rate");
};
assert_rational_eq(ratio, 500, 1);
}
}
#[test]
fn ingress_cap_enforced() {
let flows = vec![Flow {
id: 1,
origin: 1,
recipient: 5,
delivered: true,
}];
let allocations = allocate(&flows, unlimited(), constant(2_000));
let rate = allocations.get(&1).expect("missing flow");
let Rate::Finite(ratio) = rate else {
panic!("expected finite rate");
};
assert_rational_eq(ratio, 2_000, 1);
}
#[test]
fn unlimited_flow_finishes_immediately() {
let flows = vec![Flow {
id: 7,
origin: 1,
recipient: 2,
delivered: false,
}];
let allocations = allocate(&flows, unlimited(), unlimited());
assert_eq!(allocations[&7], Rate::Unlimited);
}
#[test]
fn transfer_accumulates_carry() {
let ratio = BigRational::from_frac_u64(1, 2); let rate = Rate::Finite(ratio);
let initial = BigRational::from_u128(10);
let after_short = transfer(&rate, Duration::from_millis(500), initial);
assert_eq!(after_short, BigRational::from_frac_u64(39, 4));
let after_long = transfer(&rate, Duration::from_millis(1500), after_short);
assert_eq!(after_long, BigRational::from_u128(9));
}
#[test]
fn finish_duration_accounts_for_fractional_progress() {
let rate = Rate::Finite(BigRational::from_frac_u64(1, 2));
let initial = BigRational::from_u128(1);
let partial = transfer(&rate, Duration::from_millis(500), initial.clone());
assert_eq!(partial, BigRational::from_frac_u64(3, 4));
let duration_full = duration(&rate, &initial).expect("finite duration");
assert_eq!(duration_full, Duration::from_secs(2));
let finish = duration(&rate, &partial).expect("finish duration");
assert_eq!(finish, Duration::from_millis(1500));
assert!(finish < duration_full);
}
#[test]
fn bandwidth_duration() {
let ratio = BigRational::from_u128(500);
let rate = Rate::Finite(ratio);
let time = duration(&rate, &BigRational::from_u128(1_000)).expect("finite time");
assert_eq!(time.as_secs(), 2);
}
fn rate_of(map: &BTreeMap<u64, Rate>, id: u64) -> BigRational {
match map.get(&id).expect("missing flow") {
Rate::Finite(ratio) => ratio.clone(),
Rate::Unlimited => panic!("unexpected unlimited rate"),
}
}
#[test]
fn three_peer_fair_sharing() {
let flows = vec![
Flow {
id: 1,
origin: 'A',
recipient: 'B',
delivered: true,
},
Flow {
id: 2,
origin: 'A',
recipient: 'B',
delivered: true,
},
Flow {
id: 3,
origin: 'B',
recipient: 'C',
delivered: true,
},
Flow {
id: 4,
origin: 'A',
recipient: 'C',
delivered: true,
},
Flow {
id: 5,
origin: 'C',
recipient: 'B',
delivered: true,
},
];
let allocations = allocate(
&flows,
|origin: &char| match origin {
'A' => Some(1_000_000), 'B' => Some(750_000),
'C' => Some(100_000),
_ => None,
},
|recipient: &char| match recipient {
'A' => Some(500_000),
'B' => Some(250_000),
'C' => Some(1_000_000),
_ => None,
},
);
let rate_ab1 = rate_of(&allocations, 1);
assert_rational_eq(&rate_ab1, 250_000, 3);
let rate_ab2 = rate_of(&allocations, 2);
assert_rational_eq(&rate_ab2, 250_000, 3);
let rate_ac = rate_of(&allocations, 4);
assert_rational_eq(&rate_ac, 500_000, 1);
let rate_bc = rate_of(&allocations, 3);
assert_rational_eq(&rate_bc, 500_000, 1);
let rate_cb = rate_of(&allocations, 5);
assert_rational_eq(&rate_cb, 250_000, 3);
}
#[test]
fn upstream_bottleneck_propagates() {
let flows = vec![
Flow {
id: 1,
origin: 'A',
recipient: 'B',
delivered: true,
},
Flow {
id: 2,
origin: 'A',
recipient: 'C',
delivered: true,
},
];
let allocations = allocate(
&flows,
|origin: &char| match origin {
'A' => Some(1_000_000),
'B' => Some(1_000_000),
'C' => Some(1_000_000),
_ => None,
},
|recipient: &char| match recipient {
'A' => Some(500_000),
'B' => Some(1_000),
'C' => Some(500_000),
_ => None,
},
);
let rate_ab = rate_of(&allocations, 1);
assert_rational_eq(&rate_ab, 1_000, 1);
let rate_bc = rate_of(&allocations, 2);
assert_rational_eq(&rate_bc, 500_000, 1);
}
#[test]
fn limited_capacity_still_guarantees_fair_share() {
let flows = vec![
Flow {
id: 1,
origin: 'A',
recipient: 'B',
delivered: true,
},
Flow {
id: 2,
origin: 'A',
recipient: 'C',
delivered: true,
},
];
let allocations = allocate(
&flows,
|origin: &char| match origin {
'A' => Some(50_000),
'B' => Some(1_000_000),
'C' => Some(1_000_000),
_ => None,
},
|recipient: &char| match recipient {
'A' => Some(500_000),
'B' => Some(1_000),
'C' => Some(500_000),
_ => None,
},
);
let rate_ab = rate_of(&allocations, 1);
assert_rational_eq(&rate_ab, 1_000, 1);
let rate_bc = rate_of(&allocations, 2);
assert_rational_eq(&rate_bc, 49_000, 1);
}
#[test]
fn planner_skips_unlimited_resources() {
let flows = vec![Flow {
id: 99,
origin: 1u8,
recipient: 2u8,
delivered: true,
}];
let mut egress = unlimited();
let mut ingress = unlimited();
let planner = Planner::new(&flows, &mut egress, &mut ingress);
assert_eq!(planner.resources().len(), 0);
assert!(planner.states().iter().all(|state| !state.limited));
assert_eq!(planner.active(), 0);
}
#[test]
fn planner_tracks_shared_resource_membership() {
let flows = vec![
Flow {
id: 1,
origin: 1u8,
recipient: 10u8,
delivered: true,
},
Flow {
id: 2,
origin: 1u8,
recipient: 11u8,
delivered: true,
},
];
let mut egress = constant(1_000);
let mut ingress = unlimited();
let planner = Planner::new(&flows, &mut egress, &mut ingress);
let resources = planner.resources();
assert_eq!(resources.len(), 1);
let resource = &resources[0];
assert_eq!(resource.members, vec![0, 1]);
assert_eq!(resource.active, 2);
assert!(planner.states().iter().all(|state| state.active));
}
#[test]
fn planner_freeze_clears_active_counts() {
let flows = vec![
Flow {
id: 1,
origin: 1u8,
recipient: 2u8,
delivered: true,
},
Flow {
id: 2,
origin: 1u8,
recipient: 3u8,
delivered: true,
},
];
let mut egress = constant(1_000);
let mut ingress = unlimited();
let mut planner = Planner::new(&flows, &mut egress, &mut ingress);
assert_eq!(planner.active(), 2);
planner.freeze(0);
let resources = planner.resources();
assert_eq!(resources[0].active, 0);
assert_eq!(planner.active(), 0);
assert!(planner
.rates()
.iter()
.filter_map(|opt| opt.as_ref())
.all(|ratio| ratio.is_zero()));
}
}