use core::marker::PhantomData;
use nalgebra::RealField;
use num_traits::Float;
use crate::models::{NonlinearObservationModel, NonlinearTransitionModel, TransitionModel};
use crate::types::spaces::{
ComputeInnovation, Measurement, MeasurementCovariance, StateCovariance, StateVector,
};
use crate::types::transforms::{compute_innovation_covariance, compute_kalman_gain, joseph_update};
pub use super::kalman::KalmanState as EkfState;
#[derive(Debug, Clone)]
pub struct ExtendedKalmanFilter<T, Trans, Obs, const N: usize, const M: usize>
where
T: RealField,
Trans: NonlinearTransitionModel<T, N>,
Obs: NonlinearObservationModel<T, N, M>,
{
pub transition: Trans,
pub observation: Obs,
_marker: PhantomData<T>,
}
impl<T, Trans, Obs, const N: usize, const M: usize> ExtendedKalmanFilter<T, Trans, Obs, N, M>
where
T: RealField + Float + Copy,
Trans: NonlinearTransitionModel<T, N>,
Obs: NonlinearObservationModel<T, N, M>,
{
#[inline]
pub fn new(transition: Trans, observation: Obs) -> Self {
Self {
transition,
observation,
_marker: PhantomData,
}
}
pub fn predict(&self, state: &EkfState<T, N>, dt: T) -> EkfState<T, N> {
let predicted_mean = self.transition.predict_nonlinear(&state.mean, dt);
let f = self.transition.jacobian_at(&state.mean, dt);
let q = self.transition.process_noise(dt);
let predicted_cov = f.propagate_covariance(&state.covariance).add(&q);
EkfState {
mean: predicted_mean,
covariance: predicted_cov,
}
}
pub fn update(
&self,
state: &EkfState<T, N>,
measurement: &Measurement<T, M>,
) -> Option<EkfState<T, N>> {
let h = self.observation.jacobian_at(&state.mean)?;
let predicted_meas = self.observation.observe(&state.mean);
let r = self.observation.measurement_noise();
let innovation = measurement.innovation(predicted_meas);
let innovation_cov = compute_innovation_covariance(&state.covariance, &h, &r);
let kalman_gain = compute_kalman_gain(&state.covariance, &h, &innovation_cov)?;
let correction = kalman_gain.correct(&innovation);
let updated_mean =
StateVector::from_svector(state.mean.as_svector() + correction.as_svector());
let updated_cov = joseph_update(&state.covariance, &kalman_gain, &h, &r);
Some(EkfState {
mean: updated_mean,
covariance: updated_cov,
})
}
pub fn step(
&self,
state: &EkfState<T, N>,
dt: T,
measurement: &Measurement<T, M>,
) -> Option<EkfState<T, N>> {
let predicted = self.predict(state, dt);
self.update(&predicted, measurement)
}
pub fn measurement_likelihood(
&self,
state: &EkfState<T, N>,
measurement: &Measurement<T, M>,
) -> Option<T> {
let h = self.observation.jacobian_at(&state.mean)?;
let r = self.observation.measurement_noise();
let predicted_meas = self.observation.observe(&state.mean);
let innovation = measurement.innovation(predicted_meas);
let innovation_cov = compute_innovation_covariance(&state.covariance, &h, &r);
let innovation_cov_typed =
crate::types::spaces::Covariance::from_matrix(*innovation_cov.as_matrix());
crate::types::gaussian::gaussian_likelihood(&innovation, &innovation_cov_typed)
}
pub fn mahalanobis_distance_squared(
&self,
state: &EkfState<T, N>,
measurement: &Measurement<T, M>,
) -> Option<T> {
let h = self.observation.jacobian_at(&state.mean)?;
let r = self.observation.measurement_noise();
let predicted_meas = self.observation.observe(&state.mean);
let innovation = measurement.innovation(predicted_meas);
let innovation_cov = compute_innovation_covariance(&state.covariance, &h, &r);
let s_inv = innovation_cov.as_matrix().try_inverse()?;
let y = innovation.as_svector();
let d_sq = (y.transpose() * s_inv * y)[(0, 0)];
Some(d_sq)
}
}
#[derive(Debug, Clone)]
pub struct EkfLinearTransition<T, Trans, Obs, const N: usize, const M: usize>
where
T: RealField,
Trans: TransitionModel<T, N>,
Obs: NonlinearObservationModel<T, N, M>,
{
pub transition: Trans,
pub observation: Obs,
_marker: PhantomData<T>,
}
impl<T, Trans, Obs, const N: usize, const M: usize> EkfLinearTransition<T, Trans, Obs, N, M>
where
T: RealField + Float + Copy,
Trans: TransitionModel<T, N>,
Obs: NonlinearObservationModel<T, N, M>,
{
#[inline]
pub fn new(transition: Trans, observation: Obs) -> Self {
Self {
transition,
observation,
_marker: PhantomData,
}
}
pub fn predict(&self, state: &EkfState<T, N>, dt: T) -> EkfState<T, N> {
let f = self.transition.transition_matrix(dt);
let q = self.transition.process_noise(dt);
let predicted_mean = f.apply_state(&state.mean);
let predicted_cov = f.propagate_covariance(&state.covariance).add(&q);
EkfState {
mean: predicted_mean,
covariance: predicted_cov,
}
}
pub fn update(
&self,
state: &EkfState<T, N>,
measurement: &Measurement<T, M>,
) -> Option<EkfState<T, N>> {
let h = self.observation.jacobian_at(&state.mean)?;
let predicted_meas = self.observation.observe(&state.mean);
let r = self.observation.measurement_noise();
let innovation = measurement.innovation(predicted_meas);
let innovation_cov = compute_innovation_covariance(&state.covariance, &h, &r);
let kalman_gain = compute_kalman_gain(&state.covariance, &h, &innovation_cov)?;
let correction = kalman_gain.correct(&innovation);
let updated_mean =
StateVector::from_svector(state.mean.as_svector() + correction.as_svector());
let updated_cov = joseph_update(&state.covariance, &kalman_gain, &h, &r);
Some(EkfState {
mean: updated_mean,
covariance: updated_cov,
})
}
pub fn step(
&self,
state: &EkfState<T, N>,
dt: T,
measurement: &Measurement<T, M>,
) -> Option<EkfState<T, N>> {
let predicted = self.predict(state, dt);
self.update(&predicted, measurement)
}
}
pub fn predict_ekf<T: RealField + Copy, const N: usize>(
state: &EkfState<T, N>,
predicted_mean: StateVector<T, N>,
jacobian: &crate::types::transforms::TransitionMatrix<T, N>,
process_noise: &StateCovariance<T, N>,
) -> EkfState<T, N> {
let predicted_cov = jacobian
.propagate_covariance(&state.covariance)
.add(process_noise);
EkfState {
mean: predicted_mean,
covariance: predicted_cov,
}
}
pub fn update_ekf<T: RealField + Copy, const N: usize, const M: usize>(
state: &EkfState<T, N>,
measurement: &Measurement<T, M>,
predicted_measurement: Measurement<T, M>,
jacobian: &crate::types::transforms::ObservationMatrix<T, M, N>,
meas_noise: &MeasurementCovariance<T, M>,
) -> Option<EkfState<T, N>> {
let innovation = measurement.innovation(predicted_measurement);
let innovation_cov = compute_innovation_covariance(&state.covariance, jacobian, meas_noise);
let kalman_gain = compute_kalman_gain(&state.covariance, jacobian, &innovation_cov)?;
let correction = kalman_gain.correct(&innovation);
let updated_mean = StateVector::from_svector(state.mean.as_svector() + correction.as_svector());
let updated_cov = joseph_update(&state.covariance, &kalman_gain, jacobian, meas_noise);
Some(EkfState {
mean: updated_mean,
covariance: updated_cov,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{
ConstantVelocity2D, CoordinatedTurn2D, RangeBearingSensor, RangeBearingSensor5D,
};
#[test]
fn test_ekf_state_creation() {
let mean: StateVector<f64, 5> = StateVector::from_array([0.0, 0.0, 1.0, 0.0, 0.1]);
let cov: StateCovariance<f64, 5> = StateCovariance::identity();
let state = EkfState::new(mean, cov);
assert!((state.mean.index(2) - 1.0).abs() < 1e-10);
assert!((state.uncertainty() - 5.0).abs() < 1e-10);
}
#[test]
fn test_ekf_predict_coordinated_turn() {
let transition = CoordinatedTurn2D::new(0.1_f64, 0.01, 0.99);
let sensor = RangeBearingSensor5D::new(10.0, 0.01, 0.95);
let filter = ExtendedKalmanFilter::new(transition, sensor);
let state = EkfState::new(
StateVector::from_array([100.0, 0.0, 0.0, 10.0, 0.0]),
StateCovariance::identity(),
);
let predicted = filter.predict(&state, 1.0);
assert!((predicted.mean.index(0) - 100.0).abs() < 1e-6);
assert!((predicted.mean.index(1) - 10.0).abs() < 1e-6);
assert!(predicted.uncertainty() > state.uncertainty());
}
#[test]
fn test_ekf_predict_with_turn() {
use std::f64::consts::FRAC_PI_2;
let transition = CoordinatedTurn2D::new(0.1_f64, 0.01, 0.99);
let sensor = RangeBearingSensor5D::new(10.0, 0.01, 0.95);
let filter = ExtendedKalmanFilter::new(transition, sensor);
let state = EkfState::new(
StateVector::from_array([0.0, 0.0, 10.0, 0.0, FRAC_PI_2]),
StateCovariance::identity(),
);
let predicted = filter.predict(&state, 1.0);
let r = 10.0 / FRAC_PI_2;
assert!(
(predicted.mean.index(0) - r).abs() < 0.1,
"x: {} vs {}",
predicted.mean.index(0),
r
);
assert!(
(predicted.mean.index(1) - r).abs() < 0.1,
"y: {} vs {}",
predicted.mean.index(1),
r
);
assert!(
(predicted.mean.index(2) - 0.0).abs() < 0.1,
"vx: {}",
predicted.mean.index(2)
);
assert!(
(predicted.mean.index(3) - 10.0).abs() < 0.1,
"vy: {}",
predicted.mean.index(3)
);
}
#[test]
fn test_ekf_update_range_bearing() {
let transition = CoordinatedTurn2D::new(0.1_f64, 0.01, 0.99);
let sensor = RangeBearingSensor5D::new(5.0, 0.01, 0.95);
let filter = ExtendedKalmanFilter::new(transition, sensor);
let state = EkfState::new(
StateVector::from_array([100.0, 0.0, 0.0, 0.0, 0.0]),
StateCovariance::from_matrix(nalgebra::SMatrix::<f64, 5, 5>::identity().scale(1000.0)),
);
let measurement = Measurement::from_array([100.0, 0.0]);
let updated = filter.update(&state, &measurement).unwrap();
assert!((updated.mean.index(0) - 100.0).abs() < 10.0);
assert!(updated.mean.index(1).abs() < 10.0);
assert!(updated.uncertainty() < state.uncertainty());
}
#[test]
fn test_ekf_linear_transition() {
let transition = ConstantVelocity2D::new(0.1_f64, 0.99);
let sensor = RangeBearingSensor::new(10.0, 0.01, 0.95);
let filter = EkfLinearTransition::new(transition, sensor);
let state = EkfState::new(
StateVector::from_array([100.0, 0.0, 10.0, 0.0]),
StateCovariance::identity(),
);
let predicted = filter.predict(&state, 1.0);
assert!((predicted.mean.index(0) - 110.0).abs() < 1e-6);
assert!((predicted.mean.index(1) - 0.0).abs() < 1e-6);
let measurement = Measurement::from_array([110.0, 0.0]);
let updated = filter.update(&predicted, &measurement).unwrap();
assert!((updated.mean.index(0) - 110.0).abs() < 5.0);
}
#[test]
fn test_ekf_step() {
let transition = CoordinatedTurn2D::new(0.1_f64, 0.01, 0.99);
let sensor = RangeBearingSensor5D::new(10.0, 0.01, 0.95);
let filter = ExtendedKalmanFilter::new(transition, sensor);
let state = EkfState::new(
StateVector::from_array([100.0, 0.0, 10.0, 0.0, 0.0]),
StateCovariance::identity(),
);
let measurement = Measurement::from_array([110.0, 0.0]);
let updated = filter.step(&state, 1.0, &measurement).unwrap();
assert!((updated.mean.index(0) - 110.0).abs() < 5.0);
}
#[test]
fn test_ekf_mahalanobis_distance() {
let transition = CoordinatedTurn2D::new(0.1_f64, 0.01, 0.99);
let sensor = RangeBearingSensor5D::new(10.0, 0.01, 0.95);
let filter = ExtendedKalmanFilter::new(transition, sensor);
let state = EkfState::new(
StateVector::from_array([100.0, 0.0, 0.0, 0.0, 0.0]),
StateCovariance::identity(),
);
let close_meas = Measurement::from_array([100.0, 0.0]);
let close_dist = filter
.mahalanobis_distance_squared(&state, &close_meas)
.unwrap();
let far_meas = Measurement::from_array([200.0, 1.0]);
let far_dist = filter
.mahalanobis_distance_squared(&state, &far_meas)
.unwrap();
assert!(close_dist < far_dist);
}
#[test]
fn test_standalone_ekf_functions() {
let mean = StateVector::from_array([100.0_f64, 0.0, 10.0, 0.0]);
let cov = StateCovariance::identity();
let state = EkfState::new(mean, cov);
let dt = 1.0;
let predicted_mean = StateVector::from_array([110.0, 0.0, 10.0, 0.0]);
let jacobian = crate::types::transforms::TransitionMatrix::from_matrix(nalgebra::matrix![
1.0, 0.0, dt, 0.0_f64;
0.0, 1.0, 0.0, dt;
0.0, 0.0, 1.0, 0.0;
0.0, 0.0, 0.0, 1.0
]);
let process_noise =
StateCovariance::from_matrix(nalgebra::SMatrix::<f64, 4, 4>::identity().scale(0.1));
let predicted = predict_ekf(&state, predicted_mean, &jacobian, &process_noise);
assert!((predicted.mean.index(0) - 110.0).abs() < 1e-10);
let sensor = RangeBearingSensor::new(10.0, 0.01, 0.95);
let h = sensor.jacobian_at(&predicted.mean).unwrap();
let z_pred = sensor.observe(&predicted.mean);
let r = sensor.measurement_noise();
let measurement = Measurement::from_array([110.0, 0.0]);
let updated = update_ekf(&predicted, &measurement, z_pred, &h, &r).unwrap();
assert!((updated.mean.index(0) - 110.0).abs() < 5.0);
}
}