#[cfg(feature = "alloc")]
extern crate alloc;
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::marker::PhantomData;
use nalgebra::{RealField, SMatrix, SVector};
use num_traits::Float;
use crate::models::{NonlinearObservationModel, NonlinearTransitionModel, TransitionModel};
use crate::types::spaces::{Measurement, StateCovariance, StateVector};
pub use super::kalman::KalmanState as UkfState;
#[derive(Debug, Clone, Copy)]
pub struct UkfParams<T: RealField> {
pub alpha: T,
pub beta: T,
pub kappa: T,
}
impl<T: RealField + Float> Default for UkfParams<T> {
fn default() -> Self {
Self {
alpha: T::from_f64(1e-3).unwrap(),
beta: T::from_f64(2.0).unwrap(),
kappa: T::zero(),
}
}
}
impl<T: RealField + Float + Copy> UkfParams<T> {
pub fn new(alpha: T, beta: T, kappa: T) -> Self {
assert!(alpha > T::zero(), "Alpha must be positive");
Self { alpha, beta, kappa }
}
#[inline]
fn lambda(&self, n: usize) -> T {
let n_t = T::from_usize(n).unwrap();
self.alpha * self.alpha * (n_t + self.kappa) - n_t
}
#[inline]
fn gamma(&self, n: usize) -> T {
let n_t = T::from_usize(n).unwrap();
Float::sqrt(n_t + self.lambda(n))
}
#[inline]
fn weight_mean_0(&self, n: usize) -> T {
let n_t = T::from_usize(n).unwrap();
self.lambda(n) / (n_t + self.lambda(n))
}
#[inline]
fn weight_cov_0(&self, n: usize) -> T {
let n_t = T::from_usize(n).unwrap();
self.lambda(n) / (n_t + self.lambda(n)) + (T::one() - self.alpha * self.alpha + self.beta)
}
#[inline]
fn weight_i(&self, n: usize) -> T {
let n_t = T::from_usize(n).unwrap();
T::one() / (T::from_f64(2.0).unwrap() * (n_t + self.lambda(n)))
}
}
#[cfg(feature = "alloc")]
#[derive(Debug, Clone)]
pub struct SigmaPoints<T: RealField, const N: usize> {
pub points: Vec<StateVector<T, N>>,
pub weight_mean_0: T,
pub weight_cov_0: T,
pub weight_i: T,
}
#[cfg(feature = "alloc")]
impl<T: RealField + Float + Copy, const N: usize> SigmaPoints<T, N> {
pub fn generate(state: &UkfState<T, N>, params: &UkfParams<T>) -> Option<Self> {
let gamma = params.gamma(N);
let sqrt_p = state.covariance.cholesky()?;
let scaled_sqrt_p = sqrt_p.scale(gamma);
let num_points = 2 * N + 1;
let mut points = Vec::with_capacity(num_points);
points.push(state.mean);
for i in 0..N {
let col = scaled_sqrt_p.column(i);
let offset = StateVector::from_svector(col.into_owned());
points.push(StateVector::from_svector(
state.mean.as_svector() + offset.as_svector(),
));
points.push(StateVector::from_svector(
state.mean.as_svector() - offset.as_svector(),
));
}
Some(Self {
points,
weight_mean_0: params.weight_mean_0(N),
weight_cov_0: params.weight_cov_0(N),
weight_i: params.weight_i(N),
})
}
pub fn recover_mean<const D: usize, F>(&self, transform: F) -> SVector<T, D>
where
F: Fn(&StateVector<T, N>) -> SVector<T, D>,
{
let mut mean = transform(&self.points[0]).scale(self.weight_mean_0);
for point in self.points.iter().skip(1) {
mean += transform(point).scale(self.weight_i);
}
mean
}
pub fn recover_mean_cov<const D: usize, F>(
&self,
transform: F,
additive_noise: Option<&SMatrix<T, D, D>>,
) -> (SVector<T, D>, SMatrix<T, D, D>)
where
F: Fn(&StateVector<T, N>) -> SVector<T, D>,
{
let transformed: Vec<SVector<T, D>> = self.points.iter().map(transform).collect();
let mut mean = transformed[0].scale(self.weight_mean_0);
for t in transformed.iter().skip(1) {
mean += t.scale(self.weight_i);
}
let diff0 = transformed[0] - mean;
let mut cov = (diff0 * diff0.transpose()).scale(self.weight_cov_0);
for t in transformed.iter().skip(1) {
let diff = t - mean;
cov += (diff * diff.transpose()).scale(self.weight_i);
}
if let Some(noise) = additive_noise {
cov += noise;
}
(mean, cov)
}
pub fn cross_covariance<const D: usize, F>(
&self,
state_mean: &SVector<T, N>,
transform: F,
transformed_mean: &SVector<T, D>,
) -> SMatrix<T, N, D>
where
F: Fn(&StateVector<T, N>) -> SVector<T, D>,
{
let state_diff0 = self.points[0].as_svector() - state_mean;
let trans_diff0 = transform(&self.points[0]) - transformed_mean;
let mut cross_cov = (state_diff0 * trans_diff0.transpose()).scale(self.weight_cov_0);
for point in self.points.iter().skip(1) {
let state_diff = point.as_svector() - state_mean;
let trans_diff = transform(point) - transformed_mean;
cross_cov += (state_diff * trans_diff.transpose()).scale(self.weight_i);
}
cross_cov
}
}
#[cfg(feature = "alloc")]
#[derive(Debug, Clone)]
pub struct UnscentedKalmanFilter<T, Trans, Obs, const N: usize, const M: usize>
where
T: RealField,
Trans: NonlinearTransitionModel<T, N>,
Obs: NonlinearObservationModel<T, N, M>,
{
pub transition: Trans,
pub observation: Obs,
pub params: UkfParams<T>,
_marker: PhantomData<T>,
}
#[cfg(feature = "alloc")]
impl<T, Trans, Obs, const N: usize, const M: usize> UnscentedKalmanFilter<T, Trans, Obs, N, M>
where
T: RealField + Float + Copy,
Trans: NonlinearTransitionModel<T, N>,
Obs: NonlinearObservationModel<T, N, M>,
{
#[inline]
pub fn new(transition: Trans, observation: Obs, params: UkfParams<T>) -> Self {
Self {
transition,
observation,
params,
_marker: PhantomData,
}
}
#[inline]
pub fn with_default_params(transition: Trans, observation: Obs) -> Self {
Self::new(transition, observation, UkfParams::default())
}
pub fn predict(&self, state: &UkfState<T, N>, dt: T) -> Option<UkfState<T, N>> {
let sigma_points = SigmaPoints::generate(state, &self.params)?;
let q = self.transition.process_noise(dt);
let (mean, cov) = sigma_points.recover_mean_cov(
|x| self.transition.predict_nonlinear(x, dt).into_svector(),
Some(q.as_matrix()),
);
Some(UkfState {
mean: StateVector::from_svector(mean),
covariance: StateCovariance::from_matrix(cov),
})
}
pub fn predict_unchecked(&self, state: &UkfState<T, N>, dt: T) -> UkfState<T, N> {
self.predict(state, dt)
.expect("UKF prediction failed: covariance not positive definite")
}
pub fn update(
&self,
state: &UkfState<T, N>,
measurement: &Measurement<T, M>,
) -> Option<UkfState<T, N>> {
let sigma_points = SigmaPoints::generate(state, &self.params)?;
let r = self.observation.measurement_noise();
let (z_mean, z_cov) = sigma_points.recover_mean_cov(
|x| self.observation.observe(x).into_svector(),
Some(r.as_matrix()),
);
let cross_cov = sigma_points.cross_covariance(
state.mean.as_svector(),
|x| self.observation.observe(x).into_svector(),
&z_mean,
);
let z_cov_inv = z_cov.try_inverse()?;
let kalman_gain = cross_cov * z_cov_inv;
let innovation = measurement.as_svector() - z_mean;
let updated_mean = state.mean.as_svector() + kalman_gain * innovation;
let updated_cov =
state.covariance.as_matrix() - kalman_gain * z_cov * kalman_gain.transpose();
Some(UkfState {
mean: StateVector::from_svector(updated_mean),
covariance: StateCovariance::from_matrix(updated_cov),
})
}
pub fn step(
&self,
state: &UkfState<T, N>,
dt: T,
measurement: &Measurement<T, M>,
) -> Option<UkfState<T, N>> {
let predicted = self.predict(state, dt)?;
self.update(&predicted, measurement)
}
pub fn measurement_likelihood(
&self,
state: &UkfState<T, N>,
measurement: &Measurement<T, M>,
) -> Option<T> {
let sigma_points = SigmaPoints::generate(state, &self.params)?;
let r = self.observation.measurement_noise();
let (z_mean, z_cov) = sigma_points.recover_mean_cov(
|x| self.observation.observe(x).into_svector(),
Some(r.as_matrix()),
);
let innovation = measurement.as_svector() - z_mean;
let innovation_typed =
crate::types::spaces::Innovation::from_svector(innovation.clone_owned());
let cov_typed = crate::types::spaces::Covariance::from_matrix(z_cov);
crate::types::gaussian::gaussian_likelihood(&innovation_typed, &cov_typed)
}
pub fn mahalanobis_distance_squared(
&self,
state: &UkfState<T, N>,
measurement: &Measurement<T, M>,
) -> Option<T> {
let sigma_points = SigmaPoints::generate(state, &self.params)?;
let r = self.observation.measurement_noise();
let (z_mean, z_cov) = sigma_points.recover_mean_cov(
|x| self.observation.observe(x).into_svector(),
Some(r.as_matrix()),
);
let z_cov_inv = z_cov.try_inverse()?;
let innovation = measurement.as_svector() - z_mean;
let d_sq = (innovation.transpose() * z_cov_inv * innovation)[(0, 0)];
Some(d_sq)
}
}
#[cfg(feature = "alloc")]
#[derive(Debug, Clone)]
pub struct UkfLinearTransition<T, Trans, Obs, const N: usize, const M: usize>
where
T: RealField,
Trans: TransitionModel<T, N>,
Obs: NonlinearObservationModel<T, N, M>,
{
pub transition: Trans,
pub observation: Obs,
pub params: UkfParams<T>,
_marker: PhantomData<T>,
}
#[cfg(feature = "alloc")]
impl<T, Trans, Obs, const N: usize, const M: usize> UkfLinearTransition<T, Trans, Obs, N, M>
where
T: RealField + Float + Copy,
Trans: TransitionModel<T, N>,
Obs: NonlinearObservationModel<T, N, M>,
{
#[inline]
pub fn new(transition: Trans, observation: Obs, params: UkfParams<T>) -> Self {
Self {
transition,
observation,
params,
_marker: PhantomData,
}
}
pub fn predict(&self, state: &UkfState<T, N>, dt: T) -> UkfState<T, N> {
let f = self.transition.transition_matrix(dt);
let q = self.transition.process_noise(dt);
let predicted_mean = f.apply_state(&state.mean);
let predicted_cov = f.propagate_covariance(&state.covariance).add(&q);
UkfState {
mean: predicted_mean,
covariance: predicted_cov,
}
}
pub fn update(
&self,
state: &UkfState<T, N>,
measurement: &Measurement<T, M>,
) -> Option<UkfState<T, N>> {
let sigma_points = SigmaPoints::generate(state, &self.params)?;
let r = self.observation.measurement_noise();
let (z_mean, z_cov) = sigma_points.recover_mean_cov(
|x| self.observation.observe(x).into_svector(),
Some(r.as_matrix()),
);
let cross_cov = sigma_points.cross_covariance(
state.mean.as_svector(),
|x| self.observation.observe(x).into_svector(),
&z_mean,
);
let z_cov_inv = z_cov.try_inverse()?;
let kalman_gain = cross_cov * z_cov_inv;
let innovation = measurement.as_svector() - z_mean;
let updated_mean = state.mean.as_svector() + kalman_gain * innovation;
let updated_cov =
state.covariance.as_matrix() - kalman_gain * z_cov * kalman_gain.transpose();
Some(UkfState {
mean: StateVector::from_svector(updated_mean),
covariance: StateCovariance::from_matrix(updated_cov),
})
}
pub fn step(
&self,
state: &UkfState<T, N>,
dt: T,
measurement: &Measurement<T, M>,
) -> Option<UkfState<T, N>> {
let predicted = self.predict(state, dt);
self.update(&predicted, measurement)
}
}
#[cfg(feature = "alloc")]
pub fn unscented_transform<T, const N: usize, const D: usize, F>(
mean: &StateVector<T, N>,
cov: &StateCovariance<T, N>,
params: &UkfParams<T>,
transform: F,
additive_noise: Option<&SMatrix<T, D, D>>,
) -> Option<(SVector<T, D>, SMatrix<T, D, D>)>
where
T: RealField + Float + Copy,
F: Fn(&StateVector<T, N>) -> SVector<T, D>,
{
let state = UkfState::new(*mean, *cov);
let sigma_points = SigmaPoints::generate(&state, params)?;
Some(sigma_points.recover_mean_cov(transform, additive_noise))
}
#[cfg(all(test, feature = "alloc"))]
mod tests {
use super::*;
use crate::models::{ConstantVelocity2D, CoordinatedTurn2D, RangeBearingSensor};
#[test]
fn test_ukf_params_default() {
let params: UkfParams<f64> = UkfParams::default();
assert!((params.alpha - 1e-3).abs() < 1e-10);
assert!((params.beta - 2.0).abs() < 1e-10);
assert!(params.kappa.abs() < 1e-10);
}
#[test]
fn test_ukf_params_weights() {
let params: UkfParams<f64> = UkfParams::default();
let n = 5;
let w0_mean = params.weight_mean_0(n);
let wi = params.weight_i(n);
let sum_mean = w0_mean + 2.0 * n as f64 * wi;
assert!(
(sum_mean - 1.0).abs() < 1e-6,
"Mean weights sum: {}",
sum_mean
);
}
#[test]
fn test_sigma_point_generation() {
let mean: StateVector<f64, 4> = StateVector::from_array([1.0, 2.0, 3.0, 4.0]);
let cov: StateCovariance<f64, 4> = StateCovariance::identity();
let state = UkfState::new(mean, cov);
let params = UkfParams::default();
let sigma = SigmaPoints::generate(&state, ¶ms).unwrap();
for i in 0..4 {
assert!(
(sigma.points[0].index(i) - state.mean.index(i)).abs() < 1e-10,
"Central point mismatch at {}",
i
);
}
assert_eq!(sigma.points.len(), 9);
}
#[test]
fn test_sigma_points_recover_identity() {
let mean: StateVector<f64, 4> = StateVector::from_array([1.0, 2.0, 3.0, 4.0]);
let cov: StateCovariance<f64, 4> = StateCovariance::identity();
let state = UkfState::new(mean, cov);
let params = UkfParams::default();
let sigma = SigmaPoints::generate(&state, ¶ms).unwrap();
let (recovered_mean, recovered_cov) =
sigma.recover_mean_cov(|x| x.as_svector().clone_owned(), None);
for i in 0..4 {
assert!(
(recovered_mean[i] - *state.mean.index(i)).abs() < 1e-6,
"Mean mismatch at {}: {} vs {}",
i,
recovered_mean[i],
state.mean.index(i)
);
}
for i in 0..4 {
for j in 0..4 {
let expected = if i == j { 1.0 } else { 0.0 };
assert!(
(recovered_cov[(i, j)] - expected).abs() < 1e-4,
"Cov mismatch at ({}, {}): {} vs {}",
i,
j,
recovered_cov[(i, j)],
expected
);
}
}
}
#[test]
fn test_ukf_linear_transition_predict() {
let transition = ConstantVelocity2D::new(0.1_f64, 0.99);
let sensor = RangeBearingSensor::new(10.0, 0.01, 0.95);
let filter = UkfLinearTransition::new(transition, sensor, UkfParams::default());
let state = UkfState::new(
StateVector::from_array([100.0, 0.0, 10.0, 0.0]),
StateCovariance::identity(),
);
let predicted = filter.predict(&state, 1.0);
assert!(
(predicted.mean.index(0) - 110.0).abs() < 1e-6,
"x: {}",
predicted.mean.index(0)
);
assert!(
(predicted.mean.index(1) - 0.0).abs() < 1e-6,
"y: {}",
predicted.mean.index(1)
);
}
#[test]
fn test_ukf_linear_transition_update() {
let transition = ConstantVelocity2D::new(0.1_f64, 0.99);
let sensor = RangeBearingSensor::new(10.0, 0.01, 0.95);
let filter = UkfLinearTransition::new(transition, sensor, UkfParams::default());
let state = UkfState::new(
StateVector::from_array([100.0, 0.0, 10.0, 0.0]),
StateCovariance::identity(),
);
let predicted = filter.predict(&state, 1.0);
assert!((predicted.mean.index(0) - 110.0).abs() < 1e-6);
let measurement = Measurement::from_array([110.0, 0.0]);
let updated = filter.update(&predicted, &measurement).unwrap();
assert!((updated.mean.index(0) - 110.0).abs() < 5.0);
}
#[test]
fn test_ukf_step_linear_transition() {
let transition = ConstantVelocity2D::new(0.1_f64, 0.99);
let sensor = RangeBearingSensor::new(10.0, 0.01, 0.95);
let filter = UkfLinearTransition::new(transition, sensor, UkfParams::default());
let state = UkfState::new(
StateVector::from_array([100.0, 0.0, 10.0, 0.0]),
StateCovariance::identity(),
);
let measurement = Measurement::from_array([110.0, 0.0]);
let updated = filter.step(&state, 1.0, &measurement).unwrap();
assert!((updated.mean.index(0) - 110.0).abs() < 5.0);
}
#[test]
fn test_unscented_transform() {
let mean: StateVector<f64, 4> = StateVector::from_array([1.0, 2.0, 3.0, 4.0]);
let cov: StateCovariance<f64, 4> = StateCovariance::identity();
let params = UkfParams::default();
let (transformed_mean, _) =
unscented_transform(&mean, &cov, ¶ms, |x| x.as_svector().clone_owned(), None)
.unwrap();
for i in 0..4 {
assert!(
(transformed_mean[i] - *mean.index(i)).abs() < 1e-6,
"Mean mismatch at {}: {} vs {}",
i,
transformed_mean[i],
mean.index(i)
);
}
}
#[test]
fn test_coordinated_turn_prediction_via_unscented_transform() {
let transition = CoordinatedTurn2D::new(0.1_f64, 0.01, 0.99);
let mean: StateVector<f64, 5> = StateVector::from_array([100.0, 0.0, 10.0, 0.0, 0.0]);
let cov: StateCovariance<f64, 5> = StateCovariance::identity();
let params = UkfParams::default();
let dt = 1.0;
let q = transition.process_noise(dt);
let (pred_mean, _pred_cov) = unscented_transform(
&mean,
&cov,
¶ms,
|x| transition.predict_nonlinear(x, dt).into_svector(),
Some(q.as_matrix()),
)
.unwrap();
assert!(
(pred_mean[0] - 110.0).abs() < 2.0,
"x: {} vs 110.0",
pred_mean[0]
);
assert!(
(pred_mean[1] - 0.0).abs() < 2.0,
"y: {} vs 0.0",
pred_mean[1]
);
}
#[test]
fn test_coordinated_turn_with_turn() {
use std::f64::consts::FRAC_PI_2;
let transition = CoordinatedTurn2D::new(0.1_f64, 0.01, 0.99);
let mean: StateVector<f64, 5> = StateVector::from_array([0.0, 0.0, 10.0, 0.0, FRAC_PI_2]);
let cov: StateCovariance<f64, 5> = StateCovariance::identity();
let params = UkfParams::default();
let dt = 1.0;
let q = transition.process_noise(dt);
let (pred_mean, _) = unscented_transform(
&mean,
&cov,
¶ms,
|x| transition.predict_nonlinear(x, dt).into_svector(),
Some(q.as_matrix()),
)
.unwrap();
let r = 10.0 / FRAC_PI_2;
assert!(
(pred_mean[0] - r).abs() < 1.5,
"x: {} vs {}",
pred_mean[0],
r
);
assert!(
(pred_mean[1] - r).abs() < 1.5,
"y: {} vs {}",
pred_mean[1],
r
);
}
}