use nalgebra::RealField;
use num_traits::Float;
use crate::types::spaces::{Measurement, MeasurementCovariance, StateVector};
use crate::types::transforms::ObservationMatrix;
pub trait ObservationModel<T: RealField, const N: usize, const M: usize> {
fn observation_matrix(&self) -> ObservationMatrix<T, M, N>;
fn measurement_noise(&self) -> MeasurementCovariance<T, M>;
fn detection_probability(&self, state: &StateVector<T, N>) -> T;
}
pub trait NonlinearObservationModel<T: RealField, const N: usize, const M: usize> {
fn observe(&self, state: &StateVector<T, N>) -> Measurement<T, M>;
fn jacobian_at(&self, state: &StateVector<T, N>) -> Option<ObservationMatrix<T, M, N>>;
fn measurement_noise(&self) -> MeasurementCovariance<T, M>;
fn detection_probability(&self, state: &StateVector<T, N>) -> T;
}
#[derive(Debug, Clone)]
pub struct PositionSensor2D<T: RealField> {
pub sigma_pos: T,
pub p_detection: T,
}
impl<T: RealField + Float + Copy> PositionSensor2D<T> {
pub fn new(sigma_pos: T, p_detection: T) -> Self {
assert!(
sigma_pos > T::zero(),
"Measurement noise sigma_pos must be positive"
);
assert!(
p_detection >= T::zero() && p_detection <= T::one(),
"Detection probability must be in [0, 1]"
);
Self {
sigma_pos,
p_detection,
}
}
pub fn with_noise(sigma_x: T, sigma_y: T, p_detection: T) -> PositionSensor2DAsym<T> {
assert!(
sigma_x > T::zero(),
"Measurement noise sigma_x must be positive"
);
assert!(
sigma_y > T::zero(),
"Measurement noise sigma_y must be positive"
);
assert!(
p_detection >= T::zero() && p_detection <= T::one(),
"Detection probability must be in [0, 1]"
);
PositionSensor2DAsym {
sigma_x,
sigma_y,
p_detection,
}
}
}
impl<T: RealField + Float + Copy> ObservationModel<T, 4, 2> for PositionSensor2D<T> {
fn observation_matrix(&self) -> ObservationMatrix<T, 2, 4> {
let one = T::one();
let zero = T::zero();
ObservationMatrix::from_matrix(nalgebra::matrix![
one, zero, zero, zero;
zero, one, zero, zero
])
}
fn measurement_noise(&self) -> MeasurementCovariance<T, 2> {
let sigma_sq = self.sigma_pos * self.sigma_pos;
let zero = T::zero();
MeasurementCovariance::from_matrix(nalgebra::matrix![
sigma_sq, zero;
zero, sigma_sq
])
}
fn detection_probability(&self, _state: &StateVector<T, 4>) -> T {
self.p_detection
}
}
#[derive(Debug, Clone)]
pub struct PositionSensor2DAsym<T: RealField> {
pub sigma_x: T,
pub sigma_y: T,
pub p_detection: T,
}
impl<T: RealField + Float + Copy> ObservationModel<T, 4, 2> for PositionSensor2DAsym<T> {
fn observation_matrix(&self) -> ObservationMatrix<T, 2, 4> {
let one = T::one();
let zero = T::zero();
ObservationMatrix::from_matrix(nalgebra::matrix![
one, zero, zero, zero;
zero, one, zero, zero
])
}
fn measurement_noise(&self) -> MeasurementCovariance<T, 2> {
let zero = T::zero();
let sigma_x_sq = self.sigma_x * self.sigma_x;
let sigma_y_sq = self.sigma_y * self.sigma_y;
MeasurementCovariance::from_matrix(nalgebra::matrix![
sigma_x_sq, zero;
zero, sigma_y_sq
])
}
fn detection_probability(&self, _state: &StateVector<T, 4>) -> T {
self.p_detection
}
}
#[derive(Debug, Clone)]
pub struct PositionSensor3D<T: RealField> {
pub sigma_pos: T,
pub p_detection: T,
}
impl<T: RealField + Float + Copy> PositionSensor3D<T> {
pub fn new(sigma_pos: T, p_detection: T) -> Self {
assert!(
sigma_pos > T::zero(),
"Measurement noise sigma_pos must be positive"
);
assert!(
p_detection >= T::zero() && p_detection <= T::one(),
"Detection probability must be in [0, 1]"
);
Self {
sigma_pos,
p_detection,
}
}
}
impl<T: RealField + Float + Copy> ObservationModel<T, 6, 3> for PositionSensor3D<T> {
fn observation_matrix(&self) -> ObservationMatrix<T, 3, 6> {
let one = T::one();
let zero = T::zero();
ObservationMatrix::from_matrix(nalgebra::matrix![
one, zero, zero, zero, zero, zero;
zero, one, zero, zero, zero, zero;
zero, zero, one, zero, zero, zero
])
}
fn measurement_noise(&self) -> MeasurementCovariance<T, 3> {
let sigma_sq = self.sigma_pos * self.sigma_pos;
let zero = T::zero();
MeasurementCovariance::from_matrix(nalgebra::matrix![
sigma_sq, zero, zero;
zero, sigma_sq, zero;
zero, zero, sigma_sq
])
}
fn detection_probability(&self, _state: &StateVector<T, 6>) -> T {
self.p_detection
}
}
#[derive(Debug, Clone)]
pub struct RangeBearingSensor<T: RealField> {
pub sigma_range: T,
pub sigma_bearing: T,
pub p_detection: T,
pub sensor_x: T,
pub sensor_y: T,
}
impl<T: RealField + Float + Copy> RangeBearingSensor<T> {
pub fn new(sigma_range: T, sigma_bearing: T, p_detection: T) -> Self {
assert!(
sigma_range > T::zero(),
"Range noise sigma_range must be positive"
);
assert!(
sigma_bearing > T::zero(),
"Bearing noise sigma_bearing must be positive"
);
assert!(
p_detection >= T::zero() && p_detection <= T::one(),
"Detection probability must be in [0, 1]"
);
Self {
sigma_range,
sigma_bearing,
p_detection,
sensor_x: T::zero(),
sensor_y: T::zero(),
}
}
pub fn at_position(
sigma_range: T,
sigma_bearing: T,
p_detection: T,
sensor_x: T,
sensor_y: T,
) -> Self {
assert!(
sigma_range > T::zero(),
"Range noise sigma_range must be positive"
);
assert!(
sigma_bearing > T::zero(),
"Bearing noise sigma_bearing must be positive"
);
assert!(
p_detection >= T::zero() && p_detection <= T::one(),
"Detection probability must be in [0, 1]"
);
Self {
sigma_range,
sigma_bearing,
p_detection,
sensor_x,
sensor_y,
}
}
pub fn detection_probability(&self, _state: &StateVector<T, 4>) -> T {
self.p_detection
}
pub fn measurement_noise(&self) -> MeasurementCovariance<T, 2> {
let zero = T::zero();
let sigma_r_sq = self.sigma_range * self.sigma_range;
let sigma_b_sq = self.sigma_bearing * self.sigma_bearing;
MeasurementCovariance::from_matrix(nalgebra::matrix![
sigma_r_sq, zero;
zero, sigma_b_sq
])
}
pub fn observe_nonlinear(&self, state: &StateVector<T, 4>) -> (T, T) {
let dx = *state.index(0) - self.sensor_x;
let dy = *state.index(1) - self.sensor_y;
let range = num_traits::Float::sqrt(dx * dx + dy * dy);
let bearing = num_traits::Float::atan2(dy, dx);
(range, bearing)
}
pub fn jacobian_at(&self, state: &StateVector<T, 4>) -> Option<ObservationMatrix<T, 2, 4>> {
let dx = *state.index(0) - self.sensor_x;
let dy = *state.index(1) - self.sensor_y;
let r_sq = dx * dx + dy * dy;
let r = num_traits::Float::sqrt(r_sq);
let zero = T::zero();
if r < T::from_f64(1e-10).unwrap() {
return None;
}
Some(ObservationMatrix::from_matrix(nalgebra::matrix![
dx / r, dy / r, zero, zero;
-dy / r_sq, dx / r_sq, zero, zero
]))
}
#[deprecated(
since = "0.2.0",
note = "Use jacobian_at() instead which returns Option"
)]
pub fn jacobian(&self, state: &StateVector<T, 4>) -> ObservationMatrix<T, 2, 4> {
self.jacobian_at(state)
.unwrap_or_else(ObservationMatrix::zeros)
}
#[deprecated(since = "0.2.0", note = "Use observe_nonlinear() instead")]
pub fn measure(&self, state: &StateVector<T, 4>) -> (T, T) {
self.observe_nonlinear(state)
}
}
impl<T: RealField + Float + Copy> NonlinearObservationModel<T, 4, 2> for RangeBearingSensor<T> {
fn observe(&self, state: &StateVector<T, 4>) -> Measurement<T, 2> {
let (range, bearing) = self.observe_nonlinear(state);
Measurement::from_array([range, bearing])
}
fn jacobian_at(&self, state: &StateVector<T, 4>) -> Option<ObservationMatrix<T, 2, 4>> {
RangeBearingSensor::jacobian_at(self, state)
}
fn measurement_noise(&self) -> MeasurementCovariance<T, 2> {
RangeBearingSensor::measurement_noise(self)
}
fn detection_probability(&self, state: &StateVector<T, 4>) -> T {
RangeBearingSensor::detection_probability(self, state)
}
}
#[derive(Debug, Clone)]
pub struct RangeBearingSensor5D<T: RealField> {
inner: RangeBearingSensor<T>,
}
impl<T: RealField + Float + Copy> RangeBearingSensor5D<T> {
pub fn new(sigma_range: T, sigma_bearing: T, p_detection: T) -> Self {
Self {
inner: RangeBearingSensor::new(sigma_range, sigma_bearing, p_detection),
}
}
pub fn at_position(
sigma_range: T,
sigma_bearing: T,
p_detection: T,
sensor_x: T,
sensor_y: T,
) -> Self {
Self {
inner: RangeBearingSensor::at_position(
sigma_range,
sigma_bearing,
p_detection,
sensor_x,
sensor_y,
),
}
}
pub fn observe_nonlinear(&self, state: &StateVector<T, 5>) -> (T, T) {
let dx = *state.index(0) - self.inner.sensor_x;
let dy = *state.index(1) - self.inner.sensor_y;
let range = num_traits::Float::sqrt(dx * dx + dy * dy);
let bearing = num_traits::Float::atan2(dy, dx);
(range, bearing)
}
pub fn measurement_noise(&self) -> MeasurementCovariance<T, 2> {
self.inner.measurement_noise()
}
pub fn detection_probability(&self, _state: &StateVector<T, 5>) -> T {
self.inner.p_detection
}
pub fn jacobian_at(&self, state: &StateVector<T, 5>) -> Option<ObservationMatrix<T, 2, 5>> {
let dx = *state.index(0) - self.inner.sensor_x;
let dy = *state.index(1) - self.inner.sensor_y;
let r_sq = dx * dx + dy * dy;
let r = num_traits::Float::sqrt(r_sq);
let zero = T::zero();
if r < T::from_f64(1e-10).unwrap() {
return None;
}
Some(ObservationMatrix::from_matrix(nalgebra::matrix![
dx / r, dy / r, zero, zero, zero;
-dy / r_sq, dx / r_sq, zero, zero, zero
]))
}
}
impl<T: RealField + Float + Copy> NonlinearObservationModel<T, 5, 2> for RangeBearingSensor5D<T> {
fn observe(&self, state: &StateVector<T, 5>) -> Measurement<T, 2> {
let (range, bearing) = self.observe_nonlinear(state);
Measurement::from_array([range, bearing])
}
fn jacobian_at(&self, state: &StateVector<T, 5>) -> Option<ObservationMatrix<T, 2, 5>> {
RangeBearingSensor5D::jacobian_at(self, state)
}
fn measurement_noise(&self) -> MeasurementCovariance<T, 2> {
RangeBearingSensor5D::measurement_noise(self)
}
fn detection_probability(&self, state: &StateVector<T, 5>) -> T {
RangeBearingSensor5D::detection_probability(self, state)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_position_sensor_2d() {
let sensor = PositionSensor2D::new(1.0_f64, 0.9);
let state = StateVector::from_array([10.0, 20.0, 1.0, 2.0]);
let h = sensor.observation_matrix();
let z = h.observe(&state);
assert!((z.index(0) - 10.0).abs() < 1e-10);
assert!((z.index(1) - 20.0).abs() < 1e-10);
}
#[test]
fn test_detection_probability() {
let sensor = PositionSensor2D::new(1.0_f64, 0.95);
let state = StateVector::from_array([0.0, 0.0, 0.0, 0.0]);
assert!((sensor.detection_probability(&state) - 0.95).abs() < 1e-10);
}
#[test]
fn test_range_bearing_sensor() {
let sensor = RangeBearingSensor::new(1.0_f64, 0.01, 0.9);
let state = StateVector::from_array([10.0, 0.0, 0.0, 0.0]);
let (range, bearing) = sensor.observe_nonlinear(&state);
assert!((range - 10.0).abs() < 1e-10);
assert!(bearing.abs() < 1e-10); }
#[test]
fn test_range_bearing_jacobian() {
let sensor = RangeBearingSensor::new(1.0_f64, 0.01, 0.9);
let state = StateVector::from_array([10.0, 0.0, 0.0, 0.0]);
let jacobian = sensor.jacobian_at(&state).unwrap();
assert!((jacobian.as_matrix()[(0, 0)] - 1.0).abs() < 1e-10);
assert!((jacobian.as_matrix()[(0, 1)] - 0.0).abs() < 1e-10);
assert!((jacobian.as_matrix()[(1, 0)] - 0.0).abs() < 1e-10);
assert!((jacobian.as_matrix()[(1, 1)] - 0.1).abs() < 1e-10);
}
#[test]
fn test_range_bearing_at_sensor_position() {
let sensor = RangeBearingSensor::new(1.0_f64, 0.01, 0.9);
let state = StateVector::from_array([0.0, 0.0, 0.0, 0.0]);
assert!(sensor.jacobian_at(&state).is_none());
}
}