#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "DifferenceMethod")]
#[derive(Clone)]
pub struct PyDifferenceMethod {
pub(crate) value: jacobian::DifferenceMethod,
}
#[pymethods]
impl PyDifferenceMethod {
#[classattr]
#[allow(non_snake_case)]
fn FORWARD() -> Self {
PyDifferenceMethod {
value: jacobian::DifferenceMethod::Forward,
}
}
#[classattr]
#[allow(non_snake_case)]
fn CENTRAL() -> Self {
PyDifferenceMethod {
value: jacobian::DifferenceMethod::Central,
}
}
#[classattr]
#[allow(non_snake_case)]
fn BACKWARD() -> Self {
PyDifferenceMethod {
value: jacobian::DifferenceMethod::Backward,
}
}
fn __str__(&self) -> String {
format!("{:?}", self.value)
}
fn __repr__(&self) -> String {
format!("DifferenceMethod.{:?}", self.value)
}
fn __richcmp__(&self, other: &Self, op: CompareOp) -> PyResult<bool> {
match op {
CompareOp::Eq => Ok(self.value == other.value),
CompareOp::Ne => Ok(self.value != other.value),
_ => Err(pyo3::exceptions::PyNotImplementedError::new_err(
"Only == and != are supported for DifferenceMethod",
)),
}
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "PerturbationStrategy")]
#[derive(Clone)]
pub struct PyPerturbationStrategy {
pub(crate) value: jacobian::PerturbationStrategy,
}
#[pymethods]
impl PyPerturbationStrategy {
#[classmethod]
#[pyo3(signature = (scale_factor=1.0, min_value=1.0))]
fn adaptive(
_cls: &Bound<'_, pyo3::types::PyType>,
scale_factor: f64,
min_value: f64,
) -> Self {
PyPerturbationStrategy {
value: jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
},
}
}
#[classmethod]
fn fixed(_cls: &Bound<'_, pyo3::types::PyType>, offset: f64) -> Self {
PyPerturbationStrategy {
value: jacobian::PerturbationStrategy::Fixed(offset),
}
}
#[classmethod]
fn percentage(_cls: &Bound<'_, pyo3::types::PyType>, percentage: f64) -> Self {
PyPerturbationStrategy {
value: jacobian::PerturbationStrategy::Percentage(percentage),
}
}
fn __str__(&self) -> String {
match self.value {
jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => format!("Adaptive({}, {})", scale_factor, min_value),
jacobian::PerturbationStrategy::Fixed(offset) => format!("Fixed({})", offset),
jacobian::PerturbationStrategy::Percentage(pct) => format!("Percentage({})", pct),
}
}
fn __repr__(&self) -> String {
format!("PerturbationStrategy.{}", self.__str__())
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "NumericalJacobian")]
pub struct PyDNumericalJacobian {
dynamics_fn: Py<PyAny>,
method: jacobian::DifferenceMethod,
perturbation: jacobian::PerturbationStrategy,
}
#[pymethods]
impl PyDNumericalJacobian {
#[new]
pub fn new(dynamics_fn: Py<PyAny>) -> Self {
Self {
dynamics_fn,
method: jacobian::DifferenceMethod::Central,
perturbation: jacobian::PerturbationStrategy::Adaptive {
scale_factor: 1.0,
min_value: 1.0,
},
}
}
#[classmethod]
pub fn forward(_cls: &Bound<'_, pyo3::types::PyType>, dynamics_fn: Py<PyAny>) -> Self {
Self {
dynamics_fn,
method: jacobian::DifferenceMethod::Forward,
perturbation: jacobian::PerturbationStrategy::Adaptive {
scale_factor: 1.0,
min_value: 1.0,
},
}
}
#[classmethod]
pub fn central(_cls: &Bound<'_, pyo3::types::PyType>, dynamics_fn: Py<PyAny>) -> Self {
Self::new(dynamics_fn)
}
#[classmethod]
pub fn backward(_cls: &Bound<'_, pyo3::types::PyType>, dynamics_fn: Py<PyAny>) -> Self {
Self {
dynamics_fn,
method: jacobian::DifferenceMethod::Backward,
perturbation: jacobian::PerturbationStrategy::Adaptive {
scale_factor: 1.0,
min_value: 1.0,
},
}
}
pub fn with_fixed_offset(mut slf: PyRefMut<'_, Self>, offset: f64) -> PyRefMut<'_, Self> {
slf.perturbation = jacobian::PerturbationStrategy::Fixed(offset);
slf
}
pub fn with_percentage(mut slf: PyRefMut<'_, Self>, percentage: f64) -> PyRefMut<'_, Self> {
slf.perturbation = jacobian::PerturbationStrategy::Percentage(percentage);
slf
}
pub fn with_adaptive(
mut slf: PyRefMut<'_, Self>,
scale_factor: f64,
min_value: f64,
) -> PyRefMut<'_, Self> {
slf.perturbation = jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
};
slf
}
pub fn with_method(
mut slf: PyRefMut<'_, Self>,
method: PyDifferenceMethod,
) -> PyRefMut<'_, Self> {
slf.method = method.value;
slf
}
#[allow(deprecated)]
pub fn compute<'py>(
&self,
py: Python<'py>,
t: f64,
state: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyArray<f64, numpy::Ix2>>> {
let state_vec = pyany_to_f64_array1(state, None)?;
let dimension = state_vec.len();
let state_dvec = DVector::from_vec(state_vec);
let dynamics_closure = {
let dynamics_fn_clone = self.dynamics_fn.clone_ref(py);
move |t: f64, state: &DVector<f64>, _params: Option<&DVector<f64>>| -> DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = dynamics_fn_clone
.call1(py, (t, state_py))
.expect("Failed to call dynamics function");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Dynamics function returned invalid array");
DVector::from_vec(result_vec)
})
}
};
let mut provider = match self.method {
jacobian::DifferenceMethod::Forward => {
jacobian::DNumericalJacobian::forward(Box::new(dynamics_closure))
}
jacobian::DifferenceMethod::Central => {
jacobian::DNumericalJacobian::central(Box::new(dynamics_closure))
}
jacobian::DifferenceMethod::Backward => {
jacobian::DNumericalJacobian::backward(Box::new(dynamics_closure))
}
};
provider = match self.perturbation {
jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_adaptive(scale_factor, min_value),
jacobian::PerturbationStrategy::Fixed(offset) => provider.with_fixed_offset(offset),
jacobian::PerturbationStrategy::Percentage(pct) => provider.with_percentage(pct),
};
let jac_matrix = provider.compute(t, &state_dvec, None);
let rows = jac_matrix.nrows();
let cols = jac_matrix.ncols();
let mut flat_vec = Vec::with_capacity(rows * cols);
for i in 0..rows {
for j in 0..cols {
flat_vec.push(jac_matrix[(i, j)]);
}
}
flat_vec.into_pyarray(py).reshape([rows, cols])
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AnalyticJacobian")]
pub struct PyDAnalyticJacobian {
jacobian_fn: Py<PyAny>,
}
#[pymethods]
impl PyDAnalyticJacobian {
#[new]
pub fn new(jacobian_fn: Py<PyAny>) -> Self {
Self { jacobian_fn }
}
pub fn compute<'py>(
&self,
py: Python<'py>,
t: f64,
state: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyArray<f64, numpy::Ix2>>> {
let state_vec = pyany_to_f64_array1(state, None)?;
let dimension = state_vec.len();
let state_py = state_vec.into_pyarray(py);
let result = self.jacobian_fn.call1(py, (t, state_py))?;
let jac_matrix_vec = pyany_to_f64_array2(result.bind(py), Some((dimension, dimension)))?;
let flat_vec: Vec<f64> = jac_matrix_vec.into_iter().flatten().collect();
flat_vec.into_pyarray(py).reshape([dimension, dimension])
}
}
use crate::math::sensitivity;
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "NumericalSensitivity")]
pub struct PyDNumericalSensitivity {
dynamics_fn: Py<PyAny>,
method: jacobian::DifferenceMethod,
perturbation: jacobian::PerturbationStrategy,
}
#[pymethods]
impl PyDNumericalSensitivity {
#[new]
pub fn new(dynamics_fn: Py<PyAny>) -> Self {
Self {
dynamics_fn,
method: jacobian::DifferenceMethod::Central,
perturbation: jacobian::PerturbationStrategy::Adaptive {
scale_factor: 1.0,
min_value: 1.0,
},
}
}
#[classmethod]
pub fn forward(_cls: &Bound<'_, pyo3::types::PyType>, dynamics_fn: Py<PyAny>) -> Self {
Self {
dynamics_fn,
method: jacobian::DifferenceMethod::Forward,
perturbation: jacobian::PerturbationStrategy::Adaptive {
scale_factor: 1.0,
min_value: 1.0,
},
}
}
#[classmethod]
pub fn central(_cls: &Bound<'_, pyo3::types::PyType>, dynamics_fn: Py<PyAny>) -> Self {
Self::new(dynamics_fn)
}
#[classmethod]
pub fn backward(_cls: &Bound<'_, pyo3::types::PyType>, dynamics_fn: Py<PyAny>) -> Self {
Self {
dynamics_fn,
method: jacobian::DifferenceMethod::Backward,
perturbation: jacobian::PerturbationStrategy::Adaptive {
scale_factor: 1.0,
min_value: 1.0,
},
}
}
pub fn with_fixed_offset(mut slf: PyRefMut<'_, Self>, offset: f64) -> PyRefMut<'_, Self> {
slf.perturbation = jacobian::PerturbationStrategy::Fixed(offset);
slf
}
pub fn with_percentage(mut slf: PyRefMut<'_, Self>, percentage: f64) -> PyRefMut<'_, Self> {
slf.perturbation = jacobian::PerturbationStrategy::Percentage(percentage);
slf
}
pub fn with_adaptive(
mut slf: PyRefMut<'_, Self>,
scale_factor: f64,
min_value: f64,
) -> PyRefMut<'_, Self> {
slf.perturbation = jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
};
slf
}
pub fn with_method(
mut slf: PyRefMut<'_, Self>,
method: PyDifferenceMethod,
) -> PyRefMut<'_, Self> {
slf.method = method.value;
slf
}
#[allow(deprecated)]
pub fn compute<'py>(
&self,
py: Python<'py>,
t: f64,
state: &Bound<'py, PyAny>,
params: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyArray<f64, numpy::Ix2>>> {
let state_vec = pyany_to_f64_array1(state, None)?;
let params_vec = pyany_to_f64_array1(params, None)?;
let state_dim = state_vec.len();
let param_dim = params_vec.len();
let state_dvec = DVector::from_vec(state_vec);
let params_dvec = DVector::from_vec(params_vec);
let dynamics_closure = {
let dynamics_fn_clone = self.dynamics_fn.clone_ref(py);
move |t: f64, state: &DVector<f64>, params: &DVector<f64>| -> DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let params_py = params.as_slice().to_pyarray(py);
let result = dynamics_fn_clone
.call1(py, (t, state_py, params_py))
.expect("Failed to call dynamics function");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(state.len()))
.expect("Dynamics function returned invalid array");
DVector::from_vec(result_vec)
})
}
};
let mut provider = match self.method {
jacobian::DifferenceMethod::Forward => {
sensitivity::DNumericalSensitivity::forward(Box::new(dynamics_closure))
}
jacobian::DifferenceMethod::Central => {
sensitivity::DNumericalSensitivity::central(Box::new(dynamics_closure))
}
jacobian::DifferenceMethod::Backward => {
sensitivity::DNumericalSensitivity::backward(Box::new(dynamics_closure))
}
};
provider = match self.perturbation {
jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_strategy(jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
}),
jacobian::PerturbationStrategy::Fixed(offset) => {
provider.with_strategy(jacobian::PerturbationStrategy::Fixed(offset))
}
jacobian::PerturbationStrategy::Percentage(pct) => {
provider.with_strategy(jacobian::PerturbationStrategy::Percentage(pct))
}
};
let sens_matrix = provider.compute(t, &state_dvec, ¶ms_dvec);
let rows = sens_matrix.nrows();
let cols = sens_matrix.ncols();
let mut flat_vec = Vec::with_capacity(rows * cols);
for i in 0..rows {
for j in 0..cols {
flat_vec.push(sens_matrix[(i, j)]);
}
}
flat_vec.into_pyarray(py).reshape([state_dim, param_dim])
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AnalyticSensitivity")]
pub struct PyDAnalyticSensitivity {
sensitivity_fn: Py<PyAny>,
}
#[pymethods]
impl PyDAnalyticSensitivity {
#[new]
pub fn new(sensitivity_fn: Py<PyAny>) -> Self {
Self { sensitivity_fn }
}
pub fn compute<'py>(
&self,
py: Python<'py>,
t: f64,
state: &Bound<'py, PyAny>,
params: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyArray<f64, numpy::Ix2>>> {
let state_vec = pyany_to_f64_array1(state, None)?;
let params_vec = pyany_to_f64_array1(params, None)?;
let state_dim = state_vec.len();
let param_dim = params_vec.len();
let state_py = state_vec.into_pyarray(py);
let params_py = params_vec.into_pyarray(py);
let result = self.sensitivity_fn.call1(py, (t, state_py, params_py))?;
let sens_matrix_vec = pyany_to_f64_array2(result.bind(py), Some((state_dim, param_dim)))?;
let flat_vec: Vec<f64> = sens_matrix_vec.into_iter().flatten().collect();
flat_vec.into_pyarray(py).reshape([state_dim, param_dim])
}
}