use naia_socket_shared::Instant;
use crate::connection::bandwidth::BandwidthConfig;
pub(crate) struct BandwidthAccumulator {
budget_bytes: f64,
target_bytes_per_sec: f64,
last_accumulate: Option<Instant>,
sent_this_tick: bool,
bytes_sent_this_tick: u64,
bytes_sent_last_tick: u64,
#[cfg(feature = "bench_instrumentation")]
packets_deferred_this_tick: u32,
#[cfg(feature = "bench_instrumentation")]
packets_deferred_last_tick: u32,
}
impl BandwidthAccumulator {
pub(crate) fn new(config: &BandwidthConfig) -> Self {
Self {
budget_bytes: 0.0,
target_bytes_per_sec: config.target_bytes_per_sec as f64,
last_accumulate: None,
sent_this_tick: false,
bytes_sent_this_tick: 0,
bytes_sent_last_tick: 0,
#[cfg(feature = "bench_instrumentation")]
packets_deferred_this_tick: 0,
#[cfg(feature = "bench_instrumentation")]
packets_deferred_last_tick: 0,
}
}
pub(crate) fn accumulate(&mut self, now: &Instant) {
if let Some(prev) = &self.last_accumulate {
let dt_secs = prev.elapsed(now).as_secs_f64();
self.budget_bytes += self.target_bytes_per_sec * dt_secs;
}
self.last_accumulate = Some(now.clone());
self.sent_this_tick = false;
self.bytes_sent_last_tick = self.bytes_sent_this_tick;
self.bytes_sent_this_tick = 0;
#[cfg(feature = "bench_instrumentation")]
{
self.packets_deferred_last_tick = self.packets_deferred_this_tick;
self.packets_deferred_this_tick = 0;
}
}
pub(crate) fn can_spend(&self, estimated_bytes: u32) -> bool {
if self.budget_bytes >= estimated_bytes as f64 {
return true;
}
if !self.sent_this_tick && self.budget_bytes > 0.0 {
return true;
}
false
}
pub(crate) fn spend(&mut self, actual_bytes: u32) {
self.budget_bytes -= actual_bytes as f64;
self.sent_this_tick = true;
self.bytes_sent_this_tick = self.bytes_sent_this_tick.saturating_add(actual_bytes as u64);
}
#[allow(dead_code)]
pub(crate) fn remaining(&self) -> f64 {
self.budget_bytes
}
#[allow(dead_code)]
pub(crate) fn bytes_sent_last_tick(&self) -> u64 {
self.bytes_sent_last_tick
}
#[inline]
pub(crate) fn record_deferred(&mut self) {
#[cfg(feature = "bench_instrumentation")]
{
self.packets_deferred_this_tick = self.packets_deferred_this_tick.saturating_add(1);
}
}
#[allow(dead_code)]
pub(crate) fn packets_deferred_last_tick(&self) -> u32 {
#[cfg(feature = "bench_instrumentation")]
{
self.packets_deferred_last_tick
}
#[cfg(not(feature = "bench_instrumentation"))]
{
0
}
}
}
#[cfg(test)]
mod tests {
use naia_socket_shared::Instant;
use super::*;
fn init_clock() {
#[cfg(feature = "test_time")]
naia_socket_shared::TestClock::init(0);
}
fn advance(t: &Instant, ms: u32) -> Instant {
let mut out = t.clone();
out.add_millis(ms);
out
}
#[test]
fn initial_budget_is_zero() {
init_clock();
let cfg = BandwidthConfig {
target_bytes_per_sec: 64_000,
};
let acc = BandwidthAccumulator::new(&cfg);
assert_eq!(acc.remaining(), 0.0);
}
#[test]
fn first_accumulate_just_sets_baseline() {
init_clock();
let cfg = BandwidthConfig {
target_bytes_per_sec: 64_000,
};
let mut acc = BandwidthAccumulator::new(&cfg);
let t0 = Instant::now();
acc.accumulate(&t0);
assert_eq!(acc.remaining(), 0.0);
}
#[test]
fn subsequent_accumulate_adds_rate_times_dt() {
init_clock();
let cfg = BandwidthConfig {
target_bytes_per_sec: 64_000,
};
let mut acc = BandwidthAccumulator::new(&cfg);
let t0 = Instant::now();
acc.accumulate(&t0);
let t1 = advance(&t0, 1000);
acc.accumulate(&t1);
assert!((acc.remaining() - 64_000.0).abs() < 1.0);
}
#[test]
fn spend_subtracts_from_budget() {
init_clock();
let cfg = BandwidthConfig {
target_bytes_per_sec: 64_000,
};
let mut acc = BandwidthAccumulator::new(&cfg);
let t0 = Instant::now();
acc.accumulate(&t0);
let t1 = advance(&t0, 1000);
acc.accumulate(&t1);
acc.spend(1000);
assert!((acc.remaining() - 63_000.0).abs() < 1.0);
}
#[test]
fn can_spend_true_when_budget_covers_estimate() {
init_clock();
let cfg = BandwidthConfig {
target_bytes_per_sec: 64_000,
};
let mut acc = BandwidthAccumulator::new(&cfg);
let t0 = Instant::now();
acc.accumulate(&t0);
let t1 = advance(&t0, 1000);
acc.accumulate(&t1);
assert!(acc.can_spend(1000));
assert!(acc.can_spend(64_000));
}
#[test]
fn one_packet_overshoot_when_budget_positive_but_short() {
init_clock();
let cfg = BandwidthConfig {
target_bytes_per_sec: 64_000,
};
let mut acc = BandwidthAccumulator::new(&cfg);
let t0 = Instant::now();
acc.accumulate(&t0);
let t1 = advance(&t0, 10); acc.accumulate(&t1);
assert!(acc.can_spend(1200));
acc.spend(1200);
assert!(!acc.can_spend(1200));
}
#[test]
fn overshoot_resets_on_next_accumulate() {
init_clock();
let cfg = BandwidthConfig {
target_bytes_per_sec: 64_000,
};
let mut acc = BandwidthAccumulator::new(&cfg);
let t0 = Instant::now();
acc.accumulate(&t0);
let t1 = advance(&t0, 10);
acc.accumulate(&t1);
acc.spend(1200);
let t2 = advance(&t1, 20); acc.accumulate(&t2);
assert!(acc.can_spend(1200));
}
#[test]
fn telemetry_bytes_sent_snapshots_per_tick() {
init_clock();
let cfg = BandwidthConfig { target_bytes_per_sec: 64_000 };
let mut acc = BandwidthAccumulator::new(&cfg);
let t0 = Instant::now();
acc.accumulate(&t0);
assert_eq!(acc.bytes_sent_last_tick(), 0);
let t1 = advance(&t0, 1000);
acc.accumulate(&t1);
acc.spend(500);
acc.spend(300);
assert_eq!(acc.bytes_sent_last_tick(), 0);
let t2 = advance(&t1, 1000);
acc.accumulate(&t2);
assert_eq!(acc.bytes_sent_last_tick(), 800);
acc.spend(100);
let t3 = advance(&t2, 1000);
acc.accumulate(&t3);
assert_eq!(acc.bytes_sent_last_tick(), 100);
}
#[test]
fn can_spend_false_when_budget_nonpositive_and_no_slack() {
init_clock();
let cfg = BandwidthConfig {
target_bytes_per_sec: 64_000,
};
let mut acc = BandwidthAccumulator::new(&cfg);
let t0 = Instant::now();
acc.accumulate(&t0);
let t1 = advance(&t0, 10);
acc.accumulate(&t1);
acc.spend(1200);
assert!(!acc.can_spend(1));
}
}