use std::num::NonZeroUsize;
use crate::{
Cqt, Decibels, Gammatone, LinearHz, LogHz, Magnitude, Mel, Power, Spectrogram, chromagram, fft,
irfft, istft, magnitude_spectrum, mfcc, power_spectrum, rfft,
};
use numpy::{
Complex64, PyArray1, PyArray2, PyArrayMethods, PyReadonlyArray1, PyUntypedArrayMethods,
};
use pyo3::prelude::*;
use super::params::{
PyChromaParams, PyCqtParams, PyErbParams, PyLogHzParams, PyLogParams, PyMelParams,
PyMfccParams, PySpectrogramParams, PyStftParams, PyWindowType,
};
use super::spectrogram::PySpectrogram;
use non_empty_slice::NonEmptySlice;
macro_rules! impl_py_compute_fns {
(
$(
(
freq_ty = $freq_scale:ty,
amp_ty = $amp_scale:ty,
variant = $variant:ident,
fn_name = $fn_name:ident,
freq_desc = $freq_desc:expr,
amp_desc = $amp_desc:expr
)
),+ $(,)?
) => {
$(
#[doc = concat!(
"Compute a ", $freq_desc, " ", $amp_desc, " spectrogram.\n\n",
"Parameters\n",
"----------\n",
"samples : numpy.typing.NDArray[numpy.float64]\n",
" Audio samples as a 1D NumPy array\n",
"params : SpectrogramParams\n",
" Spectrogram parameters\n\n",
"Returns\n",
"-------\n",
"Spectrogram\n",
" Spectrogram with ", $freq_desc, " frequency scale and ",
$amp_desc, " amplitude scale"
)]
#[pyfunction]
#[pyo3(signature = (samples: "numpy.typing.NDArray[numpy.float64]",
params: "SpectrogramParams", db_params: "Optional[LogParams]"=None), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], params: SpectrogramParams, db_params: Option[PyLogParams]=None)")]
fn $fn_name(
py: Python,
samples: &Bound<'_, PyAny>,
params: PySpectrogramParams,
db_params: Option<PyLogParams>,
) -> PyResult<PySpectrogram> {
let np = py.import("numpy")?;
let array_any = np.call_method1(
"ascontiguousarray",
(samples, "float64"),
)?;
let array = array_any.cast::<PyArray1<f64>>()?;
let samples = array.readonly();
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let spec = py.detach(|| {
Spectrogram::<$freq_scale, $amp_scale>::compute(
samples_slice,
¶ms.inner,
db_params.as_ref().map(|p| &p.inner),
)
})?;
Ok(PySpectrogram::from_spectrogram(py, spec))
}
)+
};
}
impl_py_compute_fns! {
(
freq_ty = LinearHz,
amp_ty = Power,
variant = LinearPower,
fn_name = compute_linear_power_spectrogram,
freq_desc = "linear",
amp_desc = "power"
),
(
freq_ty = LinearHz,
amp_ty = Magnitude,
variant = LinearMagnitude,
fn_name = compute_linear_magnitude_spectrogram,
freq_desc = "linear",
amp_desc = "magnitude"
),
(
freq_ty = LinearHz,
amp_ty = Decibels,
variant = LinearDb,
fn_name = compute_linear_db_spectrogram,
freq_desc = "linear",
amp_desc = "decibel"
),
}
macro_rules! impl_filterbank_compute_fns {
(
$(
(
freq_ty = $freq_scale:ty,
amp_ty = $amp_scale:ty,
filter_ty = $filter_ty:ty,
py_filter_ty = $py_filter_ty:ty,
variant = $variant:ident,
fn_name = $fn_name:ident,
freq_desc = $freq_desc:expr,
amp_desc = $amp_desc:expr
)
),+ $(,)?
) => {
$(
#[doc = concat!(
"Compute a ", $freq_desc, " ", $amp_desc, " spectrogram.\n\n",
"Parameters\n",
"----------\n",
"samples : numpy.typing.NDArray[numpy.float64]\n",
" Audio samples as a 1D array\n",
"params : SpectrogramParams\n",
" Spectrogram parameters\n",
"filter_params : ", stringify!($py_filter_ty), "\n",
" Filterbank parameters\n",
"db : typing.Optional[LogParams], optional\n",
" Optional decibel scaling parameters\n\n",
"Returns\n",
"-------\n",
"Spectrogram\n",
" Spectrogram with ", $freq_desc, " frequency scale and ", $amp_desc, " amplitude scale"
)]
#[pyfunction]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
params: "SpectrogramParams",
filter_params,
db: "typing.Optional[LogParams]" = None
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], params: SpectrogramParams, filter_params: FilterParams, db: typing.Optional[LogParams] = None)")]
fn $fn_name(
py: Python,
samples: PyReadonlyArray1<f64>,
params: &PySpectrogramParams,
filter_params: &$py_filter_ty,
db: Option<&PyLogParams>,
) -> PyResult<PySpectrogram> {
let samples = samples.as_slice()?;
let samples = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let spec = py.detach(|| {
Spectrogram::<$freq_scale, $amp_scale>::compute(
samples,
¶ms.inner,
&filter_params.inner,
db.map(|d| &d.inner),
)
})?;
Ok(PySpectrogram::from_spectrogram(py, spec))
}
)+
};
}
impl_filterbank_compute_fns! {
(
freq_ty = Mel,
amp_ty = Power,
filter_ty = crate::MelParams,
py_filter_ty = PyMelParams,
variant = MelPower,
fn_name = compute_mel_power_spectrogram,
freq_desc = "mel",
amp_desc = "power"
),
(
freq_ty = Mel,
amp_ty = Magnitude,
filter_ty = crate::MelParams,
py_filter_ty = PyMelParams,
variant = MelMagnitude,
fn_name = compute_mel_magnitude_spectrogram,
freq_desc = "mel",
amp_desc = "magnitude"
),
(
freq_ty = Mel,
amp_ty = Decibels,
filter_ty = crate::MelParams,
py_filter_ty = PyMelParams,
variant = MelDb,
fn_name = compute_mel_db_spectrogram,
freq_desc = "mel",
amp_desc = "decibel"
),
(
freq_ty = Gammatone,
amp_ty = Power,
filter_ty = crate::ErbParams,
py_filter_ty = PyErbParams,
variant = GammatonePower,
fn_name = compute_erb_power_spectrogram,
freq_desc = "ERB/gammatone",
amp_desc = "power"
),
(
freq_ty = Gammatone,
amp_ty = Magnitude,
filter_ty = crate::ErbParams,
py_filter_ty = PyErbParams,
variant = GammatoneMagnitude,
fn_name = compute_erb_magnitude_spectrogram,
freq_desc = "ERB/gammatone",
amp_desc = "magnitude"
),
(
freq_ty = Gammatone,
amp_ty = Decibels,
filter_ty = crate::ErbParams,
py_filter_ty = PyErbParams,
variant = GammatoneDb,
fn_name = compute_erb_db_spectrogram,
freq_desc = "ERB/gammatone",
amp_desc = "decibel"
),
(
freq_ty = LogHz,
amp_ty = Power,
filter_ty = crate::LogHzParams,
py_filter_ty = PyLogHzParams,
variant = LogHzPower,
fn_name = compute_loghz_power_spectrogram,
freq_desc = "logarithmic Hz",
amp_desc = "power"
),
(
freq_ty = LogHz,
amp_ty = Magnitude,
filter_ty = crate::LogHzParams,
py_filter_ty = PyLogHzParams,
variant = LogHzMagnitude,
fn_name = compute_loghz_magnitude_spectrogram,
freq_desc = "logarithmic Hz",
amp_desc = "magnitude"
),
(
freq_ty = LogHz,
amp_ty = Decibels,
filter_ty = crate::LogHzParams,
py_filter_ty = PyLogHzParams,
variant = LogHzDb,
fn_name = compute_loghz_db_spectrogram,
freq_desc = "logarithmic Hz",
amp_desc = "decibel"
),
}
#[pyfunction]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
params: "SpectrogramParams",
cqt: "CqtParams",
db: "typing.Optional[LogParams]" = None
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], params: SpectrogramParams, cqt: CqtParams, db: typing.Optional[LogParams] = None)")]
pub fn compute_cqt_power_spectrogram(
py: Python,
samples: PyReadonlyArray1<f64>,
params: &PySpectrogramParams,
cqt: &PyCqtParams,
db: Option<&PyLogParams>,
) -> PyResult<PySpectrogram> {
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let spec = py.detach(|| {
Spectrogram::<Cqt, Power>::compute(
samples_slice,
¶ms.inner,
&cqt.inner,
db.map(|d| &d.inner),
)
})?;
Ok(PySpectrogram::from_spectrogram(py, spec))
}
#[pyfunction]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
params: "SpectrogramParams",
cqt: "CqtParams",
db: "typing.Optional[LogParams]" = None
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], params: SpectrogramParams, cqt: CqtParams, db: typing.Optional[LogParams] = None)")]
pub fn compute_cqt_magnitude_spectrogram(
py: Python,
samples: PyReadonlyArray1<f64>,
params: &PySpectrogramParams,
cqt: &PyCqtParams,
db: Option<&PyLogParams>,
) -> PyResult<PySpectrogram> {
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let spec = py.detach(|| {
Spectrogram::<Cqt, Magnitude>::compute(
samples_slice,
¶ms.inner,
&cqt.inner,
db.map(|d| &d.inner),
)
})?;
Ok(PySpectrogram::from_spectrogram(py, spec))
}
#[pyfunction]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
params: "SpectrogramParams",
cqt: "CqtParams",
db: "typing.Optional[LogParams]" = None
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], params: SpectrogramParams, cqt: CqtParams, db: typing.Optional[LogParams] = None)")]
pub fn compute_cqt_db_spectrogram(
py: Python,
samples: PyReadonlyArray1<f64>,
params: &PySpectrogramParams,
cqt: &PyCqtParams,
db: Option<&PyLogParams>,
) -> PyResult<PySpectrogram> {
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let spec = py.detach(|| {
Spectrogram::<Cqt, Decibels>::compute(
samples_slice,
¶ms.inner,
&cqt.inner,
db.map(|d| &d.inner),
)
})?;
Ok(PySpectrogram::from_spectrogram(py, spec))
}
#[pyfunction]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
stft_params: "StftParams",
sample_rate: "float",
chroma_params: "ChromaParams"
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], stft_params: StftParams, sample_rate: float, chroma_params: ChromaParams)")]
pub fn compute_chromagram(
py: Python,
samples: PyReadonlyArray1<f64>,
stft_params: &PyStftParams,
sample_rate: f64,
chroma_params: &PyChromaParams,
) -> PyResult<Py<PyArray2<f64>>> {
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let result = py.detach(|| {
chromagram(
samples_slice,
&stft_params.inner,
sample_rate,
&chroma_params.inner,
)
})?;
Ok(PyArray2::from_owned_array(py, result.data).unbind())
}
#[pyfunction]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
stft_params: "StftParams",
sample_rate: "float",
n_mels: "int",
mfcc_params: "MfccParams"
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], stft_params: StftParams, sample_rate: float, n_mels: int, mfcc_params: MfccParams)")]
pub fn compute_mfcc(
py: Python,
samples: PyReadonlyArray1<f64>,
stft_params: &PyStftParams,
sample_rate: f64,
n_mels: usize,
mfcc_params: &PyMfccParams,
) -> PyResult<Py<PyArray2<f64>>> {
let n_mels = NonZeroUsize::new(n_mels).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("n_mels must be a positive integer")
})?;
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let result = py.detach(|| {
mfcc(
samples_slice,
&stft_params.inner,
sample_rate,
n_mels,
&mfcc_params.inner,
)
})?;
Ok(PyArray2::from_owned_array(py, result.data).unbind())
}
#[pyfunction]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
params: "SpectrogramParams"
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], params: SpectrogramParams)")]
pub fn compute_stft(
py: Python,
samples: PyReadonlyArray1<f64>,
params: &PySpectrogramParams,
) -> PyResult<Py<PyArray2<num_complex::Complex64>>> {
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let planner = crate::SpectrogramPlanner::new();
let result = py.detach(|| planner.compute_stft(samples_slice, ¶ms.inner))?;
Ok(PyArray2::from_owned_array(py, result.data).unbind())
}
#[pyfunction]
#[inline]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
n_fft: "Optional[int]" = None,
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], n_fft: Optional[int]=None)")]
pub fn compute_fft(
py: Python,
samples: PyReadonlyArray1<f64>,
n_fft: Option<usize>,
) -> PyResult<Py<PyArray1<Complex64>>> {
let n_fft = n_fft.unwrap_or_else(|| samples.len());
let n_fft = NonZeroUsize::new(n_fft).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("n_fft must be non-zero positive integer")
})?;
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let result = py.detach(|| fft(samples_slice, n_fft))?;
Ok(numpy::PyArray1::from_owned_array(py, result).unbind())
}
#[pyfunction]
#[inline]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
n_fft: "int"
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], n_fft: int) -> numpy.typing.NDArray[numpy.float64]")]
pub fn compute_rfft(
py: Python,
samples: PyReadonlyArray1<f64>,
n_fft: usize,
) -> PyResult<Py<PyArray1<f64>>> {
let n_fft = NonZeroUsize::new(n_fft).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("n_fft must be non-zero positive integer")
})?;
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let result = py.detach(|| rfft(samples_slice, n_fft))?;
Ok(PyArray1::from_owned_array(py, result).unbind())
}
#[pyfunction]
#[inline]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
n_fft: "int",
window: "typing.Optional[WindowType]" = None
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], n_fft: int, window: typing.Optional[WindowType] = None)")]
pub fn compute_power_spectrum(
py: Python,
samples: PyReadonlyArray1<f64>,
n_fft: usize,
window: Option<PyWindowType>,
) -> PyResult<Py<PyArray1<f64>>> {
let n_fft = NonZeroUsize::new(n_fft).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("n_fft must be non-zero positive integer")
})?;
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let window_type = window.map(|w| w.inner);
let result = py.detach(|| power_spectrum(samples_slice, n_fft, window_type))?;
Ok(PyArray1::from_vec(py, result.to_vec()).unbind())
}
#[pyfunction]
#[inline]
#[pyo3(signature = (
samples: "numpy.typing.NDArray[numpy.float64]",
n_fft: "int",
window: "typing.Optional[WindowType]" = None
), text_signature = "(samples: numpy.typing.NDArray[numpy.float64], n_fft: int, window: typing.Optional[WindowType] = None)")]
pub fn compute_magnitude_spectrum(
py: Python,
samples: PyReadonlyArray1<f64>,
n_fft: usize,
window: Option<PyWindowType>,
) -> PyResult<Py<PyArray1<f64>>> {
let n_fft = NonZeroUsize::new(n_fft).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("n_fft must be non-zero positive integer")
})?;
let samples = samples.as_slice()?;
let samples_slice = NonEmptySlice::new(samples).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("samples array must not be empty")
})?;
let window_type = window.map(|w| w.inner);
let result = py.detach(|| magnitude_spectrum(samples_slice, n_fft, window_type))?;
Ok(PyArray1::from_vec(py, result.to_vec()).unbind())
}
#[pyfunction]
#[inline]
#[pyo3(signature = (
spectrum: "numpy.typing.NDArray[numpy.complex128]",
n_fft: "int"
))]
pub fn compute_irfft(
py: Python,
spectrum: numpy::PyReadonlyArray1<num_complex::Complex64>,
n_fft: usize,
) -> PyResult<Py<PyArray1<f64>>> {
let n_fft = NonZeroUsize::new(n_fft).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("n_fft must be non-zero positive integer")
})?;
let spectrum = spectrum.as_slice()?;
let spectrum_slice = NonEmptySlice::new(spectrum).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("spectrum array must not be empty")
})?;
let result = py.detach(|| irfft(spectrum_slice, n_fft))?;
Ok(PyArray1::from_vec(py, result.to_vec()).unbind())
}
#[pyfunction]
#[inline]
#[pyo3(signature = (
stft_matrix: "numpy.typing.NDArray[numpy.complex64]",
n_fft: "int",
hop_size: "int",
window: "WindowType",
center: "bool" = true
), text_signature = "(stft_matrix: numpy.typing.NDArray[numpy.complex64], n_fft: int, hop_size: int, window: WindowType, center: bool = True)")]
pub fn compute_istft(
py: Python,
stft_matrix: numpy::PyReadonlyArray2<num_complex::Complex64>,
n_fft: usize,
hop_size: usize,
window: PyWindowType,
center: bool,
) -> PyResult<Py<PyArray1<f64>>> {
let n_fft = NonZeroUsize::new(n_fft).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("n_fft must be non-zero positive integer")
})?;
let hop_size = NonZeroUsize::new(hop_size).ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("hop_size must be non-zero positive integer")
})?;
let stft_array = stft_matrix.as_array().to_owned();
let result = py.detach(|| istft(&stft_array, n_fft, hop_size, window.inner, center))?;
Ok(PyArray1::from_vec(py, result.to_vec()).unbind())
}
pub fn register(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(compute_linear_power_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_linear_magnitude_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_linear_db_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_mel_power_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_mel_magnitude_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_mel_db_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_erb_power_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_erb_magnitude_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_erb_db_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_loghz_power_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_loghz_magnitude_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_loghz_db_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_cqt_power_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_cqt_magnitude_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_cqt_db_spectrogram, m)?)?;
m.add_function(wrap_pyfunction!(compute_chromagram, m)?)?;
m.add_function(wrap_pyfunction!(compute_mfcc, m)?)?;
m.add_function(wrap_pyfunction!(compute_stft, m)?)?;
m.add_function(wrap_pyfunction!(compute_fft, m)?)?;
m.add_function(wrap_pyfunction!(compute_rfft, m)?)?;
m.add_function(wrap_pyfunction!(compute_irfft, m)?)?;
m.add_function(wrap_pyfunction!(compute_power_spectrum, m)?)?;
m.add_function(wrap_pyfunction!(compute_magnitude_spectrum, m)?)?;
m.add_function(wrap_pyfunction!(compute_istft, m)?)?;
Ok(())
}