use super::butter::butter_lowpass_roots;
use super::{Biquad, SeriesBiquad, TransferFunction};
use crate::{config::*, ps::FreqWeighting};
use itertools::{EitherOrBoth, Itertools};
use num::{zero, Complex};
use std::cmp::{max, min};
pub const BUTTER_MAX_ORDER: u32 = 40;
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum FilterSpec {
Bandpass {
fl: Flt,
fu: Flt,
order: u32,
},
Lowpass {
fc: Flt,
order: u32,
},
Highpass {
fc: Flt,
order: u32,
},
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct ZPKModel {
z: Vec<PoleOrZero>,
p: Vec<PoleOrZero>,
k: Flt,
fwarp: Option<Flt>,
}
impl Default for ZPKModel {
fn default() -> Self {
ZPKModel {
z: vec![],
p: vec![],
k: 1.0,
fwarp: None,
}
}
}
impl<'a, T: AsArray<'a, Flt>> TransferFunction<'a, T> for ZPKModel {
fn tf(&self, _fs: Flt, freq: T) -> Ccol {
let freq = freq.into();
freq.mapv(|freq| {
let s = 2. * I * pi * freq;
let mut res = Cflt::ONE;
use PoleOrZero::*;
self.z.iter().for_each(|z| match z {
Complex(z) => {
res *= (s - z) * (s - z.conj());
}
Real1(z) => {
res *= s - z;
}
Real2(z1, z2) => {
res *= (s - z1) * (s - z2);
}
});
self.p.iter().for_each(|p| match p {
Complex(p) => {
res *= 1. / ((s - p) * (s - p.conj()));
}
Real1(p) => {
res *= 1. / (s - p);
}
Real2(p1, p2) => {
res *= 1. / ((s - p1) * (s - p2));
}
});
res *= self.k;
res
})
}
}
use std::ops::Mul;
impl Mul for ZPKModel {
type Output = Self;
fn mul(self, rhs: ZPKModel) -> Self {
let (mut z, mut p, mut k) = (self.z, self.p, self.k);
k *= rhs.k;
z.extend(rhs.z);
p.extend(rhs.p);
ZPKModel {
z,
p,
k,
..Default::default()
}
}
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl ZPKModel {
#[pyo3(name = "butter")]
#[staticmethod]
fn butter_py(spec: FilterSpec) -> ZPKModel {
ZPKModel::butter(spec)
}
#[pyo3(name = "freqWeightingFilter")]
#[staticmethod]
fn freqWeightingFilter_py(fw: FreqWeighting) ->ZPKModel {
Self::freqWeightingFilter(fw)
}
fn __repr__(&self) -> String {
format!("{self:?}")
}
#[pyo3(name = "tf")]
fn tf_py<'py>(
&self,
py: Python<'py>,
fs: Flt,
freq: PyArrayLike1<Flt>,
) -> PyResult<PyArr1Cflt<'py>> {
let freq = freq.as_array();
let res = PyArray1::from_array(py, &self.tf(fs, freq));
Ok(res)
}
#[pyo3(name = "bilinear")]
fn bilinear_py(&self, fs: Flt) -> SeriesBiquad {
self.bilinear(fs)
}
}
impl ZPKModel {
pub fn new<T, U>(zeros: T, poles: U, k: Flt) -> ZPKModel
where
T: Into<Vec<PoleOrZero>>,
U: Into<Vec<PoleOrZero>>,
{
let z = zeros.into();
let p = poles.into();
ZPKModel {
z,
p,
k,
..Default::default()
}
.compactize()
}
fn combine_reals(v: Vec<PoleOrZero>) -> Vec<PoleOrZero> {
let mut real1: Option<PoleOrZero> = None;
let mut v: Vec<PoleOrZero> = v
.iter()
.filter_map(|z| match z {
PoleOrZero::Complex(z) => Some(PoleOrZero::Complex(*z)),
PoleOrZero::Real2(z1, z2) => Some(PoleOrZero::Real2(*z1, *z2)),
PoleOrZero::Real1(z) => {
if let Some(real1) = real1.take() {
if let PoleOrZero::Real1(z2) = real1 {
Some(PoleOrZero::Real2(z2, *z))
} else {
unreachable!()
}
} else {
real1 = Some(PoleOrZero::Real1(*z));
None
}
}
})
.collect();
if let Some(real1) = real1 {
if let PoleOrZero::Real1(z) = real1 {
v.push(PoleOrZero::Real1(z));
} else {
unreachable!()
}
}
v
}
fn compactize(self) -> ZPKModel {
let (z, p, k, fwarp) = (self.z, self.p, self.k, self.fwarp);
let z = Self::combine_reals(z);
let p = Self::combine_reals(p);
ZPKModel { z, p, k, fwarp }
}
pub fn setWarpFreq(mut self, fcrit: Flt) -> ZPKModel {
self.fwarp = Some(fcrit);
self
}
pub fn setGainAt(mut self, freq: Flt, required_gain: Flt) -> ZPKModel {
assert!(required_gain > 0.);
let freq = [freq];
let cur_gain_at_freq = self.tf(-1.0, &freq)[0].abs();
let gain_fac = required_gain / cur_gain_at_freq;
self.k *= gain_fac;
self
}
fn replace_poles_lp2bp(
pzlp: PoleOrZero,
fc: Flt,
Bw_Hz: Flt,
) -> (Vec<PoleOrZero>, Vec<PoleOrZero>) {
let omgc = 2. * pi * fc;
let mut new_poles = Vec::with_capacity(2);
let mut extra_zeros = Vec::with_capacity(2);
let omgcsq = omgc.powi(2);
match pzlp {
PoleOrZero::Real1(pz) => {
let pz_lp = pz * Bw_Hz / fc / 2.;
let sq = pz_lp.powi(2) - omgcsq;
if sq >= 0. {
let sqrt = sq.sqrt();
extra_zeros.push(PoleOrZero::Real1(0.));
new_poles.push(PoleOrZero::Real2(
pz_lp + sqrt, pz_lp - sqrt, ));
} else {
let sqrt = (sq + 0. * I).sqrt();
extra_zeros.push(PoleOrZero::Real1(0.));
new_poles.push(PoleOrZero::Complex(pz_lp + sqrt));
}
}
PoleOrZero::Real2(z1, z2) => {
for z in [z1, z2] {
let (np, ez) = Self::replace_poles_lp2bp(PoleOrZero::Real1(z), fc, Bw_Hz);
new_poles.extend(np);
extra_zeros.extend(ez);
}
}
PoleOrZero::Complex(pz) => {
let pz_lp = pz * Bw_Hz / fc / 2.;
let sqrt = (pz_lp.powi(2) - omgcsq).sqrt();
extra_zeros.push(PoleOrZero::Real2(0., 0.));
new_poles.push(PoleOrZero::Complex(pz_lp + sqrt));
new_poles.push(PoleOrZero::Complex(pz_lp - sqrt));
}
}
(new_poles, extra_zeros)
}
fn lowpass_to_bandpass(self, fc: Flt, Bw_Hz: Flt) -> ZPKModel {
let (mut z, p, k, fwarp) = (self.z, self.p, self.k, self.fwarp);
assert!(z.is_empty());
let mut new_poles = Vec::with_capacity(2 * p.len());
for p in p {
let (new_poles_current, extra_zeros_current) = Self::replace_poles_lp2bp(p, fc, Bw_Hz);
new_poles.extend(new_poles_current);
z.extend(extra_zeros_current);
}
ZPKModel {
z,
p: new_poles,
k,
fwarp,
}
}
fn check_spec(spec: FilterSpec) {
let check_fc = |fc| assert!(fc > 0., "Cut-off frequency should be > 0");
let check_order = |order| {
assert!(
order > 0 && order <= BUTTER_MAX_ORDER,
"Invalid filter order"
)
};
match spec {
FilterSpec::Lowpass { fc, order } => {
check_fc(fc);
check_order(order);
}
FilterSpec::Highpass { fc, order } => {
check_fc(fc);
check_order(order);
}
FilterSpec::Bandpass { fl, fu, order } => {
assert!(
fl <= fu && fl > 0.,
"Invalid cut-on and cut-off frequency specified"
);
check_order(order);
}
}
}
pub fn butter(spec: FilterSpec) -> ZPKModel {
Self::check_spec(spec);
match spec {
FilterSpec::Lowpass { fc, order } => {
let p = butter_lowpass_roots(fc, order).collect();
let z = vec![];
ZPKModel {
z,
p,
k: 1.0,
..Default::default()
}
.compactize()
.setGainAt(fc, (0.5).sqrt())
.setWarpFreq(fc)
}
FilterSpec::Highpass { fc, order } => {
let p = butter_lowpass_roots(fc, order).collect();
let z = vec![PoleOrZero::Real1(0.); order as usize];
ZPKModel {
z,
p,
k: 1.0,
..Default::default()
}
.compactize()
.setGainAt(fc, (0.5).sqrt())
.setWarpFreq(fc)
}
FilterSpec::Bandpass { fl, fu, order } => {
let fmid = (fl * fu).sqrt();
let Bw_Hz = fu - fl;
let lp = Self::butter(FilterSpec::Lowpass { fc: fmid, order });
Self::lowpass_to_bandpass(lp, fmid, Bw_Hz)
.compactize()
.setGainAt(fmid, 1.0)
.setWarpFreq(fmid)
}
}
}
pub fn bilinear(&self, fs: Flt) -> SeriesBiquad {
let mut biqs = vec![];
let max_len = max(self.z.len(), self.p.len());
if max_len == 0 {
return SeriesBiquad::new(&[self.k, 0., 0., 1., 0., 0.]).unwrap();
}
let max_len = max_len as Flt;
let k_fac = self.k.powf(1. / max_len);
for case in self.z.iter().zip_longest(&self.p) {
match case {
EitherOrBoth::Both(z, p) => {
biqs.push(Biquad::bilinear_zpk(
fs,
Some(*z),
Some(*p),
Some(k_fac),
self.fwarp,
));
}
EitherOrBoth::Left(z) => {
biqs.push(Biquad::bilinear_zpk(
fs,
Some(*z),
None,
Some(k_fac),
self.fwarp,
));
}
EitherOrBoth::Right(p) => {
biqs.push(Biquad::bilinear_zpk(
fs,
None,
Some(*p),
Some(k_fac),
self.fwarp,
));
}
}
}
SeriesBiquad::newFromBiqs(biqs)
}
pub fn freqWeightingFilter(wt: FreqWeighting) -> ZPKModel {
if let FreqWeighting::Z = wt {
return ZPKModel {
z: vec![],
p: vec![],
k: 1.0,
fwarp: None,
};
}
let fr: Flt = 1000.;
let fL: Flt = num::Float::powf(10., 1.5);
let fH: Flt = num::Float::powf(10., 3.9);
let sq5: Flt = num::Float::powf(5., 0.5);
let fLsq = fL.powi(2);
let fHsq: Flt = fH.powi(2);
let frsq: Flt = fr.powi(2);
let fA = num::Float::powf(10., 2.45);
let D = Flt::sqrt(0.5);
let b = (1. / (1. - D)) * (frsq + fLsq * fHsq / frsq - D * (fLsq + fHsq));
let c = fLsq * fHsq;
let f2 = (3. - sq5) / 2. * fA;
let f3 = (3. + sq5) / 2. * fA;
let sqrtfac = (b.powi(2) - 4. * c).sqrt();
let f1 = ((-b - sqrtfac) / 2.).sqrt();
let f4 = ((-b + sqrtfac) / 2.).sqrt();
let p1 = 2. * pi * f1;
let p2 = 2. * pi * f2;
let p3 = 2. * pi * f3;
let p4 = 2. * pi * f4;
let (zeros, poles) = match wt {
FreqWeighting::Z => {
unreachable!()
}
FreqWeighting::C => {
let zeros = vec![PoleOrZero::Real2(0., 0.)];
let poles = vec![PoleOrZero::Real2(-p1, -p1), PoleOrZero::Real2(-p4, -p4)];
(zeros, poles)
}
FreqWeighting::A => {
let poles = vec![
PoleOrZero::Real2(-p1, -p1),
PoleOrZero::Real2(-p2, -p3),
PoleOrZero::Real2(-p4, -p4),
];
let zeros = vec![PoleOrZero::Real2(0., 0.), PoleOrZero::Real2(0., 0.)];
(zeros, poles)
}
};
ZPKModel::new(zeros, poles, 1.0).setGainAt(1000., 1.0)
}
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum PoleOrZero {
Complex(Cflt),
Real2(Flt, Flt),
Real1(Flt),
}
#[cfg(test)]
mod test{
use approx::assert_abs_diff_eq;
use num::complex::ComplexFloat;
use crate::TransferFunction;
use super::ZPKModel;
#[test]
fn test_A() {
let Aw = ZPKModel::freqWeightingFilter(crate::FreqWeighting::A);
assert_abs_diff_eq!(Aw.tf(0., &[1000.])[0].abs(), 1.0);
}
#[test]
fn test_C() {
let Cw = ZPKModel::freqWeightingFilter(crate::FreqWeighting::C);
assert_abs_diff_eq!(Cw.tf(0., &[1000.])[0].abs(), 1.0);
}
}