#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "IntegratorConfig")]
#[derive(Clone)]
pub struct PyIntegratorConfig {
pub(crate) inner: IntegratorConfig,
}
#[pymethods]
impl PyIntegratorConfig {
#[new]
#[pyo3(signature = (
abs_tol=1e-6,
rel_tol=1e-3,
initial_step=None,
min_step=Some(1e-12),
max_step=Some(900.0),
step_safety_factor=Some(0.9),
min_step_scale_factor=Some(0.2),
max_step_scale_factor=Some(10.0),
max_step_attempts=10,
fixed_step_size=None
))]
#[allow(clippy::too_many_arguments)]
pub fn new(
abs_tol: f64,
rel_tol: f64,
initial_step: Option<f64>,
min_step: Option<f64>,
max_step: Option<f64>,
step_safety_factor: Option<f64>,
min_step_scale_factor: Option<f64>,
max_step_scale_factor: Option<f64>,
max_step_attempts: usize,
fixed_step_size: Option<f64>,
) -> Self {
PyIntegratorConfig {
inner: IntegratorConfig {
abs_tol,
rel_tol,
initial_step,
min_step,
max_step,
step_safety_factor,
min_step_scale_factor,
max_step_scale_factor,
max_step_attempts,
fixed_step_size,
},
}
}
#[classmethod]
fn fixed_step(_cls: &Bound<'_, PyType>, step_size: f64) -> Self {
PyIntegratorConfig {
inner: IntegratorConfig::fixed_step(step_size),
}
}
#[classmethod]
fn adaptive(_cls: &Bound<'_, PyType>, abs_tol: f64, rel_tol: f64) -> Self {
PyIntegratorConfig {
inner: IntegratorConfig::adaptive(abs_tol, rel_tol),
}
}
#[getter]
fn abs_tol(&self) -> f64 {
self.inner.abs_tol
}
#[getter]
fn rel_tol(&self) -> f64 {
self.inner.rel_tol
}
#[getter]
fn initial_step(&self) -> Option<f64> {
self.inner.initial_step
}
#[getter]
fn min_step(&self) -> Option<f64> {
self.inner.min_step
}
#[getter]
fn max_step(&self) -> Option<f64> {
self.inner.max_step
}
#[getter]
fn step_safety_factor(&self) -> Option<f64> {
self.inner.step_safety_factor
}
#[getter]
fn min_step_scale_factor(&self) -> Option<f64> {
self.inner.min_step_scale_factor
}
#[getter]
fn max_step_scale_factor(&self) -> Option<f64> {
self.inner.max_step_scale_factor
}
#[getter]
fn max_step_attempts(&self) -> usize {
self.inner.max_step_attempts
}
fn __repr__(&self) -> String {
format!(
"IntegratorConfig(abs_tol={}, rel_tol={}, max_step={:?})",
self.inner.abs_tol, self.inner.rel_tol, self.inner.max_step
)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AdaptiveStepResult")]
pub struct PyAdaptiveStepDResult {
inner: DIntegratorStepResult,
}
#[pymethods]
impl PyAdaptiveStepDResult {
#[getter]
fn state<'py>(&self, py: Python<'py>) -> Bound<'py, PyArray<f64, numpy::Ix1>> {
self.inner.state.as_slice().to_pyarray(py)
}
#[getter]
fn dt_used(&self) -> f64 {
self.inner.dt_used
}
#[getter]
fn error_estimate(&self) -> Option<f64> {
self.inner.error_estimate
}
#[getter]
fn dt_next(&self) -> f64 {
self.inner.dt_next
}
fn __repr__(&self) -> String {
let error_str = match self.inner.error_estimate {
Some(err) => format!("{}", err),
None => "None".to_string(),
};
format!(
"AdaptiveStepResult(dt_used={}, error={}, dt_next={})",
self.inner.dt_used, error_str, self.inner.dt_next
)
}
}
#[pyclass(module = "brahe._brahe", unsendable)]
#[pyo3(name = "RK4Integrator")]
pub struct PyRK4DIntegrator {
inner: RK4DIntegrator,
dimension: usize,
}
#[pymethods]
impl PyRK4DIntegrator {
#[allow(deprecated)]
#[new]
#[pyo3(signature = (dimension, dynamics_fn, jacobian=None, sensitivity=None, control_fn=None, config=None))]
pub fn new(
py: Python<'_>,
dimension: usize,
dynamics_fn: Py<PyAny>,
jacobian: Option<Py<PyAny>>,
sensitivity: Option<Py<PyAny>>,
control_fn: Option<Py<PyAny>>,
config: Option<PyIntegratorConfig>,
) -> PyResult<Self> {
let dynamics_closure = {
let dynamics_fn_arc = Arc::new(dynamics_fn.clone_ref(py));
move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = dynamics_fn_arc
.call1(py, (t, state_py))
.expect("Failed to call dynamics function");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Dynamics function returned invalid array");
na::DVector::from_vec(result_vec)
})
}
};
let control: crate::integrators::traits::DControlInput = if let Some(ctrl_fn) = control_fn {
let ctrl_fn_arc = Arc::new(ctrl_fn.clone_ref(py));
Some(Box::new(move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = ctrl_fn_arc
.call1(py, (t, state_py))
.expect("Failed to call control function");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Control function returned invalid array");
na::DVector::from_vec(result_vec)
})
}))
} else {
None
};
let varmat: Option<Box<dyn DJacobianProvider>> = if let Some(jac_obj) = jacobian {
Python::attach(|py| {
let jac = jac_obj.bind(py);
if let Ok(num_jac) = jac.extract::<PyRef<PyDNumericalJacobian>>() {
let jac_arc = Arc::new(num_jac.dynamics_fn.clone_ref(py));
let jac_method = num_jac.method;
let jac_pert = num_jac.perturbation;
let jac_closure = move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = jac_arc
.call1(py, (t, state_py))
.expect("Failed to call Jacobian dynamics");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Jacobian dynamics returned invalid array");
na::DVector::from_vec(result_vec)
})
};
let mut provider = match jac_method {
crate::math::jacobian::DifferenceMethod::Forward => {
crate::math::jacobian::DNumericalJacobian::forward(Box::new(jac_closure))
}
crate::math::jacobian::DifferenceMethod::Central => {
crate::math::jacobian::DNumericalJacobian::central(Box::new(jac_closure))
}
crate::math::jacobian::DifferenceMethod::Backward => {
crate::math::jacobian::DNumericalJacobian::backward(Box::new(jac_closure))
}
};
provider = match jac_pert {
crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_adaptive(scale_factor, min_value),
crate::math::jacobian::PerturbationStrategy::Fixed(offset) => {
provider.with_fixed_offset(offset)
}
crate::math::jacobian::PerturbationStrategy::Percentage(pct) => {
provider.with_percentage(pct)
}
};
Ok(Some(Box::new(provider) as Box<dyn DJacobianProvider>))
} else if let Ok(ana_jac) = jac.extract::<PyRef<PyDAnalyticJacobian>>() {
let jac_arc = Arc::new(ana_jac.jacobian_fn.clone_ref(py));
let jac_closure = move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DMatrix<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = jac_arc
.call1(py, (t, state_py))
.expect("Failed to call Jacobian function");
let jac_matrix_vec =
pyany_to_f64_array2(result.bind(py), Some((dimension, dimension)))
.expect("Jacobian function returned invalid array");
let mut jac_matrix = na::DMatrix::<f64>::zeros(dimension, dimension);
for i in 0..dimension {
for j in 0..dimension {
jac_matrix[(i, j)] = jac_matrix_vec[i][j];
}
}
jac_matrix
})
};
let provider = crate::math::jacobian::DAnalyticJacobian::new(Box::new(jac_closure));
Ok(Some(Box::new(provider) as Box<dyn DJacobianProvider>))
} else {
Err(exceptions::PyTypeError::new_err(
"jacobian must be DNumericalJacobian or DAnalyticJacobian",
))
}
})?
} else {
None
};
let sensmat: Option<Box<dyn DSensitivityProvider>> = if let Some(sens_obj) = sensitivity {
Python::attach(|py| {
let sens = sens_obj.bind(py);
if let Ok(num_sens) = sens.extract::<PyRef<PyDNumericalSensitivity>>() {
let sens_arc = Arc::new(num_sens.dynamics_fn.clone_ref(py));
let sens_method = num_sens.method;
let sens_pert = num_sens.perturbation;
let sens_closure = move |t: f64, state: &na::DVector<f64>, params: &na::DVector<f64>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let params_py = params.as_slice().to_pyarray(py);
let result = sens_arc
.call1(py, (t, state_py, params_py))
.expect("Failed to call sensitivity dynamics");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(state.len()))
.expect("Sensitivity dynamics returned invalid array");
na::DVector::from_vec(result_vec)
})
};
let mut provider = match sens_method {
crate::math::jacobian::DifferenceMethod::Forward => {
crate::math::sensitivity::DNumericalSensitivity::forward(Box::new(sens_closure))
}
crate::math::jacobian::DifferenceMethod::Central => {
crate::math::sensitivity::DNumericalSensitivity::central(Box::new(sens_closure))
}
crate::math::jacobian::DifferenceMethod::Backward => {
crate::math::sensitivity::DNumericalSensitivity::central(Box::new(sens_closure))
}
};
provider = match sens_pert {
crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
}),
crate::math::jacobian::PerturbationStrategy::Fixed(offset) => {
provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Fixed(offset))
}
crate::math::jacobian::PerturbationStrategy::Percentage(pct) => {
provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Percentage(pct))
}
};
Ok(Some(Box::new(provider) as Box<dyn DSensitivityProvider>))
} else if let Ok(ana_sens) = sens.extract::<PyRef<PyDAnalyticSensitivity>>() {
let sens_arc = Arc::new(ana_sens.sensitivity_fn.clone_ref(py));
let sens_closure = move |t: f64, state: &na::DVector<f64>, params: &na::DVector<f64>| -> na::DMatrix<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let params_py = params.as_slice().to_pyarray(py);
let result = sens_arc
.call1(py, (t, state_py, params_py))
.expect("Failed to call sensitivity function");
let state_dim = state.len();
let param_dim = params.len();
let sens_matrix_vec =
pyany_to_f64_array2(result.bind(py), Some((state_dim, param_dim)))
.expect("Sensitivity function returned invalid array");
let mut sens_matrix = na::DMatrix::<f64>::zeros(state_dim, param_dim);
for i in 0..state_dim {
for j in 0..param_dim {
sens_matrix[(i, j)] = sens_matrix_vec[i][j];
}
}
sens_matrix
})
};
let provider = crate::math::sensitivity::DAnalyticSensitivity::new(Box::new(sens_closure));
Ok(Some(Box::new(provider) as Box<dyn DSensitivityProvider>))
} else {
Err(exceptions::PyTypeError::new_err(
"sensitivity must be NumericalSensitivity or AnalyticSensitivity",
))
}
})?
} else {
None
};
let inner = if let Some(cfg) = config {
RK4DIntegrator::with_config(dimension, Box::new(dynamics_closure), varmat, sensmat, control, cfg.inner)
} else {
RK4DIntegrator::new(dimension, Box::new(dynamics_closure), varmat, sensmat, control)
};
Ok(PyRK4DIntegrator { inner, dimension })
}
#[pyo3(signature = (t, state, dt=None))]
pub fn step<'py>(
&self,
py: Python<'py>,
t: f64,
state: &Bound<'py, PyAny>,
dt: Option<f64>,
) -> PyResult<Bound<'py, PyArray<f64, numpy::Ix1>>> {
let state_vec = pyany_to_f64_array1(state, Some(self.dimension))?;
let state_dvec = na::DVector::from_vec(state_vec);
let result = self.inner.step(t, state_dvec, None, dt);
Ok(result.state.as_slice().to_pyarray(py))
}
#[pyo3(signature = (t, state, phi, dt=None))]
#[allow(clippy::type_complexity)]
pub fn step_with_varmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: &Bound<'py, PyAny>,
phi: &Bound<'py, PyAny>,
dt: Option<f64>,
) -> PyResult<(Bound<'py, PyArray<f64, numpy::Ix1>>, Bound<'py, PyArray<f64, numpy::Ix2>>)> {
let state_vec = pyany_to_f64_array1(state, Some(self.dimension))?;
let state_dvec = na::DVector::from_vec(state_vec);
let phi_vec = pyany_to_f64_array2(phi, Some((self.dimension, self.dimension)))?;
let mut phi_dmat = na::DMatrix::<f64>::zeros(self.dimension, self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_dmat[(i, j)] = phi_vec[i][j];
}
}
let result = self.inner.step_with_varmat(t, state_dvec, None, phi_dmat, dt);
let state_np = result.state.as_slice().to_pyarray(py);
let new_phi = result.phi.expect("Variational matrix should be present");
let mut phi_flat = Vec::with_capacity(self.dimension * self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_flat.push(new_phi[(i, j)]);
}
}
let phi_np = phi_flat
.into_pyarray(py)
.reshape([self.dimension, self.dimension])?;
Ok((state_np, phi_np))
}
#[pyo3(signature = (t, state, sens, params, dt=None))]
#[allow(clippy::type_complexity)]
pub fn step_with_sensmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
sens: PyReadonlyArray2<f64>,
params: PyReadonlyArray1<f64>,
dt: Option<f64>,
) -> PyResult<(Bound<'py, PyArray<f64, Ix1>>, Bound<'py, PyArray<f64, Ix2>>)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let sens_array = sens.as_array();
let sens_shape = sens_array.shape();
let num_params = sens_shape[1];
let mut sens_dmat = na::DMatrix::<f64>::zeros(self.dimension, num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_dmat[(i, j)] = sens_array[[i, j]];
}
}
let params_vec = params.as_slice()?;
let params_dvec = na::DVector::from_vec(params_vec.to_vec());
let result = self.inner.step_with_sensmat(
t, state_dvec, sens_dmat, ¶ms_dvec, dt
);
let state_np = result.state.as_slice().to_pyarray(py);
let new_sens = result.sens.expect("Sensitivity matrix should be present");
let mut sens_flat = Vec::with_capacity(self.dimension * num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_flat.push(new_sens[(i, j)]);
}
}
let sens_np = sens_flat
.into_pyarray(py)
.reshape([self.dimension, num_params])?;
Ok((state_np, sens_np))
}
#[pyo3(signature = (t, state, phi, sens, params, dt=None))]
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
pub fn step_with_varmat_sensmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
phi: PyReadonlyArray2<f64>,
sens: PyReadonlyArray2<f64>,
params: PyReadonlyArray1<f64>,
dt: Option<f64>,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
Bound<'py, PyArray<f64, Ix2>>,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let phi_array = phi.as_array();
let mut phi_dmat = na::DMatrix::<f64>::zeros(self.dimension, self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_dmat[(i, j)] = phi_array[[i, j]];
}
}
let sens_array = sens.as_array();
let sens_shape = sens_array.shape();
let num_params = sens_shape[1];
let mut sens_dmat = na::DMatrix::<f64>::zeros(self.dimension, num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_dmat[(i, j)] = sens_array[[i, j]];
}
}
let params_vec = params.as_slice()?;
let params_dvec = na::DVector::from_vec(params_vec.to_vec());
let result = self.inner.step_with_varmat_sensmat(
t, state_dvec, phi_dmat, sens_dmat, ¶ms_dvec, dt
);
let state_np = result.state.as_slice().to_pyarray(py);
let new_phi = result.phi.expect("Variational matrix should be present");
let mut phi_flat = Vec::with_capacity(self.dimension * self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_flat.push(new_phi[(i, j)]);
}
}
let phi_np = phi_flat
.into_pyarray(py)
.reshape([self.dimension, self.dimension])?;
let new_sens = result.sens.expect("Sensitivity matrix should be present");
let mut sens_flat = Vec::with_capacity(self.dimension * num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_flat.push(new_sens[(i, j)]);
}
}
let sens_np = sens_flat
.into_pyarray(py)
.reshape([self.dimension, num_params])?;
Ok((state_np, phi_np, sens_np))
}
#[getter]
fn dimension(&self) -> usize {
self.dimension
}
fn __repr__(&self) -> String {
format!("RK4Integrator(dimension={})", self.dimension)
}
}
#[pyclass(module = "brahe._brahe", unsendable)]
#[pyo3(name = "RKF45Integrator")]
pub struct PyRKF45DIntegrator {
inner: RKF45DIntegrator,
dimension: usize,
}
#[pymethods]
impl PyRKF45DIntegrator {
#[allow(deprecated)]
#[new]
#[pyo3(signature = (dimension, dynamics_fn, jacobian=None, sensitivity=None, control_fn=None, config=None))]
pub fn new(
py: Python<'_>,
dimension: usize,
dynamics_fn: Py<PyAny>,
jacobian: Option<Py<PyAny>>,
sensitivity: Option<Py<PyAny>>,
control_fn: Option<Py<PyAny>>,
config: Option<PyIntegratorConfig>,
) -> PyResult<Self> {
let dynamics_closure = {
let dynamics_fn_arc = Arc::new(dynamics_fn.clone_ref(py));
move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = dynamics_fn_arc
.call1(py, (t, state_py))
.expect("Failed to call dynamics function");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Dynamics function returned invalid array");
na::DVector::from_vec(result_vec)
})
}
};
let control: crate::integrators::traits::DControlInput = if let Some(ctrl_fn) = control_fn {
let ctrl_fn_arc = Arc::new(ctrl_fn.clone_ref(py));
Some(Box::new(move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = ctrl_fn_arc
.call1(py, (t, state_py))
.expect("Failed to call control function");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Control function returned invalid array");
na::DVector::from_vec(result_vec)
})
}))
} else {
None
};
let varmat: Option<Box<dyn DJacobianProvider>> = if let Some(jac_obj) = jacobian {
Python::attach(|py| {
let jac = jac_obj.bind(py);
if let Ok(num_jac) = jac.extract::<PyRef<PyDNumericalJacobian>>() {
let jac_arc = Arc::new(num_jac.dynamics_fn.clone_ref(py));
let jac_method = num_jac.method;
let jac_pert = num_jac.perturbation;
let jac_closure = move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = jac_arc
.call1(py, (t, state_py))
.expect("Failed to call Jacobian dynamics");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Jacobian dynamics returned invalid array");
na::DVector::from_vec(result_vec)
})
};
let mut provider = match jac_method {
crate::math::jacobian::DifferenceMethod::Forward => {
crate::math::jacobian::DNumericalJacobian::forward(Box::new(jac_closure))
}
crate::math::jacobian::DifferenceMethod::Central => {
crate::math::jacobian::DNumericalJacobian::central(Box::new(jac_closure))
}
crate::math::jacobian::DifferenceMethod::Backward => {
crate::math::jacobian::DNumericalJacobian::backward(Box::new(jac_closure))
}
};
provider = match jac_pert {
crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_adaptive(scale_factor, min_value),
crate::math::jacobian::PerturbationStrategy::Fixed(offset) => {
provider.with_fixed_offset(offset)
}
crate::math::jacobian::PerturbationStrategy::Percentage(pct) => {
provider.with_percentage(pct)
}
};
Ok(Some(Box::new(provider) as Box<dyn DJacobianProvider>))
} else if let Ok(ana_jac) = jac.extract::<PyRef<PyDAnalyticJacobian>>() {
let jac_arc = Arc::new(ana_jac.jacobian_fn.clone_ref(py));
let jac_closure = move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DMatrix<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = jac_arc
.call1(py, (t, state_py))
.expect("Failed to call Jacobian function");
let jac_matrix_vec =
pyany_to_f64_array2(result.bind(py), Some((dimension, dimension)))
.expect("Jacobian function returned invalid array");
let mut jac_matrix = na::DMatrix::<f64>::zeros(dimension, dimension);
for i in 0..dimension {
for j in 0..dimension {
jac_matrix[(i, j)] = jac_matrix_vec[i][j];
}
}
jac_matrix
})
};
let provider = crate::math::jacobian::DAnalyticJacobian::new(Box::new(jac_closure));
Ok(Some(Box::new(provider) as Box<dyn DJacobianProvider>))
} else {
Err(exceptions::PyTypeError::new_err(
"jacobian must be DNumericalJacobian or DAnalyticJacobian",
))
}
})?
} else {
None
};
let sensmat: Option<Box<dyn DSensitivityProvider>> = if let Some(sens_obj) = sensitivity {
Python::attach(|py| {
let sens = sens_obj.bind(py);
if let Ok(num_sens) = sens.extract::<PyRef<PyDNumericalSensitivity>>() {
let sens_arc = Arc::new(num_sens.dynamics_fn.clone_ref(py));
let sens_method = num_sens.method;
let sens_pert = num_sens.perturbation;
let sens_closure = move |t: f64, state: &na::DVector<f64>, params: &na::DVector<f64>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let params_py = params.as_slice().to_pyarray(py);
let result = sens_arc
.call1(py, (t, state_py, params_py))
.expect("Failed to call sensitivity dynamics");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(state.len()))
.expect("Sensitivity dynamics returned invalid array");
na::DVector::from_vec(result_vec)
})
};
let mut provider = match sens_method {
crate::math::jacobian::DifferenceMethod::Forward => {
crate::math::sensitivity::DNumericalSensitivity::forward(Box::new(sens_closure))
}
crate::math::jacobian::DifferenceMethod::Central => {
crate::math::sensitivity::DNumericalSensitivity::central(Box::new(sens_closure))
}
crate::math::jacobian::DifferenceMethod::Backward => {
crate::math::sensitivity::DNumericalSensitivity::central(Box::new(sens_closure))
}
};
provider = match sens_pert {
crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
}),
crate::math::jacobian::PerturbationStrategy::Fixed(offset) => {
provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Fixed(offset))
}
crate::math::jacobian::PerturbationStrategy::Percentage(pct) => {
provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Percentage(pct))
}
};
Ok(Some(Box::new(provider) as Box<dyn DSensitivityProvider>))
} else if let Ok(ana_sens) = sens.extract::<PyRef<PyDAnalyticSensitivity>>() {
let sens_arc = Arc::new(ana_sens.sensitivity_fn.clone_ref(py));
let sens_closure = move |t: f64, state: &na::DVector<f64>, params: &na::DVector<f64>| -> na::DMatrix<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let params_py = params.as_slice().to_pyarray(py);
let result = sens_arc
.call1(py, (t, state_py, params_py))
.expect("Failed to call sensitivity function");
let state_dim = state.len();
let param_dim = params.len();
let sens_matrix_vec =
pyany_to_f64_array2(result.bind(py), Some((state_dim, param_dim)))
.expect("Sensitivity function returned invalid array");
let mut sens_matrix = na::DMatrix::<f64>::zeros(state_dim, param_dim);
for i in 0..state_dim {
for j in 0..param_dim {
sens_matrix[(i, j)] = sens_matrix_vec[i][j];
}
}
sens_matrix
})
};
let provider = crate::math::sensitivity::DAnalyticSensitivity::new(Box::new(sens_closure));
Ok(Some(Box::new(provider) as Box<dyn DSensitivityProvider>))
} else {
Err(exceptions::PyTypeError::new_err(
"sensitivity must be NumericalSensitivity or AnalyticSensitivity",
))
}
})?
} else {
None
};
let inner = if let Some(cfg) = config {
RKF45DIntegrator::with_config(dimension, Box::new(dynamics_closure), varmat, sensmat, control, cfg.inner)
} else {
RKF45DIntegrator::new(dimension, Box::new(dynamics_closure), varmat, sensmat, control)
};
Ok(PyRKF45DIntegrator { inner, dimension })
}
pub fn step<'py>(
&self,
_py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
dt: f64,
) -> PyResult<PyAdaptiveStepDResult> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let result = self.inner.step(t, state_dvec, None, Some(dt));
Ok(PyAdaptiveStepDResult { inner: result })
}
#[allow(clippy::type_complexity)]
pub fn step_with_varmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
phi: PyReadonlyArray2<f64>,
dt: f64,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
f64,
f64,
f64,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let phi_array = phi.as_array();
let mut phi_dmat = na::DMatrix::<f64>::zeros(self.dimension, self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_dmat[(i, j)] = phi_array[[i, j]];
}
}
let result = self.inner.step_with_varmat(t, state_dvec, None, phi_dmat, Some(dt));
let state_np = result.state.as_slice().to_pyarray(py);
let new_phi = result.phi.expect("Variational matrix should be present");
let mut phi_flat = Vec::with_capacity(self.dimension * self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_flat.push(new_phi[(i, j)]);
}
}
let dt_used = result.dt_used;
let error_est = result.error_estimate.expect("Error estimate should be present for adaptive integrator");
let dt_next = result.dt_next;
let phi_np = phi_flat
.into_pyarray(py)
.reshape([self.dimension, self.dimension])?;
Ok((state_np, phi_np, dt_used, error_est, dt_next))
}
#[allow(clippy::type_complexity)]
pub fn step_with_sensmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
sens: PyReadonlyArray2<f64>,
params: PyReadonlyArray1<f64>,
dt: f64,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
f64,
f64,
f64,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let sens_array = sens.as_array();
let sens_shape = sens_array.shape();
let num_params = sens_shape[1];
let mut sens_dmat = na::DMatrix::<f64>::zeros(self.dimension, num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_dmat[(i, j)] = sens_array[[i, j]];
}
}
let params_vec = params.as_slice()?;
let params_dvec = na::DVector::from_vec(params_vec.to_vec());
let result = self.inner.step_with_sensmat(
t, state_dvec, sens_dmat, ¶ms_dvec, Some(dt)
);
let state_np = result.state.as_slice().to_pyarray(py);
let new_sens = result.sens.expect("Sensitivity matrix should be present");
let mut sens_flat = Vec::with_capacity(self.dimension * num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_flat.push(new_sens[(i, j)]);
}
}
let dt_used = result.dt_used;
let error_est = result.error_estimate.expect("Error estimate should be present for adaptive integrator");
let dt_next = result.dt_next;
let sens_np = sens_flat
.into_pyarray(py)
.reshape([self.dimension, num_params])?;
Ok((state_np, sens_np, dt_used, error_est, dt_next))
}
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
pub fn step_with_varmat_sensmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
phi: PyReadonlyArray2<f64>,
sens: PyReadonlyArray2<f64>,
params: PyReadonlyArray1<f64>,
dt: f64,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
Bound<'py, PyArray<f64, Ix2>>,
f64,
f64,
f64,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let phi_array = phi.as_array();
let mut phi_dmat = na::DMatrix::<f64>::zeros(self.dimension, self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_dmat[(i, j)] = phi_array[[i, j]];
}
}
let sens_array = sens.as_array();
let sens_shape = sens_array.shape();
let num_params = sens_shape[1];
let mut sens_dmat = na::DMatrix::<f64>::zeros(self.dimension, num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_dmat[(i, j)] = sens_array[[i, j]];
}
}
let params_vec = params.as_slice()?;
let params_dvec = na::DVector::from_vec(params_vec.to_vec());
let result = self.inner.step_with_varmat_sensmat(
t, state_dvec, phi_dmat, sens_dmat, ¶ms_dvec, Some(dt)
);
let state_np = result.state.as_slice().to_pyarray(py);
let new_phi = result.phi.expect("Variational matrix should be present");
let mut phi_flat = Vec::with_capacity(self.dimension * self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_flat.push(new_phi[(i, j)]);
}
}
let phi_np = phi_flat
.into_pyarray(py)
.reshape([self.dimension, self.dimension])?;
let new_sens = result.sens.expect("Sensitivity matrix should be present");
let mut sens_flat = Vec::with_capacity(self.dimension * num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_flat.push(new_sens[(i, j)]);
}
}
let dt_used = result.dt_used;
let error_est = result.error_estimate.expect("Error estimate should be present for adaptive integrator");
let dt_next = result.dt_next;
let sens_np = sens_flat
.into_pyarray(py)
.reshape([self.dimension, num_params])?;
Ok((state_np, phi_np, sens_np, dt_used, error_est, dt_next))
}
#[getter]
fn dimension(&self) -> usize {
self.dimension
}
fn __repr__(&self) -> String {
format!("RKF45Integrator(dimension={})", self.dimension)
}
}
#[pyclass(module = "brahe._brahe", unsendable)]
#[pyo3(name = "DP54Integrator")]
pub struct PyDP54DIntegrator {
inner: DormandPrince54DIntegrator,
dimension: usize,
}
#[pymethods]
impl PyDP54DIntegrator {
#[allow(deprecated)]
#[new]
#[pyo3(signature = (dimension, dynamics_fn, jacobian=None, sensitivity=None, control_fn=None, config=None))]
pub fn new(
py: Python<'_>,
dimension: usize,
dynamics_fn: Py<PyAny>,
jacobian: Option<Py<PyAny>>,
sensitivity: Option<Py<PyAny>>,
control_fn: Option<Py<PyAny>>,
config: Option<PyIntegratorConfig>,
) -> PyResult<Self> {
let dynamics_closure = {
let dynamics_fn_arc = Arc::new(dynamics_fn.clone_ref(py));
move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = dynamics_fn_arc
.call1(py, (t, state_py))
.expect("Failed to call dynamics function");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Dynamics function returned invalid array");
na::DVector::from_vec(result_vec)
})
}
};
let control: crate::integrators::traits::DControlInput = if let Some(ctrl_fn) = control_fn {
let ctrl_fn_arc = Arc::new(ctrl_fn.clone_ref(py));
Some(Box::new(move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = ctrl_fn_arc
.call1(py, (t, state_py))
.expect("Failed to call control function");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Control function returned invalid array");
na::DVector::from_vec(result_vec)
})
}))
} else {
None
};
let varmat: Option<Box<dyn DJacobianProvider>> = if let Some(jac_obj) = jacobian {
Python::attach(|py| {
let jac = jac_obj.bind(py);
if let Ok(num_jac) = jac.extract::<PyRef<PyDNumericalJacobian>>() {
let jac_arc = Arc::new(num_jac.dynamics_fn.clone_ref(py));
let jac_method = num_jac.method;
let jac_pert = num_jac.perturbation;
let jac_closure = move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = jac_arc
.call1(py, (t, state_py))
.expect("Failed to call Jacobian dynamics");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Jacobian dynamics returned invalid array");
na::DVector::from_vec(result_vec)
})
};
let mut provider = match jac_method {
crate::math::jacobian::DifferenceMethod::Forward => {
crate::math::jacobian::DNumericalJacobian::forward(Box::new(jac_closure))
}
crate::math::jacobian::DifferenceMethod::Central => {
crate::math::jacobian::DNumericalJacobian::central(Box::new(jac_closure))
}
crate::math::jacobian::DifferenceMethod::Backward => {
crate::math::jacobian::DNumericalJacobian::backward(Box::new(jac_closure))
}
};
provider = match jac_pert {
crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_adaptive(scale_factor, min_value),
crate::math::jacobian::PerturbationStrategy::Fixed(offset) => {
provider.with_fixed_offset(offset)
}
crate::math::jacobian::PerturbationStrategy::Percentage(pct) => {
provider.with_percentage(pct)
}
};
Ok(Some(Box::new(provider) as Box<dyn DJacobianProvider>))
} else if let Ok(ana_jac) = jac.extract::<PyRef<PyDAnalyticJacobian>>() {
let jac_arc = Arc::new(ana_jac.jacobian_fn.clone_ref(py));
let jac_closure = move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DMatrix<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = jac_arc
.call1(py, (t, state_py))
.expect("Failed to call Jacobian function");
let jac_matrix_vec =
pyany_to_f64_array2(result.bind(py), Some((dimension, dimension)))
.expect("Jacobian function returned invalid array");
let mut jac_matrix = na::DMatrix::<f64>::zeros(dimension, dimension);
for i in 0..dimension {
for j in 0..dimension {
jac_matrix[(i, j)] = jac_matrix_vec[i][j];
}
}
jac_matrix
})
};
let provider = crate::math::jacobian::DAnalyticJacobian::new(Box::new(jac_closure));
Ok(Some(Box::new(provider) as Box<dyn DJacobianProvider>))
} else {
Err(exceptions::PyTypeError::new_err(
"jacobian must be DNumericalJacobian or DAnalyticJacobian",
))
}
})?
} else {
None
};
let sensmat: Option<Box<dyn DSensitivityProvider>> = if let Some(sens_obj) = sensitivity {
Python::attach(|py| {
let sens = sens_obj.bind(py);
if let Ok(num_sens) = sens.extract::<PyRef<PyDNumericalSensitivity>>() {
let sens_arc = Arc::new(num_sens.dynamics_fn.clone_ref(py));
let sens_method = num_sens.method;
let sens_pert = num_sens.perturbation;
let sens_closure = move |t: f64, state: &na::DVector<f64>, params: &na::DVector<f64>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let params_py = params.as_slice().to_pyarray(py);
let result = sens_arc
.call1(py, (t, state_py, params_py))
.expect("Failed to call sensitivity dynamics");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(state.len()))
.expect("Sensitivity dynamics returned invalid array");
na::DVector::from_vec(result_vec)
})
};
let mut provider = match sens_method {
crate::math::jacobian::DifferenceMethod::Forward => {
crate::math::sensitivity::DNumericalSensitivity::forward(Box::new(sens_closure))
}
crate::math::jacobian::DifferenceMethod::Central => {
crate::math::sensitivity::DNumericalSensitivity::central(Box::new(sens_closure))
}
crate::math::jacobian::DifferenceMethod::Backward => {
crate::math::sensitivity::DNumericalSensitivity::central(Box::new(sens_closure))
}
};
provider = match sens_pert {
crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
}),
crate::math::jacobian::PerturbationStrategy::Fixed(offset) => {
provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Fixed(offset))
}
crate::math::jacobian::PerturbationStrategy::Percentage(pct) => {
provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Percentage(pct))
}
};
Ok(Some(Box::new(provider) as Box<dyn DSensitivityProvider>))
} else if let Ok(ana_sens) = sens.extract::<PyRef<PyDAnalyticSensitivity>>() {
let sens_arc = Arc::new(ana_sens.sensitivity_fn.clone_ref(py));
let sens_closure = move |t: f64, state: &na::DVector<f64>, params: &na::DVector<f64>| -> na::DMatrix<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let params_py = params.as_slice().to_pyarray(py);
let result = sens_arc
.call1(py, (t, state_py, params_py))
.expect("Failed to call sensitivity function");
let state_dim = state.len();
let param_dim = params.len();
let sens_matrix_vec =
pyany_to_f64_array2(result.bind(py), Some((state_dim, param_dim)))
.expect("Sensitivity function returned invalid array");
let mut sens_matrix = na::DMatrix::<f64>::zeros(state_dim, param_dim);
for i in 0..state_dim {
for j in 0..param_dim {
sens_matrix[(i, j)] = sens_matrix_vec[i][j];
}
}
sens_matrix
})
};
let provider = crate::math::sensitivity::DAnalyticSensitivity::new(Box::new(sens_closure));
Ok(Some(Box::new(provider) as Box<dyn DSensitivityProvider>))
} else {
Err(exceptions::PyTypeError::new_err(
"sensitivity must be NumericalSensitivity or AnalyticSensitivity",
))
}
})?
} else {
None
};
let inner = if let Some(cfg) = config {
DormandPrince54DIntegrator::with_config(dimension, Box::new(dynamics_closure), varmat, sensmat, control, cfg.inner)
} else {
DormandPrince54DIntegrator::new(dimension, Box::new(dynamics_closure), varmat, sensmat, control)
};
Ok(PyDP54DIntegrator { inner, dimension })
}
pub fn step<'py>(
&self,
_py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
dt: f64,
) -> PyResult<PyAdaptiveStepDResult> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let result = self.inner.step(t, state_dvec, None, Some(dt));
Ok(PyAdaptiveStepDResult { inner: result })
}
#[allow(clippy::type_complexity)]
pub fn step_with_varmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
phi: PyReadonlyArray2<f64>,
dt: f64,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
f64,
f64,
f64,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let phi_array = phi.as_array();
let mut phi_dmat = na::DMatrix::<f64>::zeros(self.dimension, self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_dmat[(i, j)] = phi_array[[i, j]];
}
}
let result = self.inner.step_with_varmat(t, state_dvec, None, phi_dmat, Some(dt));
let state_np = result.state.as_slice().to_pyarray(py);
let new_phi = result.phi.expect("Variational matrix should be present");
let mut phi_flat = Vec::with_capacity(self.dimension * self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_flat.push(new_phi[(i, j)]);
}
}
let dt_used = result.dt_used;
let error_est = result.error_estimate.expect("Error estimate should be present for adaptive integrator");
let dt_next = result.dt_next;
let phi_np = phi_flat
.into_pyarray(py)
.reshape([self.dimension, self.dimension])?;
Ok((state_np, phi_np, dt_used, error_est, dt_next))
}
#[allow(clippy::type_complexity)]
pub fn step_with_sensmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
sens: PyReadonlyArray2<f64>,
params: PyReadonlyArray1<f64>,
dt: f64,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
f64,
f64,
f64,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let sens_array = sens.as_array();
let sens_shape = sens_array.shape();
let num_params = sens_shape[1];
let mut sens_dmat = na::DMatrix::<f64>::zeros(self.dimension, num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_dmat[(i, j)] = sens_array[[i, j]];
}
}
let params_vec = params.as_slice()?;
let params_dvec = na::DVector::from_vec(params_vec.to_vec());
let result = self.inner.step_with_sensmat(
t, state_dvec, sens_dmat, ¶ms_dvec, Some(dt)
);
let state_np = result.state.as_slice().to_pyarray(py);
let new_sens = result.sens.expect("Sensitivity matrix should be present");
let mut sens_flat = Vec::with_capacity(self.dimension * num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_flat.push(new_sens[(i, j)]);
}
}
let dt_used = result.dt_used;
let error_est = result.error_estimate.expect("Error estimate should be present for adaptive integrator");
let dt_next = result.dt_next;
let sens_np = sens_flat
.into_pyarray(py)
.reshape([self.dimension, num_params])?;
Ok((state_np, sens_np, dt_used, error_est, dt_next))
}
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
pub fn step_with_varmat_sensmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
phi: PyReadonlyArray2<f64>,
sens: PyReadonlyArray2<f64>,
params: PyReadonlyArray1<f64>,
dt: f64,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
Bound<'py, PyArray<f64, Ix2>>,
f64,
f64,
f64,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let phi_array = phi.as_array();
let mut phi_dmat = na::DMatrix::<f64>::zeros(self.dimension, self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_dmat[(i, j)] = phi_array[[i, j]];
}
}
let sens_array = sens.as_array();
let sens_shape = sens_array.shape();
let num_params = sens_shape[1];
let mut sens_dmat = na::DMatrix::<f64>::zeros(self.dimension, num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_dmat[(i, j)] = sens_array[[i, j]];
}
}
let params_vec = params.as_slice()?;
let params_dvec = na::DVector::from_vec(params_vec.to_vec());
let result = self.inner.step_with_varmat_sensmat(
t, state_dvec, phi_dmat, sens_dmat, ¶ms_dvec, Some(dt)
);
let state_np = result.state.as_slice().to_pyarray(py);
let new_phi = result.phi.expect("Variational matrix should be present");
let mut phi_flat = Vec::with_capacity(self.dimension * self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_flat.push(new_phi[(i, j)]);
}
}
let phi_np = phi_flat
.into_pyarray(py)
.reshape([self.dimension, self.dimension])?;
let new_sens = result.sens.expect("Sensitivity matrix should be present");
let mut sens_flat = Vec::with_capacity(self.dimension * num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_flat.push(new_sens[(i, j)]);
}
}
let dt_used = result.dt_used;
let error_est = result.error_estimate.expect("Error estimate should be present for adaptive integrator");
let dt_next = result.dt_next;
let sens_np = sens_flat
.into_pyarray(py)
.reshape([self.dimension, num_params])?;
Ok((state_np, phi_np, sens_np, dt_used, error_est, dt_next))
}
#[getter]
fn dimension(&self) -> usize {
self.dimension
}
fn __repr__(&self) -> String {
format!("DP54Integrator(dimension={})", self.dimension)
}
}
#[pyclass(module = "brahe._brahe", unsendable)]
#[pyo3(name = "RKN1210Integrator")]
pub struct PyRKN1210DIntegrator {
inner: RKN1210DIntegrator,
dimension: usize,
}
#[pymethods]
impl PyRKN1210DIntegrator {
#[allow(deprecated)]
#[new]
#[pyo3(signature = (dimension, dynamics_fn, jacobian=None, sensitivity=None, config=None))]
pub fn new(
py: Python<'_>,
dimension: usize,
dynamics_fn: Py<PyAny>,
jacobian: Option<Py<PyAny>>,
sensitivity: Option<Py<PyAny>>,
config: Option<PyIntegratorConfig>,
) -> PyResult<Self> {
if !dimension.is_multiple_of(2) {
return Err(exceptions::PyValueError::new_err(
"RKN1210 requires even dimension (position + velocity)",
));
}
let dynamics_closure = {
let dynamics_fn_arc = Arc::new(dynamics_fn.clone_ref(py));
move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = dynamics_fn_arc
.call1(py, (t, state_py))
.expect("Failed to call dynamics function");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Dynamics function returned invalid array");
na::DVector::from_vec(result_vec)
})
}
};
let varmat: Option<Box<dyn DJacobianProvider>> = if let Some(jac_obj) = jacobian {
Python::attach(|py| {
let jac = jac_obj.bind(py);
if let Ok(num_jac) = jac.extract::<PyRef<PyDNumericalJacobian>>() {
let jac_arc = Arc::new(num_jac.dynamics_fn.clone_ref(py));
let jac_method = num_jac.method;
let jac_pert = num_jac.perturbation;
let jac_closure = move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = jac_arc
.call1(py, (t, state_py))
.expect("Failed to call Jacobian dynamics");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(dimension))
.expect("Jacobian dynamics returned invalid array");
na::DVector::from_vec(result_vec)
})
};
let mut provider = match jac_method {
crate::math::jacobian::DifferenceMethod::Forward => {
crate::math::jacobian::DNumericalJacobian::forward(Box::new(jac_closure))
}
crate::math::jacobian::DifferenceMethod::Central => {
crate::math::jacobian::DNumericalJacobian::central(Box::new(jac_closure))
}
crate::math::jacobian::DifferenceMethod::Backward => {
crate::math::jacobian::DNumericalJacobian::backward(Box::new(jac_closure))
}
};
provider = match jac_pert {
crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_adaptive(scale_factor, min_value),
crate::math::jacobian::PerturbationStrategy::Fixed(offset) => {
provider.with_fixed_offset(offset)
}
crate::math::jacobian::PerturbationStrategy::Percentage(pct) => {
provider.with_percentage(pct)
}
};
Ok(Some(Box::new(provider) as Box<dyn DJacobianProvider>))
} else if let Ok(ana_jac) = jac.extract::<PyRef<PyDAnalyticJacobian>>() {
let jac_arc = Arc::new(ana_jac.jacobian_fn.clone_ref(py));
let jac_closure = move |t: f64, state: &na::DVector<f64>, _params: Option<&na::DVector<f64>>| -> na::DMatrix<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let result = jac_arc
.call1(py, (t, state_py))
.expect("Failed to call Jacobian function");
let jac_matrix_vec =
pyany_to_f64_array2(result.bind(py), Some((dimension, dimension)))
.expect("Jacobian function returned invalid array");
let mut jac_matrix = na::DMatrix::<f64>::zeros(dimension, dimension);
for i in 0..dimension {
for j in 0..dimension {
jac_matrix[(i, j)] = jac_matrix_vec[i][j];
}
}
jac_matrix
})
};
let provider = crate::math::jacobian::DAnalyticJacobian::new(Box::new(jac_closure));
Ok(Some(Box::new(provider) as Box<dyn DJacobianProvider>))
} else {
Err(exceptions::PyTypeError::new_err(
"jacobian must be DNumericalJacobian or DAnalyticJacobian",
))
}
})?
} else {
None
};
let sensmat: Option<Box<dyn DSensitivityProvider>> = if let Some(sens_obj) = sensitivity {
Python::attach(|py| {
let sens = sens_obj.bind(py);
if let Ok(num_sens) = sens.extract::<PyRef<PyDNumericalSensitivity>>() {
let sens_arc = Arc::new(num_sens.dynamics_fn.clone_ref(py));
let sens_method = num_sens.method;
let sens_pert = num_sens.perturbation;
let sens_closure = move |t: f64, state: &na::DVector<f64>, params: &na::DVector<f64>| -> na::DVector<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let params_py = params.as_slice().to_pyarray(py);
let result = sens_arc
.call1(py, (t, state_py, params_py))
.expect("Failed to call sensitivity dynamics");
let result_vec = pyany_to_f64_array1(result.bind(py), Some(state.len()))
.expect("Sensitivity dynamics returned invalid array");
na::DVector::from_vec(result_vec)
})
};
let mut provider = match sens_method {
crate::math::jacobian::DifferenceMethod::Forward => {
crate::math::sensitivity::DNumericalSensitivity::forward(Box::new(sens_closure))
}
crate::math::jacobian::DifferenceMethod::Central => {
crate::math::sensitivity::DNumericalSensitivity::central(Box::new(sens_closure))
}
crate::math::jacobian::DifferenceMethod::Backward => {
crate::math::sensitivity::DNumericalSensitivity::central(Box::new(sens_closure))
}
};
provider = match sens_pert {
crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
} => provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Adaptive {
scale_factor,
min_value,
}),
crate::math::jacobian::PerturbationStrategy::Fixed(offset) => {
provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Fixed(offset))
}
crate::math::jacobian::PerturbationStrategy::Percentage(pct) => {
provider.with_strategy(crate::math::jacobian::PerturbationStrategy::Percentage(pct))
}
};
Ok(Some(Box::new(provider) as Box<dyn DSensitivityProvider>))
} else if let Ok(ana_sens) = sens.extract::<PyRef<PyDAnalyticSensitivity>>() {
let sens_arc = Arc::new(ana_sens.sensitivity_fn.clone_ref(py));
let sens_closure = move |t: f64, state: &na::DVector<f64>, params: &na::DVector<f64>| -> na::DMatrix<f64> {
Python::attach(|py| {
let state_py = state.as_slice().to_pyarray(py);
let params_py = params.as_slice().to_pyarray(py);
let result = sens_arc
.call1(py, (t, state_py, params_py))
.expect("Failed to call sensitivity function");
let state_dim = state.len();
let param_dim = params.len();
let sens_matrix_vec =
pyany_to_f64_array2(result.bind(py), Some((state_dim, param_dim)))
.expect("Sensitivity function returned invalid array");
let mut sens_matrix = na::DMatrix::<f64>::zeros(state_dim, param_dim);
for i in 0..state_dim {
for j in 0..param_dim {
sens_matrix[(i, j)] = sens_matrix_vec[i][j];
}
}
sens_matrix
})
};
let provider = crate::math::sensitivity::DAnalyticSensitivity::new(Box::new(sens_closure));
Ok(Some(Box::new(provider) as Box<dyn DSensitivityProvider>))
} else {
Err(exceptions::PyTypeError::new_err(
"sensitivity must be NumericalSensitivity or AnalyticSensitivity",
))
}
})?
} else {
None
};
let inner = if let Some(cfg) = config {
RKN1210DIntegrator::with_config(dimension, Box::new(dynamics_closure), varmat, sensmat, None, cfg.inner)
} else {
RKN1210DIntegrator::new(dimension, Box::new(dynamics_closure), varmat, sensmat, None)
};
Ok(PyRKN1210DIntegrator { inner, dimension })
}
pub fn step<'py>(
&self,
_py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
dt: f64,
) -> PyResult<PyAdaptiveStepDResult> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let result = self.inner.step(t, state_dvec, None, Some(dt));
Ok(PyAdaptiveStepDResult { inner: result })
}
#[allow(clippy::type_complexity)]
pub fn step_with_varmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
phi: PyReadonlyArray2<f64>,
dt: f64,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
f64,
f64,
f64,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let phi_array = phi.as_array();
let mut phi_dmat = na::DMatrix::<f64>::zeros(self.dimension, self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_dmat[(i, j)] = phi_array[[i, j]];
}
}
let result = self.inner.step_with_varmat(t, state_dvec, None, phi_dmat, Some(dt));
let state_np = result.state.as_slice().to_pyarray(py);
let new_phi = result.phi.expect("Variational matrix should be present");
let mut phi_flat = Vec::with_capacity(self.dimension * self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_flat.push(new_phi[(i, j)]);
}
}
let dt_used = result.dt_used;
let error_est = result.error_estimate.expect("Error estimate should be present for adaptive integrator");
let dt_next = result.dt_next;
let phi_np = phi_flat
.into_pyarray(py)
.reshape([self.dimension, self.dimension])?;
Ok((state_np, phi_np, dt_used, error_est, dt_next))
}
#[allow(clippy::type_complexity)]
pub fn step_with_sensmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
sens: PyReadonlyArray2<f64>,
params: PyReadonlyArray1<f64>,
dt: f64,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
f64,
f64,
f64,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let sens_array = sens.as_array();
let sens_shape = sens_array.shape();
let num_params = sens_shape[1];
let mut sens_dmat = na::DMatrix::<f64>::zeros(self.dimension, num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_dmat[(i, j)] = sens_array[[i, j]];
}
}
let params_vec = params.as_slice()?;
let params_dvec = na::DVector::from_vec(params_vec.to_vec());
let result = self.inner.step_with_sensmat(
t, state_dvec, sens_dmat, ¶ms_dvec, Some(dt)
);
let state_np = result.state.as_slice().to_pyarray(py);
let new_sens = result.sens.expect("Sensitivity matrix should be present");
let mut sens_flat = Vec::with_capacity(self.dimension * num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_flat.push(new_sens[(i, j)]);
}
}
let dt_used = result.dt_used;
let error_est = result.error_estimate.expect("Error estimate should be present for adaptive integrator");
let dt_next = result.dt_next;
let sens_np = sens_flat
.into_pyarray(py)
.reshape([self.dimension, num_params])?;
Ok((state_np, sens_np, dt_used, error_est, dt_next))
}
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
pub fn step_with_varmat_sensmat<'py>(
&self,
py: Python<'py>,
t: f64,
state: PyReadonlyArray1<f64>,
phi: PyReadonlyArray2<f64>,
sens: PyReadonlyArray2<f64>,
params: PyReadonlyArray1<f64>,
dt: f64,
) -> PyResult<(
Bound<'py, PyArray<f64, Ix1>>,
Bound<'py, PyArray<f64, Ix2>>,
Bound<'py, PyArray<f64, Ix2>>,
f64,
f64,
f64,
)> {
let state_vec = state.as_slice()?;
let state_dvec = na::DVector::from_vec(state_vec.to_vec());
let phi_array = phi.as_array();
let mut phi_dmat = na::DMatrix::<f64>::zeros(self.dimension, self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_dmat[(i, j)] = phi_array[[i, j]];
}
}
let sens_array = sens.as_array();
let sens_shape = sens_array.shape();
let num_params = sens_shape[1];
let mut sens_dmat = na::DMatrix::<f64>::zeros(self.dimension, num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_dmat[(i, j)] = sens_array[[i, j]];
}
}
let params_vec = params.as_slice()?;
let params_dvec = na::DVector::from_vec(params_vec.to_vec());
let result = self.inner.step_with_varmat_sensmat(
t, state_dvec, phi_dmat, sens_dmat, ¶ms_dvec, Some(dt)
);
let state_np = result.state.as_slice().to_pyarray(py);
let new_phi = result.phi.expect("Variational matrix should be present");
let mut phi_flat = Vec::with_capacity(self.dimension * self.dimension);
for i in 0..self.dimension {
for j in 0..self.dimension {
phi_flat.push(new_phi[(i, j)]);
}
}
let phi_np = phi_flat
.into_pyarray(py)
.reshape([self.dimension, self.dimension])?;
let new_sens = result.sens.expect("Sensitivity matrix should be present");
let mut sens_flat = Vec::with_capacity(self.dimension * num_params);
for i in 0..self.dimension {
for j in 0..num_params {
sens_flat.push(new_sens[(i, j)]);
}
}
let dt_used = result.dt_used;
let error_est = result.error_estimate.expect("Error estimate should be present for adaptive integrator");
let dt_next = result.dt_next;
let sens_np = sens_flat
.into_pyarray(py)
.reshape([self.dimension, num_params])?;
Ok((state_np, phi_np, sens_np, dt_used, error_est, dt_next))
}
#[getter]
fn dimension(&self) -> usize {
self.dimension
}
fn __repr__(&self) -> String {
format!("RKN1210Integrator(dimension={})", self.dimension)
}
}