use crate::{FFTError, FFTResult};
use scirs2_core::numeric::{Complex64, NumCast};
use std::f64::consts::PI;
use std::fmt::Debug;
pub fn analytic_signal<T>(x: &[T]) -> FFTResult<Vec<Complex64>>
where
T: NumCast + Copy + Debug + 'static,
{
let n = x.len();
if n == 0 {
return Err(FFTError::ValueError(
"Input signal cannot be empty".to_string(),
));
}
let signal: Vec<f64> = x
.iter()
.map(|&val| {
NumCast::from(val).ok_or_else(|| {
FFTError::ValueError(format!("Could not convert {val:?} to numeric type"))
})
})
.collect::<FFTResult<Vec<_>>>()?;
let spectrum = crate::fft::fft(&signal, None)?;
let mut h = vec![Complex64::new(0.0, 0.0); n];
h[0] = Complex64::new(1.0, 0.0);
if n % 2 == 0 {
for k in 1..n / 2 {
h[k] = Complex64::new(2.0, 0.0);
}
h[n / 2] = Complex64::new(1.0, 0.0);
} else {
for k in 1..=(n - 1) / 2 {
h[k] = Complex64::new(2.0, 0.0);
}
}
let filtered: Vec<Complex64> = spectrum
.iter()
.zip(h.iter())
.map(|(&s, &hk)| s * hk)
.collect();
let result = crate::fft::ifft(&filtered, None)?;
Ok(result)
}
pub fn envelope<T>(x: &[T]) -> FFTResult<Vec<f64>>
where
T: NumCast + Copy + Debug + 'static,
{
let xa = analytic_signal(x)?;
Ok(xa.iter().map(|c| c.norm()).collect())
}
pub fn instantaneous_phase<T>(x: &[T]) -> FFTResult<Vec<f64>>
where
T: NumCast + Copy + Debug + 'static,
{
let xa = analytic_signal(x)?;
Ok(xa.iter().map(|c| c.im.atan2(c.re)).collect())
}
pub fn instantaneous_phase_unwrapped<T>(x: &[T]) -> FFTResult<Vec<f64>>
where
T: NumCast + Copy + Debug + 'static,
{
let wrapped_phase = instantaneous_phase(x)?;
Ok(unwrap_phase(&wrapped_phase))
}
pub fn instantaneous_frequency<T>(x: &[T], fs: f64) -> FFTResult<Vec<f64>>
where
T: NumCast + Copy + Debug + 'static,
{
if x.len() < 2 {
return Err(FFTError::ValueError(
"Signal must have at least 2 samples for instantaneous frequency".to_string(),
));
}
if fs <= 0.0 {
return Err(FFTError::ValueError(
"Sampling frequency must be positive".to_string(),
));
}
let phase = instantaneous_phase_unwrapped(x)?;
let dt = 1.0 / fs;
let mut freq = Vec::with_capacity(phase.len() - 1);
for i in 0..phase.len() - 1 {
let dphi = phase[i + 1] - phase[i];
freq.push(dphi / (2.0 * PI * dt));
}
Ok(freq)
}
pub fn instantaneous_frequency_central<T>(x: &[T], fs: f64) -> FFTResult<Vec<f64>>
where
T: NumCast + Copy + Debug + 'static,
{
if x.len() < 3 {
return Err(FFTError::ValueError(
"Signal must have at least 3 samples for central difference frequency".to_string(),
));
}
if fs <= 0.0 {
return Err(FFTError::ValueError(
"Sampling frequency must be positive".to_string(),
));
}
let phase = instantaneous_phase_unwrapped(x)?;
let dt = 1.0 / fs;
let mut freq = Vec::with_capacity(phase.len() - 2);
for i in 1..phase.len() - 1 {
let dphi = phase[i + 1] - phase[i - 1];
freq.push(dphi / (4.0 * PI * dt));
}
Ok(freq)
}
fn unwrap_phase(phase: &[f64]) -> Vec<f64> {
if phase.is_empty() {
return Vec::new();
}
let mut unwrapped = Vec::with_capacity(phase.len());
unwrapped.push(phase[0]);
let mut cumulative_correction = 0.0;
for i in 1..phase.len() {
let mut diff = phase[i] - phase[i - 1];
while diff > PI {
diff -= 2.0 * PI;
cumulative_correction -= 2.0 * PI;
}
while diff < -PI {
diff += 2.0 * PI;
cumulative_correction += 2.0 * PI;
}
unwrapped.push(phase[i] + cumulative_correction);
}
unwrapped
}
#[cfg(test)]
mod tests {
use super::*;
use approx::assert_relative_eq;
#[test]
fn test_analytic_signal_cosine() {
let n = 256;
let freq = 10.0;
let dt = 1.0 / 256.0;
let signal: Vec<f64> = (0..n)
.map(|i| (2.0 * PI * freq * i as f64 * dt).cos())
.collect();
let xa = analytic_signal(&signal).expect("Analytic signal should succeed");
assert_eq!(xa.len(), n);
for i in n / 4..3 * n / 4 {
let mag = xa[i].norm();
assert_relative_eq!(mag, 1.0, epsilon = 0.05);
}
}
#[test]
fn test_analytic_signal_real_part_preserved() {
let signal = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
let xa = analytic_signal(&signal).expect("Analytic signal should succeed");
for (i, &val) in signal.iter().enumerate() {
assert_relative_eq!(xa[i].re, val, epsilon = 1e-10);
}
}
#[test]
fn test_envelope_am_signal() {
let n = 512;
let fs = 1000.0;
let carrier_freq = 100.0;
let mod_freq = 10.0;
let mod_depth = 0.5;
let signal: Vec<f64> = (0..n)
.map(|i| {
let t = i as f64 / fs;
let modulation = 1.0 + mod_depth * (2.0 * PI * mod_freq * t).cos();
modulation * (2.0 * PI * carrier_freq * t).cos()
})
.collect();
let env = envelope(&signal).expect("Envelope should succeed");
assert_eq!(env.len(), n);
for &val in &env {
assert!(val >= -1e-10, "Envelope should be non-negative");
}
let env_slice = &env[n / 4..3 * n / 4];
let max_env = env_slice.iter().copied().fold(f64::MIN, f64::max);
let min_env = env_slice.iter().copied().fold(f64::MAX, f64::min);
assert!(max_env > 1.0, "Max envelope should be > 1");
assert!(min_env < 1.0, "Min envelope should be < 1");
}
#[test]
fn test_instantaneous_phase_linear() {
let n = 256;
let freq = 5.0;
let dt = 1.0 / 256.0;
let signal: Vec<f64> = (0..n)
.map(|i| (2.0 * PI * freq * i as f64 * dt).cos())
.collect();
let phase = instantaneous_phase_unwrapped(&signal).expect("Unwrapped phase should succeed");
assert_eq!(phase.len(), n);
let start = n / 4;
let end = 3 * n / 4;
let expected_rate = 2.0 * PI * freq * dt;
let mut rates = Vec::new();
for i in start..end - 1 {
rates.push(phase[i + 1] - phase[i]);
}
let avg_rate: f64 = rates.iter().sum::<f64>() / rates.len() as f64;
assert_relative_eq!(avg_rate, expected_rate, epsilon = 0.1);
}
#[test]
fn test_instantaneous_frequency_constant() {
let n = 512;
let fs = 1000.0;
let freq = 50.0;
let signal: Vec<f64> = (0..n)
.map(|i| {
let t = i as f64 / fs;
(2.0 * PI * freq * t).cos()
})
.collect();
let inst_freq = instantaneous_frequency(&signal, fs).expect("Inst freq should succeed");
assert_eq!(inst_freq.len(), n - 1);
let mid_start = n / 4;
let mid_end = 3 * n / 4;
for &f in &inst_freq[mid_start..mid_end] {
assert!(
(f - freq).abs() < 3.0,
"Frequency should be near {freq} Hz, got {f}"
);
}
}
#[test]
fn test_instantaneous_frequency_central_accuracy() {
let n = 512;
let fs = 1000.0;
let freq = 50.0;
let signal: Vec<f64> = (0..n)
.map(|i| {
let t = i as f64 / fs;
(2.0 * PI * freq * t).cos()
})
.collect();
let freq_forward =
instantaneous_frequency(&signal, fs).expect("Forward inst freq should succeed");
let freq_central =
instantaneous_frequency_central(&signal, fs).expect("Central inst freq should succeed");
assert_eq!(freq_central.len(), n - 2);
let mid_start = n / 4;
let mid_end = 3 * n / 4;
let forward_errors: Vec<f64> = freq_forward[mid_start..mid_end]
.iter()
.map(|&f| (f - freq).abs())
.collect();
let central_errors: Vec<f64> = freq_central[mid_start - 1..mid_end - 1]
.iter()
.map(|&f| (f - freq).abs())
.collect();
let avg_forward_error: f64 =
forward_errors.iter().sum::<f64>() / forward_errors.len() as f64;
let avg_central_error: f64 =
central_errors.iter().sum::<f64>() / central_errors.len() as f64;
assert!(
avg_central_error <= avg_forward_error + 0.5,
"Central ({avg_central_error:.4}) should be at least as accurate as forward ({avg_forward_error:.4})"
);
}
#[test]
fn test_chirp_instantaneous_frequency() {
let n = 1024;
let fs = 1000.0;
let f0 = 10.0; let f1 = 100.0; let duration = n as f64 / fs;
let signal: Vec<f64> = (0..n)
.map(|i| {
let t = i as f64 / fs;
let freq_at_t = f0 + (f1 - f0) * t / duration;
let phase = 2.0 * PI * (f0 * t + (f1 - f0) * t * t / (2.0 * duration));
phase.cos()
})
.collect();
let inst_freq =
instantaneous_frequency(&signal, fs).expect("Chirp inst freq should succeed");
let mid_start = n / 4;
let mid_end = 3 * n / 4;
let first_quarter_avg: f64 =
inst_freq[mid_start..n / 2].iter().sum::<f64>() / (n / 2 - mid_start) as f64;
let second_quarter_avg: f64 =
inst_freq[n / 2..mid_end].iter().sum::<f64>() / (mid_end - n / 2) as f64;
assert!(
second_quarter_avg > first_quarter_avg,
"Frequency should increase: second half ({second_quarter_avg:.1}) > first half ({first_quarter_avg:.1})"
);
}
#[test]
fn test_empty_signal_error() {
let empty: Vec<f64> = vec![];
assert!(analytic_signal(&empty).is_err());
assert!(envelope(&empty).is_err());
assert!(instantaneous_phase(&empty).is_err());
}
#[test]
fn test_short_signal_frequency_error() {
let short = vec![1.0];
assert!(instantaneous_frequency(&short, 100.0).is_err());
let very_short = vec![1.0, 2.0];
assert!(instantaneous_frequency_central(&very_short, 100.0).is_err());
}
#[test]
fn test_negative_fs_error() {
let signal = vec![1.0, 2.0, 3.0, 4.0];
assert!(instantaneous_frequency(&signal, -100.0).is_err());
assert!(instantaneous_frequency(&signal, 0.0).is_err());
}
#[test]
fn test_unwrap_phase() {
let wrapped = vec![
0.0, 1.0, 2.0, 3.0, -2.8, -1.8, -0.8, 0.2, 1.2, 2.2, 3.2, -2.6,
];
let unwrapped = unwrap_phase(&wrapped);
for i in 1..unwrapped.len() {
let diff = unwrapped[i] - unwrapped[i - 1];
assert!(
diff > -0.5 && diff < 2.0,
"Unwrapped difference should be smooth, got {diff} at index {i}"
);
}
}
#[test]
fn test_dc_signal_envelope() {
let signal = vec![3.0; 64];
let env = envelope(&signal).expect("DC envelope should succeed");
for &val in &env[8..56] {
assert_relative_eq!(val, 3.0, epsilon = 0.5);
}
}
}