use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::VecDeque;
use web_time::{Duration, Instant};
use crate::buffer::{BufferReturnCode, PacketBuffer, SmartFlushConfig};
use crate::buffer_level_filter::BufferLevelFilter;
use crate::delay_manager::{DelayConfig, DelayManager};
use crate::expand::ExpandFactory;
use crate::expand::{Expand, ExpandPhase};
use crate::packet::AudioPacket;
use crate::statistics::{
LifetimeStatistics, NetworkStatistics, StatisticsCalculator, TimeStretchOperation,
};
use crate::time_stretch::{TimeStretchFactory, TimeStretcher};
use crate::{NetEqError, Result};
const K_DECELERATION_TARGET_LEVEL_OFFSET_MS: u32 = 85;
const K_BASE_MIN_DELAY_MS: u32 = 20;
#[derive(Debug, Clone)]
pub struct NetEqConfig {
pub sample_rate: u32,
pub channels: u8,
pub max_packets_in_buffer: usize,
pub max_delay_ms: u32,
pub min_delay_ms: u32,
pub additional_delay_ms: u32,
pub for_test_no_time_stretching: bool,
pub bypass_mode: bool,
pub delay_config: DelayConfig,
pub smart_flush_config: SmartFlushConfig,
}
impl Default for NetEqConfig {
fn default() -> Self {
Self {
sample_rate: 16000,
channels: 1,
max_packets_in_buffer: 200,
max_delay_ms: 0,
min_delay_ms: 0,
additional_delay_ms: 0,
for_test_no_time_stretching: false,
bypass_mode: false,
delay_config: DelayConfig::default(),
smart_flush_config: SmartFlushConfig::default(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Operation {
Normal,
Merge,
Expand,
ExpandStart,
ExpandEnd,
Accelerate,
FastAccelerate,
PreemptiveExpand,
TimeStretchBuffer,
ComfortNoise,
Dtmf,
Undefined,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetEqStats {
pub network: NetworkStatistics,
pub lifetime: LifetimeStatistics,
pub current_buffer_size_ms: u32,
pub target_delay_ms: u32,
pub packets_awaiting_decode: usize,
#[serde(default)]
pub packets_per_sec: u32,
}
#[derive(Debug, Clone)]
pub struct AudioFrame {
pub samples: Vec<f32>,
pub sample_rate: u32,
pub channels: u8,
pub samples_per_channel: usize,
pub speech_type: SpeechType,
pub vad_activity: bool,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SpeechType {
Normal,
Cng, Expand, Music, }
impl AudioFrame {
pub fn new(sample_rate: u32, channels: u8, samples_per_channel: usize) -> Self {
Self {
samples: vec![0.0; samples_per_channel * channels as usize],
sample_rate,
channels,
samples_per_channel,
speech_type: SpeechType::Normal,
vad_activity: false,
}
}
pub fn duration_ms(&self) -> u32 {
(self.samples_per_channel as u32 * 1000) / self.sample_rate
}
}
pub struct NetEq {
config: NetEqConfig,
packet_buffer: PacketBuffer,
delay_manager: DelayManager,
buffer_level_filter: BufferLevelFilter,
statistics: StatisticsCalculator,
accelerate: Box<dyn TimeStretcher + Send>,
preemptive_expand: Box<dyn TimeStretcher + Send>,
expand: Box<Expand>,
last_decode_timestamp: Option<u32>,
output_frame_size_samples: usize,
_muted: bool,
last_operation: Operation,
consecutive_expands: u32,
frame_timestamp: u32,
leftover_samples: Vec<f32>,
decoders: HashMap<u8, Box<dyn crate::codec::AudioDecoder + Send>>,
bypass_audio_queue: VecDeque<f32>,
packets_received_this_second: u32,
last_packets_second_instant: Instant,
packets_per_sec_snapshot: u32,
leftover_time_stretched_samples: Vec<f32>,
timestretch_added_samples: i32,
}
impl NetEq {
pub fn new(config: NetEqConfig) -> Result<Self> {
if config.sample_rate == 0 {
return Err(NetEqError::InvalidSampleRate(config.sample_rate));
}
if config.channels == 0 {
return Err(NetEqError::InvalidChannelCount(config.channels));
}
let output_frame_size_samples =
(config.sample_rate / 100) as usize * config.channels as usize;
let packet_buffer = PacketBuffer::with_config(
config.max_packets_in_buffer,
config.smart_flush_config.clone(),
);
let mut delay_config = config.delay_config.clone();
delay_config.base_minimum_delay_ms = K_BASE_MIN_DELAY_MS;
delay_config.base_maximum_delay_ms = (config.max_packets_in_buffer * 20 * 3 / 4) as u32;
delay_config.additional_delay_ms = config.additional_delay_ms;
let mut delay_manager = DelayManager::new(delay_config);
if config.min_delay_ms > 0 {
delay_manager.set_minimum_delay(config.min_delay_ms);
}
if config.max_delay_ms > 0 {
delay_manager.set_maximum_delay(config.max_delay_ms);
}
let statistics = StatisticsCalculator::new();
let buffer_level_filter = BufferLevelFilter::new(config.sample_rate);
let accelerate: Box<dyn TimeStretcher + Send> =
TimeStretchFactory::create_accelerate(config.sample_rate, config.channels);
let preemptive_expand: Box<dyn TimeStretcher + Send> =
TimeStretchFactory::create_preemptive_expand(config.sample_rate, config.channels);
let expand: Box<Expand> = ExpandFactory::create_expand(config.sample_rate, config.channels);
Ok(Self {
config,
packet_buffer,
delay_manager,
buffer_level_filter,
statistics,
accelerate,
preemptive_expand,
expand,
last_decode_timestamp: None,
output_frame_size_samples,
_muted: false,
last_operation: Operation::Normal,
consecutive_expands: 0,
frame_timestamp: 0,
leftover_samples: Vec::new(),
decoders: HashMap::new(),
bypass_audio_queue: VecDeque::new(),
packets_received_this_second: 0,
last_packets_second_instant: Instant::now(),
packets_per_sec_snapshot: 0,
leftover_time_stretched_samples: Vec::new(),
timestretch_added_samples: 0,
})
}
pub fn insert_packet(&mut self, packet: AudioPacket) -> Result<()> {
self.packets_received_this_second = self.packets_received_this_second.saturating_add(1);
self.maybe_roll_packet_rate();
if self.config.bypass_mode {
if let Some(decoder) = self.decoders.get_mut(&packet.header.payload_type) {
match decoder.decode(&packet.payload) {
Ok(pcm_samples) => {
let sample_count = pcm_samples.len();
self.bypass_audio_queue.extend(pcm_samples);
log::trace!("Bypass mode: decoded {sample_count} samples");
}
Err(e) => {
log::warn!("Bypass mode decode error: {e:?}");
}
}
} else {
log::warn!(
"No decoder registered for payload type {} in bypass mode",
packet.header.payload_type
);
}
return Ok(());
}
self.delay_manager
.update(packet.header.timestamp, packet.sample_rate, false)?;
let target_delay = self.delay_manager.target_delay_ms();
let result =
self.packet_buffer
.insert_packet(packet, &mut self.statistics, target_delay)?;
self.statistics
.update_buffer_size(self.current_buffer_size_ms() as u16, target_delay as u16);
match result {
BufferReturnCode::Flushed => {
log::info!("Buffer flushed due to overflow");
}
BufferReturnCode::PartialFlush => {
log::debug!("Partial buffer flush performed");
}
_ => {}
}
Ok(())
}
pub fn get_audio(&mut self) -> Result<AudioFrame> {
self.maybe_roll_packet_rate();
if self.config.bypass_mode {
let mut frame = AudioFrame::new(
self.config.sample_rate,
self.config.channels,
self.output_frame_size_samples / self.config.channels as usize,
);
let samples_needed = self.output_frame_size_samples;
let mut filled = 0;
while filled < samples_needed && !self.bypass_audio_queue.is_empty() {
frame.samples[filled] = self.bypass_audio_queue.pop_front().unwrap();
filled += 1;
}
while filled < samples_needed {
frame.samples[filled] = 0.0;
filled += 1;
}
frame.speech_type = if filled == samples_needed {
SpeechType::Normal
} else {
SpeechType::Expand
};
frame.vad_activity = filled > 0;
return Ok(frame);
}
let mut frame = AudioFrame::new(
self.config.sample_rate,
self.config.channels,
self.output_frame_size_samples / self.config.channels as usize,
);
let pre_buffer_ms = self.current_buffer_size_ms();
let pre_target_delay = self.delay_manager.target_delay_ms();
let pre_packet_count = self.packet_buffer.len();
log::debug!(
"get_audio pre-decision: buffer={pre_buffer_ms}ms, target={pre_target_delay}ms, packets={pre_packet_count}"
);
let operation = self.get_decision()?;
self.last_operation = operation;
match operation {
Operation::Normal => self.decode_normal(&mut frame)?,
Operation::Accelerate => self.decode_accelerate(&mut frame, false)?,
Operation::FastAccelerate => self.decode_accelerate(&mut frame, true)?,
Operation::PreemptiveExpand => self.decode_preemptive_expand(&mut frame)?,
Operation::TimeStretchBuffer => self.return_time_stretch_buffer(&mut frame)?,
Operation::Expand => self.decode_expand(&mut frame, ExpandPhase::Expand)?,
Operation::ExpandStart => self.decode_expand(&mut frame, ExpandPhase::ExpandStart)?,
Operation::ExpandEnd => self.decode_expand(&mut frame, ExpandPhase::ExpandEnd)?,
Operation::Merge => self.decode_merge(&mut frame)?,
Operation::ComfortNoise => self.generate_comfort_noise(&mut frame)?,
_ => {
frame.samples.fill(0.0);
frame.speech_type = SpeechType::Expand;
}
}
self.statistics.record_decode_operation(operation);
let post_buffer_ms = self.current_buffer_size_ms();
let post_packet_count = self.packet_buffer.len();
log::trace!(
"get_audio post-decision: operation={operation:?}, buffer_after={post_buffer_ms}ms, packets_after={post_packet_count}"
);
self.frame_timestamp = self
.frame_timestamp
.wrapping_add((frame.samples_per_channel * self.config.channels as usize) as u32);
self.statistics
.jitter_buffer_delay(frame.duration_ms() as u64, frame.samples_per_channel as u64);
Ok(frame)
}
fn maybe_roll_packet_rate(&mut self) {
let now = Instant::now();
if now.duration_since(self.last_packets_second_instant) >= Duration::from_secs(1) {
self.packets_per_sec_snapshot = self.packets_received_this_second;
self.packets_received_this_second = 0;
self.last_packets_second_instant = now;
}
}
pub fn get_statistics(&self) -> NetEqStats {
NetEqStats {
network: self.statistics.network_statistics().clone(),
lifetime: self.statistics.lifetime_statistics().clone(),
current_buffer_size_ms: self.current_buffer_size_ms(),
target_delay_ms: self.delay_manager.target_delay_ms(),
packets_awaiting_decode: self.packet_buffer.len(),
packets_per_sec: self.packets_per_sec_snapshot,
}
}
pub fn is_empty(&self) -> bool {
self.packet_buffer.is_empty()
}
pub fn target_delay_ms(&self) -> u32 {
self.delay_manager.target_delay_ms()
}
pub fn set_minimum_delay(&mut self, delay_ms: u32) -> u32 {
self.delay_manager.set_minimum_delay(delay_ms)
}
pub fn set_maximum_delay(&mut self, delay_ms: u32) -> u32 {
self.delay_manager.set_maximum_delay(delay_ms)
}
pub fn flush(&mut self) {
self.packet_buffer.flush(&mut self.statistics);
self.leftover_samples.clear();
self.reset();
}
fn reset(&mut self) {
self.delay_manager.reset();
self.buffer_level_filter.reset();
self.buffer_level_filter
.set_filtered_buffer_level(self.current_buffer_size_samples());
self.last_decode_timestamp = None;
self.consecutive_expands = 0;
self.packets_received_this_second = 0;
self.packets_per_sec_snapshot = 0;
self.last_packets_second_instant = Instant::now();
self.leftover_time_stretched_samples.clear();
self.timestretch_added_samples = 0;
}
fn get_decision(&mut self) -> Result<Operation> {
let current_buffer_samples = self.current_buffer_size_samples();
let target_delay_ms: u32 = self.delay_manager.target_delay_ms();
self.buffer_level_filter
.set_target_buffer_level(target_delay_ms);
self.buffer_level_filter
.update(current_buffer_samples, -self.timestretch_added_samples);
self.timestretch_added_samples = 0;
let samples_per_ms = self.config.sample_rate / 1000;
let target_level_samples = target_delay_ms * samples_per_ms;
let low_limit = std::cmp::max(
target_level_samples * 3 / 4,
target_level_samples
.saturating_sub(K_DECELERATION_TARGET_LEVEL_OFFSET_MS * samples_per_ms),
);
let high_limit = std::cmp::max(target_level_samples, low_limit + 20 * samples_per_ms);
if !self.leftover_time_stretched_samples.is_empty() {
return Ok(Operation::TimeStretchBuffer);
}
if self.consecutive_expands > 0 && current_buffer_samples < low_limit as usize {
self.consecutive_expands = self.consecutive_expands.saturating_add(1);
return Ok(Operation::Expand);
}
if current_buffer_samples < self.output_frame_size_samples * 3 / 2
&& current_buffer_samples >= self.output_frame_size_samples / 2
&& self.consecutive_expands == 0
{
self.consecutive_expands = self.consecutive_expands.saturating_add(1);
return Ok(Operation::ExpandStart);
}
if current_buffer_samples < self.output_frame_size_samples {
self.consecutive_expands = self.consecutive_expands.saturating_add(1);
return Ok(Operation::Expand);
}
if self.consecutive_expands > 0 {
if self.consecutive_expands > 600 {
self.reset();
}
self.consecutive_expands = 0;
return Ok(Operation::ExpandEnd);
}
let buffer_level_samples = self.buffer_level_filter.filtered_current_level();
if buffer_level_samples >= (high_limit << 2) as usize {
return Ok(Operation::FastAccelerate);
}
if buffer_level_samples >= high_limit as usize {
return Ok(Operation::Accelerate);
}
if buffer_level_samples < low_limit as usize
&& current_buffer_samples >= self.output_frame_size_samples * 3
{
return Ok(Operation::PreemptiveExpand);
}
Ok(Operation::Normal)
}
fn decode_normal(&mut self, frame: &mut AudioFrame) -> Result<()> {
log::trace!(
"decode_normal: entering with buffer={}ms, packets={}",
self.current_buffer_size_ms(),
self.packet_buffer.len()
);
let samples_needed = frame.samples.len();
let mut filled = 0;
if !self.leftover_samples.is_empty() {
let to_copy = samples_needed.min(self.leftover_samples.len());
frame.samples[..to_copy].copy_from_slice(&self.leftover_samples[..to_copy]);
self.leftover_samples.drain(..to_copy);
filled += to_copy;
log::trace!("decode_normal: consumed {to_copy} leftover samples");
}
while filled < samples_needed {
match self.packet_buffer.get_next_packet() {
Some(packet) => {
let packet_samples: Vec<f32> = if let Some(dec) =
self.decoders.get_mut(&packet.header.payload_type)
{
match dec.decode(&packet.payload) {
Ok(pcm) => pcm,
Err(e) => {
log::error!(
"decoder error for pt {}: {:?}",
packet.header.payload_type,
e
);
Vec::new()
}
}
} else {
let mut v = Vec::with_capacity(packet.payload.len() / 4);
for chunk in packet.payload.chunks_exact(4) {
v.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
}
v
};
let available = packet_samples.len();
let need_now = samples_needed - filled;
let to_copy = need_now.min(available);
frame.samples[filled..filled + to_copy]
.copy_from_slice(&packet_samples[..to_copy]);
filled += to_copy;
if available > to_copy {
self.leftover_samples
.extend_from_slice(&packet_samples[to_copy..]);
log::trace!(
"decode_normal: stored {} leftover samples",
available - to_copy
);
}
self.last_decode_timestamp = Some(packet.header.timestamp);
frame.speech_type = SpeechType::Normal;
frame.vad_activity = true;
}
None => {
frame.samples[filled..].fill(0.0);
frame.speech_type = SpeechType::Expand;
break;
}
}
}
log::trace!(
"decode_normal: exiting with buffer={}ms, packets={}",
self.current_buffer_size_ms(),
self.packet_buffer.len()
);
Ok(())
}
fn decode_accelerate(&mut self, frame: &mut AudioFrame, fast_mode: bool) -> Result<()> {
if !self.config.for_test_no_time_stretching {
let available_samples = self.current_buffer_size_samples();
let mut output_len: usize = 0;
let mut required_samples: usize = 0;
for i in (1..=3).rev() {
output_len = self.output_frame_size_samples * i;
if fast_mode {
required_samples = (output_len as f32 * 2.0).ceil() as usize
} else {
required_samples = (output_len as f32 * 1.5).ceil() as usize
}
if required_samples <= available_samples {
break;
}
}
let mut extended_frame = AudioFrame::new(
self.config.sample_rate,
self.config.channels,
required_samples,
);
self.decode_normal(&mut extended_frame)?;
let mut output =
AudioFrame::new(self.config.sample_rate, self.config.channels, output_len);
let _result =
self.accelerate
.process(&extended_frame.samples, &mut output.samples, fast_mode);
let used_input_samples = self.accelerate.get_used_input_samples();
if extended_frame.samples.len() > used_input_samples {
self.leftover_samples.splice(
0..0,
extended_frame.samples[used_input_samples..].iter().cloned(),
);
}
let frame_len = frame.samples.len();
frame.samples.clone_from_slice(&output.samples[..frame_len]);
self.leftover_time_stretched_samples
.extend_from_slice(&output.samples[frame_len..]);
let samples_removed = used_input_samples as i32 - output.samples.len() as i32;
self.timestretch_added_samples -= samples_removed / self.config.channels as i32;
self.statistics
.time_stretch_operation(TimeStretchOperation::Accelerate, samples_removed as u64);
frame.speech_type = SpeechType::Normal;
frame.vad_activity = extended_frame.vad_activity; } else {
self.decode_normal(frame)?;
}
Ok(())
}
fn decode_preemptive_expand(&mut self, frame: &mut AudioFrame) -> Result<()> {
if !self.config.for_test_no_time_stretching {
let mut extended_frame = AudioFrame::new(
self.config.sample_rate,
self.config.channels,
(frame.samples_per_channel as f32 * 3.0) as usize,
);
self.decode_normal(&mut extended_frame)?;
let mut output = AudioFrame::new(
self.config.sample_rate,
self.config.channels,
(frame.samples_per_channel as f32 * 3.0) as usize,
);
let _result =
self.preemptive_expand
.process(&extended_frame.samples, &mut output.samples, false);
let used_input_samples = self.preemptive_expand.get_used_input_samples();
if extended_frame.samples.len() > used_input_samples {
self.leftover_samples.splice(
0..0,
extended_frame.samples[used_input_samples..].iter().cloned(),
);
}
let frame_len = frame.samples.len();
frame.samples.clone_from_slice(&output.samples[..frame_len]);
self.leftover_time_stretched_samples
.extend_from_slice(&output.samples[frame_len..]);
let samples_added = output.samples.len() as i32 - used_input_samples as i32;
self.timestretch_added_samples += samples_added / self.config.channels as i32;
self.statistics.time_stretch_operation(
TimeStretchOperation::PreemptiveExpand,
samples_added as u64,
);
frame.speech_type = SpeechType::Normal;
frame.vad_activity = extended_frame.vad_activity; } else {
self.decode_normal(frame)?;
}
Ok(())
}
fn return_time_stretch_buffer(&mut self, frame: &mut AudioFrame) -> Result<()> {
if !self.leftover_time_stretched_samples.is_empty() {
let to_copy = frame.samples.len();
frame
.samples
.copy_from_slice(&self.leftover_time_stretched_samples[..to_copy]);
self.leftover_time_stretched_samples.drain(..to_copy);
frame.speech_type = SpeechType::Normal;
frame.vad_activity = true;
}
Ok(())
}
fn decode_expand(&mut self, frame: &mut AudioFrame, phase: ExpandPhase) -> Result<()> {
log::trace!(
"decode_expand: buffer before expand={}ms, packets={} (consecutive_expands={})",
self.current_buffer_size_ms(),
self.packet_buffer.len(),
self.consecutive_expands
);
let samples_required = self.expand.samples_required(phase);
let mut input = AudioFrame::new(
self.config.sample_rate,
self.config.channels,
samples_required,
);
self.decode_normal(&mut input)?;
self.expand
.process(&input.samples, &mut frame.samples, phase);
let used_input_samples = self.expand.get_used_input_samples();
if input.samples.len() > used_input_samples {
self.leftover_samples
.splice(0..0, input.samples[used_input_samples..].iter().cloned());
}
self.statistics
.concealment_event(frame.samples_per_channel as u64, true);
self.statistics.time_stretch_operation(
TimeStretchOperation::Expand,
frame.samples_per_channel as u64,
);
frame.speech_type = SpeechType::Expand;
frame.vad_activity = false;
log::trace!(
"decode_expand: buffer after expand={}ms, packets={} (consecutive_expands={})",
self.current_buffer_size_ms(),
self.packet_buffer.len(),
self.consecutive_expands
);
Ok(())
}
fn decode_merge(&mut self, frame: &mut AudioFrame) -> Result<()> {
self.decode_normal(frame)
}
fn generate_comfort_noise(&mut self, frame: &mut AudioFrame) -> Result<()> {
for sample in &mut frame.samples {
*sample = (simple_random() - 0.5) * 0.000005; }
frame.speech_type = SpeechType::Cng;
frame.vad_activity = false;
Ok(())
}
pub fn current_buffer_size_ms(&self) -> u32 {
self.current_buffer_size_samples() as u32 * 1000 / self.config.sample_rate
}
pub fn current_buffer_size_samples(&self) -> usize {
self.packet_buffer.num_samples_in_buffer()
+ self.leftover_samples.len()
+ self.leftover_time_stretched_samples.len()
}
pub fn register_decoder(
&mut self,
payload_type: u8,
decoder: Box<dyn crate::codec::AudioDecoder + Send>,
) {
self.decoders.insert(payload_type, decoder);
}
}
use std::sync::atomic::{AtomicU64, Ordering};
static RNG_STATE: AtomicU64 = AtomicU64::new(1);
fn simple_random() -> f32 {
let mut x = RNG_STATE.load(Ordering::Relaxed);
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
RNG_STATE.store(x, Ordering::Relaxed);
((x as u32) >> 16) as f32 / 65536.0
}
#[cfg(test)]
mod tests {
use std::{thread::sleep, time::Duration};
use super::*;
use crate::packet::RtpHeader;
fn create_test_packet(seq: u16, ts: u32, duration_ms: u32) -> AudioPacket {
let header = RtpHeader::new(seq, ts, 12345, 96, false);
let mut payload = Vec::new();
for i in 0..160 {
let sample = (i as f32 / 160.0 * 2.0 * std::f32::consts::PI * 440.0).sin() * 0.1;
payload.extend_from_slice(&sample.to_le_bytes());
}
AudioPacket::new(header, payload, 16000, 1, duration_ms)
}
#[test]
fn test_neteq_creation() {
let config = NetEqConfig::default();
let neteq = NetEq::new(config).unwrap();
assert!(neteq.is_empty());
assert_eq!(neteq.target_delay_ms(), 80); }
#[test]
fn test_packet_insertion() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let packet = create_test_packet(1, 0, 10);
neteq.insert_packet(packet).unwrap();
assert!(!neteq.is_empty());
}
#[test]
fn test_audio_generation() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
for i in 0..5 {
let packet = create_test_packet(i, i as u32 * 160, 10);
neteq.insert_packet(packet).unwrap();
}
let frame = neteq.get_audio().unwrap();
assert_eq!(frame.sample_rate, 16000);
assert_eq!(frame.channels, 1);
assert!(!frame.samples.is_empty());
}
#[test]
fn test_empty_buffer_expansion() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let frame = neteq.get_audio().unwrap();
assert_eq!(frame.speech_type, SpeechType::Expand);
assert!(!frame.vad_activity);
}
#[test]
fn test_packet_buffer_with_out_of_order_jitter() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let packets: Vec<AudioPacket> = (0u16..=4)
.map(|i| create_test_packet(i, i as u32 * 160, 10))
.collect();
let mut counter = 0;
for p in packets {
counter += 10;
neteq.insert_packet(p).unwrap();
assert_eq!(neteq.current_buffer_size_ms(), counter);
}
assert_eq!(neteq.current_buffer_size_ms(), 50);
let frame = neteq.get_audio().unwrap();
assert!(frame.vad_activity, "Frame should have VAD activity");
println!("before stats: {:?}\n", neteq.get_statistics());
let packet = create_test_packet(5, 1000, 10);
neteq.insert_packet(packet).unwrap();
println!("before jitter: {:?}\n", neteq.get_statistics());
let packet = create_test_packet(6, 10000, 10);
neteq.insert_packet(packet).unwrap();
println!("after jitter: {:?}", neteq.get_statistics());
}
#[test]
fn test_escalating_packet_delays() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let mut seq: u16 = 0;
let mut ts: u32 = 0;
let delays_ms = [0u64, 10, 30, 70, 120];
for delay in &delays_ms {
if *delay > 0 {
sleep(Duration::from_millis(*delay));
}
let packet = create_test_packet(seq, ts, 10);
neteq.insert_packet(packet).unwrap();
seq = seq.wrapping_add(1);
ts = ts.wrapping_add(160); }
let mut expand_frames = 0;
for _ in 0..(delays_ms.len() + 3) {
let frame = neteq.get_audio().unwrap();
if frame.speech_type == SpeechType::Expand {
expand_frames += 1;
}
}
assert!(expand_frames > 0);
}
#[test]
fn test_late_joining_peer_no_excessive_buffering() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let mut seq: u16 = 0;
let mut timestamp: u32 = 0;
const PACKET_DURATION_SAMPLES: u32 = 320;
for _ in 0..5 {
let packet = create_test_packet(seq, timestamp, 20);
neteq.insert_packet(packet).unwrap();
seq += 1;
timestamp += PACKET_DURATION_SAMPLES;
}
let initial_buffer_ms = neteq.current_buffer_size_ms();
assert!(
initial_buffer_ms <= 100,
"Initial buffer should be reasonable, got {initial_buffer_ms}ms"
);
let late_join_timestamp = timestamp + (10 * PACKET_DURATION_SAMPLES);
let mut late_timestamp = late_join_timestamp;
for _ in 0..8 {
let packet = create_test_packet(seq, late_timestamp, 20);
neteq.insert_packet(packet).unwrap();
seq += 1;
late_timestamp += PACKET_DURATION_SAMPLES;
}
let buffer_after_late_join = neteq.current_buffer_size_ms();
assert!(buffer_after_late_join <= 500,
"Buffer should not accumulate excessively after late peer join. Got {buffer_after_late_join}ms, expected ≤500ms");
assert!(buffer_after_late_join <= 600,
"REGRESSION DETECTION: Buffer accumulated {buffer_after_late_join}ms, this exceeds acceptable threshold and may indicate the old 60-packet bug has returned");
let mut total_operations = std::collections::HashMap::new();
for _ in 0..20 {
let pre_buffer = neteq.current_buffer_size_ms();
let frame = neteq.get_audio().unwrap();
let post_buffer = neteq.current_buffer_size_ms();
let operation = match frame.speech_type {
SpeechType::Normal => {
if pre_buffer > post_buffer + 25 {
"Accelerate"
} else {
"Normal"
}
}
SpeechType::Expand => "Expand",
_ => "Other",
};
*total_operations.entry(operation).or_insert(0) += 1;
assert!(
post_buffer <= 500,
"Buffer should never exceed 500ms during processing, got {post_buffer}ms"
);
}
let accelerate_count = total_operations.get("Accelerate").copied().unwrap_or(0);
assert!(accelerate_count > 0,
"NetEQ should have used acceleration to handle late-joining peer scenario. Operations: {total_operations:?}");
let final_buffer = neteq.current_buffer_size_ms();
assert!(
final_buffer <= 400,
"Final buffer should be reasonable, got {final_buffer}ms"
);
println!("✅ Late-joining peer test passed:");
println!(" Initial buffer: {initial_buffer_ms}ms");
println!(" Peak buffer: {buffer_after_late_join}ms");
println!(" Final buffer: {final_buffer}ms");
println!(" Operations used: {total_operations:?}");
}
#[test]
fn test_continuous_streaming_buffer_convergence() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let mut seq: u16 = 0;
let mut timestamp: u32 = 0;
const PACKET_DURATION_SAMPLES: u32 = 320;
for _ in 0..3 {
let packet = create_test_packet(seq, timestamp, 20);
neteq.insert_packet(packet).unwrap();
seq += 1;
timestamp += PACKET_DURATION_SAMPLES;
}
timestamp += 15 * PACKET_DURATION_SAMPLES;
let mut buffer_measurements = Vec::new();
let mut acceleration_count = 0;
for cycle in 0..40 {
if cycle % 3 == 0 {
for _ in 0..2 {
let packet = create_test_packet(seq, timestamp, 20);
neteq.insert_packet(packet).unwrap();
seq += 1;
timestamp += PACKET_DURATION_SAMPLES;
}
}
let pre_buffer = neteq.current_buffer_size_ms();
let frame = neteq.get_audio().unwrap();
let post_buffer = neteq.current_buffer_size_ms();
buffer_measurements.push(post_buffer);
if matches!(frame.speech_type, SpeechType::Normal) && pre_buffer > post_buffer + 25 {
acceleration_count += 1;
}
if cycle % 5 == 0 {
println!("Cycle {cycle}: Buffer {pre_buffer}ms -> {post_buffer}ms");
}
}
let initial_buffer = buffer_measurements[0];
let final_buffer = buffer_measurements[buffer_measurements.len() - 1];
let max_buffer = buffer_measurements.iter().max().copied().unwrap_or(0);
let steady_state_samples = &buffer_measurements[buffer_measurements.len() - 10..];
let steady_state_avg =
steady_state_samples.iter().sum::<u32>() / steady_state_samples.len() as u32;
println!("📊 Buffer Analysis:");
println!(" Initial: {initial_buffer}ms");
println!(" Peak: {max_buffer}ms");
println!(" Final: {final_buffer}ms");
println!(" Steady-state avg: {steady_state_avg}ms");
println!(" Acceleration operations: {acceleration_count}");
assert!(
acceleration_count <= 6,
"Expected minimal acceleration for small buffers, got {acceleration_count} (buffers were only 20-40ms above target)"
);
assert!(
steady_state_avg <= 300,
"Steady-state buffer should be small, got {steady_state_avg}ms (expected ≤300ms)"
);
assert!(
final_buffer <= max_buffer,
"Buffer should not exceed peak during steady-state, but final={final_buffer}ms > max={max_buffer}ms"
);
assert!(
max_buffer <= 600,
"REGRESSION: Max buffer {max_buffer}ms indicates old excessive buffering bug may have returned"
);
assert!(
steady_state_avg < max_buffer / 2,
"Steady-state ({steady_state_avg}ms) should be much smaller than peak ({max_buffer}ms)"
);
}
#[test]
fn test_buffer_duration_calculation_with_identical_timestamps() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let base_timestamp = 1000u32;
let packet_duration_ms = 20u32;
let num_packets = 30u32;
for i in 0..num_packets {
let packet = create_test_packet(i as u16, base_timestamp, packet_duration_ms);
neteq.insert_packet(packet).unwrap();
}
let span_duration = neteq.packet_buffer.get_span_duration_ms();
let content_duration = neteq.packet_buffer.get_total_content_duration_ms();
let buffer_size_ms = neteq.current_buffer_size_ms();
println!("Buffer calculation test:");
println!(" Packets: {}", neteq.packet_buffer.len());
println!(" Span duration: {span_duration}ms");
println!(" Content duration: {content_duration}ms");
println!(" Buffer size: {buffer_size_ms}ms");
assert_eq!(
span_duration, 0,
"Span duration should be 0ms for identical timestamps"
);
assert_eq!(
content_duration,
num_packets * packet_duration_ms,
"Content duration should be sum of all packet durations: {} packets × {}ms = {}ms",
num_packets,
packet_duration_ms,
num_packets * packet_duration_ms
);
assert_eq!(
buffer_size_ms, content_duration,
"NetEQ should use content duration, not span duration"
);
for _ in 0..5 {
let pre_buffer = neteq.current_buffer_size_ms();
let frame = neteq.get_audio().unwrap();
let post_buffer = neteq.current_buffer_size_ms();
assert!(
!frame.samples.is_empty(),
"Frame should contain audio samples"
);
println!(
" Buffer {pre_buffer}ms -> {post_buffer}ms, speech_type: {:?}",
frame.speech_type
);
}
println!("✅ Buffer duration calculation test passed - content duration used correctly");
}
macro_rules! make_insert_audio {
() => {{
let mut seq: u16 = 0;
let sample_rate = 16000;
move |neteq: &mut NetEq, duration_ms: u32| {
for _ in 0..(duration_ms / 10) {
let header = RtpHeader::new(seq, seq as u32 * 160, 12345, 96, false);
seq += 1;
let mut payload = Vec::new();
let sample_length = sample_rate / 100;
for i in 0..sample_length {
let sample = (i as f32 / 160.0 * 2.0 * std::f32::consts::PI).sin() * 0.1;
payload.extend_from_slice(&sample.to_le_bytes());
}
let packet = AudioPacket::new(header, payload, sample_rate, 1, 10);
neteq.insert_packet(packet).unwrap();
}
}
}};
}
macro_rules! make_reset_filtered_level {
() => {{
move |neteq: &mut NetEq| {
neteq
.buffer_level_filter
.set_filtered_buffer_level(neteq.current_buffer_size_samples());
}
}};
}
#[test]
fn test_get_decision_expand() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let mut insert_audio = make_insert_audio!();
let reset_filtered_level = make_reset_filtered_level!();
neteq.delay_manager.set_base_minimum_delay(50);
neteq.delay_manager.set_base_maximum_delay(50);
insert_audio(&mut neteq, 50);
reset_filtered_level(&mut neteq);
for _ in 0..4 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Normal);
}
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::ExpandStart);
for _ in 0..100 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Expand);
}
insert_audio(&mut neteq, 20);
for _ in 0..100 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Expand);
}
insert_audio(&mut neteq, 20);
reset_filtered_level(&mut neteq);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::ExpandEnd);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Normal);
}
#[test]
fn test_get_decision_accelerate() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let mut insert_audio = make_insert_audio!();
let reset_filtered_level = make_reset_filtered_level!();
neteq.delay_manager.set_base_minimum_delay(50);
neteq.delay_manager.set_base_maximum_delay(50);
insert_audio(&mut neteq, 50);
reset_filtered_level(&mut neteq);
for _ in 0..20 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Normal);
insert_audio(&mut neteq, 10);
}
insert_audio(&mut neteq, 10);
reset_filtered_level(&mut neteq);
for _ in 0..3 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Accelerate);
insert_audio(&mut neteq, 10);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::TimeStretchBuffer);
insert_audio(&mut neteq, 10);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::TimeStretchBuffer);
insert_audio(&mut neteq, 10);
}
insert_audio(&mut neteq, 300);
reset_filtered_level(&mut neteq);
for _ in 0..3 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::FastAccelerate);
insert_audio(&mut neteq, 10);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::TimeStretchBuffer);
insert_audio(&mut neteq, 10);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::TimeStretchBuffer);
insert_audio(&mut neteq, 10);
}
}
#[test]
fn test_get_decision_preemptive_expand() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let mut insert_audio = make_insert_audio!();
let reset_filtered_level = make_reset_filtered_level!();
neteq.delay_manager.set_base_minimum_delay(50);
neteq.delay_manager.set_base_maximum_delay(50);
insert_audio(&mut neteq, 50);
reset_filtered_level(&mut neteq);
for _ in 0..20 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Normal);
insert_audio(&mut neteq, 10);
}
neteq.delay_manager.set_base_minimum_delay(500);
neteq.delay_manager.set_base_maximum_delay(500);
for _ in 0..3 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::PreemptiveExpand);
insert_audio(&mut neteq, 10);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::TimeStretchBuffer);
insert_audio(&mut neteq, 10);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::TimeStretchBuffer);
insert_audio(&mut neteq, 10);
}
}
#[test]
fn test_expand_safety_valve_triggers_after_600_frames() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let mut insert_audio = make_insert_audio!();
let reset_filtered_level = make_reset_filtered_level!();
neteq.delay_manager.set_base_minimum_delay(50);
neteq.delay_manager.set_base_maximum_delay(50);
insert_audio(&mut neteq, 50);
reset_filtered_level(&mut neteq);
for _ in 0..4 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Normal);
}
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::ExpandStart);
for _ in 0..700 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Expand);
}
assert!(neteq.consecutive_expands > 600);
insert_audio(&mut neteq, 50);
reset_filtered_level(&mut neteq);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::ExpandEnd);
assert_eq!(neteq.consecutive_expands, 0);
}
#[test]
fn test_recovery_after_safety_valve_reset() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let mut insert_audio = make_insert_audio!();
let reset_filtered_level = make_reset_filtered_level!();
neteq.delay_manager.set_base_minimum_delay(50);
neteq.delay_manager.set_base_maximum_delay(50);
insert_audio(&mut neteq, 50);
reset_filtered_level(&mut neteq);
for _ in 0..4 {
let _ = neteq.get_audio().unwrap();
}
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::ExpandStart);
for _ in 0..650 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Expand);
}
insert_audio(&mut neteq, 50);
reset_filtered_level(&mut neteq);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::ExpandEnd);
let mut saw_normal = false;
for _ in 0..20 {
insert_audio(&mut neteq, 10);
let _ = neteq.get_audio().unwrap();
assert!(
neteq.last_operation != Operation::Expand
&& neteq.last_operation != Operation::ExpandStart,
"System re-entered expand after safety valve reset: {:?}",
neteq.last_operation
);
if neteq.last_operation == Operation::Normal {
saw_normal = true;
}
}
assert!(
saw_normal,
"System never reached Normal after safety valve reset"
);
}
#[test]
fn test_sudden_disconnect_skips_expand_start() {
let config = NetEqConfig::default();
let mut neteq = NetEq::new(config).unwrap();
let mut insert_audio = make_insert_audio!();
let reset_filtered_level = make_reset_filtered_level!();
neteq.delay_manager.set_base_minimum_delay(50);
neteq.delay_manager.set_base_maximum_delay(50);
insert_audio(&mut neteq, 50);
reset_filtered_level(&mut neteq);
for _ in 0..5 {
let _ = neteq.get_audio().unwrap();
}
let _ = neteq.get_audio().unwrap();
assert!(
neteq.last_operation == Operation::Expand
|| neteq.last_operation == Operation::ExpandStart,
"Expected Expand or ExpandStart after sudden disconnect, got {:?}",
neteq.last_operation
);
for _ in 0..5 {
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Expand);
}
insert_audio(&mut neteq, 50);
reset_filtered_level(&mut neteq);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::ExpandEnd);
let _ = neteq.get_audio().unwrap();
assert_eq!(neteq.last_operation, Operation::Normal);
}
}