#![allow(dead_code)]
use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct PacingConfig {
pub target_bitrate_bps: f64,
pub burst_factor: f64,
pub max_queue_depth: usize,
pub priority_queue: bool,
pub min_gap: Duration,
pub max_gap: Duration,
}
impl Default for PacingConfig {
fn default() -> Self {
Self {
target_bitrate_bps: 4_000_000.0, burst_factor: 1.5,
max_queue_depth: 512,
priority_queue: true,
min_gap: Duration::from_micros(100),
max_gap: Duration::from_millis(50),
}
}
}
impl PacingConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_bitrate(mut self, bps: f64) -> Self {
self.target_bitrate_bps = bps;
self
}
#[must_use]
pub fn with_burst_factor(mut self, factor: f64) -> Self {
self.burst_factor = factor.clamp(1.0, 10.0);
self
}
#[must_use]
pub fn ideal_gap(&self, packet_bytes: usize) -> Duration {
if self.target_bitrate_bps <= 0.0 {
return self.max_gap;
}
let bits = (packet_bytes as f64) * 8.0;
let secs = bits / self.target_bitrate_bps;
let gap = Duration::from_secs_f64(secs);
gap.clamp(self.min_gap, self.max_gap)
}
#[must_use]
pub fn max_bucket_bits(&self) -> f64 {
self.target_bitrate_bps * self.burst_factor
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum PacketPriority {
Control = 3,
High = 2,
Normal = 1,
Low = 0,
}
#[derive(Debug)]
pub struct PacedPacket {
pub payload: Vec<u8>,
pub priority: PacketPriority,
pub not_before: Instant,
pub enqueued_at: Instant,
pub seq: u32,
}
impl PacedPacket {
#[must_use]
pub fn new(payload: Vec<u8>, seq: u32, not_before: Instant) -> Self {
Self {
payload,
priority: PacketPriority::Normal,
not_before,
enqueued_at: Instant::now(),
seq,
}
}
#[must_use]
pub const fn with_priority(mut self, priority: PacketPriority) -> Self {
self.priority = priority;
self
}
#[must_use]
pub fn size_bytes(&self) -> usize {
self.payload.len()
}
#[must_use]
pub fn queue_latency(&self) -> Duration {
self.enqueued_at.elapsed()
}
}
struct TokenBucket {
tokens: f64,
max_tokens: f64,
fill_rate_bps: f64,
last_fill: Instant,
}
impl TokenBucket {
fn new(fill_rate_bps: f64, max_tokens: f64) -> Self {
Self {
tokens: max_tokens, max_tokens,
fill_rate_bps,
last_fill: Instant::now(),
}
}
fn refill(&mut self) {
let elapsed = self.last_fill.elapsed().as_secs_f64();
let new_tokens = elapsed * self.fill_rate_bps;
self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
self.last_fill = Instant::now();
}
fn try_consume(&mut self, bits: f64) -> bool {
self.refill();
if self.tokens >= bits {
self.tokens -= bits;
true
} else {
false
}
}
fn fill_fraction(&self) -> f64 {
if self.max_tokens <= 0.0 {
return 0.0;
}
(self.tokens / self.max_tokens).clamp(0.0, 1.0)
}
fn set_rate(&mut self, fill_rate_bps: f64, max_tokens: f64) {
self.fill_rate_bps = fill_rate_bps;
self.max_tokens = max_tokens;
self.tokens = self.tokens.min(max_tokens);
}
}
#[derive(Debug, Clone, Default)]
pub struct PacingStats {
pub queue_depth: usize,
pub packets_sent: u64,
pub packets_dropped: u64,
pub bytes_sent: u64,
pub avg_queue_latency: Duration,
pub output_bitrate_bps: f64,
pub bucket_fill: f64,
}
pub struct PacketPacer {
config: PacingConfig,
high_queue: VecDeque<PacedPacket>,
normal_queue: VecDeque<PacedPacket>,
low_queue: VecDeque<PacedPacket>,
bucket: TokenBucket,
next_send: Instant,
stats: PacingStats,
throughput_ema: f64,
last_throughput_update: Instant,
window_bytes: u64,
window_start: Instant,
}
impl PacketPacer {
#[must_use]
pub fn new(config: PacingConfig) -> Self {
let max_bits = config.max_bucket_bits();
let rate = config.target_bitrate_bps;
let bucket = TokenBucket::new(rate, max_bits);
let now = Instant::now();
let mut pacer = Self {
config,
high_queue: VecDeque::new(),
normal_queue: VecDeque::new(),
low_queue: VecDeque::new(),
bucket,
next_send: now,
stats: PacingStats::default(),
throughput_ema: 0.0,
last_throughput_update: now,
window_bytes: 0,
window_start: now,
};
pacer.update_stats();
pacer
}
pub fn enqueue(&mut self, packet: PacedPacket) -> bool {
let total_depth = self.high_queue.len() + self.normal_queue.len() + self.low_queue.len();
if total_depth >= self.config.max_queue_depth {
self.stats.packets_dropped += 1;
return false;
}
match packet.priority {
PacketPriority::Control | PacketPriority::High => {
self.high_queue.push_back(packet);
}
PacketPriority::Normal => {
self.normal_queue.push_back(packet);
}
PacketPriority::Low => {
self.low_queue.push_back(packet);
}
}
self.update_stats();
true
}
pub fn dequeue_ready(&mut self) -> Vec<PacedPacket> {
let mut out = Vec::new();
loop {
let now = Instant::now();
let next = if let Some(p) = self.high_queue.front() {
if p.not_before <= now {
self.high_queue.pop_front()
} else {
None
}
} else if let Some(p) = self.normal_queue.front() {
if p.not_before <= now {
self.normal_queue.pop_front()
} else {
None
}
} else if let Some(p) = self.low_queue.front() {
if p.not_before <= now {
self.low_queue.pop_front()
} else {
None
}
} else {
break;
};
let packet = match next {
Some(p) => p,
None => break,
};
if now < self.next_send {
match packet.priority {
PacketPriority::Control | PacketPriority::High => {
self.high_queue.push_front(packet);
}
PacketPriority::Normal => {
self.normal_queue.push_front(packet);
}
PacketPriority::Low => {
self.low_queue.push_front(packet);
}
}
break;
}
let bits = packet.size_bytes() as f64 * 8.0;
if !self.bucket.try_consume(bits) {
match packet.priority {
PacketPriority::Control | PacketPriority::High => {
self.high_queue.push_front(packet);
}
PacketPriority::Normal => {
self.normal_queue.push_front(packet);
}
PacketPriority::Low => {
self.low_queue.push_front(packet);
}
}
break;
}
let raw_gap = if self.config.target_bitrate_bps > 0.0 {
let bits_f = (packet.size_bytes() as f64) * 8.0;
Duration::from_secs_f64(bits_f / self.config.target_bitrate_bps)
} else {
self.config.max_gap
};
let gap = if raw_gap >= self.config.min_gap {
raw_gap.min(self.config.max_gap)
} else {
Duration::ZERO
};
self.next_send = now + gap;
self.window_bytes += packet.size_bytes() as u64;
self.stats.bytes_sent += packet.size_bytes() as u64;
self.stats.packets_sent += 1;
out.push(packet);
}
self.update_throughput();
self.update_stats();
out
}
pub fn set_bitrate(&mut self, bps: f64) {
self.config.target_bitrate_bps = bps;
let max_bits = self.config.max_bucket_bits();
self.bucket.set_rate(bps, max_bits);
}
#[must_use]
pub const fn config(&self) -> &PacingConfig {
&self.config
}
#[must_use]
pub fn stats(&self) -> &PacingStats {
&self.stats
}
#[must_use]
pub fn queue_depth(&self) -> usize {
self.high_queue.len() + self.normal_queue.len() + self.low_queue.len()
}
#[must_use]
pub fn high_priority_depth(&self) -> usize {
self.high_queue.len()
}
fn update_stats(&mut self) {
self.stats.queue_depth = self.queue_depth();
self.stats.bucket_fill = self.bucket.fill_fraction();
}
fn update_throughput(&mut self) {
let elapsed = self.window_start.elapsed();
if elapsed >= Duration::from_millis(100) {
let bps = (self.window_bytes as f64 * 8.0) / elapsed.as_secs_f64();
self.throughput_ema = 0.8 * self.throughput_ema + 0.2 * bps;
self.stats.output_bitrate_bps = self.throughput_ema;
self.window_bytes = 0;
self.window_start = Instant::now();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_packet(seq: u32, size: usize) -> PacedPacket {
let not_before = Instant::now(); PacedPacket::new(vec![0u8; size], seq, not_before)
}
fn make_high_packet(seq: u32, size: usize) -> PacedPacket {
make_packet(seq, size).with_priority(PacketPriority::Control)
}
#[test]
fn test_pacing_config_default() {
let cfg = PacingConfig::default();
assert!(cfg.target_bitrate_bps > 0.0);
assert!(cfg.burst_factor >= 1.0);
assert!(cfg.max_queue_depth > 0);
}
#[test]
fn test_pacing_config_builder() {
let cfg = PacingConfig::new()
.with_bitrate(10_000_000.0)
.with_burst_factor(2.0);
assert!((cfg.target_bitrate_bps - 10_000_000.0).abs() < 1.0);
assert!((cfg.burst_factor - 2.0).abs() < 1e-9);
}
#[test]
fn test_ideal_gap() {
let cfg = PacingConfig::new().with_bitrate(1_000_000.0); let gap = cfg.ideal_gap(1250);
assert!(gap >= Duration::from_millis(9) && gap <= Duration::from_millis(11));
}
#[test]
fn test_max_bucket_bits() {
let cfg = PacingConfig::new()
.with_bitrate(1_000_000.0)
.with_burst_factor(2.0);
assert!((cfg.max_bucket_bits() - 2_000_000.0).abs() < 1.0);
}
#[test]
fn test_paced_packet_size() {
let pkt = make_packet(0, 188);
assert_eq!(pkt.size_bytes(), 188);
}
#[test]
fn test_paced_packet_priority() {
let pkt = make_packet(0, 100).with_priority(PacketPriority::High);
assert_eq!(pkt.priority, PacketPriority::High);
}
#[test]
fn test_pacer_enqueue_dequeue() {
let cfg = PacingConfig::new().with_bitrate(1_000_000_000.0); let mut pacer = PacketPacer::new(cfg);
assert!(pacer.enqueue(make_packet(0, 188)));
let out = pacer.dequeue_ready();
assert_eq!(out.len(), 1);
assert_eq!(out[0].seq, 0);
}
#[test]
fn test_pacer_queue_depth() {
let cfg = PacingConfig::new().with_bitrate(1.0); let mut pacer = PacketPacer::new(cfg);
pacer.enqueue(make_packet(0, 1));
pacer.enqueue(make_packet(1, 1));
assert_eq!(pacer.queue_depth(), 2);
}
#[test]
fn test_pacer_queue_overflow() {
let mut cfg = PacingConfig::new();
cfg.max_queue_depth = 2;
cfg.target_bitrate_bps = 1.0; let mut pacer = PacketPacer::new(cfg);
pacer.enqueue(make_packet(0, 1));
pacer.enqueue(make_packet(1, 1));
let dropped = !pacer.enqueue(make_packet(2, 1)); assert!(dropped);
assert_eq!(pacer.stats().packets_dropped, 1);
}
#[test]
fn test_pacer_priority_order() {
let cfg = PacingConfig::new().with_bitrate(1_000_000_000.0);
let mut pacer = PacketPacer::new(cfg);
pacer.enqueue(make_packet(1, 10).with_priority(PacketPriority::Low));
pacer.enqueue(make_high_packet(2, 10));
let out = pacer.dequeue_ready();
assert_eq!(out.len(), 2);
assert_eq!(out[0].priority, PacketPriority::Control);
}
#[test]
fn test_pacer_set_bitrate() {
let mut pacer = PacketPacer::new(PacingConfig::default());
pacer.set_bitrate(10_000_000.0);
assert!((pacer.config().target_bitrate_bps - 10_000_000.0).abs() < 1.0);
}
#[test]
fn test_pacer_stats_after_dequeue() {
let cfg = PacingConfig::new().with_bitrate(1_000_000_000.0);
let mut pacer = PacketPacer::new(cfg);
pacer.enqueue(make_packet(0, 200));
pacer.dequeue_ready();
assert_eq!(pacer.stats().packets_sent, 1);
assert_eq!(pacer.stats().bytes_sent, 200);
}
#[test]
fn test_pacer_high_priority_depth() {
let cfg = PacingConfig::new().with_bitrate(1.0); let mut pacer = PacketPacer::new(cfg);
pacer.enqueue(make_high_packet(0, 1));
pacer.enqueue(make_high_packet(1, 1));
assert_eq!(pacer.high_priority_depth(), 2);
}
#[test]
fn test_paced_packet_queue_latency() {
let pkt = make_packet(0, 100);
let latency = pkt.queue_latency();
assert!(latency >= Duration::ZERO);
}
#[test]
fn test_packet_priority_ordering() {
assert!(PacketPriority::Control > PacketPriority::High);
assert!(PacketPriority::High > PacketPriority::Normal);
assert!(PacketPriority::Normal > PacketPriority::Low);
}
#[test]
fn test_token_bucket_starts_full() {
let cfg = PacingConfig::new().with_bitrate(1_000_000.0);
let pacer = PacketPacer::new(cfg);
assert!((pacer.stats().bucket_fill - 1.0).abs() < 1e-3);
}
#[test]
fn test_pacer_zero_length_payload() {
let cfg = PacingConfig::new().with_bitrate(1_000_000_000.0);
let mut pacer = PacketPacer::new(cfg);
pacer.enqueue(make_packet(0, 0));
let out = pacer.dequeue_ready();
assert_eq!(out.len(), 1);
}
#[test]
fn test_pacer_multiple_packets() {
let cfg = PacingConfig::new().with_bitrate(1_000_000_000.0);
let mut pacer = PacketPacer::new(cfg);
for i in 0..10u32 {
pacer.enqueue(make_packet(i, 100));
}
let out = pacer.dequeue_ready();
assert_eq!(out.len(), 10);
}
#[test]
fn test_ideal_gap_zero_bitrate() {
let cfg = PacingConfig::new().with_bitrate(0.0);
let gap = cfg.ideal_gap(1000);
assert_eq!(gap, cfg.max_gap);
}
#[test]
fn test_pacer_all_priority_queues() {
let cfg = PacingConfig::new().with_bitrate(1.0); let mut pacer = PacketPacer::new(cfg);
pacer.enqueue(make_packet(0, 1).with_priority(PacketPriority::Low));
pacer.enqueue(make_packet(1, 1).with_priority(PacketPriority::Normal));
pacer.enqueue(make_packet(2, 1).with_priority(PacketPriority::High));
pacer.enqueue(make_packet(3, 1).with_priority(PacketPriority::Control));
assert_eq!(pacer.queue_depth(), 4);
assert_eq!(pacer.high_priority_depth(), 2); }
}