pyin 1.2.0

pYIN pitch detection algorithm written in Rust
Documentation
use std::cmp::Ord;
use std::fmt::Debug;
use std::ops::Add;

use approx::{abs_diff_eq, AbsDiffEq};
use ndarray::prelude::*;
use ndarray_stats::{MaybeNan, QuantileExt};
use realfft::num_traits::Float;

/// Viterbi decoding from observation likelihoods.
/// Given a sequence of observation likelihoods `prob[s, t]`,
/// indicating the conditional likelihood of seeing the observation
/// at time `t` from state `s`, and a transition matrix
/// `transition[[i, j]]` which encodes the conditional probability of
/// moving from state `i` to state `j`, the Viterbi algorithm [#]_ computes
/// the most likely sequence of states from the observations.
///
/// .. [#] Viterbi, Andrew. "Error bounds for convolutional codes and an
///     asymptotically optimum decoding algorithm."
///     IEEE transactions on Information Theory 13.2 (1967): 260-269.
///
/// # Arguments
///
/// * `prob` - shape=[n_states, n_steps], non-negative. `prob[[s, t]]` is the probability of observation at time `t` being generated by state `s`.
/// * `transition` - shape=[n_states, n_states], non-negative. `transition[[i, j]]` is the probability of a transition from `i`->`j`. Each row must sum to 1.
/// * `p_init` - If Some(arr), arr (shape=[n_states]) means initial state distribution. If None, a uniform distribution is assumed.
///
/// # Returns
///
/// returns (states, logp)
/// * `states` - shape=[n_steps]. The most likely state sequence.
/// * `logp` - the log probability of `states` given the observations.
///
/// # Examples
/// Example from https://en.wikipedia.org/wiki/Viterbi_algorithm#Example
///
/// In this example, we have two states ``healthy`` and ``fever``, with
/// initial probabilities 60% and 40%.
///
///
/// We have three observation possibilities: `normal`, `cold`, and
/// `dizzy`, whose probabilities given each state are:
///
/// `healthy => {normal: 50%, cold: 40%, dizzy: 10%}` and
/// `fever => {normal: 10%, cold: 30%, dizzy: 60%}`
///
/// Finally, we have transition probabilities:
///
/// `healthy => healthy (70%)` and
/// `fever => fever (60%)`.
///
/// Over three days, we observe the sequence `[normal, cold, dizzy]`,
/// and wish to know the maximum likelihood assignment of states for the
/// corresponding days, which we compute with the Viterbi algorithm below.
///
/// ```ignore
/// use ndarray::prelude::*;
/// let p_init = Array1::from_vec(vec![0.6f64, 0.4]);
/// let p_emit = Array2::from_shape_vec((2, 3), vec![0.5f64, 0.4, 0.1, 0.1, 0.3, 0.6]).unwrap();
/// let p_trans = Array2::from_shape_vec((2, 2), vec![0.7f64, 0.3, 0.4, 0.6]).unwrap();
/// let (path, logp) = viterbi(p_emit.view(), p_trans.view(), Some(p_init.into()));
/// assert!(path.into_iter().zip([0, 0, 1].into_iter()).all(|(x, y)| x == y));
/// assert!((logp - (-4.19173690823075)).abs() < 1e-14);
/// ```
pub(crate) fn viterbi<A>(
    prob: ArrayView2<A>,
    transition: ArrayView2<A>,
    p_init: Option<CowArray<A, Ix1>>,
) -> (Array1<usize>, A)
where
    A: Float + AbsDiffEq<Epsilon = A> + Add + MaybeNan + Debug,
    <A as MaybeNan>::NotNan: Ord,
{
    assert_eq!(prob.raw_dim()[0], transition.raw_dim()[0]);
    assert_eq!(transition.raw_dim()[0], transition.raw_dim()[1]);
    assert!(
        transition.iter().all(|&x| x >= A::zero())
            && transition.sum_axis(Axis(1)).abs_diff_eq(
                &Array::from_elem(transition.shape()[0], A::one()),
                A::from(1e-6).unwrap(),
            ),
        "Invalid transition matrix: must be non-negative and sum to 1 on each row.",
    );
    assert!(
        prob.iter().all(|&x| x >= A::zero() && x <= A::one()),
        "Invalid probability values: must be between 0 and 1."
    );

    let n_states = prob.shape()[0];
    let n_steps = prob.shape()[1];

    let mut states = Array1::<usize>::zeros(n_steps);
    let mut values = Array2::<A>::zeros((n_steps, n_states));
    let mut ptr = Array2::<usize>::zeros((n_steps, n_states));

    // Compute log-likelihoods while avoiding log-underflow
    let epsilon = A::min_positive_value();
    let log_trans = transition.mapv(|x| (x + epsilon).ln());
    let log_prob = prob.t().mapv(|x| (x + epsilon).ln());

    let p_init = match p_init {
        Some(p_init) => {
            assert!(
                p_init.raw_dim() == Dim(n_states)
                    && p_init.iter().all(|&x| x >= A::zero())
                    && abs_diff_eq!(p_init.sum(), A::one(), epsilon = A::from(1e-6).unwrap()),
                "Invalid initial state distribution: p_init={:?}",
                p_init
            );
            p_init
        }
        None => Array1::from_elem(n_states, A::one() / A::from(n_states).unwrap()).into(),
    };
    let log_p_init = p_init.mapv(|x| (x + epsilon).ln());

    // _viterbi
    // factor in initial state distribution
    values
        .slice_mut(s![0, ..])
        .assign(&(log_p_init + log_prob.slice(s![0, ..])));

    for t in 1..n_steps {
        /* Want V[t, j] <- p[t, j] * max_k V[t-1, k] * A[k, j]
        assume at time t-1 we were in state k
        transition k -> j */

        /* Broadcast over rows:
        Tout[k, j] = V[t-1, k] * A[k, j]
        then take the max over columns
        We'll do this in log-space for stability */
        let trans_out = &values.slice(s![t - 1..t, ..]) + &log_trans.t();
        values
            .slice_mut(s![t, ..])
            .iter_mut()
            .enumerate()
            .for_each(|(j, x)| {
                ptr[[t, j]] = trans_out.slice(s![j, ..]).argmax_skipnan().unwrap();
                *x = log_prob[[t, j]] + trans_out[[j, ptr[[t, j]]]];
            })
    }
    // Now roll backward

    // Get the last state
    *states.last_mut().unwrap() = values.slice(s![-1, ..]).argmax_skipnan().unwrap();

    for t in (0..n_steps - 1).rev() {
        states[t] = ptr[[t + 1, states[t + 1]]];
    }

    let (i, j) = (values.shape()[0] - 1, states[states.shape()[0] - 1]);
    (states, values[[i, j]])
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_viterbi() {
        let p_init = Array1::from_vec(vec![0.6f64, 0.4]);
        let p_emit = Array2::from_shape_vec((2, 3), vec![0.5f64, 0.4, 0.1, 0.1, 0.3, 0.6]).unwrap();
        let p_trans = Array2::from_shape_vec((2, 2), vec![0.7f64, 0.3, 0.4, 0.6]).unwrap();
        let (path, logp) = viterbi(p_emit.view(), p_trans.view(), Some(p_init.into()));
        assert!(path
            .into_iter()
            .zip([0, 0, 1].into_iter())
            .all(|(x, y)| x == y));
        assert!((logp - (-4.19173690823075)).abs() < 1e-14);
    }
}