lasprs 0.4.1

Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)
Documentation
use super::*;
use crate::config::*;
use anyhow::{bail, Result};
use num::Complex;

#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Clone, Copy, Debug)]
/// # A biquad is a second order recursive filter structure.
///
/// This implementation only allows for normalized coefficients (a_0 = 1). It
///  performs the following relation of output to input:
///
///  ```math
///  y[n] = - a_1 * y[n-1] - a_2 * y[n-2]
///       + b_0 * x[n] + b_1 * x[n-1] + b_2 * x[n-2]
///  ```
///
///  The coefficients can be generated for typical standard type of biquad
///  filters, such as low pass, high pass, bandpass (first order), low shelf,
///  high shelf, peaking and notch filters.
///
///  The transfer function is:
///
/// ```math
///          b_0 + b_1 z^-1 + b_2 * z^-2
///  H[z] = -----------------------------
///          1   + a_1 z^-1 + a_2 * z^-2
///  ```
///
///  And the frequency response can be found by filling in in above equation z =
///  exp(i*omega/fs), where fs is the sampling frequency and omega is the radian
///  frequency at which the transfer function is evaluated.
///
pub struct Biquad {
    // State parameters
    w1: Flt,
    w2: Flt,
    // Filter coefficients - forward
    b0: Flt,
    b1: Flt,
    b2: Flt,
    // Filter coefficients - recursive
    // a0: Flt, // a0 is assumed one, not used
    a1: Flt,
    a2: Flt,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl Biquad {
    #[new]
    /// Create new biquad filter. See [Biquad::new()]
    ///
    pub fn new_py<'py>(coefs: PyArrayLike1<Flt>) -> PyResult<Self> {
        Ok(Biquad::new(coefs.as_slice()?)?)
    }
    #[pyo3(name = "unit")]
    #[staticmethod]
    /// See: [Biquad::unit()]
    pub fn unit_py() -> Biquad {
        Biquad::unit()
    }
    #[pyo3(name = "firstOrderHighPass")]
    #[staticmethod]
    /// See: [Biquad::firstOrderHighPass()]
    pub fn firstOrderHighPass_py(fs: Flt, fc: Flt) -> PyResult<Biquad> {
        Ok(Biquad::firstOrderHighPass(fs, fc)?)
    }

    /// See: [Biquad::firstOrderLowPass()]
    #[pyo3(name = "firstOrderLowPass")]
    #[staticmethod]
    pub fn firstOrderLowPass_py(fs: Flt, fc: Flt) -> PyResult<Biquad> {
        Ok(Biquad::firstOrderLowPass(fs, fc)?)
    }

    /// See: [Biquad::tf()]
    #[pyo3(name = "tf")]
    pub fn tf_py<'py>(
        &self,
        py: Python<'py>,
        fs: Flt,
        freq: PyArrayLike1<Flt>,
    ) -> PyResult<PyArr1Cflt<'py>> {
        let freq = freq.as_array();
        let res = PyArray1::from_array_bound(py, &self.tf(fs, freq));
        Ok(res)
    }

    /// See: [Biquad::filter()]
    #[pyo3(name = "filter")]
    pub fn filter_py<'py>(
        &mut self,
        py: Python<'py>,
        input: PyArrayLike1<Flt>,
    ) -> PyResult<PyArr1Flt<'py>> {
        Ok(PyArray1::from_vec_bound(py, self.filter(input.as_slice()?)))
    }
}
impl Biquad {
    /// Create new biquad filter from given filter coeficients
    ///
    /// # Args
    ///
    /// - coefs: Filter coefficients.
    ///
    pub fn new(coefs: &[Flt]) -> Result<Self> {
        match coefs {
            [b0, b1, b2, a0, a1, a2] => {
                if *a0 != 1.0  {
                    bail!("Coefficient a0 should be equal to 1.0")
                }
                Ok(Biquad { w1: 0., w2: 0., b0: *b0, b1: *b1, b2: *b2, a1: *a1, a2: *a2})
            },
            _ => bail!("Could not initialize biquad. Please make sure that the coefficients contain 6 terms")
        }
    }

    /// Construct a Biquad with 0 initial state from coefficients given as
    /// arguments.
    ///
    /// *CAREFUL*: No checks are don  on validity / stability of the created filter!
    fn fromCoefs(b0: Flt, b1: Flt, b2: Flt, a1: Flt, a2: Flt) -> Biquad {
        Biquad {
            w1: 0.,
            w2: 0.,
            b0,
            b1,
            b2,
            a1,
            a2,
        }
    }

    /// Create unit impulse response biquad filter. Input = output
    fn unit() -> Biquad {
        let filter_coefs = &[1., 0., 0., 1., 0., 0.];
        Biquad::new(filter_coefs).unwrap()
    }

