use std::time::Duration;
use rstest::rstest;
use crate::simulation::VirtualTime;
use super::config::BbrConfig;
use super::controller::BbrController;
use super::state::{BbrState, ProbeBwPhase};
#[derive(Debug, Clone, Copy)]
pub struct NetworkCondition {
pub rtt: Duration,
pub bandwidth: u64,
pub loss_rate: f64,
pub jitter: f64,
}
impl NetworkCondition {
pub const LAN: Self = Self {
rtt: Duration::from_millis(1),
bandwidth: 100_000_000,
loss_rate: 0.0,
jitter: 0.05,
};
pub const DATACENTER: Self = Self {
rtt: Duration::from_millis(10),
bandwidth: 10_000_000,
loss_rate: 0.001,
jitter: 0.1,
};
pub const CONTINENTAL: Self = Self {
rtt: Duration::from_millis(50),
bandwidth: 5_000_000,
loss_rate: 0.005,
jitter: 0.15,
};
pub const INTERCONTINENTAL: Self = Self {
rtt: Duration::from_millis(135),
bandwidth: 2_000_000,
loss_rate: 0.01,
jitter: 0.2,
};
pub const HIGH_LATENCY: Self = Self {
rtt: Duration::from_millis(250),
bandwidth: 1_000_000,
loss_rate: 0.02,
jitter: 0.25,
};
}
pub struct BbrTestHarness {
pub time: VirtualTime,
pub controller: BbrController<VirtualTime>,
pub condition: NetworkCondition,
seed: u64,
rng_state: u64,
}
impl BbrTestHarness {
pub fn new(config: BbrConfig, condition: NetworkCondition, seed: u64) -> Self {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(config, time.clone());
Self {
time,
controller,
condition,
seed,
rng_state: seed,
}
}
fn random(&mut self) -> f64 {
self.rng_state = self
.rng_state
.wrapping_mul(6364136223846793005)
.wrapping_add(1);
(self.rng_state >> 33) as f64 / (1u64 << 31) as f64
}
pub fn transfer_bytes(&mut self, bytes: usize) -> usize {
let packet_size = 1400; let mut delivered = 0;
let mut pending_tokens: Vec<(super::delivery_rate::DeliveryRateToken, usize, Duration)> =
Vec::new();
let mut bytes_to_send = bytes;
while bytes_to_send > 0 {
let chunk = bytes_to_send.min(packet_size);
if self.controller.flightsize() + chunk > self.controller.current_cwnd() {
if let Some((token, size, rtt)) = pending_tokens.pop() {
self.time.advance(rtt);
if self.random() < self.condition.loss_rate {
self.controller.on_loss(size);
} else {
self.controller.on_ack_with_token(rtt, size, Some(token));
delivered += size;
}
} else {
break;
}
continue;
}
let token = self.controller.on_send(chunk);
bytes_to_send -= chunk;
let jitter_factor = 1.0 + (self.random() - 0.5) * 2.0 * self.condition.jitter;
let rtt =
Duration::from_nanos((self.condition.rtt.as_nanos() as f64 * jitter_factor) as u64);
pending_tokens.push((token, chunk, rtt));
}
for (token, size, rtt) in pending_tokens {
self.time.advance(rtt);
if self.random() < self.condition.loss_rate {
self.controller.on_loss(size);
} else {
self.controller.on_ack_with_token(rtt, size, Some(token));
delivered += size;
}
}
delivered
}
pub fn run_rtts(&mut self, count: usize, bytes_per_rtt: usize) -> Vec<BbrSnapshot> {
let mut snapshots = Vec::with_capacity(count);
for _ in 0..count {
self.transfer_bytes(bytes_per_rtt);
snapshots.push(self.snapshot());
}
snapshots
}
pub fn snapshot(&self) -> BbrSnapshot {
BbrSnapshot {
state: self.controller.state(),
probe_bw_phase: self.controller.stats().probe_bw_phase,
cwnd: self.controller.current_cwnd(),
flightsize: self.controller.flightsize(),
max_bw: self.controller.max_bw(),
min_rtt: self.controller.min_rtt(),
bdp: self.controller.bdp(),
pacing_rate: self.controller.pacing_rate(),
}
}
pub fn inject_timeout(&mut self) {
self.controller.on_timeout();
}
}
#[derive(Debug, Clone)]
pub struct BbrSnapshot {
pub state: BbrState,
pub probe_bw_phase: ProbeBwPhase,
pub cwnd: usize,
pub flightsize: usize,
pub max_bw: u64,
pub min_rtt: Option<Duration>,
pub bdp: usize,
pub pacing_rate: u64,
}
#[test]
fn test_startup_initial_state() {
let harness = BbrTestHarness::new(BbrConfig::default(), NetworkCondition::CONTINENTAL, 12345);
assert_eq!(harness.controller.state(), BbrState::Startup);
}
#[test]
fn test_startup_to_drain_transition() {
let mut harness = BbrTestHarness::new(BbrConfig::default(), NetworkCondition::LAN, 12345);
let snapshots = harness.run_rtts(30, 1_000_000);
let final_state = snapshots.last().unwrap().state;
assert!(
final_state == BbrState::Drain || final_state == BbrState::ProbeBW,
"Expected Drain or ProbeBW, got {:?}. max_bw: {} bytes/sec",
final_state,
snapshots.last().unwrap().max_bw
);
}
#[test]
fn test_drain_to_probe_bw_transition() {
let mut harness =
BbrTestHarness::new(BbrConfig::default(), NetworkCondition::DATACENTER, 12345);
for _ in 0..50 {
harness.transfer_bytes(50_000);
if harness.controller.state() == BbrState::ProbeBW {
break;
}
}
let state = harness.controller.state();
assert!(
state == BbrState::Startup || state == BbrState::Drain || state == BbrState::ProbeBW,
"Unexpected state: {:?}",
state
);
}
#[test]
fn test_lan_conditions() {
let mut harness = BbrTestHarness::new(BbrConfig::default(), NetworkCondition::LAN, 12345);
let snapshots = harness.run_rtts(10, 100_000);
let last = snapshots.last().unwrap();
if let Some(rtt) = last.min_rtt {
assert!(rtt < Duration::from_millis(10));
}
}
#[test]
fn test_intercontinental_conditions() {
let mut harness = BbrTestHarness::new(
BbrConfig::default(),
NetworkCondition::INTERCONTINENTAL,
12345,
);
let snapshots = harness.run_rtts(20, 50_000);
let last = snapshots.last().unwrap();
if let Some(rtt) = last.min_rtt {
assert!(rtt > Duration::from_millis(50));
}
}
#[test]
fn test_high_latency_no_death_spiral() {
let mut harness =
BbrTestHarness::new(BbrConfig::default(), NetworkCondition::HIGH_LATENCY, 12345);
let snapshots = harness.run_rtts(50, 20_000);
let min_cwnd = snapshots.iter().map(|s| s.cwnd).min().unwrap();
assert!(
min_cwnd >= BbrConfig::default().min_cwnd,
"cwnd collapsed to {} (min_cwnd={})",
min_cwnd,
BbrConfig::default().min_cwnd
);
}
#[test]
fn test_timeout_recovery() {
let mut harness =
BbrTestHarness::new(BbrConfig::default(), NetworkCondition::CONTINENTAL, 12345);
harness.run_rtts(10, 50_000);
harness.inject_timeout();
assert_eq!(harness.controller.state(), BbrState::Startup);
assert_eq!(
harness.controller.current_cwnd(),
BbrConfig::default().initial_cwnd
);
let snapshots = harness.run_rtts(20, 50_000);
let final_cwnd = snapshots.last().unwrap().cwnd;
assert!(
final_cwnd >= BbrConfig::default().min_cwnd,
"cwnd collapsed below min_cwnd after recovery: {}",
final_cwnd
);
let final_state = snapshots.last().unwrap().state;
assert!(
final_state != BbrState::Startup || final_cwnd >= BbrConfig::default().min_cwnd,
"Should have recovered to a stable state"
);
}
#[test]
fn test_loss_response_reduces_inflight_hi() {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
for _ in 0..10 {
controller.on_send(1000);
}
controller.on_loss(1000);
let stats = controller.stats();
assert!(stats.lost > 0);
}
#[test]
fn test_bdp_scales_with_bandwidth() {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
for _i in 0..20 {
let token = controller.on_send(10000);
time.advance(Duration::from_millis(50));
controller.on_ack_with_token(Duration::from_millis(50), 10000, Some(token));
}
let bdp = controller.bdp();
let min_rtt = controller.min_rtt();
let max_bw = controller.max_bw();
if let Some(rtt) = min_rtt {
if max_bw > 0 {
let expected_bdp = (max_bw as u128 * rtt.as_nanos() / 1_000_000_000) as usize;
assert!(
bdp >= expected_bdp / 2 && bdp <= expected_bdp * 2,
"BDP {} not close to expected {}",
bdp,
expected_bdp
);
}
}
}
#[test]
fn test_timeout_storm_prevents_recovery() {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
let initial_cwnd = BbrConfig::default().initial_cwnd;
println!("Initial cwnd: {}", initial_cwnd);
let mut max_cwnd_achieved = initial_cwnd;
println!("\nSimulating timeout storm with recovery attempts (minimal traffic):");
for round in 0..10 {
for _ in 0..5 {
let token = controller.on_send(1400);
time.advance(Duration::from_millis(50));
controller.on_ack_with_token(Duration::from_millis(100), 1400, Some(token));
}
let cwnd_before_timeout = controller.current_cwnd();
max_cwnd_achieved = max_cwnd_achieved.max(cwnd_before_timeout);
controller.on_timeout();
let cwnd_after_timeout = controller.current_cwnd();
println!(
" Round {}: cwnd {} -> {} (max_bdp_seen={})",
round + 1,
cwnd_before_timeout,
cwnd_after_timeout,
controller.max_bdp_seen()
);
assert!(
cwnd_after_timeout >= initial_cwnd,
"BBR cwnd ({}) should not drop below initial_cwnd ({}) after timeout",
cwnd_after_timeout,
initial_cwnd
);
}
let final_cwnd = controller.current_cwnd();
println!(
"\nFinal cwnd: {} (initial was {}, max_bdp_seen={})",
final_cwnd,
initial_cwnd,
controller.max_bdp_seen()
);
println!(
"Max cwnd achieved during recovery attempts: {}",
max_cwnd_achieved
);
}
#[test]
fn test_bbr_adaptive_timeout_floor_with_high_bdp() {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
let initial_cwnd = BbrConfig::default().initial_cwnd;
println!("Initial cwnd: {}", initial_cwnd);
println!("\nBuilding up high BDP measurements...");
for _ in 0..100 {
for _ in 0..50 {
controller.on_send(1400);
}
time.advance(Duration::from_millis(20)); for _ in 0..50 {
let token = controller.on_send(1400);
controller.on_ack_with_token(Duration::from_millis(20), 1400, Some(token));
}
}
let max_bdp = controller.max_bdp_seen();
let cwnd_before = controller.current_cwnd();
println!(
"After warmup: cwnd={}, max_bdp_seen={}",
cwnd_before, max_bdp
);
controller.on_timeout();
let cwnd_after = controller.current_cwnd();
println!(
"After timeout: cwnd={} (initial={}, adaptive_floor=max_bdp/4={})",
cwnd_after,
initial_cwnd,
max_bdp / 4
);
if max_bdp > 4 * initial_cwnd {
let expected_floor = max_bdp / 4;
assert!(
cwnd_after >= expected_floor,
"BBR cwnd ({}) should use adaptive floor ({}) when max_bdp ({}) > 4*initial ({})",
cwnd_after,
expected_floor,
max_bdp,
4 * initial_cwnd
);
println!(
"SUCCESS: Adaptive floor kicked in - cwnd {} >= floor {}",
cwnd_after, expected_floor
);
} else {
assert_eq!(
cwnd_after, initial_cwnd,
"BBR should use initial_cwnd when max_bdp < 4*initial"
);
println!(
"Note: max_bdp ({}) < 4*initial_cwnd ({}), using initial_cwnd",
max_bdp,
4 * initial_cwnd
);
}
}
#[rstest]
#[case::intercontinental_minimal(125, 30, 10)]
#[case::intercontinental_standard(135, 50, 20)]
#[case::high_latency_standard(250, 50, 20)]
#[case::high_latency_extended(250, 100, 30)]
#[case::satellite_link(500, 50, 20)]
fn test_issue_2697_timeout_preserves_bandwidth_and_rtt(
#[case] rtt_ms: u64,
#[case] warmup_rounds: usize,
#[case] packets_per_round: usize,
) {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
let rtt = Duration::from_millis(rtt_ms);
let packet_size = 1400;
for _ in 0..warmup_rounds {
let mut tokens = Vec::new();
for _ in 0..packets_per_round {
tokens.push(controller.on_send(packet_size));
}
time.advance(rtt);
for token in tokens {
controller.on_ack_with_token(rtt, packet_size, Some(token));
}
}
let pre_timeout_max_bw = controller.max_bw();
let pre_timeout_min_rtt = controller.min_rtt();
assert!(
pre_timeout_max_bw > 0,
"[{}ms RTT] Should have bandwidth measurement before timeout",
rtt_ms
);
assert!(
pre_timeout_min_rtt.is_some(),
"[{}ms RTT] Should have RTT measurement before timeout",
rtt_ms
);
controller.on_timeout();
let post_timeout_max_bw = controller.max_bw();
let post_timeout_min_rtt = controller.min_rtt();
assert!(
post_timeout_max_bw > 0,
"[{}ms RTT] Bandwidth estimate must be preserved across timeout (was {}, now {})",
rtt_ms,
pre_timeout_max_bw,
post_timeout_max_bw
);
assert!(
post_timeout_min_rtt.is_some(),
"[{}ms RTT] RTT estimate must be preserved across timeout (was {:?}, now {:?})",
rtt_ms,
pre_timeout_min_rtt,
post_timeout_min_rtt
);
let bw_retention = post_timeout_max_bw as f64 / pre_timeout_max_bw as f64;
assert!(
bw_retention >= 0.25,
"[{}ms RTT] Bandwidth should retain at least 25% after timeout (retained {:.1}%)",
rtt_ms,
bw_retention * 100.0
);
}
#[rstest]
#[case::intercontinental_3_timeouts(135, 3)]
#[case::high_latency_5_timeouts(250, 5)]
#[case::high_latency_10_timeouts(250, 10)]
#[case::satellite_5_timeouts(500, 5)]
fn test_issue_2697_repeated_timeouts_preserve_bandwidth(
#[case] rtt_ms: u64,
#[case] timeout_count: usize,
) {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
let rtt = Duration::from_millis(rtt_ms);
let packet_size = 1400;
for _ in 0..50 {
let mut tokens = Vec::new();
for _ in 0..20 {
tokens.push(controller.on_send(packet_size));
}
time.advance(rtt);
for token in tokens {
controller.on_ack_with_token(rtt, packet_size, Some(token));
}
}
let healthy_max_bw = controller.max_bw();
assert!(
healthy_max_bw > 0,
"[{}ms RTT] Should have healthy bandwidth before storm",
rtt_ms
);
for i in 0..timeout_count {
controller.on_timeout();
let post_timeout_bw = controller.max_bw();
assert!(
post_timeout_bw > 0,
"[{}ms RTT] Timeout #{}: Bandwidth must NOT reset to 0 (was {}, now {})",
rtt_ms,
i + 1,
healthy_max_bw,
post_timeout_bw
);
for _ in 0..2 {
let mut tokens = Vec::new();
for _ in 0..5 {
tokens.push(controller.on_send(packet_size));
}
time.advance(rtt);
for token in tokens {
controller.on_ack_with_token(rtt, packet_size, Some(token));
}
}
}
}
#[rstest]
#[case::river_ui_intercontinental(135, 2900)]
#[case::river_ui_high_latency(250, 2900)]
#[case::large_contract_satellite(500, 1024)]
fn test_issue_2697_large_transfer_with_timeouts(#[case] rtt_ms: u64, #[case] transfer_kb: usize) {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
let rtt = Duration::from_millis(rtt_ms);
let packet_size = 1400;
let transfer_bytes = transfer_kb * 1024;
for _ in 0..30 {
let mut tokens = Vec::new();
for _ in 0..20 {
tokens.push(controller.on_send(packet_size));
}
time.advance(rtt);
for token in tokens {
controller.on_ack_with_token(rtt, packet_size, Some(token));
}
}
let healthy_max_bw = controller.max_bw();
assert!(
healthy_max_bw > 0,
"Should establish bandwidth before transfer"
);
let mut bytes_transferred = 0usize;
let timeout_interval = 500 * 1024; let mut next_timeout_at = timeout_interval;
while bytes_transferred < transfer_bytes {
let burst_size = 20;
let mut tokens = Vec::new();
for _ in 0..burst_size {
tokens.push(controller.on_send(packet_size));
}
time.advance(rtt);
for token in tokens {
controller.on_ack_with_token(rtt, packet_size, Some(token));
bytes_transferred += packet_size;
}
if bytes_transferred >= next_timeout_at {
controller.on_timeout();
let post_timeout_bw = controller.max_bw();
assert!(
post_timeout_bw > 0,
"[{}ms RTT, {}KB transfer] Bandwidth collapsed to 0 at {}KB transferred",
rtt_ms,
transfer_kb,
bytes_transferred / 1024
);
next_timeout_at += timeout_interval;
}
}
let final_bw = controller.max_bw();
let final_bw_kbps = final_bw as f64 / 1024.0;
assert!(
final_bw_kbps >= 10.0,
"[{}ms RTT, {}KB transfer] Final bandwidth {:.1} KB/s is too low (bug caused ~1 KB/s)",
rtt_ms,
transfer_kb,
final_bw_kbps
);
}
fn simulate_paced_transfer(
controller: &BbrController<VirtualTime>,
time: &VirtualTime,
rtt: Duration,
transfer_bytes: usize,
packet_size: usize,
) -> (usize, Duration) {
use crate::simulation::TimeSource;
let mut bytes_sent = 0usize;
let start_time = time.now_nanos();
while bytes_sent < transfer_bytes {
let pacing_rate = controller.pacing_rate();
let bytes_allowed_by_pacing = (pacing_rate as f64 * rtt.as_secs_f64()) as usize;
let cwnd = controller.current_cwnd();
let flightsize = controller.flightsize();
let bytes_allowed_by_cwnd = cwnd.saturating_sub(flightsize);
let bytes_to_send = bytes_allowed_by_pacing
.min(bytes_allowed_by_cwnd)
.min(transfer_bytes - bytes_sent)
.max(packet_size);
let packets_to_send = (bytes_to_send / packet_size).max(1);
let mut tokens = Vec::new();
for _ in 0..packets_to_send {
tokens.push(controller.on_send(packet_size));
}
time.advance(rtt);
for token in tokens {
controller.on_ack_with_token(rtt, packet_size, Some(token));
bytes_sent += packet_size;
}
}
let elapsed = Duration::from_nanos(time.now_nanos() - start_time);
(bytes_sent, elapsed)
}
#[rstest]
#[case::domestic_broadband(50, 1_000_000)] #[case::intercontinental(135, 500_000)] #[case::high_latency(250, 250_000)] fn test_fresh_connection_ramps_up_throughput(
#[case] rtt_ms: u64,
#[case] min_expected_throughput: usize,
) {
use crate::simulation::TimeSource;
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
let rtt = Duration::from_millis(rtt_ms);
let packet_size = 1400;
let transfer_size = 500 * 1024;
println!("\n[{}ms RTT] Initial state:", rtt_ms);
println!(
" pacing_rate: {} B/s ({:.1} KB/s)",
controller.pacing_rate(),
controller.pacing_rate() as f64 / 1024.0
);
println!(" cwnd: {} bytes", controller.current_cwnd());
let (bytes_sent, elapsed) =
simulate_paced_transfer(&controller, &time, rtt, transfer_size, packet_size);
let throughput = bytes_sent as f64 / elapsed.as_secs_f64();
let throughput_kbps = throughput / 1024.0;
println!("[{}ms RTT] Final state:", rtt_ms);
println!(
" pacing_rate: {} B/s ({:.1} KB/s)",
controller.pacing_rate(),
controller.pacing_rate() as f64 / 1024.0
);
println!(" cwnd: {} bytes", controller.current_cwnd());
println!(
" max_bw: {} B/s ({:.1} KB/s)",
controller.max_bw(),
controller.max_bw() as f64 / 1024.0
);
println!(" throughput: {:.1} KB/s", throughput_kbps);
assert!(
throughput >= min_expected_throughput as f64,
"[{}ms RTT] Fresh connection throughput {:.1} KB/s is below expected {} KB/s - BBR bootstrap problem",
rtt_ms,
throughput_kbps,
min_expected_throughput / 1024
);
}
#[rstest]
#[case::early_timeout_domestic(50, 500_000)] #[case::early_timeout_intercontinental(135, 250_000)] #[case::early_timeout_high_latency(250, 125_000)] fn test_early_timeout_recovers_throughput(
#[case] rtt_ms: u64,
#[case] min_expected_throughput: usize,
) {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
let rtt = Duration::from_millis(rtt_ms);
let packet_size = 1400;
println!("\n[{}ms RTT] Testing early timeout recovery", rtt_ms);
println!(" Initial pacing_rate: {} B/s", controller.pacing_rate());
println!(" Initial cwnd: {} bytes", controller.current_cwnd());
for _ in 0..5 {
let token = controller.on_send(packet_size);
time.advance(rtt);
controller.on_ack_with_token(rtt, packet_size, Some(token));
}
println!(" After 5 packets:");
println!(" pacing_rate: {} B/s", controller.pacing_rate());
println!(" cwnd: {} bytes", controller.current_cwnd());
println!(" max_bw: {} B/s", controller.max_bw());
controller.on_timeout();
let post_timeout_cwnd = controller.current_cwnd();
let post_timeout_pacing = controller.pacing_rate();
println!(" After early timeout:");
println!(" pacing_rate: {} B/s", post_timeout_pacing);
println!(" cwnd: {} bytes", post_timeout_cwnd);
println!(" max_bw: {} B/s", controller.max_bw());
let transfer_size = 500 * 1024; let (bytes_sent, elapsed) =
simulate_paced_transfer(&controller, &time, rtt, transfer_size, packet_size);
let throughput = bytes_sent as f64 / elapsed.as_secs_f64();
let throughput_kbps = throughput / 1024.0;
println!(" Final throughput: {:.1} KB/s", throughput_kbps);
assert!(
throughput >= min_expected_throughput as f64,
"[{}ms RTT] Post-early-timeout throughput {:.1} KB/s is below {} KB/s. \
Post-timeout state: cwnd={}B, pacing={}B/s",
rtt_ms,
throughput_kbps,
min_expected_throughput / 1024,
post_timeout_cwnd,
post_timeout_pacing
);
}
#[test]
fn test_initial_pacing_rate_is_reasonable() {
let controller = BbrController::new(BbrConfig::default());
let initial_pacing = controller.pacing_rate();
assert!(
initial_pacing >= 1_000_000,
"Initial pacing rate {} B/s is below 1 MB/s minimum",
initial_pacing
);
}
#[test]
fn test_pacing_rate_grows_with_successful_transfers() {
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
let rtt = Duration::from_millis(50);
let packet_size = 1400;
let initial_pacing = controller.pacing_rate();
for _ in 0..20 {
let cwnd = controller.current_cwnd();
let packets = (cwnd / packet_size).max(1);
let mut tokens = Vec::new();
for _ in 0..packets {
tokens.push(controller.on_send(packet_size));
}
time.advance(rtt);
for token in tokens {
controller.on_ack_with_token(rtt, packet_size, Some(token));
}
}
let final_pacing = controller.pacing_rate();
assert!(
final_pacing > initial_pacing * 10,
"Pacing rate didn't grow enough: {} -> {} (expected 10x+ increase)",
initial_pacing,
final_pacing
);
}
#[rstest]
#[case::domestic_50ms(50, 5_000_000)] #[case::intercontinental_135ms(135, 2_000_000)] fn test_longer_transfer_achieves_higher_throughput(
#[case] rtt_ms: u64,
#[case] min_expected_throughput: usize,
) {
use crate::simulation::TimeSource;
let time = VirtualTime::new();
let controller = BbrController::new_with_time_source(BbrConfig::default(), time.clone());
let rtt = Duration::from_millis(rtt_ms);
let packet_size = 1400;
let total_transfer = 5 * 1024 * 1024;
println!(
"\n[{}ms RTT] Testing 5MB transfer throughput ramp-up",
rtt_ms
);
println!(
" Initial pacing_rate: {} B/s ({:.1} KB/s)",
controller.pacing_rate(),
controller.pacing_rate() as f64 / 1024.0
);
println!(" Initial cwnd: {} bytes", controller.current_cwnd());
let chunk_size = total_transfer / 5;
let mut total_bytes_sent = 0usize;
let start_time = time.now_nanos();
let mut stage_throughputs = Vec::new();
for stage in 0..5 {
println!(
" [Before Stage {}] cwnd: {}KB, pacing: {:.1} KB/s, max_bw: {:.1} KB/s, state: {:?}",
stage + 1,
controller.current_cwnd() / 1024,
controller.pacing_rate() as f64 / 1024.0,
controller.max_bw() as f64 / 1024.0,
controller.state()
);
let stage_start = time.now_nanos();
let (bytes, _) = simulate_paced_transfer(&controller, &time, rtt, chunk_size, packet_size);
let stage_elapsed = Duration::from_nanos(time.now_nanos() - stage_start);
let stage_throughput = bytes as f64 / stage_elapsed.as_secs_f64();
total_bytes_sent += bytes;
stage_throughputs.push(stage_throughput);
println!(
" [After Stage {}] {:.1} KB/s (cwnd: {}KB, pacing: {:.1} KB/s, max_bw: {:.1} KB/s)",
stage + 1,
stage_throughput / 1024.0,
controller.current_cwnd() / 1024,
controller.pacing_rate() as f64 / 1024.0,
controller.max_bw() as f64 / 1024.0
);
}
let total_elapsed = Duration::from_nanos(time.now_nanos() - start_time);
let overall_throughput = total_bytes_sent as f64 / total_elapsed.as_secs_f64();
println!(
" Overall throughput: {:.1} KB/s ({:.2} MB/s)",
overall_throughput / 1024.0,
overall_throughput / (1024.0 * 1024.0)
);
println!(
" Final state: {:?}, max_bw: {:.1} KB/s",
controller.state(),
controller.max_bw() as f64 / 1024.0
);
let last_stage = stage_throughputs[4];
let min_sustained = min_expected_throughput as f64 * 0.5; assert!(
last_stage >= min_sustained,
"[{}ms RTT] Throughput collapsed: last stage {:.1} KB/s is below {:.1} KB/s",
rtt_ms,
last_stage / 1024.0,
min_sustained / 1024.0
);
assert!(
overall_throughput >= min_expected_throughput as f64,
"[{}ms RTT] Overall throughput {:.1} KB/s ({:.2} MB/s) is below expected {} KB/s",
rtt_ms,
overall_throughput / 1024.0,
overall_throughput / (1024.0 * 1024.0),
min_expected_throughput / 1024
);
}