lasprs 0.6.7

Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)
Documentation
//! (Colored) noise Source implementations. Uses bilinear transform for a fixe and
//! pre-calculated set of analogue poles and zeros.
//!
use super::source::SourceImpl;
use crate::{config::*, PoleOrZero, SeriesBiquad, TransferFunction, ZPKModel};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use rand_distr::StandardNormal;

/// The number of poles in the filter that turns white noise into pink noise.
const PINKNOISE_ANALOG_ORDER: usize = 10;

/// White noise source. Can be colored by applying a color filter to the source
#[derive(Clone, Debug)]
pub struct WhiteNoise {
    // SmallRng is a cheap random number generator
    rng: SmallRng,
    // Interruption state (whether to output, number of samples, number of samples after which a switch need to be performed)
    interrupt_state: Option<InterruptState>,
}
impl WhiteNoise {
    pub fn new(fs: Flt, interrupt_period: Option<Flt>) -> Self {
        let interrupt_state = if let Some(period) = interrupt_period {
            if period > 0. {
                Some(InterruptState {
                    period,
                    cur_idx: 0,
                    max_idx: (period * fs) as usize,
                    silence: false,
                })
            } else {
                None
            }
        } else {
            None
        };
        WhiteNoise {
            rng: SmallRng::from_entropy(),
            interrupt_state,
        }
    }
}
impl SourceImpl for WhiteNoise {
    fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
        let mut output = true;

        // Look at whether we should do interruption of the played noise.
        if let Some(InterruptState {
            cur_idx,
            max_idx,
            silence,
            ..
        }) = &mut self.interrupt_state
        {
            if cur_idx > max_idx {
                // Swap flag
                *cur_idx = 0;
                *silence = !*silence;
            }
            output = !*silence;
            *cur_idx += sig.len();
        }
        // If output is true, send new random noise. Otherwise, just silence
        if output {
            sig.for_each(|s| {
                *s = self.rng.sample(StandardNormal);
            });
        } else {
            sig.for_each(|s| {
                *s = 0.;
            });
        }
    }

    fn reset(&mut self, fs: Flt) {
        if let Some(state) = &mut self.interrupt_state {
            // Restore to first start with output
            state.silence = false;
            state.cur_idx = 0;
            state.max_idx = (state.period * fs) as usize;
            // state.period = untouched
        }
    }
    fn clone_dyn(&self) -> Box<dyn SourceImpl> {
        Box::new(self.clone())
    }
}

#[derive(Debug, Clone)]
pub struct ColoredNoise {
    // White noise generator
    wn: WhiteNoise,
    // Temporary storage for the generated signal. Needs to be able to slice,
    // which is not guaranteed by the input iterator.
    tmp: Vec<Flt>,
    // Analog filter used to generate the digital filter below
    analogue_blueprint: ZPKModel,
    // The digital filter that colors the white noise
    filter: SeriesBiquad,
}
impl ColoredNoise {
    /// Generate a colored noise signal source that outputs pink noise (-3 dB /
    /// octave ) from 20 Hz to 20 kHz.
    pub fn newPinkNoise(fs: Flt, interrupt_period: Option<Flt>) -> Self {
        let twopi = 2. * pi;
        let fl = 10.;
        let fu = 20e3;
        // Exponentially spread poles
        let fpoles_real: Vec<Flt> = (0..PINKNOISE_ANALOG_ORDER)
            .map(|i| fl * (fu / fl).powf((i as Flt) / (PINKNOISE_ANALOG_ORDER as Flt - 1.)))
            .collect();

        // Zeros as geometric means inbetween
        let fzeros_real: Vec<Flt> = (0..PINKNOISE_ANALOG_ORDER - 1)
            .map(|i| (fpoles_real[i] * fpoles_real[i + 1]).sqrt())
            .collect();

        let poles: Vec<PoleOrZero> = fpoles_real
            .into_iter()
            .map(|f| PoleOrZero::Real1(-twopi * f))
            .collect();
        let zeros: Vec<PoleOrZero> = fzeros_real
            .into_iter()
            .map(|f| PoleOrZero::Real1(-twopi * f))
            .collect();
        let gain = 1.;

        let analogue_blueprint = ZPKModel::new(zeros, poles, gain);
        let filter = analogue_blueprint.bilinear(480000.);
        Self {
            wn: WhiteNoise::new(fs, interrupt_period),
            tmp: vec![],
            analogue_blueprint,
            filter,
        }
    }
}

impl SourceImpl for ColoredNoise {
    fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
        // Make temporary array of same size as output iterator
        self.tmp.resize(sig.len(), 0.);

        // Generate white noise signal in place
        self.wn.genSignal_unscaled(&mut self.tmp.iter_mut());

        // Then, filter it with noise shaping filter
        self.filter.filter_inout(&mut self.tmp);

        // And copy over to output iterator
        sig.zip(&self.tmp).for_each(|(sig, src)| {
            *sig = *src;
        });
    }
    fn reset(&mut self, fs: Flt) {
        // Pre-computed analog poles
        self.filter = self.analogue_blueprint.bilinear(fs);
        let fnyq = fs / 2.;

        // We set the gain such that the signal power is the same as that for
        // the white noise that enters it. We do this by calculating the sum of
        // the power in each bin up to the Nyquist frequency, and calculating a
        // gain that is the inverse of this value.

        // Choose a number of points. As long as its fine enough over the whole
        // range (no peaks / dips are missing) At 48 kHz this means we sample
        // the tf with a resolution of 10 Hz. Fine enough for slowly varying
        // filter gains.
        let N = 2400;
        let freq = Array1::linspace(20., fnyq, 2000);
        let tf_power_sum = self
            .filter
            .tf(fs, freq.view())
            .mapv(|t| t.abs().powi(2))
            .sum();
        // The above is sum of squares. The white noise signal is unit
        // variance, meaning also unit linear level. So to compute gain, we
        // have to take the square root of this ratio.
        let gain = ((N as Flt) / tf_power_sum).sqrt();
        // dbg!(gain);
        self.filter.setGain(gain);
    }
    fn clone_dyn(&self) -> Box<dyn SourceImpl> {
        Box::new(self.clone())
    }
}

#[derive(Clone, Copy, Debug)]
struct InterruptState {
    period: Flt,
    cur_idx: usize,
    max_idx: usize,
    silence: bool,
}