use std::time::Duration;
use std::num::NonZeroU32;
use audio_samples::{
AudioSamples, OpusBandwidth, OpusCodec, OpusConfig, OpusMode, OpusStereoCodec,
codecs::{decode, encode},
detect_opus_mode, sample_rate,
utils::comparison::snr,
utils::generation::{sine_wave, white_noise},
};
use non_empty_slice::NonEmptyVec;
use spectrograms::WindowType;
const SIGNAL_DURATION_MS: u64 = 200;
fn silk_codec(bit_budget: u32) -> OpusCodec {
let config = OpusConfig::with_mode(OpusMode::Silk, bit_budget);
OpusCodec::new(config, WindowType::Hanning)
}
fn celt_codec(bit_budget: u32) -> OpusCodec {
let config = OpusConfig::with_mode(OpusMode::Celt, bit_budget);
OpusCodec::new(config, WindowType::Hanning)
}
fn auto_codec(bit_budget: u32) -> OpusCodec {
let config = OpusConfig::new(bit_budget);
OpusCodec::new(config, WindowType::Hanning)
}
fn signal_sine(freq_hz: f64) -> audio_samples::AudioSamples<'static, f32> {
sine_wave::<f32>(
freq_hz,
Duration::from_millis(SIGNAL_DURATION_MS),
sample_rate!(44100),
0.5,
)
}
fn signal_white_noise() -> audio_samples::AudioSamples<'static, f32> {
white_noise::<f32>(
Duration::from_millis(SIGNAL_DURATION_MS),
sample_rate!(44100),
0.5,
Some(42),
)
}
fn round_trip_snr(signal: &audio_samples::AudioSamples<'static, f32>, codec: OpusCodec) -> f64 {
let encoded = encode(signal, codec).expect("encode failed");
let reconstructed = decode::<OpusCodec, f32>(encoded).expect("decode failed");
let orig = signal.as_slice().expect("original contiguous");
let recon = reconstructed.as_slice().expect("reconstructed contiguous");
let len = orig.len().min(recon.len());
let error: Vec<f32> = orig[..len]
.iter()
.zip(recon[..len].iter())
.map(|(a, b)| a - b)
.collect();
debug_assert!(!error.is_empty(), "error vector must be non-empty");
let nev = unsafe { NonEmptyVec::new_unchecked(error) };
let error_sig =
audio_samples::AudioSamples::<'static, f32>::from_mono_vec(nev, signal.sample_rate());
let orig_trimmed = if len == orig.len() {
signal.clone()
} else {
let trimmed: Vec<f32> = orig[..len].to_vec();
debug_assert!(!trimmed.is_empty(), "trimmed vector must be non-empty");
let nev2 = unsafe { NonEmptyVec::new_unchecked(trimmed) };
audio_samples::AudioSamples::<'static, f32>::from_mono_vec(nev2, signal.sample_rate())
};
snr(&orig_trimmed, &error_sig).expect("snr failed")
}
#[test]
fn silk_sine_440() {
let snr_db = round_trip_snr(&signal_sine(440.0), silk_codec(64_000));
eprintln!("SILK sine_440 @ 64k: {snr_db:.2} dB");
assert!(
snr_db > 20.0,
"SILK SNR {snr_db:.2} dB below floor of 20 dB"
);
}
#[test]
fn silk_sine_1k() {
let snr_db = round_trip_snr(&signal_sine(1000.0), silk_codec(64_000));
eprintln!("SILK sine_1k @ 64k: {snr_db:.2} dB");
assert!(
snr_db > 20.0,
"SILK SNR {snr_db:.2} dB below floor of 20 dB"
);
}
#[test]
fn silk_white_noise() {
let snr_db = round_trip_snr(&signal_white_noise(), silk_codec(64_000));
eprintln!("SILK white_noise @ 64k: {snr_db:.2} dB");
assert!(
snr_db > 20.0,
"SILK SNR {snr_db:.2} dB below floor of 20 dB"
);
}
#[test]
fn celt_sine_440_64k() {
let snr_db = round_trip_snr(&signal_sine(440.0), celt_codec(64_000));
eprintln!("CELT sine_440 @ 64k: {snr_db:.2} dB");
assert!(snr_db > 1.0, "CELT SNR {snr_db:.2} dB below floor of 1 dB");
}
#[test]
fn celt_sine_440_128k() {
let snr_db = round_trip_snr(&signal_sine(440.0), celt_codec(128_000));
eprintln!("CELT sine_440 @ 128k: {snr_db:.2} dB");
assert!(snr_db > 1.0, "CELT SNR {snr_db:.2} dB below floor of 1 dB");
}
#[test]
fn celt_sine_1k_64k() {
let snr_db = round_trip_snr(&signal_sine(1000.0), celt_codec(64_000));
eprintln!("CELT sine_1k @ 64k: {snr_db:.2} dB");
assert!(snr_db > 1.0, "CELT SNR {snr_db:.2} dB below floor of 1 dB");
}
#[test]
fn celt_white_noise_64k() {
let snr_db = round_trip_snr(&signal_white_noise(), celt_codec(64_000));
eprintln!("CELT white_noise @ 64k: {snr_db:.2} dB");
assert!(
snr_db > -1.0,
"CELT SNR {snr_db:.2} dB below floor of -1 dB"
);
}
#[test]
fn auto_mode_sine_440() {
let snr_db = round_trip_snr(&signal_sine(440.0), auto_codec(128_000));
eprintln!("Auto sine_440 @ 128k: {snr_db:.2} dB");
assert!(
snr_db > 1.0,
"auto-mode SNR {snr_db:.2} dB below floor of 1 dB"
);
}
#[test]
fn auto_mode_white_noise() {
let snr_db = round_trip_snr(&signal_white_noise(), auto_codec(128_000));
eprintln!("Auto white_noise @ 128k: {snr_db:.2} dB");
assert!(
snr_db > -1.0,
"auto-mode SNR {snr_db:.2} dB below floor of -1 dB"
);
}
#[test]
fn celt_band_layout_sanity() {
use audio_samples::BandLayout;
use std::num::NonZeroUsize;
let n_bins = NonZeroUsize::new(441).unwrap();
let layout = BandLayout::celt(44100.0, n_bins);
assert!(layout.len().get() >= 1);
assert!(layout.len().get() <= 21);
for band in layout.as_slice().iter() {
assert!(
band.end_bin > band.start_bin,
"band [{}, {}) is empty",
band.start_bin,
band.end_bin,
);
assert!(band.end_bin <= n_bins.get(), "band end_bin out of range");
}
}
#[test]
fn celt_band_layout_low_sample_rate() {
use audio_samples::BandLayout;
use std::num::NonZeroUsize;
let n_bins = NonZeroUsize::new(80).unwrap();
let layout = BandLayout::celt(8000.0, n_bins);
assert!(layout.len().get() >= 1, "must have at least one band");
for band in layout.as_slice().iter() {
assert!(band.end_bin > band.start_bin);
assert!(band.end_bin <= n_bins.get());
}
}
#[test]
fn silk_lpc_round_trip() {
use audio_samples::{silk_decode_frame, silk_encode_frame};
let samples: Vec<f32> = (0..128)
.map(|i| (2.0 * std::f32::consts::PI * 440.0 * i as f32 / 44100.0).sin() * 0.5)
.collect();
let frame = silk_encode_frame(&samples).expect("encode");
let recovered = silk_decode_frame(&frame);
assert_eq!(recovered.len(), samples.len());
let signal_power: f32 = samples.iter().map(|&x| x * x).sum::<f32>() / samples.len() as f32;
let error_power: f32 = samples
.iter()
.zip(recovered.iter())
.map(|(&a, &b)| (a - b).powi(2))
.sum::<f32>()
/ samples.len() as f32;
let snr_db = 10.0 * (signal_power / error_power.max(1e-15)).log10();
assert!(
snr_db > 30.0,
"SILK LPC round-trip SNR {snr_db:.2} dB below floor of 30 dB"
);
}
#[test]
fn detect_mode_fullband_forces_celt() {
let signal = signal_sine(440.0);
let samples = signal.as_slice().expect("mono contiguous");
let mode = detect_opus_mode(samples, 44100, OpusBandwidth::FullBand);
assert_eq!(
mode,
OpusMode::Celt,
"FullBand must always select CELT (SILK not valid above 8 kHz)"
);
}
#[test]
fn detect_mode_selects_celt_for_sine() {
let signal = signal_sine(440.0);
let samples = signal.as_slice().expect("mono contiguous");
let mode = detect_opus_mode(samples, 44100, OpusBandwidth::WideBand);
assert_eq!(
mode,
OpusMode::Celt,
"sustained 440 Hz sine (flat time-energy) should map to CELT"
);
}
#[test]
fn detect_mode_selects_silk_for_burst() {
let n = 882usize;
let mut burst = vec![0.0f32; n];
let burst_len = n / 8;
for i in 0..burst_len {
burst[i] = (2.0 * std::f32::consts::PI * 440.0 * i as f32 / 44100.0).sin() * 0.5;
}
let mode = detect_opus_mode(&burst, 44100, OpusBandwidth::WideBand);
assert_eq!(
mode,
OpusMode::Silk,
"burst-then-silence (energy in one sub-window) should map to SILK"
);
}
#[test]
fn stereo_silk_round_trip() {
let left = signal_sine(440.0);
let right = signal_sine(880.0);
let n = left.samples_per_channel().get();
let left_s = left.as_slice().expect("mono contiguous");
let right_s = right.as_slice().expect("mono contiguous");
let interleaved: Vec<f32> = left_s
.iter()
.zip(right_s)
.flat_map(|(&l, &r)| [l, r])
.collect();
let ne = NonEmptyVec::new(interleaved).expect("non-empty");
let stereo: AudioSamples<'static, f32> = AudioSamples::from_interleaved_vec(
ne,
NonZeroU32::new(2).expect("2 > 0"),
sample_rate!(44100),
)
.expect("valid stereo");
let codec = OpusStereoCodec::new(
OpusConfig::with_mode(OpusMode::Silk, 64_000),
OpusConfig::with_mode(OpusMode::Silk, 32_000),
spectrograms::WindowType::Hanning,
);
let encoded = encode(&stereo, codec).expect("stereo encode");
let recovered: AudioSamples<'static, f32> =
decode::<OpusStereoCodec, f32>(encoded).expect("stereo decode");
assert_eq!(recovered.num_channels().get(), 2, "decoded must be stereo");
assert_eq!(
recovered.samples_per_channel().get(),
n,
"length must be preserved"
);
let mut recon_channels = recovered.channels();
let recon_left = recon_channels.next().expect("left channel");
let recon_left_s = recon_left.as_slice().expect("channel contiguous");
let len = left_s.len().min(recon_left_s.len());
let signal_power: f64 = left_s[..len]
.iter()
.map(|&x| (x as f64).powi(2))
.sum::<f64>()
/ len as f64;
let error_power: f64 = left_s[..len]
.iter()
.zip(recon_left_s[..len].iter())
.map(|(a, b)| ((a - b) as f64).powi(2))
.sum::<f64>()
/ len as f64;
let snr_db = 10.0 * (signal_power / error_power.max(1e-15)).log10();
eprintln!("Stereo SILK left-channel SNR: {snr_db:.2} dB");
assert!(
snr_db > 20.0,
"Stereo SILK left-channel SNR {snr_db:.2} dB below floor of 20 dB"
);
}
fn snr_db(original: &[f32], recovered: &[f32]) -> f64 {
let n = original.len().min(recovered.len());
let signal: f64 = original[..n]
.iter()
.map(|&x| (x as f64).powi(2))
.sum::<f64>()
/ n as f64;
let error: f64 = original[..n]
.iter()
.zip(recovered[..n].iter())
.map(|(a, b)| ((*a - *b) as f64).powi(2))
.sum::<f64>()
/ n as f64;
10.0 * (signal / error.max(1e-15)).log10()
}
#[test]
fn stateful_silk_cross_frame() {
use audio_samples::{SilkState, silk_decode_frame_stateful, silk_encode_frame_stateful};
let sample_rate = 44100_u32;
let frame_len = 882; let all_samples: Vec<f32> = (0..frame_len * 3)
.map(|i| (2.0 * std::f32::consts::PI * 440.0 * i as f32 / sample_rate as f32).sin() * 0.5)
.collect();
let mut enc = SilkState::default();
let f0 = silk_encode_frame_stateful(&all_samples[..frame_len], sample_rate, &mut enc).unwrap();
let f1 = silk_encode_frame_stateful(
&all_samples[frame_len..frame_len * 2],
sample_rate,
&mut enc,
)
.unwrap();
let f2 =
silk_encode_frame_stateful(&all_samples[frame_len * 2..], sample_rate, &mut enc).unwrap();
let mut dec = SilkState::default();
let mut recovered = Vec::new();
recovered.extend(silk_decode_frame_stateful(&f0, &mut dec));
recovered.extend(silk_decode_frame_stateful(&f1, &mut dec));
recovered.extend(silk_decode_frame_stateful(&f2, &mut dec));
assert_eq!(recovered.len(), all_samples.len());
let db = snr_db(&all_samples, &recovered);
eprintln!("Stateful SILK 3-frame SNR: {db:.1} dB");
assert!(db > 20.0, "stateful SILK SNR {db:.1} dB too low");
}
#[test]
fn silk_ltp_detects_pitch() {
use audio_samples::estimate_pitch;
let sample_rate = 44100_u32;
let samples: Vec<f32> = (0..882)
.map(|i| (2.0 * std::f32::consts::PI * 220.0 * i as f32 / sample_rate as f32).sin())
.collect();
let result = estimate_pitch(&samples, sample_rate);
assert!(result.is_some(), "pitch not detected for 220 Hz sine");
let (lag, gain) = result.unwrap();
let expected = (sample_rate as f32 / 220.0).round() as usize;
assert!(
lag.abs_diff(expected) <= 3,
"lag {lag} expected ~{expected}"
);
assert!(gain > 0.5, "LTP gain {gain:.2} too low for pure sine");
}
#[test]
fn hybrid_sine_round_trip() {
use audio_samples::codecs::{decode, encode};
use audio_samples::{AudioSamples, OpusCodec, OpusConfig, OpusMode, sample_rate};
use spectrograms::WindowType;
let sr = sample_rate!(44100);
let samples: Vec<f32> = (0..882 * 2)
.map(|i| (2.0 * std::f32::consts::PI * 440.0 * i as f32 / 44100.0).sin() * 0.5)
.collect();
let ne = non_empty_slice::NonEmptyVec::new(samples.clone()).unwrap();
let audio = AudioSamples::<'static, f32>::from_mono_vec(ne, sr);
let config = OpusConfig::with_mode(OpusMode::Hybrid, 128_000);
let codec = OpusCodec::new(config, WindowType::Hanning);
let encoded = encode(&audio, codec).expect("hybrid encode");
assert!(
encoded
.frames
.as_non_empty_slice()
.iter()
.all(|f| f.mode == OpusMode::Hybrid),
"not all frames encoded as Hybrid"
);
let recovered: AudioSamples<'static, f32> =
decode::<OpusCodec, f32>(encoded).expect("hybrid decode");
let orig_ch = audio.as_slice().expect("mono contiguous");
let rec_ch = recovered.as_slice().expect("mono contiguous");
let db = snr_db(orig_ch, rec_ch);
eprintln!("Hybrid sine round-trip SNR: {db:.1} dB");
assert!(db > 15.0, "hybrid SNR {db:.1} dB too low");
}
#[test]
fn hybrid_white_noise_round_trip() {
use audio_samples::codecs::{decode, encode};
use audio_samples::{AudioSamples, OpusCodec, OpusConfig, OpusMode, sample_rate, white_noise};
use spectrograms::WindowType;
let sr = sample_rate!(44100);
let noise = white_noise::<f32>(std::time::Duration::from_millis(40), sr, 0.5, None);
let config = OpusConfig::with_mode(OpusMode::Hybrid, 128_000);
let codec = OpusCodec::new(config, WindowType::Hanning);
let encoded = encode(&noise, codec).expect("hybrid noise encode");
let recovered: AudioSamples<'static, f32> =
decode::<OpusCodec, f32>(encoded).expect("hybrid noise decode");
let orig = noise.as_slice().expect("mono contiguous");
let rec = recovered.as_slice().expect("mono contiguous");
let db = snr_db(orig, rec);
eprintln!("Hybrid white-noise round-trip SNR: {db:.1} dB");
assert!(db > 5.0, "hybrid noise SNR {db:.1} dB too low");
}