use std::cmp::Ordering;
use ndarray::prelude::*;
use num_traits::Zero;
use ordered_float::OrderedFloat;
use statrs::distribution::Continuous;
use super::LogProb;
quick_error! {
#[derive(Debug, PartialEq)]
pub enum HMMError {
InvalidDimension(an0: usize, an1: usize, bn: usize, bm: usize, pin: usize) {
description("invalid dimensions on construction")
display(
"inferred from A: N_0={}, N_1={} (must be equal), from B: N={}, M={}, from \
pi: N={}", an0, an1, bn, bm, pin)
}
}
}
custom_derive! {
#[derive(
NewtypeFrom,
NewtypeDeref,
PartialEq,
Copy,
Clone,
Debug
)]
pub struct State(pub usize);
}
pub struct StateIter {
nxt: usize,
max: usize,
}
impl StateIter {
pub fn new(num_states: usize) -> Self {
Self {
nxt: 0,
max: num_states,
}
}
}
impl Iterator for StateIter {
type Item = State;
fn next(&mut self) -> Option<State> {
if self.nxt < self.max {
let cur = self.nxt;
self.nxt += 1;
Some(State(cur))
} else {
None
}
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct StateTransition {
pub src: State,
pub dst: State,
}
impl StateTransition {
pub fn new(src: State, dst: State) -> Self {
Self { src, dst }
}
}
pub struct StateTransitionIter {
nxt_a: usize,
nxt_b: usize,
max: usize,
}
impl StateTransitionIter {
pub fn new(num_states: usize) -> Self {
Self {
nxt_a: 0,
nxt_b: 0,
max: num_states,
}
}
}
impl Iterator for StateTransitionIter {
type Item = StateTransition;
fn next(&mut self) -> Option<StateTransition> {
let cur_b = self.nxt_b;
let cur_a = self.nxt_a;
if self.nxt_b < self.max {
self.nxt_b += 1;
Some(StateTransition::new(State(cur_a), State(cur_b)))
} else if self.nxt_a < self.max {
self.nxt_b = 0;
self.nxt_a += 1;
Some(StateTransition::new(State(cur_a), State(cur_b)))
} else {
None
}
}
}
pub trait Model<Observation> {
fn num_states(&self) -> usize;
fn states(&self) -> StateIter;
fn transitions(&self) -> StateTransitionIter;
fn transition_prob(&self, from: State, to: State) -> LogProb;
fn transition_prob_idx(&self, from: State, to: State, _to_idx: usize) -> LogProb {
self.transition_prob(from, to)
}
fn initial_prob(&self, state: State) -> LogProb;
fn observation_prob(&self, state: State, observation: &Observation) -> LogProb;
}
fn viterbi_matrices<O, M: Model<O>>(
hmm: &M,
observations: &[O],
) -> (Array2<LogProb>, Array2<usize>) {
let mut vals = Array2::<LogProb>::zeros((observations.len(), hmm.num_states()));
let mut from = Array2::<usize>::zeros((observations.len(), hmm.num_states()));
for (i, o) in observations.iter().enumerate() {
if i == 0 {
for s in hmm.states() {
vals[[0, *s]] = hmm.initial_prob(s) + hmm.observation_prob(s, o);
from[[0, *s]] = *s;
}
} else {
for j in hmm.states() {
let x = vals
.index_axis(Axis(0), i - 1)
.iter()
.enumerate()
.map(|(a, p)| (State(a), p))
.max_by(|(a, &x), (b, &y)| {
if x.is_zero() && y.is_zero() {
Ordering::Equal
} else if x.is_zero() {
Ordering::Less
} else if y.is_zero() {
Ordering::Greater
} else {
(x + hmm.transition_prob_idx(*a, j, i))
.partial_cmp(&(y + hmm.transition_prob_idx(*b, j, i)))
.unwrap()
}
})
.map(|(x, y)| (x, *y))
.unwrap();
vals[[i, *j]] =
x.1 + hmm.transition_prob_idx(x.0, j, i) + hmm.observation_prob(j, o);
from[[i, *j]] = *x.0;
}
}
}
(vals, from)
}
fn viterbi_traceback(vals: Array2<LogProb>, from: Array2<usize>) -> (Vec<State>, LogProb) {
let n = vals.len_of(Axis(0));
let mut result: Vec<State> = Vec::new();
let mut curr = 0;
let mut res_prob = LogProb::ln_zero();
for (i, col) in vals.axis_iter(Axis(0)).rev().enumerate() {
if i == 0 {
let tmp = col
.iter()
.enumerate()
.max_by_key(|&(_, item)| OrderedFloat(**item))
.unwrap();
curr = tmp.0;
res_prob = *tmp.1;
} else {
curr = from[[n - i, curr]];
}
result.push(State(curr));
}
result.reverse();
(result, res_prob)
}
pub fn viterbi<O, M: Model<O>>(hmm: &M, observations: &[O]) -> (Vec<State>, LogProb) {
let (vals, from) = viterbi_matrices(hmm, observations);
viterbi_traceback(vals, from)
}
pub fn forward<O, M: Model<O>>(hmm: &M, observations: &[O]) -> (Array2<LogProb>, LogProb) {
let mut vals = Array2::<LogProb>::zeros((observations.len(), hmm.num_states()));
for (i, o) in observations.iter().enumerate() {
if i == 0 {
for s in hmm.states() {
vals[[0, *s]] = hmm.initial_prob(s) + hmm.observation_prob(s, o);
}
} else {
for j in hmm.states() {
let xs = hmm
.states()
.map(|k| {
vals[[i - 1, *k]]
+ hmm.transition_prob_idx(k, j, i)
+ hmm.observation_prob(j, o)
})
.collect::<Vec<LogProb>>();
vals[[i, *j]] = LogProb::ln_sum_exp(&xs);
}
}
}
let prob = LogProb::ln_sum_exp(vals.row(observations.len() - 1).into_slice().unwrap());
(vals, prob)
}
pub fn backward<O, M: Model<O>>(hmm: &M, observations: &[O]) -> (Array2<LogProb>, LogProb) {
let mut vals = Array2::<LogProb>::zeros((observations.len(), hmm.num_states()));
let n = observations.len();
for (i, o) in observations.iter().rev().enumerate() {
if i == 0 {
for j in hmm.states() {
let maybe_initial = if i == observations.len() - 1 {
hmm.initial_prob(j)
} else {
LogProb::ln_one()
};
vals[[0, *j]] = LogProb::ln_one() + hmm.observation_prob(j, o) + maybe_initial;
}
} else {
for j in hmm.states() {
let maybe_initial = if i == observations.len() - 1 {
hmm.initial_prob(j)
} else {
LogProb::ln_one()
};
let xs = hmm
.states()
.map(|k| {
vals[[i - 1, *k]]
+ hmm.transition_prob_idx(j, k, n - i)
+ hmm.observation_prob(j, o)
+ maybe_initial
})
.collect::<Vec<LogProb>>();
vals[[i, *j]] = LogProb::ln_sum_exp(&xs);
}
}
}
let prob = LogProb::ln_sum_exp(vals.row(observations.len() - 1).into_slice().unwrap());
(vals, prob)
}
pub mod discrete_emission {
use super::super::{LogProb, Prob};
use super::*;
#[derive(Debug, PartialEq)]
pub struct Model {
transition: Array2<LogProb>,
observation: Array2<LogProb>,
initial: Array1<LogProb>,
}
impl Model {
pub fn new(
transition: Array2<LogProb>,
observation: Array2<LogProb>,
initial: Array1<LogProb>,
) -> Result<Self, HMMError> {
let (an0, an1) = transition.dim();
let (bn, bm) = observation.dim();
let pin = initial.dim();
if an0 != an1 || an0 != bn || an0 != pin {
Err(HMMError::InvalidDimension(an0, an1, bn, bm, pin))
} else {
Ok(Self {
transition,
observation,
initial,
})
}
}
pub fn with_prob(
transition: &Array2<Prob>,
observation: &Array2<Prob>,
initial: &Array1<Prob>,
) -> Result<Self, HMMError> {
Self::new(
transition.map(|x| LogProb::from(*x)),
observation.map(|x| LogProb::from(*x)),
initial.map(|x| LogProb::from(*x)),
)
}
pub fn with_float(
transition: &Array2<f64>,
observation: &Array2<f64>,
initial: &Array1<f64>,
) -> Result<Self, HMMError> {
Self::new(
transition.map(|x| LogProb::from(Prob(*x))),
observation.map(|x| LogProb::from(Prob(*x))),
initial.map(|x| LogProb::from(Prob(*x))),
)
}
}
impl super::Model<usize> for Model {
fn num_states(&self) -> usize {
self.transition.dim().0
}
fn states(&self) -> StateIter {
StateIter {
nxt: 0,
max: self.num_states(),
}
}
fn transitions(&self) -> StateTransitionIter {
StateTransitionIter {
nxt_a: 0,
nxt_b: 0,
max: self.num_states(),
}
}
fn transition_prob(&self, from: State, to: State) -> LogProb {
self.transition[[*from, *to]]
}
fn initial_prob(&self, state: State) -> LogProb {
self.initial[[*state]]
}
fn observation_prob(&self, state: State, observation: &usize) -> LogProb {
self.observation[[*state, *observation]]
}
}
}
pub mod univariate_continuous_emission {
use super::super::{LogProb, Prob};
use super::*;
use statrs;
pub struct Model<Dist: Continuous<f64, f64>> {
transition: Array2<LogProb>,
observation: Vec<Dist>,
initial: Array1<LogProb>,
}
impl<Dist: Continuous<f64, f64>> Model<Dist> {
pub fn new(
transition: Array2<LogProb>,
observation: Vec<Dist>,
initial: Array1<LogProb>,
) -> Result<Self, HMMError> {
let (an0, an1) = transition.dim();
let bn = observation.len();
let pin = initial.dim();
if an0 != an1 || an0 != bn || an0 != pin {
Err(HMMError::InvalidDimension(an0, an1, bn, bn, pin))
} else {
Ok(Self {
transition,
observation,
initial,
})
}
}
pub fn with_prob(
transition: &Array2<Prob>,
observation: Vec<Dist>,
initial: &Array1<Prob>,
) -> Result<Self, HMMError> {
Self::new(
transition.map(|x| LogProb::from(*x)),
observation,
initial.map(|x| LogProb::from(*x)),
)
}
pub fn with_float(
transition: &Array2<f64>,
observation: Vec<Dist>,
initial: &Array1<f64>,
) -> Result<Self, HMMError> {
Self::new(
transition.map(|x| LogProb::from(Prob(*x))),
observation,
initial.map(|x| LogProb::from(Prob(*x))),
)
}
}
impl<Dist: Continuous<f64, f64>> super::Model<f64> for Model<Dist> {
fn num_states(&self) -> usize {
self.transition.dim().0
}
fn states(&self) -> StateIter {
StateIter {
nxt: 0,
max: self.num_states(),
}
}
fn transitions(&self) -> StateTransitionIter {
StateTransitionIter {
nxt_a: 0,
nxt_b: 0,
max: self.num_states(),
}
}
fn transition_prob(&self, from: State, to: State) -> LogProb {
self.transition[[*from, *to]]
}
fn initial_prob(&self, state: State) -> LogProb {
self.initial[[*state]]
}
fn observation_prob(&self, state: State, observation: &f64) -> LogProb {
LogProb::from(Prob::from(self.observation[*state].pdf(*observation)))
}
}
pub type GaussianModel = Model<statrs::distribution::Normal>;
}
#[cfg(test)]
mod tests {
use super::super::Prob;
use ndarray::array;
use statrs::distribution::Normal;
use super::discrete_emission::Model as DiscreteEmissionHMM;
use super::univariate_continuous_emission::GaussianModel as GaussianHMM;
use super::*;
#[test]
fn test_discrete_viterbi_toy_example() {
let transition = array![[0.5, 0.5], [0.4, 0.6]];
let observation = array![[0.2, 0.3, 0.3, 0.2], [0.3, 0.2, 0.2, 0.3]];
let initial = array![0.5, 0.5];
let hmm = DiscreteEmissionHMM::with_float(&transition, &observation, &initial)
.expect("Dimensions should be consistent");
let (path, log_prob) = viterbi(&hmm, &vec![2, 2, 1, 0, 1, 3, 2, 0, 0]);
let prob = Prob::from(log_prob);
let expected = vec![0, 0, 0, 1, 1, 1, 1, 1, 1]
.iter()
.map(|i| State(*i))
.collect::<Vec<State>>();
assert_eq!(expected, path);
assert_relative_eq!(4.25e-8_f64, *prob, epsilon = 1e-9_f64);
}
#[test]
fn test_discrete_forward_toy_example() {
let transition = array![[0.5, 0.5], [0.4, 0.6]];
let observation = array![[0.2, 0.3, 0.3, 0.2], [0.3, 0.2, 0.2, 0.3]];
let initial = array![0.5, 0.5];
let hmm = DiscreteEmissionHMM::with_float(&transition, &observation, &initial)
.expect("Dimensions should be consistent");
let log_prob = forward(&hmm, &vec![2, 2, 1, 0]).1;
let prob = Prob::from(log_prob);
assert_relative_eq!(0.0038432_f64, *prob, epsilon = 0.0001);
}
#[test]
fn test_discrete_backward_toy_example() {
let transition = array![[0.5, 0.5], [0.4, 0.6]];
let observation = array![[0.2, 0.3, 0.3, 0.2], [0.3, 0.2, 0.2, 0.3]];
let initial = array![0.5, 0.5];
let hmm = DiscreteEmissionHMM::with_float(&transition, &observation, &initial)
.expect("Dimensions should be consistent");
let log_prob = backward(&hmm, &vec![2, 2, 1, 0]).1;
let prob = Prob::from(log_prob);
assert_relative_eq!(0.0038432_f64, *prob, epsilon = 0.0001);
}
#[test]
fn test_discrete_forward_equals_backward_toy_example() {
let transition = array![[0.5, 0.5], [0.4, 0.6]];
let observation = array![[0.2, 0.3, 0.3, 0.2], [0.3, 0.2, 0.2, 0.3]];
let initial = array![0.5, 0.5];
let hmm = DiscreteEmissionHMM::with_float(&transition, &observation, &initial)
.expect("Dimensions should be consistent");
for len in 1..10 {
let mut seq: Vec<usize> = vec![0; len];
while seq.iter().sum::<usize>() != len {
for i in 0..len {
if seq[i] == 0 {
seq[i] = 1;
break;
} else {
seq[i] = 0;
}
}
let prob_fwd = *Prob::from(forward(&hmm, &seq).1);
let prob_bck = *Prob::from(backward(&hmm, &seq).1);
assert_relative_eq!(prob_fwd, prob_bck, epsilon = 0.00001);
}
}
}
#[test]
fn test_gaussian_viterbi_simple_example() {
let transition = array![[0.5, 0.5], [0.4, 0.6]];
let observation = vec![
Normal::new(0.0, 1.0).unwrap(),
Normal::new(2.0, 1.0).unwrap(),
];
let initial = array![0.5, 0.5];
let hmm = GaussianHMM::with_float(&transition, observation, &initial)
.expect("Dimensions should be consistent");
let (path, log_prob) = viterbi(
&hmm,
&vec![-0.1, 0.1, -0.2, 0.5, 0.8, 1.1, 1.2, 1.5, 0.5, 0.2],
);
let prob = Prob::from(log_prob);
let expected = vec![0, 0, 0, 0, 0, 1, 1, 1, 0, 0]
.iter()
.map(|i| State(*i))
.collect::<Vec<State>>();
assert_eq!(expected, path);
assert_relative_eq!(2.64e-8_f64, *prob, epsilon = 1e-9_f64);
}
#[test]
fn test_gaussian_forward_simple_example() {
let transition = array![[0.5, 0.5], [0.4, 0.6]];
let observation = vec![
Normal::new(0.0, 1.0).unwrap(),
Normal::new(2.0, 1.0).unwrap(),
];
let initial = array![0.5, 0.5];
let hmm = GaussianHMM::with_float(&transition, observation, &initial)
.expect("Dimensions should be consistent");
let log_prob = forward(&hmm, &vec![0.1, 1.5, 1.8, 2.2, 0.5]).1;
let prob = Prob::from(log_prob);
assert_relative_eq!(7.820e-4_f64, *prob, epsilon = 1e-5_f64);
}
#[test]
fn test_gaussian_backward_simple_example() {
let transition = array![[0.5, 0.5], [0.4, 0.6]];
let observation = vec![
Normal::new(0.0, 1.0).unwrap(),
Normal::new(2.0, 1.0).unwrap(),
];
let initial = array![0.5, 0.5];
let hmm = GaussianHMM::with_float(&transition, observation, &initial)
.expect("Dimensions should be consistent");
let log_prob = backward(&hmm, &vec![0.1, 1.5, 1.8, 2.2, 0.5]).1;
let prob = Prob::from(log_prob);
assert_relative_eq!(7.820e-4_f64, *prob, epsilon = 1e-5_f64);
}
#[test]
fn test_gaussian_forward_equals_backward_simple_example() {
let transition = array![[0.5, 0.5], [0.4, 0.6]];
let observation = vec![
Normal::new(0.0, 1.0).unwrap(),
Normal::new(2.0, 1.0).unwrap(),
];
let initial = array![0.5, 0.5];
let hmm = GaussianHMM::with_float(&transition, observation, &initial)
.expect("Dimensions should be consistent");
let seqs = vec![vec![0.1, 0.5, 1.0, 1.5, 1.8, 2.1]];
for seq in &seqs {
let prob_fwd = *Prob::from(forward(&hmm, &seq).1);
let prob_bck = *Prob::from(backward(&hmm, &seq).1);
assert_relative_eq!(prob_fwd, prob_bck, epsilon = 0.00001);
}
}
}