use super::pitch::{PITCH_GRID_MAX, PITCH_GRID_MIN};
pub const PYIN_WINDOW: usize = 160;
pub const PYIN_GRID_LEN: usize =
(PITCH_GRID_MAX as usize) - (PITCH_GRID_MIN as usize) + 1;
pub const PYIN_NUM_THRESHOLDS: usize = 100;
const PYIN_BETA_ALPHA: f64 = 2.0;
const PYIN_BETA_BETA: f64 = 18.0;
const D_FLOOR: f64 = 1e-12;
const E_OF_P_FLOOR: f64 = 0.0;
const E_OF_P_CEIL: f64 = 1.0;
const HMM_LOG_PITCH_SIGMA: f64 = 0.346_573_590_279_973;
const EMISSION_FLOOR: f64 = 1.0e-6;
const HMM_LEAK_ALPHA: f64 = 0.05;
fn beta_weights() -> [f64; PYIN_NUM_THRESHOLDS] {
let mut w = [0.0f64; PYIN_NUM_THRESHOLDS];
let mut total = 0.0f64;
for (i, slot) in w.iter_mut().enumerate() {
let s = (i as f64 + 0.5) / PYIN_NUM_THRESHOLDS as f64;
let pdf = s.powf(PYIN_BETA_ALPHA - 1.0) * (1.0 - s).powf(PYIN_BETA_BETA - 1.0);
*slot = pdf;
total += pdf;
}
if total > 0.0 {
for v in &mut w {
*v /= total;
}
}
w
}
fn difference_function(x: &[f64], tau_min: usize, tau_max: usize, w: usize) -> Vec<f64> {
let n = tau_max - tau_min + 1;
let mut d = vec![0.0f64; n];
for (idx, tau) in (tau_min..=tau_max).enumerate() {
let mut acc = 0.0f64;
for j in 0..w {
let diff = x[j] - x[j + tau];
acc += diff * diff;
}
d[idx] = acc;
}
d
}
fn cmndf(d_full: &[f64], tau_min: usize, tau_max: usize) -> Vec<f64> {
let n = tau_max - tau_min + 1;
let mut out = vec![1.0f64; n];
let mut running_sum = 0.0f64;
for tau in 1..=tau_max {
running_sum += d_full[tau];
if tau >= tau_min {
let mean = running_sum / tau as f64;
out[tau - tau_min] = if mean <= D_FLOOR {
1.0
} else {
d_full[tau] / mean
};
}
}
out
}
fn first_local_min_below(d_prime: &[f64], threshold: f64) -> Option<usize> {
if d_prime.is_empty() {
return None;
}
if d_prime[0] < threshold && (d_prime.len() == 1 || d_prime[0] < d_prime[1]) {
return Some(0);
}
for i in 1..d_prime.len() - 1 {
if d_prime[i] < threshold
&& d_prime[i] < d_prime[i - 1]
&& d_prime[i] <= d_prime[i + 1]
{
return Some(i);
}
}
if d_prime.len() >= 2
&& d_prime[d_prime.len() - 1] < threshold
&& d_prime[d_prime.len() - 1] < d_prime[d_prime.len() - 2]
{
return Some(d_prime.len() - 1);
}
None
}
fn parabolic_offset(grid: &[f64], idx: usize) -> f64 {
if idx == 0 || idx + 1 >= grid.len() {
return 0.0;
}
let y_m1 = grid[idx - 1];
let y_0 = grid[idx];
let y_p1 = grid[idx + 1];
let denom = y_m1 - 2.0 * y_0 + y_p1;
if denom.abs() < 1e-18 {
0.0
} else {
let off = 0.5 * (y_m1 - y_p1) / denom;
off.clamp(-0.5, 0.5)
}
}
#[inline]
fn grid_index_to_period(i: f64) -> f64 {
PITCH_GRID_MIN + i
}
struct PyinFrame {
prob: [f64; PYIN_GRID_LEN],
d_prime: Vec<f64>,
}
fn pyin_frame(x: &[f64]) -> PyinFrame {
let tau_min = PITCH_GRID_MIN as usize; let tau_max = PITCH_GRID_MAX as usize; let w = PYIN_WINDOW;
debug_assert!(x.len() >= w + tau_max,
"pyin input must cover W + τ_max samples ({})", w + tau_max);
let mut d_full = vec![0.0f64; tau_max + 1];
for tau in 1..=tau_max {
let mut acc = 0.0f64;
for j in 0..w {
let diff = x[j] - x[j + tau];
acc += diff * diff;
}
d_full[tau] = acc;
}
let _ = difference_function;
let d_prime = cmndf(&d_full, tau_min, tau_max);
debug_assert_eq!(d_prime.len(), PYIN_GRID_LEN);
let weights = beta_weights();
let mut prob = [0.0f64; PYIN_GRID_LEN];
let argmin_idx = {
let mut best = 0usize;
let mut best_v = d_prime[0];
for (i, &v) in d_prime.iter().enumerate().skip(1) {
if v < best_v {
best_v = v;
best = i;
}
}
best
};
for k in 0..PYIN_NUM_THRESHOLDS {
let s = (k as f64 + 0.5) / PYIN_NUM_THRESHOLDS as f64;
let idx = first_local_min_below(&d_prime, s).unwrap_or(argmin_idx);
prob[idx] += weights[k];
}
PyinFrame { prob, d_prime }
}
fn frame_to_pitch(frame: &PyinFrame, best: usize) -> (f64, f64) {
let off = parabolic_offset(&frame.d_prime, best);
let p_hat_i = grid_index_to_period(best as f64 + off)
.clamp(PITCH_GRID_MIN, PITCH_GRID_MAX);
let e_p_hat_i = frame.d_prime[best].clamp(E_OF_P_FLOOR, E_OF_P_CEIL);
(p_hat_i, e_p_hat_i)
}
pub fn run_pyin(x: &[f64]) -> (f64, f64) {
let frame = pyin_frame(x);
let best = {
let mut best = 0usize;
let mut best_p = frame.prob[0];
for (i, &p) in frame.prob.iter().enumerate().skip(1) {
if p > best_p {
best_p = p;
best = i;
}
}
best
};
frame_to_pitch(&frame, best)
}
#[derive(Clone, Debug)]
pub struct PyinHmmState {
log_alpha: [f64; PYIN_GRID_LEN],
primed: bool,
}
impl PyinHmmState {
pub const fn new() -> Self {
Self {
log_alpha: [0.0; PYIN_GRID_LEN],
primed: false,
}
}
pub fn reset(&mut self) {
self.log_alpha = [0.0; PYIN_GRID_LEN];
self.primed = false;
}
}
impl Default for PyinHmmState {
#[inline]
fn default() -> Self {
Self::new()
}
}
fn log_trans_table() -> &'static [[f64; PYIN_GRID_LEN]; PYIN_GRID_LEN] {
use std::sync::OnceLock;
static TABLE: OnceLock<Box<[[f64; PYIN_GRID_LEN]; PYIN_GRID_LEN]>> = OnceLock::new();
TABLE.get_or_init(|| {
let mut t = Box::new([[0.0f64; PYIN_GRID_LEN]; PYIN_GRID_LEN]);
let two_sigma_sq = 2.0 * HMM_LOG_PITCH_SIGMA * HMM_LOG_PITCH_SIGMA;
for i in 0..PYIN_GRID_LEN {
let tau_i = PITCH_GRID_MIN + i as f64;
let log_i = tau_i.ln();
for j in 0..PYIN_GRID_LEN {
let tau_j = PITCH_GRID_MIN + j as f64;
let dlog = log_i - tau_j.ln();
t[i][j] = -(dlog * dlog) / two_sigma_sq;
}
}
t
})
}
pub fn run_pyin_smoothed(x: &[f64], state: &mut PyinHmmState) -> (f64, f64) {
let frame = pyin_frame(x);
let mut log_emit = [0.0f64; PYIN_GRID_LEN];
let floor = EMISSION_FLOOR / PYIN_GRID_LEN as f64;
for i in 0..PYIN_GRID_LEN {
log_emit[i] = (frame.prob[i] + floor).ln();
}
if !state.primed {
state.log_alpha = log_emit;
state.primed = true;
let best = argmax_idx(&log_emit);
return frame_to_pitch(&frame, best);
}
let trans = log_trans_table();
let mut new_alpha = [0.0f64; PYIN_GRID_LEN];
let prior = &state.log_alpha;
let mut leaked = [0.0f64; PYIN_GRID_LEN];
for (dst, src) in leaked.iter_mut().zip(prior.iter()) {
*dst = src * (1.0 - HMM_LEAK_ALPHA);
}
for i in 0..PYIN_GRID_LEN {
let row = &trans[i];
let mut best = f64::NEG_INFINITY;
for j in 0..PYIN_GRID_LEN {
let cand = leaked[j] + row[j];
if cand > best {
best = cand;
}
}
new_alpha[i] = best + log_emit[i];
}
let max_val = new_alpha
.iter()
.copied()
.fold(f64::NEG_INFINITY, f64::max);
if max_val.is_finite() {
for v in &mut new_alpha {
*v -= max_val;
}
}
state.log_alpha = new_alpha;
let best = argmax_idx(&new_alpha);
frame_to_pitch(&frame, best)
}
#[inline]
fn argmax_idx(v: &[f64; PYIN_GRID_LEN]) -> usize {
let mut best = 0usize;
let mut best_v = v[0];
for (i, &x) in v.iter().enumerate().skip(1) {
if x > best_v {
best_v = x;
best = i;
}
}
best
}
#[cfg(test)]
mod tests {
use super::*;
use std::f64::consts::PI;
#[test]
fn pyin_locks_pure_cosine_200hz() {
let n = PYIN_WINDOW + PITCH_GRID_MAX as usize + 10;
let mut x = vec![0.0f64; n];
for (i, slot) in x.iter_mut().enumerate() {
*slot = (2.0 * PI * 200.0 * (i as f64) / 8000.0).cos();
}
let (p, e) = run_pyin(&x);
assert!((p - 40.0).abs() <= 0.5, "expected period ≈ 40, got {p}");
assert!(e < 0.05, "expected confident e_p, got {e}");
}
#[test]
fn pyin_locks_pure_cosine_300hz() {
let n = PYIN_WINDOW + PITCH_GRID_MAX as usize + 10;
let mut x = vec![0.0f64; n];
for (i, slot) in x.iter_mut().enumerate() {
*slot = (2.0 * PI * 300.0 * (i as f64) / 8000.0).cos();
}
let (p, e) = run_pyin(&x);
assert!((p - 26.67).abs() <= 0.6, "expected period ≈ 26.67, got {p}");
assert!(e < 0.05, "expected confident e_p, got {e}");
}
#[test]
fn pyin_silence_returns_high_e_p() {
let x = vec![0.0f64; PYIN_WINDOW + PITCH_GRID_MAX as usize + 10];
let (_p, e) = run_pyin(&x);
assert!(e >= 0.5, "silence should report high e_p; got {e}");
}
#[test]
fn pyin_grid_covers_pitch_range() {
assert_eq!(PYIN_GRID_LEN, 102); }
#[test]
fn pyin_smoothed_locks_stationary_cosine() {
let total = 8 * (PYIN_WINDOW + PITCH_GRID_MAX as usize);
let mut x = vec![0.0f64; total];
for (i, slot) in x.iter_mut().enumerate() {
*slot = (2.0 * PI * 200.0 * (i as f64) / 8000.0).cos();
}
let mut state = PyinHmmState::new();
let win = PYIN_WINDOW + PITCH_GRID_MAX as usize;
let hop = 160; let mut last_p = 0.0;
let mut frames = 0usize;
let mut start = 0usize;
while start + win <= x.len() {
let (p, e) = run_pyin_smoothed(&x[start..start + win], &mut state);
assert!((p - 40.0).abs() <= 0.5, "frame {frames}: p={p}");
assert!(e < 0.05, "frame {frames}: e={e}");
last_p = p;
start += hop;
frames += 1;
}
assert!(frames >= 2, "need at least 2 frames to exercise the HMM step");
assert!((last_p - 40.0).abs() <= 0.5);
}
#[test]
fn pyin_smoothed_silence_to_voice_onset() {
let win = PYIN_WINDOW + PITCH_GRID_MAX as usize;
let n_silent = 3;
let mut state = PyinHmmState::new();
let silent = vec![0.0f64; win];
for _ in 0..n_silent {
let _ = run_pyin_smoothed(&silent, &mut state);
}
let mut x = vec![0.0f64; win];
for (i, slot) in x.iter_mut().enumerate() {
*slot = (2.0 * PI * 200.0 * (i as f64) / 8000.0).cos();
}
let (p, e) = run_pyin_smoothed(&x, &mut state);
assert!((p - 40.0).abs() <= 2.0, "post-silence p={p}");
assert!(e < 0.1, "post-silence e={e}");
}
}