extern crate alloc;
use alloc::vec::Vec;
use crate::matrix::vector::Vector;
use crate::traits::FloatScalar;
use crate::Matrix;
use super::{apply_var_floor, cholesky_with_jitter, EstimateError};
pub struct SrUkf<T: FloatScalar, const N: usize, const M: usize> {
pub x: Vector<T, N>,
pub s: Matrix<T, N, N>,
alpha: T,
beta: T,
kappa: T,
min_variance: T,
gamma: T,
}
impl<T: FloatScalar, const N: usize, const M: usize> SrUkf<T, N, M> {
pub fn new(x0: Vector<T, N>, s0: Matrix<T, N, N>) -> Self {
Self::with_params(x0, s0, T::one(), T::from(2.0).unwrap(), T::zero())
}
pub fn from_covariance(x0: Vector<T, N>, p0: Matrix<T, N, N>) -> Result<Self, EstimateError> {
let chol = cholesky_with_jitter(&p0)?;
Ok(Self::new(x0, chol.l_full()))
}
pub fn from_covariance_with_params(
x0: Vector<T, N>,
p0: Matrix<T, N, N>,
alpha: T,
beta: T,
kappa: T,
) -> Result<Self, EstimateError> {
let chol = cholesky_with_jitter(&p0)?;
Ok(Self::with_params(x0, chol.l_full(), alpha, beta, kappa))
}
pub fn with_params(x0: Vector<T, N>, s0: Matrix<T, N, N>, alpha: T, beta: T, kappa: T) -> Self {
Self {
x: x0,
s: s0,
alpha,
beta,
kappa,
min_variance: T::zero(),
gamma: T::one(),
}
}
pub fn with_min_variance(mut self, min_variance: T) -> Self {
self.min_variance = min_variance;
self
}
pub fn with_fading_memory(mut self, gamma: T) -> Self {
self.gamma = gamma;
self
}
#[inline]
pub fn state(&self) -> &Vector<T, N> {
&self.x
}
#[inline]
pub fn cholesky_factor(&self) -> &Matrix<T, N, N> {
&self.s
}
pub fn covariance(&self) -> Matrix<T, N, N> {
self.s * self.s.transpose()
}
fn weights(&self) -> (T, T, T) {
let n = T::from(N).unwrap();
let lambda = self.alpha * self.alpha * (n + self.kappa) - n;
let denom = n + lambda;
let wm_0 = lambda / denom;
let wc_0 = wm_0 + (T::one() - self.alpha * self.alpha + self.beta);
let w_i = T::one() / (T::from(2.0).unwrap() * denom);
(wm_0, wc_0, w_i)
}
fn scaled_cholesky(&self) -> Matrix<T, N, N> {
let n = T::from(N).unwrap();
let lambda = self.alpha * self.alpha * (n + self.kappa) - n;
self.s * (n + lambda).sqrt()
}
fn sigma_points(&self, scaled_s: &Matrix<T, N, N>) -> Vec<Vector<T, N>> {
let mut sigmas: Vec<Vector<T, N>> = Vec::with_capacity(2 * N + 1);
sigmas.push(self.x);
for i in 0..N {
let mut col = Vector::<T, N>::zeros();
for r in 0..N {
col[r] = scaled_s[(r, i)];
}
sigmas.push(self.x + col);
sigmas.push(self.x - col);
}
sigmas
}
pub fn predict(
&mut self,
f: impl Fn(&Vector<T, N>) -> Vector<T, N>,
q: Option<&Matrix<T, N, N>>,
) -> Result<(), EstimateError> {
let (wm_0, wc_0, w_i) = self.weights();
let scaled_s = self.scaled_cholesky();
let mut sigmas = self.sigma_points(&scaled_s);
for s in sigmas.iter_mut() {
*s = f(s);
}
let mut x_mean = Vector::<T, N>::zeros();
for r in 0..N {
x_mean[r] = wm_0 * sigmas[0][r];
}
for i in 0..N {
for r in 0..N {
x_mean[r] = x_mean[r] + w_i * (sigmas[2 * i + 1][r] + sigmas[2 * i + 2][r]);
}
}
let mut p_new = Matrix::<T, N, N>::zeros();
for i in 0..N {
let dp = sigmas[2 * i + 1] - x_mean;
let dm = sigmas[2 * i + 2] - x_mean;
for r in 0..N {
for c in 0..N {
p_new[(r, c)] = p_new[(r, c)] + w_i * (dp[r] * dp[c] + dm[r] * dm[c]);
}
}
}
let d0 = sigmas[0] - x_mean;
for r in 0..N {
for c in 0..N {
p_new[(r, c)] = p_new[(r, c)] + wc_0 * d0[r] * d0[c];
}
}
p_new *= self.gamma;
if let Some(q) = q {
p_new += *q;
}
let half = T::from(0.5).unwrap();
p_new = (p_new + p_new.transpose()) * half;
apply_var_floor(&mut p_new, self.min_variance);
let chol = cholesky_with_jitter(&p_new)?;
self.x = x_mean;
self.s = chol.l_full();
Ok(())
}
pub fn update(
&mut self,
z: &Vector<T, M>,
h: impl Fn(&Vector<T, N>) -> Vector<T, M>,
r: &Matrix<T, M, M>,
) -> Result<T, EstimateError> {
let (s_mat, s_inv, pxz, innovation, nis) = self.measurement_transform(z, &h, r)?;
self.apply_update(&s_mat, &s_inv, &pxz, &innovation)?;
Ok(nis)
}
#[allow(clippy::type_complexity)]
fn measurement_transform(
&self,
z: &Vector<T, M>,
h: &impl Fn(&Vector<T, N>) -> Vector<T, M>,
r: &Matrix<T, M, M>,
) -> Result<
(
Matrix<T, M, M>,
Matrix<T, M, M>,
Matrix<T, N, M>,
Vector<T, M>,
T,
),
EstimateError,
> {
let (wm_0, wc_0, w_i) = self.weights();
let scaled_s = self.scaled_cholesky();
let sigmas_x = self.sigma_points(&scaled_s);
let mut sigmas_z: Vec<Vector<T, M>> = Vec::with_capacity(2 * N + 1);
for sx in &sigmas_x {
sigmas_z.push(h(sx));
}
let mut z_mean = Vector::<T, M>::zeros();
for r in 0..M {
z_mean[r] = wm_0 * sigmas_z[0][r];
}
for i in 0..N {
for r in 0..M {
z_mean[r] = z_mean[r] + w_i * (sigmas_z[2 * i + 1][r] + sigmas_z[2 * i + 2][r]);
}
}
let mut pzz = Matrix::<T, M, M>::zeros();
let dz0 = sigmas_z[0] - z_mean;
for ri in 0..M {
for ci in 0..M {
pzz[(ri, ci)] = pzz[(ri, ci)] + wc_0 * dz0[ri] * dz0[ci];
}
}
for i in 0..N {
let dzp = sigmas_z[2 * i + 1] - z_mean;
let dzm = sigmas_z[2 * i + 2] - z_mean;
for ri in 0..M {
for ci in 0..M {
pzz[(ri, ci)] = pzz[(ri, ci)] + w_i * (dzp[ri] * dzp[ci] + dzm[ri] * dzm[ci]);
}
}
}
let s_mat = pzz + *r;
let mut pxz = Matrix::<T, N, M>::zeros();
let dx0 = sigmas_x[0] - self.x;
for ri in 0..N {
for ci in 0..M {
pxz[(ri, ci)] = pxz[(ri, ci)] + wc_0 * dx0[ri] * dz0[ci];
}
}
for i in 0..N {
let dxp = sigmas_x[2 * i + 1] - self.x;
let dxm = sigmas_x[2 * i + 2] - self.x;
let dzp = sigmas_z[2 * i + 1] - z_mean;
let dzm = sigmas_z[2 * i + 2] - z_mean;
for ri in 0..N {
for ci in 0..M {
pxz[(ri, ci)] = pxz[(ri, ci)] + w_i * (dxp[ri] * dzp[ci] + dxm[ri] * dzm[ci]);
}
}
}
let s_inv = cholesky_with_jitter(&s_mat)
.map_err(|_| EstimateError::SingularInnovation)?
.inverse();
let innovation = *z - z_mean;
let nis = (innovation.transpose() * s_inv * innovation)[(0, 0)];
Ok((s_mat, s_inv, pxz, innovation, nis))
}
fn apply_update(
&mut self,
s_mat: &Matrix<T, M, M>,
s_inv: &Matrix<T, M, M>,
pxz: &Matrix<T, N, M>,
innovation: &Vector<T, M>,
) -> Result<(), EstimateError> {
let k = *pxz * *s_inv;
self.x += k * *innovation;
let mut p_new = self.covariance() - k * *s_mat * k.transpose();
let half = T::from(0.5).unwrap();
p_new = (p_new + p_new.transpose()) * half;
apply_var_floor(&mut p_new, self.min_variance);
let chol = cholesky_with_jitter(&p_new)?;
self.s = chol.l_full();
Ok(())
}
pub fn update_gated(
&mut self,
z: &Vector<T, M>,
h: impl Fn(&Vector<T, N>) -> Vector<T, M>,
r: &Matrix<T, M, M>,
gate: T,
) -> Result<Option<T>, EstimateError> {
let (s_mat, s_inv, pxz, innovation, nis) = self.measurement_transform(z, &h, r)?;
if nis > gate {
return Ok(None);
}
self.apply_update(&s_mat, &s_inv, &pxz, &innovation)?;
Ok(Some(nis))
}
}