#[derive(Debug)]
pub struct HiddenMarkovModel {
pub num_states: usize,
pub num_observations: usize,
pub initial_probabilities: Vec<f64>,
pub transition_probabilities: Vec<Vec<f64>>,
pub emission_probabilities: Vec<Vec<f64>>,
}
impl HiddenMarkovModel {
pub fn new(
num_states: usize,
num_observations: usize,
initial_probabilities: Vec<f64>,
transition_probabilities: Vec<Vec<f64>>,
emission_probabilities: Vec<Vec<f64>>,
) -> Self {
assert_eq!(initial_probabilities.len(), num_states);
assert_eq!(transition_probabilities.len(), num_states);
for row in &transition_probabilities {
assert_eq!(row.len(), num_states);
}
assert_eq!(emission_probabilities.len(), num_states);
for row in &emission_probabilities {
assert_eq!(row.len(), num_observations);
}
Self {
num_states,
num_observations,
initial_probabilities,
transition_probabilities,
emission_probabilities,
}
}
}
pub fn viterbi(hmm: &HiddenMarkovModel, observations: &[usize]) -> Vec<usize> {
if observations.is_empty() {
return Vec::new();
}
for &obs in observations {
assert!(
obs < hmm.num_observations,
"Observation {} is out of range (max = {})",
obs,
hmm.num_observations - 1
);
}
let t = observations.len();
let n = hmm.num_states;
let mut delta = vec![vec![f64::NEG_INFINITY; n]; t];
let mut psi = vec![vec![0_usize; n]; t];
for s in 0..n {
let init_log = hmm.initial_probabilities[s].ln();
let emission_log = hmm.emission_probabilities[s][observations[0]].ln();
delta[0][s] = init_log + emission_log;
psi[0][s] = 0; }
for time in 1..t {
let obs = observations[time];
for s in 0..n {
let emit_log = hmm.emission_probabilities[s][obs].ln();
let mut best_val = f64::NEG_INFINITY;
let mut best_prev = 0_usize;
for s_prev in 0..n {
let candidate =
delta[time - 1][s_prev] + hmm.transition_probabilities[s_prev][s].ln();
if candidate > best_val {
best_val = candidate;
best_prev = s_prev;
}
}
delta[time][s] = best_val + emit_log;
psi[time][s] = best_prev;
}
}
let mut best_final_score = f64::NEG_INFINITY;
let mut best_final_state = 0_usize;
for s in 0..n {
if delta[t - 1][s] > best_final_score {
best_final_score = delta[t - 1][s];
best_final_state = s;
}
}
let mut path = vec![0_usize; t];
path[t - 1] = best_final_state;
for time in (1..t).rev() {
path[time - 1] = psi[time][path[time]];
}
path
}
#[cfg(test)]
mod tests {
use super::*;
fn approx_eq(a: f64, b: f64, eps: f64) -> bool {
(a - b).abs() < eps
}
#[test]
fn test_viterbi_simple() {
let hmm = HiddenMarkovModel::new(
2,
3,
vec![0.6, 0.4],
vec![vec![0.7, 0.3], vec![0.4, 0.6]],
vec![vec![0.5, 0.4, 0.1], vec![0.1, 0.3, 0.6]],
);
let observations = vec![0_usize, 1, 2];
let path = viterbi(&hmm, &observations);
assert_eq!(path.len(), 3);
for &st in &path {
assert!(st < 2);
}
}
#[test]
fn test_viterbi_empty() {
let hmm = HiddenMarkovModel::new(
2,
3,
vec![1.0, 0.0],
vec![vec![1.0, 0.0], vec![0.0, 1.0]],
vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
);
let empty_obs: Vec<usize> = vec![];
let path = viterbi(&hmm, &empty_obs);
assert_eq!(path.len(), 0);
}
#[test]
fn test_probabilities_sum_one() {
let hmm = HiddenMarkovModel::new(
2,
3,
vec![0.6, 0.4],
vec![vec![0.7, 0.3], vec![0.4, 0.6]],
vec![vec![0.5, 0.4, 0.1], vec![0.1, 0.3, 0.6]],
);
for row in &hmm.transition_probabilities {
let sum: f64 = row.iter().sum();
assert!(approx_eq(sum, 1.0, 1e-9));
}
for row in &hmm.emission_probabilities {
let sum: f64 = row.iter().sum();
assert!(approx_eq(sum, 1.0, 1e-9));
}
let initial_sum: f64 = hmm.initial_probabilities.iter().sum();
assert!(approx_eq(initial_sum, 1.0, 1e-9));
}
}