#![allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FluxSensitivity {
Low,
Medium,
High,
Custom,
}
impl FluxSensitivity {
#[must_use]
pub fn threshold(&self) -> f32 {
match self {
Self::Low => 0.6,
Self::Medium | Self::Custom => 0.35,
Self::High => 0.15,
}
}
#[must_use]
pub fn label(&self) -> &'static str {
match self {
Self::Low => "Low",
Self::Medium => "Medium",
Self::High => "High",
Self::Custom => "Custom",
}
}
}
#[derive(Debug, Clone)]
pub struct FluxFrame {
pub time_s: f32,
pub flux: f32,
pub is_onset: bool,
}
impl FluxFrame {
#[must_use]
pub fn new(time_s: f32, flux: f32, is_onset: bool) -> Self {
Self {
time_s,
flux,
is_onset,
}
}
}
#[derive(Debug, Clone)]
pub struct FluxResult {
pub frames: Vec<FluxFrame>,
pub onsets: Vec<f32>,
pub mean_flux: f32,
pub peak_flux: f32,
}
pub struct SpectralFluxDetector {
sample_rate: f32,
frame_size: usize,
hop_size: usize,
sensitivity: FluxSensitivity,
threshold: f32,
}
impl SpectralFluxDetector {
#[must_use]
pub fn new(
sample_rate: f32,
frame_size: usize,
hop_size: usize,
sensitivity: FluxSensitivity,
) -> Self {
Self {
sample_rate,
frame_size,
hop_size,
sensitivity,
threshold: sensitivity.threshold(),
}
}
#[must_use]
pub fn with_threshold(
sample_rate: f32,
frame_size: usize,
hop_size: usize,
threshold: f32,
) -> Self {
Self {
sample_rate,
frame_size,
hop_size,
sensitivity: FluxSensitivity::Custom,
threshold: threshold.clamp(0.0, 1.0),
}
}
#[must_use]
pub fn threshold(&self) -> f32 {
self.threshold
}
#[must_use]
pub fn sensitivity(&self) -> FluxSensitivity {
self.sensitivity
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn detect(&self, samples: &[f32]) -> FluxResult {
let magnitudes = self.compute_frame_magnitudes(samples);
let flux_values = self.compute_flux(&magnitudes);
let mean_flux = if flux_values.is_empty() {
0.0
} else {
flux_values.iter().sum::<f32>() / flux_values.len() as f32
};
let peak_flux = flux_values.iter().copied().fold(0.0_f32, f32::max);
let adaptive = mean_flux + self.threshold * (peak_flux - mean_flux);
let mut frames = Vec::with_capacity(flux_values.len());
let mut onsets = Vec::new();
for (i, &flux) in flux_values.iter().enumerate() {
let time_s = (i * self.hop_size) as f32 / self.sample_rate;
let is_onset = flux > adaptive;
if is_onset {
onsets.push(time_s);
}
frames.push(FluxFrame::new(time_s, flux, is_onset));
}
FluxResult {
frames,
onsets,
mean_flux,
peak_flux,
}
}
#[must_use]
pub fn count_onsets(&self, samples: &[f32]) -> usize {
self.detect(samples).onsets.len()
}
#[allow(clippy::cast_precision_loss)]
fn compute_frame_magnitudes(&self, samples: &[f32]) -> Vec<f32> {
let mut mags = Vec::new();
let mut pos = 0;
while pos + self.frame_size <= samples.len() {
let frame = &samples[pos..pos + self.frame_size];
let energy: f32 = frame.iter().map(|&x| x * x).sum::<f32>() / self.frame_size as f32;
mags.push(energy.sqrt());
pos += self.hop_size;
}
mags
}
#[allow(clippy::unused_self)]
fn compute_flux(&self, magnitudes: &[f32]) -> Vec<f32> {
if magnitudes.len() < 2 {
return vec![0.0; magnitudes.len()];
}
let mut flux = vec![0.0_f32];
for i in 1..magnitudes.len() {
let diff = (magnitudes[i] - magnitudes[i - 1]).max(0.0);
flux.push(diff);
}
flux
}
}
impl Default for SpectralFluxDetector {
fn default() -> Self {
Self::new(44100.0, 2048, 512, FluxSensitivity::Medium)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sensitivity_thresholds() {
assert!(FluxSensitivity::Low.threshold() > FluxSensitivity::Medium.threshold());
assert!(FluxSensitivity::Medium.threshold() > FluxSensitivity::High.threshold());
}
#[test]
fn test_sensitivity_labels() {
assert_eq!(FluxSensitivity::Low.label(), "Low");
assert_eq!(FluxSensitivity::Medium.label(), "Medium");
assert_eq!(FluxSensitivity::High.label(), "High");
assert_eq!(FluxSensitivity::Custom.label(), "Custom");
}
#[test]
fn test_flux_frame_new() {
let f = FluxFrame::new(1.5, 0.42, true);
assert_eq!(f.time_s, 1.5);
assert_eq!(f.flux, 0.42);
assert!(f.is_onset);
}
#[test]
fn test_default_detector() {
let det = SpectralFluxDetector::default();
assert_eq!(det.sample_rate, 44100.0);
assert_eq!(det.frame_size, 2048);
assert_eq!(det.hop_size, 512);
assert_eq!(det.sensitivity(), FluxSensitivity::Medium);
}
#[test]
fn test_custom_threshold_clamped() {
let det = SpectralFluxDetector::with_threshold(44100.0, 2048, 512, 2.0);
assert_eq!(det.threshold(), 1.0);
let det2 = SpectralFluxDetector::with_threshold(44100.0, 2048, 512, -0.5);
assert_eq!(det2.threshold(), 0.0);
}
#[test]
fn test_detect_silence() {
let det = SpectralFluxDetector::default();
let silence = vec![0.0_f32; 44100];
let result = det.detect(&silence);
assert_eq!(result.mean_flux, 0.0);
assert_eq!(result.peak_flux, 0.0);
assert!(result.onsets.is_empty());
}
#[test]
fn test_detect_constant_signal_no_onsets() {
let det = SpectralFluxDetector::default();
let constant = vec![0.5_f32; 44100];
let result = det.detect(&constant);
assert_eq!(result.peak_flux, 0.0);
}
#[test]
fn test_detect_impulse_has_onset() {
let det = SpectralFluxDetector::new(44100.0, 256, 128, FluxSensitivity::High);
let mut samples = vec![0.0_f32; 44100];
for s in samples[10000..10256].iter_mut() {
*s = 0.9;
}
let result = det.detect(&samples);
assert!(result.peak_flux > 0.0);
assert!(!result.onsets.is_empty());
}
#[test]
fn test_count_onsets_matches() {
let det = SpectralFluxDetector::new(44100.0, 256, 128, FluxSensitivity::High);
let mut samples = vec![0.0_f32; 44100];
for s in samples[5000..5256].iter_mut() {
*s = 0.8;
}
let count = det.count_onsets(&samples);
let result = det.detect(&samples);
assert_eq!(count, result.onsets.len());
}
#[test]
fn test_flux_frames_length() {
let det = SpectralFluxDetector::default();
let samples = vec![0.1_f32; 44100];
let result = det.detect(&samples);
assert!(!result.frames.is_empty());
}
#[test]
fn test_short_signal_no_panic() {
let det = SpectralFluxDetector::default();
let short = vec![0.1_f32; 100];
let result = det.detect(&short);
assert!(result.frames.is_empty());
assert_eq!(result.mean_flux, 0.0);
}
#[test]
fn test_mean_flux_non_negative() {
let det = SpectralFluxDetector::default();
let samples: Vec<f32> = (0..44100).map(|i| (i as f32 * 0.01).sin() * 0.5).collect();
let result = det.detect(&samples);
assert!(result.mean_flux >= 0.0);
}
#[test]
fn test_onset_times_monotonic() {
let det = SpectralFluxDetector::new(44100.0, 256, 128, FluxSensitivity::High);
let mut samples = vec![0.0_f32; 44100];
for s in samples[4000..4256].iter_mut() {
*s = 0.9;
}
for s in samples[20000..20256].iter_mut() {
*s = 0.9;
}
let result = det.detect(&samples);
for w in result.onsets.windows(2) {
assert!(w[1] >= w[0]);
}
}
}