use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
#[derive(Debug, Clone, Copy)]
pub struct DeliveryRateToken {
pub delivered: u64,
pub sent_time_nanos: u64,
pub sent: u64,
pub is_app_limited: bool,
pub first_sent_delivered: u64,
pub first_sent_time_nanos: u64,
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct DeliveryRateSample {
pub delivery_rate: u64,
pub is_app_limited: bool,
pub delivered: u64,
pub interval_nanos: u64,
pub rtt: Duration,
pub lost: u64,
}
const IDLE_RESET_THRESHOLD_NANOS: u64 = 500_000_000;
pub(crate) struct DeliveryRateSampler {
delivered: AtomicU64,
sent: AtomicU64,
lost: AtomicU64,
first_sent_time_nanos: AtomicU64,
first_sent_delivered: AtomicU64,
last_send_time_nanos: AtomicU64,
app_limited: AtomicBool,
round_count: AtomicU64,
round_start_delivered: AtomicU64,
}
impl DeliveryRateSampler {
pub(crate) fn new() -> Self {
Self {
delivered: AtomicU64::new(0),
sent: AtomicU64::new(0),
lost: AtomicU64::new(0),
first_sent_time_nanos: AtomicU64::new(0),
first_sent_delivered: AtomicU64::new(0),
last_send_time_nanos: AtomicU64::new(0),
app_limited: AtomicBool::new(false),
round_count: AtomicU64::new(0),
round_start_delivered: AtomicU64::new(0),
}
}
pub(crate) fn on_send(
&self,
bytes: usize,
now_nanos: u64,
inflight: usize,
) -> DeliveryRateToken {
let delivered = self.delivered.load(Ordering::Acquire);
let sent_before = self.sent.fetch_add(bytes as u64, Ordering::AcqRel);
let last_send = self.last_send_time_nanos.load(Ordering::Acquire);
let time_since_last_send = now_nanos.saturating_sub(last_send);
let is_truly_idle = inflight == 0 && time_since_last_send > IDLE_RESET_THRESHOLD_NANOS;
let first_sent_time_current = self.first_sent_time_nanos.load(Ordering::Acquire);
let should_reset = is_truly_idle || first_sent_time_current == 0;
let (first_sent_delivered, first_sent_time_nanos) = if should_reset {
self.first_sent_delivered
.store(delivered, Ordering::Release);
self.first_sent_time_nanos
.store(now_nanos, Ordering::Release);
(delivered, now_nanos)
} else {
(
self.first_sent_delivered.load(Ordering::Acquire),
first_sent_time_current,
)
};
self.last_send_time_nanos
.store(now_nanos, Ordering::Release);
let is_app_limited = self.app_limited.load(Ordering::Acquire);
DeliveryRateToken {
delivered,
sent_time_nanos: now_nanos,
sent: sent_before + bytes as u64,
is_app_limited,
first_sent_delivered,
first_sent_time_nanos,
}
}
pub(crate) fn on_ack(
&self,
token: DeliveryRateToken,
bytes_acked: usize,
now_nanos: u64,
) -> Option<DeliveryRateSample> {
let delivered_before = self
.delivered
.fetch_add(bytes_acked as u64, Ordering::AcqRel);
let delivered_now = delivered_before + bytes_acked as u64;
let send_elapsed = now_nanos.saturating_sub(token.first_sent_time_nanos);
let ack_elapsed = now_nanos.saturating_sub(token.sent_time_nanos);
let interval_nanos = send_elapsed.max(ack_elapsed);
if interval_nanos == 0 {
return None;
}
let delivered_bytes = delivered_now.saturating_sub(token.first_sent_delivered);
let delivery_rate = if delivered_bytes > 0 {
(delivered_bytes as u128 * 1_000_000_000 / interval_nanos as u128) as u64
} else {
0
};
let bytes_sent_since_token = self.sent.load(Ordering::Acquire).saturating_sub(token.sent);
let delivery_rate = if bytes_sent_since_token >= delivered_bytes / 2 {
let send_rate =
(bytes_sent_since_token as u128 * 1_000_000_000 / interval_nanos as u128) as u64;
delivery_rate.min(send_rate)
} else {
delivery_rate
};
let rtt_nanos = now_nanos.saturating_sub(token.sent_time_nanos);
let rtt = Duration::from_nanos(rtt_nanos);
if delivered_now > self.round_start_delivered.load(Ordering::Acquire) {
self.round_count.fetch_add(1, Ordering::AcqRel);
self.round_start_delivered
.store(delivered_now, Ordering::Release);
}
Some(DeliveryRateSample {
delivery_rate,
is_app_limited: token.is_app_limited,
delivered: delivered_bytes,
interval_nanos,
rtt,
lost: 0, })
}
pub(crate) fn on_loss(&self, bytes_lost: usize) {
self.lost.fetch_add(bytes_lost as u64, Ordering::AcqRel);
}
pub(crate) fn set_app_limited(&self, app_limited: bool) {
self.app_limited.store(app_limited, Ordering::Release);
}
pub(crate) fn is_app_limited(&self) -> bool {
self.app_limited.load(Ordering::Acquire)
}
pub(crate) fn delivered(&self) -> u64 {
self.delivered.load(Ordering::Acquire)
}
pub(crate) fn lost(&self) -> u64 {
self.lost.load(Ordering::Acquire)
}
pub(crate) fn round_count(&self) -> u64 {
self.round_count.load(Ordering::Acquire)
}
pub(crate) fn reset(&self) {
self.delivered.store(0, Ordering::Release);
self.sent.store(0, Ordering::Release);
self.lost.store(0, Ordering::Release);
self.first_sent_time_nanos.store(0, Ordering::Release);
self.first_sent_delivered.store(0, Ordering::Release);
self.last_send_time_nanos.store(0, Ordering::Release);
self.app_limited.store(false, Ordering::Release);
self.round_count.store(0, Ordering::Release);
self.round_start_delivered.store(0, Ordering::Release);
}
}
impl Default for DeliveryRateSampler {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for DeliveryRateSampler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeliveryRateSampler")
.field("delivered", &self.delivered.load(Ordering::Relaxed))
.field("sent", &self.sent.load(Ordering::Relaxed))
.field("lost", &self.lost.load(Ordering::Relaxed))
.field("app_limited", &self.app_limited.load(Ordering::Relaxed))
.field("round_count", &self.round_count.load(Ordering::Relaxed))
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_delivery_rate_basic() {
let sampler = DeliveryRateSampler::new();
let token = sampler.on_send(1000, 0, 0);
assert_eq!(token.delivered, 0);
assert_eq!(token.sent_time_nanos, 0);
let sample = sampler.on_ack(token, 1000, 100_000_000).unwrap();
assert_eq!(sample.delivery_rate, 10000);
assert_eq!(sample.rtt, Duration::from_millis(100));
}
#[test]
fn test_delivery_rate_multiple_packets() {
let sampler = DeliveryRateSampler::new();
let token1 = sampler.on_send(1000, 0, 0);
let token2 = sampler.on_send(1000, 10_000_000, 1000);
let sample1 = sampler.on_ack(token1, 1000, 100_000_000).unwrap();
assert_eq!(sample1.rtt, Duration::from_millis(100));
let sample2 = sampler.on_ack(token2, 1000, 110_000_000).unwrap();
assert_eq!(sample2.rtt, Duration::from_millis(100));
assert!(sample2.delivered > sample1.delivered);
}
#[test]
fn test_app_limited_flag() {
let sampler = DeliveryRateSampler::new();
assert!(!sampler.is_app_limited());
sampler.set_app_limited(true);
let token = sampler.on_send(1000, 0, 0);
assert!(token.is_app_limited);
sampler.set_app_limited(false);
let token2 = sampler.on_send(1000, 10_000_000, 1000);
assert!(!token2.is_app_limited);
}
#[test]
fn test_round_counting() {
let sampler = DeliveryRateSampler::new();
assert_eq!(sampler.round_count(), 0);
let token = sampler.on_send(1000, 0, 0);
sampler.on_ack(token, 1000, 100_000_000);
assert_eq!(sampler.round_count(), 1);
let token2 = sampler.on_send(1000, 100_000_000, 0);
sampler.on_ack(token2, 1000, 200_000_000);
assert_eq!(sampler.round_count(), 2);
}
#[test]
fn test_loss_tracking() {
let sampler = DeliveryRateSampler::new();
assert_eq!(sampler.lost(), 0);
sampler.on_loss(1000);
assert_eq!(sampler.lost(), 1000);
sampler.on_loss(500);
assert_eq!(sampler.lost(), 1500);
}
#[test]
fn test_reset() {
let sampler = DeliveryRateSampler::new();
let token = sampler.on_send(1000, 0, 0);
sampler.on_ack(token, 1000, 100_000_000);
sampler.on_loss(500);
sampler.set_app_limited(true);
assert!(sampler.delivered() > 0);
assert!(sampler.lost() > 0);
assert!(sampler.is_app_limited());
sampler.reset();
assert_eq!(sampler.delivered(), 0);
assert_eq!(sampler.lost(), 0);
assert!(!sampler.is_app_limited());
assert_eq!(sampler.round_count(), 0);
}
}