#![allow(dead_code)]
#[derive(Debug, Clone)]
pub struct LookAhead {
pub lookahead_ms: f32,
pub sample_rate: f32,
delay_buffer: Vec<f32>,
write_pos: usize,
}
impl LookAhead {
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
#[must_use]
pub fn new(lookahead_ms: f32, sample_rate: f32) -> Self {
let delay_samples = ((lookahead_ms / 1000.0) * sample_rate) as usize;
let size = delay_samples.max(1);
Self {
lookahead_ms,
sample_rate,
delay_buffer: vec![0.0; size],
write_pos: 0,
}
}
#[must_use]
pub fn has_look_ahead(&self) -> bool {
self.lookahead_ms > 0.0
}
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
#[must_use]
pub fn delay_samples(&self) -> usize {
((self.lookahead_ms / 1000.0) * self.sample_rate) as usize
}
pub fn push(&mut self, input: f32) -> f32 {
let out = self.delay_buffer[self.write_pos];
self.delay_buffer[self.write_pos] = input;
self.write_pos = (self.write_pos + 1) % self.delay_buffer.len();
out
}
pub fn reset(&mut self) {
self.delay_buffer.fill(0.0);
self.write_pos = 0;
}
}
#[derive(Debug, Clone)]
pub struct CompressorParams {
pub threshold_db: f32,
pub ratio: f32,
pub attack_ms: f32,
pub release_ms: f32,
pub knee_db: f32,
pub makeup_db: f32,
}
impl Default for CompressorParams {
fn default() -> Self {
Self {
threshold_db: -18.0,
ratio: 4.0,
attack_ms: 5.0,
release_ms: 50.0,
knee_db: 3.0,
makeup_db: 0.0,
}
}
}
pub struct CompressorLook {
pub params: CompressorParams,
lookahead: LookAhead,
envelope_db: f32,
sample_rate: f32,
attack_coeff: f32,
release_coeff: f32,
}
impl CompressorLook {
#[allow(clippy::cast_precision_loss)]
#[must_use]
pub fn new(params: CompressorParams, lookahead_ms: f32, sample_rate: f32) -> Self {
let attack_coeff = Self::time_to_coeff(params.attack_ms, sample_rate);
let release_coeff = Self::time_to_coeff(params.release_ms, sample_rate);
Self {
params,
lookahead: LookAhead::new(lookahead_ms, sample_rate),
envelope_db: 0.0,
sample_rate,
attack_coeff,
release_coeff,
}
}
#[allow(clippy::cast_precision_loss)]
#[must_use]
fn time_to_coeff(time_ms: f32, sample_rate: f32) -> f32 {
if time_ms <= 0.0 {
return 0.0;
}
(-1.0 / (time_ms * 0.001 * sample_rate)).exp()
}
#[must_use]
fn lin_to_db(lin: f32) -> f32 {
if lin <= 1e-10 {
-200.0
} else {
20.0 * lin.log10()
}
}
#[must_use]
pub fn compute_gain_reduction(&mut self, input_sample: f32) -> f32 {
let level_db = Self::lin_to_db(input_sample.abs());
let threshold = self.params.threshold_db;
let ratio = self.params.ratio;
let knee = self.params.knee_db;
let target_gr = if level_db < threshold - knee / 2.0 {
0.0
} else if level_db > threshold + knee / 2.0 {
(threshold - level_db) * (1.0 - 1.0 / ratio)
} else {
let x = level_db - threshold + knee / 2.0;
(1.0 - 1.0 / ratio) * x * x / (2.0 * knee)
};
let coeff = if target_gr < self.envelope_db {
self.attack_coeff
} else {
self.release_coeff
};
self.envelope_db = coeff * self.envelope_db + (1.0 - coeff) * target_gr;
self.envelope_db
}
#[allow(clippy::cast_precision_loss)]
pub fn process(&mut self, input: f32) -> f32 {
let gain_db = self.compute_gain_reduction(input);
let delayed = self.lookahead.push(input);
let gain_lin = 10.0_f32.powf((gain_db + self.params.makeup_db) / 20.0);
delayed * gain_lin
}
#[must_use]
pub fn release_gain(&mut self) -> f32 {
self.envelope_db *= self.release_coeff;
self.envelope_db
}
#[must_use]
pub fn envelope_db(&self) -> f32 {
self.envelope_db
}
#[must_use]
pub fn has_look_ahead(&self) -> bool {
self.lookahead.has_look_ahead()
}
pub fn reset(&mut self) {
self.envelope_db = 0.0;
self.lookahead.reset();
}
}
#[derive(Debug, Clone, Default)]
pub struct CompressorStats {
sample_count: u64,
gain_reduction_sum_db: f64,
peak_gain_reduction_db: f32,
active_samples: u64,
}
impl CompressorStats {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn record(&mut self, gain_reduction_db: f32) {
self.sample_count += 1;
self.gain_reduction_sum_db += f64::from(gain_reduction_db);
if gain_reduction_db < self.peak_gain_reduction_db {
self.peak_gain_reduction_db = gain_reduction_db;
}
if gain_reduction_db < -0.1 {
self.active_samples += 1;
}
}
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
#[must_use]
pub fn avg_gain_reduction_db(&self) -> f32 {
if self.sample_count == 0 {
return 0.0;
}
(self.gain_reduction_sum_db / self.sample_count as f64) as f32
}
#[must_use]
pub fn peak_gain_reduction_db(&self) -> f32 {
self.peak_gain_reduction_db
}
#[allow(clippy::cast_precision_loss)]
#[must_use]
pub fn activity_ratio(&self) -> f32 {
if self.sample_count == 0 {
return 0.0;
}
self.active_samples as f32 / self.sample_count as f32
}
#[must_use]
pub fn sample_count(&self) -> u64 {
self.sample_count
}
pub fn reset(&mut self) {
*self = Self::default();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lookahead_has_look_ahead_true() {
let la = LookAhead::new(5.0, 48000.0);
assert!(la.has_look_ahead());
}
#[test]
fn test_lookahead_has_look_ahead_false() {
let la = LookAhead::new(0.0, 48000.0);
assert!(!la.has_look_ahead());
}
#[test]
fn test_lookahead_delay_samples() {
let la = LookAhead::new(1.0, 48000.0);
assert_eq!(la.delay_samples(), 48);
}
#[test]
fn test_lookahead_push_returns_delayed() {
let mut la = LookAhead::new(0.0, 48000.0);
la.push(0.5); let out = la.push(0.9); assert!((out - 0.5).abs() < 1e-6, "got {out}");
}
#[test]
fn test_lookahead_reset() {
let mut la = LookAhead::new(1.0, 48000.0);
for _ in 0..10 {
la.push(1.0);
}
la.reset();
let out = la.push(0.0);
assert_eq!(out, 0.0);
}
#[test]
fn test_compressor_params_default() {
let p = CompressorParams::default();
assert!(p.threshold_db < 0.0);
assert!(p.ratio > 1.0);
}
#[test]
fn test_compressor_look_no_gain_reduction_below_threshold() {
let params = CompressorParams {
threshold_db: 0.0,
ratio: 4.0,
..Default::default()
};
let mut comp = CompressorLook::new(params, 0.0, 48000.0);
let gr = comp.compute_gain_reduction(0.001);
assert!(gr <= 0.0);
assert!(gr > -1.0, "expected near 0 GR, got {gr}");
}
#[test]
fn test_compressor_look_gain_reduction_above_threshold() {
let params = CompressorParams {
threshold_db: -20.0,
ratio: 10.0,
attack_ms: 0.0,
..Default::default()
};
let mut comp = CompressorLook::new(params, 0.0, 48000.0);
for _ in 0..200 {
let _ = comp.compute_gain_reduction(1.0);
}
let gr = comp.envelope_db();
assert!(gr < -5.0, "expected significant GR, got {gr}");
}
#[test]
fn test_compressor_look_has_look_ahead() {
let comp = CompressorLook::new(Default::default(), 5.0, 48000.0);
assert!(comp.has_look_ahead());
}
#[test]
fn test_compressor_look_reset() {
let mut comp = CompressorLook::new(Default::default(), 1.0, 48000.0);
for _ in 0..100 {
comp.process(0.9);
}
comp.reset();
assert_eq!(comp.envelope_db(), 0.0);
}
#[test]
fn test_release_gain_decays_toward_zero() {
let params = CompressorParams {
threshold_db: -20.0,
ratio: 10.0,
..Default::default()
};
let mut comp = CompressorLook::new(params, 0.0, 48000.0);
for _ in 0..200 {
let _ = comp.compute_gain_reduction(1.0);
}
let initial_gr = comp.envelope_db();
for _ in 0..100 {
let _ = comp.release_gain();
}
let after_gr = comp.envelope_db();
assert!(
after_gr > initial_gr,
"envelope should recover toward 0: initial={initial_gr}, after={after_gr}"
);
}
#[test]
fn test_stats_record_and_avg() {
let mut stats = CompressorStats::new();
stats.record(-3.0);
stats.record(-6.0);
let avg = stats.avg_gain_reduction_db();
assert!((avg - (-4.5)).abs() < 0.01, "got avg={avg}");
}
#[test]
fn test_stats_peak_gain_reduction() {
let mut stats = CompressorStats::new();
stats.record(-2.0);
stats.record(-8.0);
stats.record(-1.0);
assert!((stats.peak_gain_reduction_db() - (-8.0)).abs() < 0.01);
}
#[test]
fn test_stats_activity_ratio() {
let mut stats = CompressorStats::new();
stats.record(0.0); stats.record(-3.0); stats.record(-5.0); let ratio = stats.activity_ratio();
assert!((ratio - (2.0 / 3.0)).abs() < 0.01, "got {ratio}");
}
#[test]
fn test_stats_reset() {
let mut stats = CompressorStats::new();
stats.record(-6.0);
stats.reset();
assert_eq!(stats.sample_count(), 0);
assert_eq!(stats.avg_gain_reduction_db(), 0.0);
}
}