use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
pub type TransferId = u64;
#[derive(Debug, Clone)]
pub struct BandwidthSharingModel {
bandwidth_bytes_per_ms: f64,
active: Vec<ActiveTransfer>,
last_update_ms: f64,
id_counter: Arc<AtomicU64>,
}
#[derive(Debug, Clone)]
struct ActiveTransfer {
id: TransferId,
remaining_bytes: f64,
}
impl BandwidthSharingModel {
pub fn new(gbps: f64, id_counter: Arc<AtomicU64>) -> Self {
let bandwidth_bytes_per_ms = if gbps > 0.0 {
gbps * 1e6
} else {
f64::INFINITY
};
Self {
bandwidth_bytes_per_ms,
active: Vec::new(),
last_update_ms: 0.0,
id_counter,
}
}
pub fn start_transfer(&mut self, now_ms: f64, bytes: usize) -> TransferId {
let now_ms = now_ms.max(self.last_update_ms);
self.last_update_ms = now_ms;
let id = self.id_counter.fetch_add(1, Ordering::Relaxed);
let remaining = if self.bandwidth_bytes_per_ms.is_finite() {
bytes as f64
} else {
0.0
};
self.active.push(ActiveTransfer {
id,
remaining_bytes: remaining,
});
id
}
pub fn advance_to(&mut self, now_ms: f64) -> Vec<TransferId> {
let mut completed = Vec::new();
if now_ms < self.last_update_ms {
return completed;
}
loop {
if self.active.is_empty() {
self.last_update_ms = now_ms;
break;
}
let n = self.active.len() as f64;
let rate_per = self.bandwidth_bytes_per_ms / n;
let (min_idx, min_remaining) = self
.active
.iter()
.enumerate()
.map(|(i, t)| (i, t.remaining_bytes))
.min_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.expect("active is non-empty");
let time_to_next_completion = if rate_per.is_finite() && rate_per > 0.0 {
min_remaining / rate_per
} else {
0.0
};
let next_event_ms = self.last_update_ms + time_to_next_completion;
if next_event_ms > now_ms + 1e-9 {
let elapsed = now_ms - self.last_update_ms;
if rate_per.is_finite() && elapsed > 0.0 {
let delta_per = elapsed * rate_per;
for t in &mut self.active {
t.remaining_bytes = (t.remaining_bytes - delta_per).max(0.0);
}
}
self.last_update_ms = now_ms;
break;
}
if rate_per.is_finite() && time_to_next_completion > 0.0 {
let delta_per = time_to_next_completion * rate_per;
for t in &mut self.active {
t.remaining_bytes = (t.remaining_bytes - delta_per).max(0.0);
}
}
self.last_update_ms = next_event_ms;
completed.push(self.active.swap_remove(min_idx).id);
}
completed
}
pub fn earliest_finish(&self) -> Option<f64> {
if self.active.is_empty() {
return None;
}
let n = self.active.len() as f64;
let rate_per = self.bandwidth_bytes_per_ms / n;
let min_remaining = self
.active
.iter()
.map(|t| t.remaining_bytes)
.fold(f64::INFINITY, f64::min);
let delta = if rate_per.is_finite() && rate_per > 0.0 {
min_remaining / rate_per
} else {
0.0
};
Some(self.last_update_ms + delta)
}
pub fn active_count(&self) -> usize {
self.active.len()
}
pub fn is_idle(&self) -> bool {
self.active.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
const EPS: f64 = 1e-6;
fn make_model(gbps: f64) -> BandwidthSharingModel {
BandwidthSharingModel::new(gbps, Arc::new(AtomicU64::new(0)))
}
fn approx_eq(a: f64, b: f64) {
assert!(
(a - b).abs() < EPS,
"expected {a} ≈ {b} (diff {})",
(a - b).abs()
);
}
#[test]
fn single_transfer_finishes_at_duration() {
let mut model = make_model(1.0);
let id = model.start_transfer(0.0, 1_000_000);
approx_eq(model.earliest_finish().unwrap(), 1.0);
assert!(model.advance_to(0.5).is_empty());
approx_eq(model.earliest_finish().unwrap(), 1.0);
let completed = model.advance_to(1.0);
assert_eq!(completed, vec![id]);
assert!(model.is_idle());
}
#[test]
fn two_concurrent_share_bandwidth_under_ps() {
let mut model = make_model(1.0);
let id1 = model.start_transfer(0.0, 1_000_000);
let id2 = model.start_transfer(0.0, 1_000_000);
approx_eq(model.earliest_finish().unwrap(), 2.0);
assert!(model.advance_to(1.99).is_empty());
let completed = model.advance_to(2.0);
assert_eq!(completed.len(), 2);
assert!(completed.contains(&id1));
assert!(completed.contains(&id2));
assert!(model.is_idle());
}
#[test]
fn staggered_arrivals_follow_ps_semantics() {
let mut model = make_model(1.0);
let id1 = model.start_transfer(0.0, 1_000_000);
assert!(model.advance_to(0.5).is_empty());
let id2 = model.start_transfer(0.5, 1_000_000);
let c1 = model.advance_to(1.5);
assert_eq!(c1, vec![id1]);
assert_eq!(model.active_count(), 1);
let c2 = model.advance_to(2.0);
assert_eq!(c2, vec![id2]);
assert!(model.is_idle());
}
#[test]
fn staggered_later_arrival_inherits_remaining_bandwidth() {
let mut model = make_model(1.0);
let _id1 = model.start_transfer(0.0, 1_000_000);
let c1 = model.advance_to(1.0);
assert_eq!(c1.len(), 1);
assert!(model.is_idle());
assert!(model.advance_to(5.0).is_empty());
let id2 = model.start_transfer(5.0, 1_000_000);
approx_eq(model.earliest_finish().unwrap(), 6.0);
let c2 = model.advance_to(6.0);
assert_eq!(c2, vec![id2]);
}
#[test]
fn zero_bandwidth_is_infinite_throughput() {
let mut model = make_model(0.0);
let id = model.start_transfer(10.0, 1_000_000_000);
approx_eq(model.earliest_finish().unwrap(), 10.0);
let completed = model.advance_to(10.0);
assert_eq!(completed, vec![id]);
}
#[test]
fn earliest_finish_is_none_when_idle() {
let model = make_model(1.0);
assert!(model.is_idle());
assert!(model.earliest_finish().is_none());
}
#[test]
fn shared_counter_yields_globally_unique_ids_across_models() {
let counter = Arc::new(AtomicU64::new(0));
let mut a = BandwidthSharingModel::new(1.0, counter.clone());
let mut b = BandwidthSharingModel::new(1.0, counter);
let mut ids = Vec::new();
for _ in 0..5 {
ids.push(a.start_transfer(0.0, 1));
ids.push(b.start_transfer(0.0, 1));
}
ids.sort();
ids.dedup();
assert_eq!(ids.len(), 10, "shared counter must yield 10 distinct ids");
}
#[test]
fn many_simultaneous_arrivals_all_complete_at_nx() {
let mut model = make_model(1.0);
for _ in 0..10 {
let _id = model.start_transfer(0.0, 1_000_000);
}
approx_eq(model.earliest_finish().unwrap(), 10.0);
let completed = model.advance_to(10.0);
assert_eq!(completed.len(), 10);
assert!(model.is_idle());
}
}