    /// Initialize biquad as first order high pass filter.
    ///
    ///
    /// * fs: Sampling frequency in \[Hz\]
    /// * cuton_Hz: -3 dB cut-on frequency in \[Hz\]
    ///
    pub fn firstOrderHighPass(fs: Flt, cuton_Hz: Flt) -> Result<Biquad> {
        if fs <= 0. {
            bail!("Invalid sampling frequency: {} [Hz]", fs);
        }
        if cuton_Hz <= 0. {
            bail!("Invalid cuton frequency: {} [Hz]", cuton_Hz);
        }
        if cuton_Hz >= 0.98 * fs / 2. {
            bail!(
                "Invalid cuton frequency. We limit this to 0.98* fs / 2. Given value {} [Hz]",
                cuton_Hz
            );
        }

        let tau: Flt = 1. / (2. * pi * cuton_Hz);
        let facnum = 2. * fs * tau / (1. + 2. * fs * tau);
        let facden = (1. - 2. * fs * tau) / (1. + 2. * fs * tau);

        Ok(Biquad::fromCoefs(
            facnum,  // b0
            -facnum, // b1
            0.,      // b2,
            facden,  // a1
            0.,      // a2
        ))
    }

    /// First order low pass filter (one pole in the real axis). No pre-warping
    /// correction done.
    /// 
    /// * `fs` - Sampling frequency [Hz]
    /// * `fc` - Cut-off frequency (-3 dB point) [Hz]
    pub fn firstOrderLowPass(fs: Flt, fc: Flt) -> Result<Biquad> {
        if fc <= 0. {
            bail!("Cuton frequency, given: should be > 0")
        }
        if fc >= fs / 2. {
            bail!("Cuton frequency should be smaller than Nyquist frequency")
        }
        let b0 = pi*fc/(pi*fc+fs);
        let b1 = b0;
        let a1 = (pi*fc-fs)/(pi*fc+fs);

        Ok(Biquad::fromCoefs(b0, b1, 0., a1, 0.))
    }

    /// Filter input signal, output by overwriting input slice.
    #[inline]
    pub fn filter_inout(&mut self, inout: &mut [Flt]) {
        for sample in inout.iter_mut() {
            let w0 = *sample - self.a1 * self.w1 - self.a2 * self.w2;
            let yn = self.b0 * w0 + self.b1 * self.w1 + self.b2 * self.w2;
            self.w2 = self.w1;
            self.w1 = w0;
            *sample = yn;
        }
        // println!("{:?}", inout);
    }
}
impl Default for Biquad {
    /// Unit impulse (does not transform signal whatsoever)
    fn default() -> Self {
        Biquad::unit()
    }
}

impl Filter for Biquad {
    fn filter(&mut self, input: &[Flt]) -> Vec<Flt> {
        let mut out = input.to_vec();
        self.filter_inout(&mut out);
        // println!("{:?}", out);
        out
    }
    fn reset(&mut self) {
        self.w1 = 0.;
        self.w2 = 0.;
    }
    fn clone_dyn(&self) -> Box<dyn Filter> {
        Box::new(*self)
    }
}
impl<'a, T: AsArray<'a, Flt>> TransferFunction<'a, T> for Biquad {
    fn tf(&self, fs: Flt, freq: T) -> Ccol {
        let freq = freq.into();
        let res = freq.mapv(|f| {
            let z = Complex::exp(I * 2. * pi * f / fs);
            let num = self.b0 + self.b1 / z + self.b2 / z / z;
            let den = 1. + self.a1 / z + self.a2 / z / z;
            num / den
        });
        res
    }
}

#[cfg(test)]
mod test {
    use approx::assert_abs_diff_eq;
    use num::complex::ComplexFloat;

    use super::*;

    #[test]
    fn test_biquad1() {
        let mut ser = Biquad::unit();
        let inp = vec![1., 0., 0., 0., 0., 0.];
        let filtered = ser.filter(&inp);
        assert_eq!(&filtered, &inp);
    }

    #[test]
    fn test_firstOrderLowpass() {
        let fs = 1e5;
        let fc = 10.;
        let b = Biquad::firstOrderLowPass(fs, fc).unwrap();
        let mut freq = Dcol::from_elem(5, 0.);
        freq[1] = fc;
        freq[2] = fs / 2.;
        let tf = b.tf(fs, freq.view());
        // println!("{:?}", tf);
        assert_abs_diff_eq!(tf[0].re, 1., epsilon = 1e-6);
        assert_abs_diff_eq!(tf[0].im, 0.);
        assert_abs_diff_eq!(tf[1].abs(), 1. / Flt::sqrt(2.), epsilon = 1e-6);
    }
}