#[derive(Debug, Clone)]
pub struct RtmpSessionStats {
pub total_bytes: u64,
pub chunks_sent: u64,
pub avg_chunk_size: f64,
pub dropped_frames: u32,
}
impl Default for RtmpSessionStats {
fn default() -> Self {
Self::new()
}
}
impl RtmpSessionStats {
#[must_use]
pub fn new() -> Self {
Self {
total_bytes: 0,
chunks_sent: 0,
avg_chunk_size: 0.0,
dropped_frames: 0,
}
}
pub fn record_chunk(&mut self, chunk_size: u32) {
self.total_bytes += u64::from(chunk_size);
self.chunks_sent += 1;
self.avg_chunk_size = self.total_bytes as f64 / self.chunks_sent as f64;
}
pub fn drop_frame(&mut self) {
self.dropped_frames += 1;
}
pub fn reset(&mut self) {
self.total_bytes = 0;
self.chunks_sent = 0;
self.avg_chunk_size = 0.0;
self.dropped_frames = 0;
}
#[must_use]
pub fn throughput_bps(&self, elapsed_secs: f64) -> f64 {
if elapsed_secs <= 0.0 {
return 0.0;
}
(self.total_bytes as f64 * 8.0) / elapsed_secs
}
}
pub const MIN_CHUNK_SIZE: u32 = 128;
pub const MAX_CHUNK_SIZE: u32 = 65536;
pub const DEFAULT_CHUNK_SIZE: u32 = 4096;
#[derive(Debug)]
pub struct RtmpChunkOptimizer {
min_chunk_size: u32,
max_chunk_size: u32,
current_chunk_size: u32,
stats: RtmpSessionStats,
}
impl Default for RtmpChunkOptimizer {
fn default() -> Self {
Self::new()
}
}
impl RtmpChunkOptimizer {
#[must_use]
pub fn new() -> Self {
Self {
min_chunk_size: MIN_CHUNK_SIZE,
max_chunk_size: MAX_CHUNK_SIZE,
current_chunk_size: DEFAULT_CHUNK_SIZE,
stats: RtmpSessionStats::new(),
}
}
#[must_use]
pub fn with_bounds(min: u32, max: u32) -> Self {
let clamped_min = min.clamp(MIN_CHUNK_SIZE, MAX_CHUNK_SIZE);
let clamped_max = max.clamp(MIN_CHUNK_SIZE, MAX_CHUNK_SIZE);
let (effective_min, effective_max) = if clamped_min <= clamped_max {
(clamped_min, clamped_max)
} else {
(clamped_max, clamped_max)
};
Self {
min_chunk_size: effective_min,
max_chunk_size: effective_max,
current_chunk_size: DEFAULT_CHUNK_SIZE.clamp(effective_min, effective_max),
stats: RtmpSessionStats::new(),
}
}
pub fn optimize_chunk_size(&mut self, bandwidth_bps: u64) -> u32 {
let raw = ((bandwidth_bps as f64 / 8.0) * 0.1).sqrt();
let rounded = round_to_multiple_of_128(raw as u64);
let clamped = rounded.clamp(self.min_chunk_size, self.max_chunk_size);
self.current_chunk_size = clamped;
clamped
}
#[must_use]
pub fn current_size(&self) -> u32 {
self.current_chunk_size
}
#[must_use]
pub fn stats(&self) -> &RtmpSessionStats {
&self.stats
}
pub fn stats_mut(&mut self) -> &mut RtmpSessionStats {
&mut self.stats
}
pub fn record_sent(&mut self, bytes: u32) {
self.stats.record_chunk(bytes);
}
pub fn adaptive_update(&mut self, bandwidth_bps: u64, packet_loss_pct: f64) -> u32 {
if packet_loss_pct > 5.0 {
let reduced = (self.current_chunk_size as f64 * 0.75) as u32;
self.current_chunk_size = reduced.clamp(self.min_chunk_size, self.max_chunk_size);
} else if packet_loss_pct < 1.0 {
self.optimize_chunk_size(bandwidth_bps);
}
self.current_chunk_size
}
#[must_use]
pub fn suggested_chunk_count(&self, total_bytes: u64) -> u64 {
if total_bytes == 0 || self.current_chunk_size == 0 {
return 0;
}
let cs = u64::from(self.current_chunk_size);
(total_bytes + cs - 1) / cs
}
}
fn round_to_multiple_of_128(value: u64) -> u32 {
if value == 0 {
return 128;
}
let rem = value % 128;
let rounded = if rem == 0 { value } else { value + (128 - rem) };
rounded.min(u32::MAX as u64) as u32
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_optimize_chunk_size_low_bandwidth_clamped_to_min() {
let mut opt = RtmpChunkOptimizer::new();
let size = opt.optimize_chunk_size(64_000);
assert_eq!(size, MIN_CHUNK_SIZE, "Expected MIN_CHUNK_SIZE at 64 kbps");
}
#[test]
fn test_optimize_chunk_size_medium_bandwidth() {
let mut opt = RtmpChunkOptimizer::new();
let size = opt.optimize_chunk_size(10_000_000);
assert!(size >= MIN_CHUNK_SIZE, "Must be >= MIN");
assert!(size <= MAX_CHUNK_SIZE, "Must be <= MAX");
assert_eq!(size % 128, 0, "Must be a multiple of 128");
assert_eq!(size, 384, "10 Mbps should yield chunk size 384");
}
#[test]
fn test_optimize_chunk_size_high_bandwidth_clamped_to_max() {
let mut opt = RtmpChunkOptimizer::new();
let size = opt.optimize_chunk_size(u64::MAX);
assert_eq!(
size, MAX_CHUNK_SIZE,
"Expected MAX_CHUNK_SIZE for extreme bandwidth"
);
}
#[test]
fn test_optimize_chunk_size_zero_bandwidth_clamped_to_min() {
let mut opt = RtmpChunkOptimizer::new();
let size = opt.optimize_chunk_size(0);
assert_eq!(size, MIN_CHUNK_SIZE);
}
#[test]
fn test_optimize_chunk_size_stored() {
let mut opt = RtmpChunkOptimizer::new();
let size = opt.optimize_chunk_size(10_000_000);
assert_eq!(opt.current_size(), size);
}
#[test]
fn test_with_bounds_clamps() {
let opt = RtmpChunkOptimizer::with_bounds(0, 1_000_000);
assert_eq!(opt.min_chunk_size, MIN_CHUNK_SIZE);
assert_eq!(opt.max_chunk_size, MAX_CHUNK_SIZE);
}
#[test]
fn test_with_bounds_custom() {
let opt = RtmpChunkOptimizer::with_bounds(256, 8192);
assert_eq!(opt.min_chunk_size, 256);
assert_eq!(opt.max_chunk_size, 8192);
}
#[test]
fn test_rtmp_session_stats_record_chunk() {
let mut s = RtmpSessionStats::new();
s.record_chunk(1024);
assert_eq!(s.total_bytes, 1024);
assert_eq!(s.chunks_sent, 1);
}
#[test]
fn test_rtmp_session_stats_avg_chunk_size() {
let mut s = RtmpSessionStats::new();
s.record_chunk(1000);
s.record_chunk(3000);
assert!((s.avg_chunk_size - 2000.0).abs() < 1e-6);
}
#[test]
fn test_rtmp_session_stats_drop_frame() {
let mut s = RtmpSessionStats::new();
s.drop_frame();
s.drop_frame();
assert_eq!(s.dropped_frames, 2);
}
#[test]
fn test_rtmp_session_stats_reset() {
let mut s = RtmpSessionStats::new();
s.record_chunk(512);
s.drop_frame();
s.reset();
assert_eq!(s.total_bytes, 0);
assert_eq!(s.chunks_sent, 0);
assert!((s.avg_chunk_size).abs() < 1e-12);
assert_eq!(s.dropped_frames, 0);
}
#[test]
fn test_rtmp_session_stats_throughput_bps() {
let mut s = RtmpSessionStats::new();
s.record_chunk(1_000_000); let bps = s.throughput_bps(1.0);
assert!(
(bps - 8_000_000.0).abs() < 1.0,
"Expected 8 Mbps for 1 MB in 1 s"
);
}
#[test]
fn test_rtmp_session_stats_throughput_zero_elapsed() {
let mut s = RtmpSessionStats::new();
s.record_chunk(100);
assert_eq!(s.throughput_bps(0.0), 0.0);
assert_eq!(s.throughput_bps(-1.0), 0.0);
}
#[test]
fn test_adaptive_update_high_loss_reduces_size() {
let mut opt = RtmpChunkOptimizer::new();
opt.current_chunk_size = 4096;
let new_size = opt.adaptive_update(10_000_000, 6.0);
assert_eq!(new_size, 3072);
assert_eq!(opt.current_size(), 3072);
}
#[test]
fn test_adaptive_update_low_loss_optimizes() {
let mut opt = RtmpChunkOptimizer::new();
let new_size = opt.adaptive_update(10_000_000, 0.5);
assert_eq!(new_size, 384);
}
#[test]
fn test_adaptive_update_medium_loss_holds() {
let mut opt = RtmpChunkOptimizer::new();
opt.current_chunk_size = 2048;
let new_size = opt.adaptive_update(10_000_000, 3.0);
assert_eq!(new_size, 2048, "Medium loss should hold current size");
}
#[test]
fn test_suggested_chunk_count_exact() {
let mut opt = RtmpChunkOptimizer::new();
opt.current_chunk_size = 1024;
assert_eq!(opt.suggested_chunk_count(4096), 4);
}
#[test]
fn test_suggested_chunk_count_ceiling() {
let mut opt = RtmpChunkOptimizer::new();
opt.current_chunk_size = 1000;
assert_eq!(opt.suggested_chunk_count(2500), 3);
}
#[test]
fn test_suggested_chunk_count_zero_bytes() {
let opt = RtmpChunkOptimizer::new();
assert_eq!(opt.suggested_chunk_count(0), 0);
}
#[test]
fn test_record_sent_delegates() {
let mut opt = RtmpChunkOptimizer::new();
opt.record_sent(512);
opt.record_sent(512);
assert_eq!(opt.stats().total_bytes, 1024);
assert_eq!(opt.stats().chunks_sent, 2);
}
}