#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "SGPPropagator")]
pub struct PySGPPropagator {
pub propagator: propagators::SGPPropagator,
}
#[pymethods]
impl PySGPPropagator {
#[classmethod]
#[pyo3(signature = (line1, line2, step_size=60.0))]
pub fn from_tle(_cls: &Bound<'_, PyType>, line1: String, line2: String, step_size: Option<f64>) -> PyResult<Self> {
let step_size = step_size.unwrap_or(60.0);
match propagators::SGPPropagator::from_tle(&line1, &line2, step_size) {
Ok(propagator) => Ok(PySGPPropagator { propagator }),
Err(e) => Err(exceptions::PyRuntimeError::new_err(e.to_string())),
}
}
#[classmethod]
#[pyo3(signature = (name, line1, line2, step_size=60.0))]
pub fn from_3le(_cls: &Bound<'_, PyType>, name: String, line1: String, line2: String, step_size: Option<f64>) -> PyResult<Self> {
let step_size = step_size.unwrap_or(60.0);
match propagators::SGPPropagator::from_3le(Some(&name), &line1, &line2, step_size) {
Ok(propagator) => Ok(PySGPPropagator { propagator }),
Err(e) => Err(exceptions::PyRuntimeError::new_err(e.to_string())),
}
}
#[classmethod]
#[pyo3(signature = (
epoch,
mean_motion,
eccentricity,
inclination,
raan,
arg_of_pericenter,
mean_anomaly,
norad_id,
step_size=60.0,
object_name=None,
object_id=None,
classification=None,
bstar=None,
mean_motion_dot=None,
mean_motion_ddot=None,
ephemeris_type=None,
element_set_no=None,
rev_at_epoch=None
))]
#[allow(clippy::too_many_arguments)]
pub fn from_omm_elements(
_cls: &Bound<'_, PyType>,
epoch: &str,
mean_motion: f64,
eccentricity: f64,
inclination: f64,
raan: f64,
arg_of_pericenter: f64,
mean_anomaly: f64,
norad_id: u64,
step_size: Option<f64>,
object_name: Option<String>,
object_id: Option<String>,
classification: Option<char>,
bstar: Option<f64>,
mean_motion_dot: Option<f64>,
mean_motion_ddot: Option<f64>,
ephemeris_type: Option<u8>,
element_set_no: Option<u64>,
rev_at_epoch: Option<u64>,
) -> PyResult<Self> {
let step_size = step_size.unwrap_or(60.0);
match propagators::SGPPropagator::from_omm_elements(
epoch,
mean_motion,
eccentricity,
inclination,
raan,
arg_of_pericenter,
mean_anomaly,
norad_id,
step_size,
object_name.as_deref(),
object_id.as_deref(),
classification,
bstar,
mean_motion_dot,
mean_motion_ddot,
ephemeris_type,
element_set_no,
rev_at_epoch,
) {
Ok(propagator) => Ok(PySGPPropagator { propagator }),
Err(e) => Err(exceptions::PyRuntimeError::new_err(e.to_string())),
}
}
#[getter]
pub fn norad_id(&self) -> u32 {
self.propagator.norad_id
}
#[getter]
pub fn satellite_name(&self) -> Option<String> {
self.propagator.satellite_name.clone()
}
#[getter]
pub fn line1(&self) -> String {
self.propagator.line1.clone()
}
#[getter]
pub fn line2(&self) -> String {
self.propagator.line2.clone()
}
#[getter]
pub fn epoch(&self) -> PyEpoch {
PyEpoch { obj: self.propagator.epoch }
}
#[pyo3(text_signature = "()")]
pub fn current_epoch(&self) -> PyEpoch {
PyEpoch { obj: self.propagator.current_epoch() }
}
#[getter]
pub fn step_size(&self) -> f64 {
self.propagator.step_size()
}
#[setter]
pub fn set_step_size(&mut self, step_size: f64) {
self.propagator.set_step_size(step_size);
}
#[pyo3(name = "set_step_size", text_signature = "(new_step_size)")]
pub fn set_step_size_explicit(&mut self, new_step_size: f64) {
self.propagator.set_step_size(new_step_size);
}
#[pyo3(text_signature = "(frame, representation, angle_format)")]
pub fn set_output_format(
&mut self,
frame: PyRef<PyOrbitFrame>,
representation: PyRef<PyOrbitRepresentation>,
angle_format: Option<PyRef<PyAngleFormat>>,
) {
let angle_fmt = angle_format.map(|af| af.value);
self.propagator = self.propagator.clone().with_output_format(
frame.frame,
representation.representation,
angle_fmt,
);
}
#[pyo3(text_signature = "()")]
pub fn current_state<'a>(&self, py: Python<'a>) -> Bound<'a, PyArray<f64, Ix1>> {
let state = self.propagator.current_state();
state.as_slice().to_pyarray(py).to_owned()
}
#[pyo3(text_signature = "()")]
pub fn initial_state<'a>(&self, py: Python<'a>) -> Bound<'a, PyArray<f64, Ix1>> {
let state = self.propagator.initial_state();
state.as_slice().to_pyarray(py).to_owned()
}
#[pyo3(text_signature = "(epoch)")]
pub fn state<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = SStateProvider::state(&self.propagator, epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_pef<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = self.propagator.state_pef(epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_eci<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = SOrbitStateProvider::state_eci(&self.propagator, epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_ecef<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = SOrbitStateProvider::state_ecef(&self.propagator, epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_gcrf<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = SOrbitStateProvider::state_gcrf(&self.propagator, epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_eme2000<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = SOrbitStateProvider::state_eme2000(&self.propagator, epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_itrf<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = SOrbitStateProvider::state_itrf(&self.propagator, epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epochs)")]
pub fn states<'a>(&self, py: Python<'a>, epochs: Vec<PyRef<PyEpoch>>) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = SStateProvider::states(&self.propagator, &epoch_vec)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epochs)")]
pub fn states_eci<'a>(&self, py: Python<'a>, epochs: Vec<PyRef<PyEpoch>>) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = SOrbitStateProvider::states_eci(&self.propagator, &epoch_vec)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epochs)")]
pub fn states_gcrf<'a>(&self, py: Python<'a>, epochs: Vec<PyRef<PyEpoch>>) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = SOrbitStateProvider::states_gcrf(&self.propagator, &epoch_vec)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epochs)")]
pub fn states_itrf<'a>(&self, py: Python<'a>, epochs: Vec<PyRef<PyEpoch>>) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = SOrbitStateProvider::states_itrf(&self.propagator, &epoch_vec)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "()")]
pub fn step(&mut self) {
self.propagator.step();
}
#[pyo3(text_signature = "(step_size)")]
pub fn step_by(&mut self, step_size: f64) {
self.propagator.step_by(step_size);
}
#[pyo3(text_signature = "(target_epoch)")]
pub fn step_past(&mut self, target_epoch: PyRef<PyEpoch>) {
self.propagator.step_past(target_epoch.obj);
}
#[pyo3(text_signature = "(num_steps)")]
pub fn propagate_steps(&mut self, num_steps: usize) {
self.propagator.propagate_steps(num_steps);
}
#[pyo3(text_signature = "(target_epoch)")]
pub fn propagate_to(&mut self, target_epoch: PyRef<PyEpoch>) {
self.propagator.propagate_to(target_epoch.obj);
}
#[pyo3(text_signature = "()")]
pub fn reset(&mut self) {
self.propagator.reset();
}
#[pyo3(text_signature = "(mode)")]
pub fn set_trajectory_mode(&mut self, mode: &PyTrajectoryMode) {
self.propagator.set_trajectory_mode(mode.mode);
}
#[getter]
pub fn trajectory_mode(&self) -> PyTrajectoryMode {
PyTrajectoryMode { mode: self.propagator.trajectory_mode() }
}
#[pyo3(text_signature = "(max_size)")]
pub fn set_eviction_policy_max_size(&mut self, max_size: usize) -> PyResult<()> {
self.propagator.set_eviction_policy_max_size(max_size)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))
}
#[pyo3(text_signature = "(max_age)")]
pub fn set_eviction_policy_max_age(&mut self, max_age: f64) -> PyResult<()> {
self.propagator.set_eviction_policy_max_age(max_age)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))
}
#[getter]
pub fn trajectory(&self) -> PyOrbitalTrajectory {
PyOrbitalTrajectory {
trajectory: self.propagator.trajectory.clone(),
}
}
#[pyo3(text_signature = "(angle_format)")]
pub fn get_elements<'a>(&self, py: Python<'a>, angle_format: &PyAngleFormat) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
match self.propagator.get_elements(angle_format.value) {
Ok(elements) => Ok(elements.as_slice().to_pyarray(py).to_owned()),
Err(e) => Err(exceptions::PyRuntimeError::new_err(e.to_string())),
}
}
#[getter]
pub fn semi_major_axis(&self) -> f64 {
self.propagator.semi_major_axis()
}
#[getter]
pub fn eccentricity(&self) -> f64 {
self.propagator.eccentricity()
}
#[getter]
pub fn inclination(&self) -> f64 {
self.propagator.inclination()
}
#[getter]
pub fn right_ascension(&self) -> f64 {
self.propagator.right_ascension()
}
#[getter]
pub fn arg_perigee(&self) -> f64 {
self.propagator.arg_perigee()
}
#[getter]
pub fn mean_anomaly(&self) -> f64 {
self.propagator.mean_anomaly()
}
#[getter]
pub fn ephemeris_age(&self) -> f64 {
self.propagator.ephemeris_age()
}
pub fn with_name(mut slf: PyRefMut<'_, Self>, name: String) -> PyRefMut<'_, Self> {
slf.propagator = slf.propagator.clone().with_name(&name);
slf
}
pub fn with_uuid(mut slf: PyRefMut<'_, Self>, uuid_str: String) -> PyResult<PyRefMut<'_, Self>> {
let uuid = uuid::Uuid::parse_str(&uuid_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?;
slf.propagator = slf.propagator.clone().with_uuid(uuid);
Ok(slf)
}
pub fn with_new_uuid(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.propagator = slf.propagator.clone().with_new_uuid();
slf
}
pub fn with_id(mut slf: PyRefMut<'_, Self>, id: u64) -> PyRefMut<'_, Self> {
slf.propagator = slf.propagator.clone().with_id(id);
slf
}
pub fn with_identity(mut slf: PyRefMut<'_, Self>, name: Option<String>, uuid_str: Option<String>, id: Option<u64>) -> PyResult<PyRefMut<'_, Self>> {
let uuid = match uuid_str {
Some(s) => Some(uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?),
None => None,
};
slf.propagator = slf.propagator.clone().with_identity(name.as_deref(), uuid, id);
Ok(slf)
}
pub fn set_identity(&mut self, name: Option<String>, uuid_str: Option<String>, id: Option<u64>) -> PyResult<()> {
let uuid = match uuid_str {
Some(s) => Some(uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?),
None => None,
};
self.propagator.set_identity(name.as_deref(), uuid, id);
Ok(())
}
pub fn set_id(&mut self, id: Option<u64>) {
self.propagator.set_id(id)
}
pub fn set_name(&mut self, name: Option<String>) {
self.propagator.set_name(name.as_deref());
}
pub fn generate_uuid(&mut self) {
self.propagator.generate_uuid()
}
pub fn get_id(&self) -> Option<u64> {
self.propagator.get_id()
}
pub fn get_name(&self) -> Option<String> {
self.propagator.get_name().map(|s| s.to_string())
}
pub fn get_uuid(&self) -> Option<String> {
self.propagator.get_uuid().map(|u| u.to_string())
}
#[pyo3(text_signature = "(epoch, angle_format)")]
pub fn state_koe_osc<'a>(
&self,
py: Python<'a>,
epoch: PyRef<PyEpoch>,
angle_format: &PyAngleFormat,
) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = SOrbitStateProvider::state_koe_osc(&self.propagator, epoch.obj, angle_format.value)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epochs, angle_format)")]
pub fn states_koe_osc<'a>(
&self,
py: Python<'a>,
epochs: Vec<PyRef<PyEpoch>>,
angle_format: &PyAngleFormat,
) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = SOrbitStateProvider::states_koe_osc(&self.propagator, &epoch_vec, angle_format.value)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epoch, angle_format)")]
pub fn state_koe_mean<'a>(
&self,
py: Python<'a>,
epoch: PyRef<PyEpoch>,
angle_format: &PyAngleFormat,
) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = SOrbitStateProvider::state_koe_mean(&self.propagator, epoch.obj, angle_format.value)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epochs, angle_format)")]
pub fn states_koe_mean<'a>(
&self,
py: Python<'a>,
epochs: Vec<PyRef<PyEpoch>>,
angle_format: &PyAngleFormat,
) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = SOrbitStateProvider::states_koe_mean(&self.propagator, &epoch_vec, angle_format.value)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(event)")]
pub fn add_event_detector(&mut self, event: &Bound<'_, PyAny>) -> PyResult<()> {
if let Ok(mut time_event) = event.extract::<PyRefMut<PyTimeEvent>>() {
if let Some(d_event) = time_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("TimeEvent already consumed"));
}
if let Ok(mut value_event) = event.extract::<PyRefMut<PyValueEvent>>() {
if let Some(d_event) = value_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("ValueEvent already consumed"));
}
if let Ok(mut binary_event) = event.extract::<PyRefMut<PyBinaryEvent>>() {
if let Some(d_event) = binary_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("BinaryEvent already consumed"));
}
if let Ok(mut alt_event) = event.extract::<PyRefMut<PyAltitudeEvent>>() {
if let Some(d_event) = alt_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("AltitudeEvent already consumed"));
}
if let Ok(mut asc_event) = event.extract::<PyRefMut<PyAscendingNodeEvent>>() {
if let Some(d_event) = asc_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("AscendingNodeEvent already consumed"));
}
if let Ok(mut desc_event) = event.extract::<PyRefMut<PyDescendingNodeEvent>>() {
if let Some(d_event) = desc_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("DescendingNodeEvent already consumed"));
}
if let Ok(mut ta_event) = event.extract::<PyRefMut<PyTrueAnomalyEvent>>() {
if let Some(d_event) = ta_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("TrueAnomalyEvent already consumed"));
}
if let Ok(mut ma_event) = event.extract::<PyRefMut<PyMeanAnomalyEvent>>() {
if let Some(d_event) = ma_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("MeanAnomalyEvent already consumed"));
}
if let Ok(mut ea_event) = event.extract::<PyRefMut<PyEccentricAnomalyEvent>>() {
if let Some(d_event) = ea_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("EccentricAnomalyEvent already consumed"));
}
if let Ok(mut aol_event) = event.extract::<PyRefMut<PyArgumentOfLatitudeEvent>>() {
if let Some(d_event) = aol_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("ArgumentOfLatitudeEvent already consumed"));
}
if let Ok(mut aoi_entry_event) = event.extract::<PyRefMut<PyAOIEntryEvent>>() {
if let Some(d_event) = aoi_entry_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("AOIEntryEvent already consumed"));
}
if let Ok(mut aoi_exit_event) = event.extract::<PyRefMut<PyAOIExitEvent>>() {
if let Some(d_event) = aoi_exit_event.take_d_event() {
self.propagator.add_event_detector(Box::new(d_event));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("AOIExitEvent already consumed"));
}
Err(exceptions::PyTypeError::new_err(
"Unsupported event type. SGPPropagator supports: TimeEvent, ValueEvent, BinaryEvent, \
AscendingNodeEvent, DescendingNodeEvent, AltitudeEvent, TrueAnomalyEvent, MeanAnomalyEvent, \
EccentricAnomalyEvent, ArgumentOfLatitudeEvent, AOIEntryEvent, AOIExitEvent."
))
}
#[pyo3(text_signature = "()")]
pub fn event_log(&self) -> Vec<PyDetectedEvent> {
self.propagator
.event_log()
.iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "(name)")]
pub fn events_by_name(&self, name: &str) -> Vec<PyDetectedEvent> {
self.propagator
.events_by_name(name)
.iter()
.map(|e| PyDetectedEvent { event: (*e).clone() })
.collect()
}
#[pyo3(text_signature = "()")]
pub fn latest_event(&self) -> Option<PyDetectedEvent> {
self.propagator
.latest_event()
.map(|e| PyDetectedEvent { event: e.clone() })
}
#[pyo3(text_signature = "(start, end)")]
pub fn events_in_range(&self, start: PyRef<PyEpoch>, end: PyRef<PyEpoch>) -> Vec<PyDetectedEvent> {
self.propagator
.events_in_range(start.obj, end.obj)
.iter()
.map(|e| PyDetectedEvent { event: (*e).clone() })
.collect()
}
#[getter]
pub fn terminated(&self) -> bool {
self.propagator.is_terminated()
}
#[getter]
pub fn termination_error(&self) -> Option<String> {
self.propagator.termination_error().map(|e| e.to_string())
}
#[pyo3(text_signature = "()")]
pub fn reset_termination(&mut self) {
self.propagator.reset_termination();
}
#[pyo3(text_signature = "()")]
pub fn clear_events(&mut self) {
self.propagator.clear_events();
}
fn __repr__(&self) -> String {
format!("SGPPropagator(norad_id={}, name={:?}, epoch={:?})",
self.propagator.norad_id,
self.propagator.satellite_name,
self.propagator.epoch)
}
fn __str__(&self) -> String {
self.__repr__()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "KeplerianPropagator")]
pub struct PyKeplerianPropagator {
pub propagator: propagators::KeplerianPropagator,
}
#[pymethods]
impl PyKeplerianPropagator {
#[new]
#[pyo3(signature = (epoch, state, frame, representation, angle_format, step_size))]
pub fn new(
epoch: PyRef<PyEpoch>,
state: PyReadonlyArray1<f64>,
frame: PyRef<PyOrbitFrame>,
representation: PyRef<PyOrbitRepresentation>,
angle_format: PyRef<PyAngleFormat>,
step_size: f64,
) -> PyResult<Self> {
let state_array = state.as_array();
if state_array.len() != 6 {
return Err(exceptions::PyValueError::new_err(
"State vector must have exactly 6 elements"
));
}
let state_vec = na::Vector6::from_row_slice(state_array.as_slice().unwrap());
let propagator = propagators::KeplerianPropagator::new(
epoch.obj,
state_vec,
frame.frame,
representation.representation,
Some(angle_format.value),
step_size,
);
Ok(PyKeplerianPropagator { propagator })
}
#[classmethod]
#[pyo3(signature = (epoch, elements, angle_format, step_size))]
pub fn from_keplerian(
_cls: &Bound<'_, PyType>,
epoch: PyRef<PyEpoch>,
elements: PyReadonlyArray1<f64>,
angle_format: PyRef<PyAngleFormat>,
step_size: f64,
) -> PyResult<Self> {
let elements_array = elements.as_array();
if elements_array.len() != 6 {
return Err(exceptions::PyValueError::new_err(
"Elements vector must have exactly 6 elements"
));
}
let elements_vec = na::Vector6::from_row_slice(elements_array.as_slice().unwrap());
let propagator = propagators::KeplerianPropagator::from_keplerian(
epoch.obj,
elements_vec,
angle_format.value,
step_size,
);
Ok(PyKeplerianPropagator { propagator })
}
#[classmethod]
#[pyo3(signature = (epoch, state, step_size))]
pub fn from_eci(
_cls: &Bound<'_, PyType>,
epoch: PyRef<PyEpoch>,
state: PyReadonlyArray1<f64>,
step_size: f64,
) -> PyResult<Self> {
let state_array = state.as_array();
if state_array.len() != 6 {
return Err(exceptions::PyValueError::new_err(
"State vector must have exactly 6 elements"
));
}
let state_vec = na::Vector6::from_row_slice(state_array.as_slice().unwrap());
let propagator = propagators::KeplerianPropagator::from_eci(
epoch.obj,
state_vec,
step_size,
);
Ok(PyKeplerianPropagator { propagator })
}
#[classmethod]
#[pyo3(signature = (epoch, state, step_size))]
pub fn from_ecef(
_cls: &Bound<'_, PyType>,
epoch: PyRef<PyEpoch>,
state: PyReadonlyArray1<f64>,
step_size: f64,
) -> PyResult<Self> {
let state_array = state.as_array();
if state_array.len() != 6 {
return Err(exceptions::PyValueError::new_err(
"State vector must have exactly 6 elements"
));
}
let state_vec = na::Vector6::from_row_slice(state_array.as_slice().unwrap());
let propagator = propagators::KeplerianPropagator::from_ecef(
epoch.obj,
state_vec,
step_size,
);
Ok(PyKeplerianPropagator { propagator })
}
#[pyo3(text_signature = "()")]
pub fn current_epoch(&self) -> PyEpoch {
PyEpoch { obj: self.propagator.current_epoch() }
}
#[getter]
pub fn initial_epoch(&self) -> PyEpoch {
PyEpoch { obj: self.propagator.initial_epoch() }
}
#[pyo3(text_signature = "()")]
pub fn initial_state<'a>(&self, py: Python<'a>) -> Bound<'a, PyArray<f64, Ix1>> {
let state = self.propagator.initial_state();
state.as_slice().to_pyarray(py).to_owned()
}
#[getter]
pub fn step_size(&self) -> f64 {
self.propagator.step_size()
}
#[setter]
pub fn set_step_size(&mut self, step_size: f64) {
self.propagator.set_step_size(step_size);
}
#[pyo3(name = "set_step_size", text_signature = "(new_step_size)")]
pub fn set_step_size_explicit(&mut self, new_step_size: f64) {
self.propagator.set_step_size(new_step_size);
}
#[pyo3(text_signature = "()")]
pub fn current_state<'a>(&self, py: Python<'a>) -> Bound<'a, PyArray<f64, Ix1>> {
let state = self.propagator.current_state();
state.as_slice().to_pyarray(py).to_owned()
}
#[pyo3(text_signature = "()")]
pub fn step(&mut self) {
self.propagator.step();
}
#[pyo3(text_signature = "(step_size)")]
pub fn step_by(&mut self, step_size: f64) {
self.propagator.step_by(step_size);
}
#[pyo3(text_signature = "(target_epoch)")]
pub fn step_past(&mut self, target_epoch: PyRef<PyEpoch>) {
self.propagator.step_past(target_epoch.obj);
}
#[pyo3(text_signature = "(num_steps)")]
pub fn propagate_steps(&mut self, num_steps: usize) {
self.propagator.propagate_steps(num_steps);
}
#[pyo3(text_signature = "(target_epoch)")]
pub fn propagate_to(&mut self, target_epoch: PyRef<PyEpoch>) {
self.propagator.propagate_to(target_epoch.obj);
}
#[pyo3(text_signature = "()")]
pub fn reset(&mut self) {
self.propagator.reset();
}
#[pyo3(text_signature = "(epoch, state, frame, representation, angle_format)")]
pub fn set_initial_conditions(
&mut self,
epoch: PyRef<PyEpoch>,
state: PyReadonlyArray1<f64>,
frame: PyRef<PyOrbitFrame>,
representation: PyRef<PyOrbitRepresentation>,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<()> {
let state_array = state.as_array();
if state_array.len() != 6 {
return Err(exceptions::PyValueError::new_err(
"State vector must have exactly 6 elements"
));
}
let state_vec = na::Vector6::from_row_slice(state_array.as_slice().unwrap());
self.propagator.set_initial_conditions(
epoch.obj,
state_vec,
frame.frame,
representation.representation,
Some(angle_format.value),
);
Ok(())
}
#[pyo3(text_signature = "(max_size)")]
pub fn set_eviction_policy_max_size(&mut self, max_size: usize) -> PyResult<()> {
match self.propagator.set_eviction_policy_max_size(max_size) {
Ok(_) => Ok(()),
Err(e) => Err(exceptions::PyRuntimeError::new_err(e.to_string())),
}
}
#[pyo3(text_signature = "(max_age)")]
pub fn set_eviction_policy_max_age(&mut self, max_age: f64) -> PyResult<()> {
match self.propagator.set_eviction_policy_max_age(max_age) {
Ok(_) => Ok(()),
Err(e) => Err(exceptions::PyRuntimeError::new_err(e.to_string())),
}
}
#[pyo3(text_signature = "(epoch)")]
pub fn state<'a>(&self, py: Python<'a>, epoch: PyRef<PyEpoch>) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = self.propagator.state(epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_eci<'a>(&self, py: Python<'a>, epoch: PyRef<PyEpoch>) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = self.propagator.state_eci(epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_ecef<'a>(&self, py: Python<'a>, epoch: PyRef<PyEpoch>) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = self.propagator.state_ecef(epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_gcrf<'a>(&self, py: Python<'a>, epoch: PyRef<PyEpoch>) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = self.propagator.state_gcrf(epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_eme2000<'a>(&self, py: Python<'a>, epoch: PyRef<PyEpoch>) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = self.propagator.state_eme2000(epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_itrf<'a>(&self, py: Python<'a>, epoch: PyRef<PyEpoch>) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = self.propagator.state_itrf(epoch.obj)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch, angle_format)")]
pub fn state_koe_osc<'a>(
&self,
py: Python<'a>,
epoch: PyRef<PyEpoch>,
angle_format: &PyAngleFormat,
) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DOrbitStateProvider::state_koe_osc(&self.propagator, epoch.obj, angle_format.value)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epochs)")]
pub fn states<'a>(&self, py: Python<'a>, epochs: Vec<PyRef<PyEpoch>>) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = DStateProvider::states(&self.propagator, &epoch_vec)?;
Ok(states.iter().map(|s| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epochs)")]
pub fn states_eci<'a>(&self, py: Python<'a>, epochs: Vec<PyRef<PyEpoch>>) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = DOrbitStateProvider::states_eci(&self.propagator, &epoch_vec)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epochs)")]
pub fn states_ecef<'a>(&self, py: Python<'a>, epochs: Vec<PyRef<PyEpoch>>) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = DOrbitStateProvider::states_ecef(&self.propagator, &epoch_vec)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epochs)")]
pub fn states_gcrf<'a>(&self, py: Python<'a>, epochs: Vec<PyRef<PyEpoch>>) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = DOrbitStateProvider::states_gcrf(&self.propagator, &epoch_vec)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epochs)")]
pub fn states_itrf<'a>(&self, py: Python<'a>, epochs: Vec<PyRef<PyEpoch>>) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = DOrbitStateProvider::states_itrf(&self.propagator, &epoch_vec)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epochs, angle_format)")]
pub fn states_koe_osc<'a>(
&self,
py: Python<'a>,
epochs: Vec<PyRef<PyEpoch>>,
angle_format: &PyAngleFormat,
) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = DOrbitStateProvider::states_koe_osc(&self.propagator, &epoch_vec, angle_format.value)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epoch, angle_format)")]
pub fn state_koe_mean<'a>(
&self,
py: Python<'a>,
epoch: PyRef<PyEpoch>,
angle_format: &PyAngleFormat,
) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DOrbitStateProvider::state_koe_mean(&self.propagator, epoch.obj, angle_format.value)?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epochs, angle_format)")]
pub fn states_koe_mean<'a>(
&self,
py: Python<'a>,
epochs: Vec<PyRef<PyEpoch>>,
angle_format: &PyAngleFormat,
) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = DOrbitStateProvider::states_koe_mean(&self.propagator, &epoch_vec, angle_format.value)?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[getter]
pub fn trajectory(&self) -> PyOrbitalTrajectory {
let traj = &self.propagator.trajectory;
let states: Vec<DVector<f64>> = traj.states.iter()
.map(|s| DVector::from_column_slice(s.as_slice()))
.collect();
let covariances = traj.covariances.as_ref().map(|covs| {
covs.iter()
.map(|c| DMatrix::from_row_slice(6, 6, c.as_slice()))
.collect()
});
let mut d_traj = trajectories::DOrbitTrajectory::from_orbital_data(
traj.epochs.clone(),
states,
traj.frame,
traj.representation,
traj.angle_format,
covariances,
);
d_traj.set_identity(
traj.get_name(),
traj.get_uuid(),
traj.get_id()
);
PyOrbitalTrajectory { trajectory: d_traj }
}
pub fn with_name(mut slf: PyRefMut<'_, Self>, name: String) -> PyRefMut<'_, Self> {
slf.propagator = slf.propagator.clone().with_name(&name);
slf
}
pub fn with_uuid(mut slf: PyRefMut<'_, Self>, uuid_str: String) -> PyResult<PyRefMut<'_, Self>> {
let uuid = uuid::Uuid::parse_str(&uuid_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?;
slf.propagator = slf.propagator.clone().with_uuid(uuid);
Ok(slf)
}
pub fn with_new_uuid(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.propagator = slf.propagator.clone().with_new_uuid();
slf
}
pub fn with_id(mut slf: PyRefMut<'_, Self>, id: u64) -> PyRefMut<'_, Self> {
slf.propagator = slf.propagator.clone().with_id(id);
slf
}
pub fn with_identity(mut slf: PyRefMut<'_, Self>, name: Option<String>, uuid_str: Option<String>, id: Option<u64>) -> PyResult<PyRefMut<'_, Self>> {
let uuid = match uuid_str {
Some(s) => Some(uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?),
None => None,
};
slf.propagator = slf.propagator.clone().with_identity(name.as_deref(), uuid, id);
Ok(slf)
}
pub fn set_identity(&mut self, name: Option<String>, uuid_str: Option<String>, id: Option<u64>) -> PyResult<()> {
let uuid = match uuid_str {
Some(s) => Some(uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?),
None => None,
};
self.propagator.set_identity(name.as_deref(), uuid, id);
Ok(())
}
pub fn set_id(&mut self, id: Option<u64>) {
self.propagator.set_id(id) }
pub fn set_name(&mut self, name: Option<String>) {
self.propagator.set_name(name.as_deref()) }
pub fn generate_uuid(&mut self) {
self.propagator.generate_uuid() }
pub fn get_id(&self) -> Option<u64> {
self.propagator.get_id()
}
pub fn get_name(&self) -> Option<String> {
self.propagator.get_name().map(|s| s.to_string())
}
pub fn get_uuid(&self) -> Option<String> {
self.propagator.get_uuid().map(|u| u.to_string())
}
fn __repr__(&self) -> String {
format!("KeplerianPropagator(epoch={:?}, step_size={})",
self.propagator.current_epoch(),
self.propagator.step_size())
}
fn __str__(&self) -> String {
self.__repr__()
}
}
#[pyfunction(name = "par_propagate_to")]
fn py_par_propagate_to(
propagators: &Bound<'_, PyAny>,
target_epoch: &PyEpoch,
) -> PyResult<()> {
if !propagators.is_instance_of::<PyList>() {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"propagators must be a list of KeplerianPropagator, SGPPropagator, or NumericalOrbitPropagator"
));
}
let prop_list = propagators.cast::<PyList>()?;
if prop_list.is_empty() {
return Ok(()); }
let first = prop_list.get_item(0)?;
if first.is_instance_of::<PyKeplerianPropagator>() {
let mut props: Vec<propagators::KeplerianPropagator> = Vec::new();
for item in prop_list.iter() {
let py_prop = item.cast::<PyKeplerianPropagator>()?;
props.push(py_prop.borrow().propagator.clone());
}
propagators::par_propagate_to_s(&mut props, target_epoch.obj);
for (i, item) in prop_list.iter().enumerate() {
let mut py_prop = item.cast::<PyKeplerianPropagator>()?.borrow_mut();
py_prop.propagator = props[i].clone();
}
Ok(())
} else if first.is_instance_of::<PySGPPropagator>() {
let mut props: Vec<propagators::SGPPropagator> = Vec::new();
let mut extracted_detectors: Vec<Vec<Box<dyn events::DEventDetector>>> = Vec::new();
for item in prop_list.iter() {
let mut py_prop = item.cast::<PySGPPropagator>()?.borrow_mut();
extracted_detectors.push(py_prop.propagator.take_event_detectors());
props.push(py_prop.propagator.clone());
}
for (i, detectors) in extracted_detectors.into_iter().enumerate() {
props[i].set_event_detectors(detectors);
}
propagators::par_propagate_to_s(&mut props, target_epoch.obj);
for (i, item) in prop_list.iter().enumerate() {
let mut py_prop = item.cast::<PySGPPropagator>()?.borrow_mut();
let detectors = props[i].take_event_detectors();
let event_log = props[i].take_event_log();
let terminated = props[i].is_terminated();
let termination_error = props[i].take_termination_error();
py_prop.propagator = props[i].clone();
py_prop.propagator.set_event_detectors(detectors);
py_prop.propagator.set_event_log(event_log);
py_prop.propagator.set_terminated(terminated);
py_prop.propagator.set_termination_error(termination_error);
}
Ok(())
} else if first.is_instance_of::<PyNumericalOrbitPropagator>() {
struct SendPtr(*mut propagators::DNumericalOrbitPropagator);
unsafe impl Send for SendPtr {}
unsafe impl Sync for SendPtr {}
let mut borrow_guards: Vec<PyRefMut<'_, PyNumericalOrbitPropagator>> = Vec::new();
for item in prop_list.iter() {
borrow_guards.push(item.cast::<PyNumericalOrbitPropagator>()?.borrow_mut());
}
let prop_ptrs: Vec<SendPtr> = borrow_guards
.iter_mut()
.map(|guard| SendPtr(&mut guard.propagator as *mut _))
.collect();
let target = target_epoch.obj;
crate::utils::threading::get_thread_pool().install(|| {
prop_ptrs.par_iter().for_each(|SendPtr(ptr)| {
unsafe {
(*(*ptr)).propagate_to(target);
}
});
});
Ok(())
} else if first.is_instance_of::<PyNumericalPropagator>() {
Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"NumericalPropagator cannot be used with par_propagate_to because its Python \
dynamics callback cannot safely execute in parallel due to the GIL. \
Use NumericalOrbitPropagator for parallel propagation of orbital dynamics, \
or call propagate_to() sequentially on each NumericalPropagator."
))
} else {
Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"propagators must be a list of KeplerianPropagator, SGPPropagator, or NumericalOrbitPropagator"
))
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "IntegrationMethod")]
#[derive(Clone)]
pub struct PyIntegrationMethod {
pub method: propagators::IntegratorMethod,
}
#[pymethods]
#[allow(non_snake_case)]
impl PyIntegrationMethod {
#[classattr]
fn RK4() -> Self {
PyIntegrationMethod { method: propagators::IntegratorMethod::RK4 }
}
#[classattr]
fn RKF45() -> Self {
PyIntegrationMethod { method: propagators::IntegratorMethod::RKF45 }
}
#[classattr]
fn DP54() -> Self {
PyIntegrationMethod { method: propagators::IntegratorMethod::DP54 }
}
#[classattr]
fn RKN1210() -> Self {
PyIntegrationMethod { method: propagators::IntegratorMethod::RKN1210 }
}
pub fn is_adaptive(&self) -> bool {
self.method.is_adaptive()
}
fn __repr__(&self) -> String {
format!("IntegrationMethod.{:?}", self.method)
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "VariationalConfig")]
#[derive(Clone)]
pub struct PyVariationalConfig {
pub config: propagators::VariationalConfig,
}
#[pymethods]
impl PyVariationalConfig {
#[new]
#[pyo3(signature = (enable_stm=false, enable_sensitivity=false, store_stm_history=false, store_sensitivity_history=false))]
fn new(
enable_stm: bool,
enable_sensitivity: bool,
store_stm_history: bool,
store_sensitivity_history: bool,
) -> Self {
PyVariationalConfig {
config: propagators::VariationalConfig {
enable_stm,
enable_sensitivity,
store_stm_history,
store_sensitivity_history,
..Default::default()
},
}
}
#[getter]
fn get_enable_stm(&self) -> bool {
self.config.enable_stm
}
#[setter]
fn set_enable_stm(&mut self, value: bool) {
self.config.enable_stm = value;
}
#[getter]
fn get_enable_sensitivity(&self) -> bool {
self.config.enable_sensitivity
}
#[setter]
fn set_enable_sensitivity(&mut self, value: bool) {
self.config.enable_sensitivity = value;
}
#[getter]
fn get_store_stm_history(&self) -> bool {
self.config.store_stm_history
}
#[setter]
fn set_store_stm_history(&mut self, value: bool) {
self.config.store_stm_history = value;
}
#[getter]
fn get_store_sensitivity_history(&self) -> bool {
self.config.store_sensitivity_history
}
#[setter]
fn set_store_sensitivity_history(&mut self, value: bool) {
self.config.store_sensitivity_history = value;
}
fn __repr__(&self) -> String {
format!(
"VariationalConfig(enable_stm={}, enable_sensitivity={}, store_stm_history={}, store_sensitivity_history={})",
self.config.enable_stm, self.config.enable_sensitivity,
self.config.store_stm_history, self.config.store_sensitivity_history
)
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "AtmosphericModel")]
#[derive(Clone)]
pub struct PyAtmosphericModel {
pub model: propagators::AtmosphericModel,
}
#[pymethods]
#[allow(non_snake_case)]
impl PyAtmosphericModel {
#[classattr]
fn HARRIS_PRIESTER() -> Self {
PyAtmosphericModel { model: propagators::AtmosphericModel::HarrisPriester }
}
#[classattr]
fn NRLMSISE00() -> Self {
PyAtmosphericModel { model: propagators::AtmosphericModel::NRLMSISE00 }
}
#[classmethod]
fn exponential(_cls: &Bound<'_, PyType>, scale_height: f64, rho0: f64, h0: f64) -> Self {
PyAtmosphericModel {
model: propagators::AtmosphericModel::Exponential { scale_height, rho0, h0 }
}
}
fn __repr__(&self) -> String {
format!("AtmosphericModel({:?})", self.model)
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "EclipseModel")]
#[derive(Clone)]
pub struct PyEclipseModel {
pub model: propagators::EclipseModel,
}
#[pymethods]
#[allow(non_snake_case)]
impl PyEclipseModel {
#[classattr]
fn CONICAL() -> Self {
PyEclipseModel { model: propagators::EclipseModel::Conical }
}
#[classattr]
fn CYLINDRICAL() -> Self {
PyEclipseModel { model: propagators::EclipseModel::Cylindrical }
}
#[classattr]
fn NONE() -> Self {
PyEclipseModel { model: propagators::EclipseModel::None }
}
fn __repr__(&self) -> String {
format!("EclipseModel.{:?}", self.model)
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "ParameterSource")]
#[derive(Clone)]
pub struct PyParameterSource {
pub source: propagators::ParameterSource,
}
#[pymethods]
impl PyParameterSource {
#[classmethod]
fn value(_cls: &Bound<'_, PyType>, value: f64) -> Self {
PyParameterSource {
source: propagators::ParameterSource::Value(value),
}
}
#[classmethod]
fn parameter_index(_cls: &Bound<'_, PyType>, index: usize) -> Self {
PyParameterSource {
source: propagators::ParameterSource::ParameterIndex(index),
}
}
fn is_value(&self) -> bool {
matches!(self.source, propagators::ParameterSource::Value(_))
}
fn is_parameter_index(&self) -> bool {
matches!(self.source, propagators::ParameterSource::ParameterIndex(_))
}
fn get_value(&self) -> Option<f64> {
match self.source {
propagators::ParameterSource::Value(v) => Some(v),
_ => None,
}
}
fn get_index(&self) -> Option<usize> {
match self.source {
propagators::ParameterSource::ParameterIndex(i) => Some(i),
_ => None,
}
}
fn __repr__(&self) -> String {
match &self.source {
propagators::ParameterSource::Value(v) => format!("ParameterSource.value({})", v),
propagators::ParameterSource::ParameterIndex(i) => format!("ParameterSource.parameter_index({})", i),
}
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "GravityConfiguration")]
#[derive(Clone)]
pub struct PyGravityConfiguration {
pub config: propagators::GravityConfiguration,
}
#[pymethods]
impl PyGravityConfiguration {
#[new]
#[pyo3(signature = (degree=None, order=None, model_type=None, use_global=false))]
fn new(degree: Option<usize>, order: Option<usize>, model_type: Option<&PyGravityModelType>, use_global: bool) -> Self {
match (degree, order) {
(Some(d), Some(o)) => {
let source = if use_global {
propagators::GravityModelSource::Global
} else {
let model = model_type
.map(|mt| mt.model.clone())
.unwrap_or(crate::orbit_dynamics::gravity::GravityModelType::EGM2008_360);
propagators::GravityModelSource::ModelType(model)
};
PyGravityConfiguration {
config: propagators::GravityConfiguration::SphericalHarmonic { source, degree: d, order: o },
}
}
_ => PyGravityConfiguration {
config: propagators::GravityConfiguration::PointMass,
},
}
}
#[classmethod]
fn point_mass(_cls: &Bound<'_, PyType>) -> Self {
PyGravityConfiguration {
config: propagators::GravityConfiguration::PointMass,
}
}
#[classmethod]
#[pyo3(signature = (degree, order, model_type=None, use_global=false))]
fn spherical_harmonic(_cls: &Bound<'_, PyType>, degree: usize, order: usize, model_type: Option<&PyGravityModelType>, use_global: bool) -> Self {
let source = if use_global {
propagators::GravityModelSource::Global
} else {
let model = model_type
.map(|mt| mt.model.clone())
.unwrap_or(crate::orbit_dynamics::gravity::GravityModelType::EGM2008_360);
propagators::GravityModelSource::ModelType(model)
};
PyGravityConfiguration {
config: propagators::GravityConfiguration::SphericalHarmonic { source, degree, order },
}
}
fn is_point_mass(&self) -> bool {
matches!(self.config, propagators::GravityConfiguration::PointMass)
}
fn is_spherical_harmonic(&self) -> bool {
matches!(self.config, propagators::GravityConfiguration::SphericalHarmonic { .. })
}
fn get_degree(&self) -> Option<usize> {
match &self.config {
propagators::GravityConfiguration::SphericalHarmonic { degree, .. } => Some(*degree),
_ => None,
}
}
fn get_order(&self) -> Option<usize> {
match &self.config {
propagators::GravityConfiguration::SphericalHarmonic { order, .. } => Some(*order),
_ => None,
}
}
fn __repr__(&self) -> String {
match &self.config {
propagators::GravityConfiguration::PointMass => "GravityConfiguration.point_mass()".to_string(),
propagators::GravityConfiguration::SphericalHarmonic { degree, order, .. } => {
format!("GravityConfiguration.spherical_harmonic(degree={}, order={})", degree, order)
}
}
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "DragConfiguration")]
#[derive(Clone)]
pub struct PyDragConfiguration {
pub config: propagators::DragConfiguration,
}
#[pymethods]
impl PyDragConfiguration {
#[new]
#[pyo3(signature = (model, area, cd))]
fn new(model: &PyAtmosphericModel, area: &PyParameterSource, cd: &PyParameterSource) -> Self {
PyDragConfiguration {
config: propagators::DragConfiguration {
model: model.model.clone(),
area: area.source.clone(),
cd: cd.source.clone(),
},
}
}
#[getter]
fn model(&self) -> PyAtmosphericModel {
PyAtmosphericModel { model: self.config.model.clone() }
}
#[setter]
fn set_model(&mut self, model: &PyAtmosphericModel) {
self.config.model = model.model.clone();
}
#[getter]
fn area(&self) -> PyParameterSource {
PyParameterSource { source: self.config.area.clone() }
}
#[setter]
fn set_area(&mut self, area: &PyParameterSource) {
self.config.area = area.source.clone();
}
#[getter]
fn cd(&self) -> PyParameterSource {
PyParameterSource { source: self.config.cd.clone() }
}
#[setter]
fn set_cd(&mut self, cd: &PyParameterSource) {
self.config.cd = cd.source.clone();
}
fn __repr__(&self) -> String {
format!("DragConfiguration(model={:?}, area={:?}, cd={:?})",
self.config.model, self.config.area, self.config.cd)
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "SolarRadiationPressureConfiguration")]
#[derive(Clone)]
pub struct PySolarRadiationPressureConfiguration {
pub config: propagators::SolarRadiationPressureConfiguration,
}
#[pymethods]
impl PySolarRadiationPressureConfiguration {
#[new]
#[pyo3(signature = (area, cr, eclipse_model))]
fn new(area: &PyParameterSource, cr: &PyParameterSource, eclipse_model: &PyEclipseModel) -> Self {
PySolarRadiationPressureConfiguration {
config: propagators::SolarRadiationPressureConfiguration {
area: area.source.clone(),
cr: cr.source.clone(),
eclipse_model: eclipse_model.model.clone(),
},
}
}
#[getter]
fn area(&self) -> PyParameterSource {
PyParameterSource { source: self.config.area.clone() }
}
#[setter]
fn set_area(&mut self, area: &PyParameterSource) {
self.config.area = area.source.clone();
}
#[getter]
fn cr(&self) -> PyParameterSource {
PyParameterSource { source: self.config.cr.clone() }
}
#[setter]
fn set_cr(&mut self, cr: &PyParameterSource) {
self.config.cr = cr.source.clone();
}
#[getter]
fn eclipse_model(&self) -> PyEclipseModel {
PyEclipseModel { model: self.config.eclipse_model.clone() }
}
#[setter]
fn set_eclipse_model(&mut self, eclipse_model: &PyEclipseModel) {
self.config.eclipse_model = eclipse_model.model.clone();
}
fn __repr__(&self) -> String {
format!("SolarRadiationPressureConfiguration(area={:?}, cr={:?}, eclipse_model={:?})",
self.config.area, self.config.cr, self.config.eclipse_model)
}
}
#[pyclass(module = "brahe._brahe", eq, eq_int, from_py_object)]
#[pyo3(name = "ThirdBody")]
#[derive(Clone, PartialEq, Eq)]
#[allow(clippy::upper_case_acronyms)]
pub enum PyThirdBody {
SUN,
MOON,
MERCURY,
VENUS,
MARS,
JUPITER,
SATURN,
URANUS,
NEPTUNE,
}
impl From<PyThirdBody> for propagators::ThirdBody {
fn from(body: PyThirdBody) -> Self {
match body {
PyThirdBody::SUN => propagators::ThirdBody::Sun,
PyThirdBody::MOON => propagators::ThirdBody::Moon,
PyThirdBody::MERCURY => propagators::ThirdBody::Mercury,
PyThirdBody::VENUS => propagators::ThirdBody::Venus,
PyThirdBody::MARS => propagators::ThirdBody::Mars,
PyThirdBody::JUPITER => propagators::ThirdBody::Jupiter,
PyThirdBody::SATURN => propagators::ThirdBody::Saturn,
PyThirdBody::URANUS => propagators::ThirdBody::Uranus,
PyThirdBody::NEPTUNE => propagators::ThirdBody::Neptune,
}
}
}
impl From<propagators::ThirdBody> for PyThirdBody {
fn from(body: propagators::ThirdBody) -> Self {
match body {
propagators::ThirdBody::Sun => PyThirdBody::SUN,
propagators::ThirdBody::Moon => PyThirdBody::MOON,
propagators::ThirdBody::Mercury => PyThirdBody::MERCURY,
propagators::ThirdBody::Venus => PyThirdBody::VENUS,
propagators::ThirdBody::Mars => PyThirdBody::MARS,
propagators::ThirdBody::Jupiter => PyThirdBody::JUPITER,
propagators::ThirdBody::Saturn => PyThirdBody::SATURN,
propagators::ThirdBody::Uranus => PyThirdBody::URANUS,
propagators::ThirdBody::Neptune => PyThirdBody::NEPTUNE,
}
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "ThirdBodyConfiguration")]
#[derive(Clone)]
pub struct PyThirdBodyConfiguration {
pub config: propagators::ThirdBodyConfiguration,
}
#[pymethods]
impl PyThirdBodyConfiguration {
#[new]
#[pyo3(signature = (ephemeris_source, bodies))]
fn new(ephemeris_source: PyEphemerisSource, bodies: Vec<PyThirdBody>) -> Self {
PyThirdBodyConfiguration {
config: propagators::ThirdBodyConfiguration {
ephemeris_source: ephemeris_source.into(),
bodies: bodies.into_iter().map(|b| b.into()).collect(),
},
}
}
#[getter]
fn ephemeris_source(&self) -> PyEphemerisSource {
match self.config.ephemeris_source {
propagators::EphemerisSource::LowPrecision => PyEphemerisSource::LowPrecision,
propagators::EphemerisSource::DE440s => PyEphemerisSource::DE440s,
propagators::EphemerisSource::DE440 => PyEphemerisSource::DE440,
propagators::EphemerisSource::SPK(spice::SPKKernel::DE440s) => PyEphemerisSource::DE440s,
propagators::EphemerisSource::SPK(spice::SPKKernel::DE440) => PyEphemerisSource::DE440,
}
}
#[setter]
fn set_ephemeris_source(&mut self, source: PyEphemerisSource) {
self.config.ephemeris_source = source.into();
}
#[getter]
fn bodies(&self) -> Vec<PyThirdBody> {
self.config.bodies.iter().map(|b| b.clone().into()).collect()
}
#[setter]
fn set_bodies(&mut self, bodies: Vec<PyThirdBody>) {
self.config.bodies = bodies.into_iter().map(|b| b.into()).collect();
}
fn __repr__(&self) -> String {
format!("ThirdBodyConfiguration(ephemeris_source={:?}, bodies={:?})",
self.config.ephemeris_source, self.config.bodies)
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "NumericalPropagationConfig")]
#[derive(Clone)]
pub struct PyNumericalPropagationConfig {
pub config: propagators::NumericalPropagationConfig,
}
#[pymethods]
impl PyNumericalPropagationConfig {
#[classmethod]
fn default(_cls: &Bound<'_, PyType>) -> Self {
PyNumericalPropagationConfig {
config: propagators::NumericalPropagationConfig::default(),
}
}
#[classmethod]
fn with_method(_cls: &Bound<'_, PyType>, method: &PyIntegrationMethod) -> Self {
PyNumericalPropagationConfig {
config: propagators::NumericalPropagationConfig::with_method(method.method),
}
}
#[classmethod]
fn high_precision(_cls: &Bound<'_, PyType>) -> Self {
PyNumericalPropagationConfig {
config: propagators::NumericalPropagationConfig::high_precision(),
}
}
#[new]
#[pyo3(signature = (method, integrator, variational))]
fn new(
method: &PyIntegrationMethod,
integrator: &PyIntegratorConfig,
variational: &PyVariationalConfig,
) -> Self {
PyNumericalPropagationConfig {
config: propagators::NumericalPropagationConfig::new(
method.method,
integrator.inner.clone(),
variational.config.clone(),
),
}
}
fn with_abs_tol(mut slf: PyRefMut<'_, Self>, abs_tol: f64) -> PyRefMut<'_, Self> {
slf.config.integrator.abs_tol = abs_tol;
slf
}
fn with_rel_tol(mut slf: PyRefMut<'_, Self>, rel_tol: f64) -> PyRefMut<'_, Self> {
slf.config.integrator.rel_tol = rel_tol;
slf
}
fn with_initial_step(mut slf: PyRefMut<'_, Self>, step: f64) -> PyRefMut<'_, Self> {
slf.config.integrator.initial_step = Some(step);
slf
}
fn with_max_step(mut slf: PyRefMut<'_, Self>, step: f64) -> PyRefMut<'_, Self> {
slf.config.integrator.max_step = Some(step);
slf
}
fn with_stm(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.config.variational.enable_stm = true;
slf
}
fn with_sensitivity(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.config.variational.enable_sensitivity = true;
slf
}
fn with_stm_history(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.config.variational.store_stm_history = true;
slf
}
fn with_sensitivity_history(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.config.variational.store_sensitivity_history = true;
slf
}
fn with_store_accelerations(mut slf: PyRefMut<'_, Self>, store: bool) -> PyRefMut<'_, Self> {
slf.config.store_accelerations = store;
slf
}
fn with_interpolation_method(
mut slf: PyRefMut<'_, Self>,
method: PyInterpolationMethod,
) -> PyRefMut<'_, Self> {
slf.config.interpolation_method = method.method;
slf
}
#[getter]
fn method(&self) -> PyIntegrationMethod {
PyIntegrationMethod { method: self.config.method }
}
#[getter]
fn abs_tol(&self) -> f64 {
self.config.integrator.abs_tol
}
#[getter]
fn rel_tol(&self) -> f64 {
self.config.integrator.rel_tol
}
#[getter]
fn variational(&self) -> PyVariationalConfig {
PyVariationalConfig { config: self.config.variational.clone() }
}
#[setter]
fn set_variational(&mut self, value: PyVariationalConfig) {
self.config.variational = value.config;
}
#[getter]
fn store_accelerations(&self) -> bool {
self.config.store_accelerations
}
#[setter]
fn set_store_accelerations(&mut self, value: bool) {
self.config.store_accelerations = value;
}
#[getter]
fn interpolation_method(&self) -> PyInterpolationMethod {
PyInterpolationMethod { method: self.config.interpolation_method }
}
#[setter]
fn set_interpolation_method(&mut self, value: PyInterpolationMethod) {
self.config.interpolation_method = value.method;
}
fn __repr__(&self) -> String {
format!("NumericalPropagationConfig(method={:?}, abs_tol={}, rel_tol={})",
self.config.method, self.config.integrator.abs_tol, self.config.integrator.rel_tol)
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "ForceModelConfig")]
#[derive(Clone)]
pub struct PyForceModelConfig {
pub config: propagators::ForceModelConfig,
}
#[pymethods]
impl PyForceModelConfig {
#[new]
#[pyo3(signature = (gravity=None, drag=None, srp=None, third_body=None, relativity=false, mass=None))]
fn new(
gravity: Option<&PyGravityConfiguration>,
drag: Option<&PyDragConfiguration>,
srp: Option<&PySolarRadiationPressureConfiguration>,
third_body: Option<&PyThirdBodyConfiguration>,
relativity: bool,
mass: Option<&PyParameterSource>,
) -> Self {
PyForceModelConfig {
config: propagators::ForceModelConfig {
gravity: gravity.map(|g| g.config.clone()).unwrap_or(propagators::GravityConfiguration::PointMass),
drag: drag.map(|d| d.config.clone()),
srp: srp.map(|s| s.config.clone()),
third_body: third_body.map(|t| t.config.clone()),
relativity,
mass: mass.map(|m| m.source.clone()),
},
}
}
#[classmethod]
fn default(_cls: &Bound<'_, PyType>) -> Self {
PyForceModelConfig {
config: propagators::ForceModelConfig::default(),
}
}
#[classmethod]
fn high_fidelity(_cls: &Bound<'_, PyType>) -> Self {
PyForceModelConfig {
config: propagators::ForceModelConfig::high_fidelity(),
}
}
#[classmethod]
fn earth_gravity(_cls: &Bound<'_, PyType>) -> Self {
PyForceModelConfig {
config: propagators::ForceModelConfig::earth_gravity(),
}
}
#[classmethod]
fn two_body(_cls: &Bound<'_, PyType>) -> Self {
PyForceModelConfig {
config: propagators::ForceModelConfig::two_body_gravity(),
}
}
#[classmethod]
fn conservative_forces(_cls: &Bound<'_, PyType>) -> Self {
PyForceModelConfig {
config: propagators::ForceModelConfig::conservative_forces(),
}
}
#[classmethod]
fn leo_default(_cls: &Bound<'_, PyType>) -> Self {
PyForceModelConfig {
config: propagators::ForceModelConfig::leo_default(),
}
}
#[classmethod]
fn geo_default(_cls: &Bound<'_, PyType>) -> Self {
PyForceModelConfig {
config: propagators::ForceModelConfig::geo_default(),
}
}
pub fn requires_params(&self) -> bool {
self.config.requires_params()
}
#[getter]
fn gravity(&self) -> PyGravityConfiguration {
PyGravityConfiguration { config: self.config.gravity.clone() }
}
#[setter]
fn set_gravity(&mut self, gravity: &PyGravityConfiguration) {
self.config.gravity = gravity.config.clone();
}
#[getter]
fn drag(&self) -> Option<PyDragConfiguration> {
self.config.drag.as_ref().map(|d| PyDragConfiguration { config: d.clone() })
}
#[setter]
fn set_drag(&mut self, drag: Option<&PyDragConfiguration>) {
self.config.drag = drag.map(|d| d.config.clone());
}
#[getter]
fn srp(&self) -> Option<PySolarRadiationPressureConfiguration> {
self.config.srp.as_ref().map(|s| PySolarRadiationPressureConfiguration { config: s.clone() })
}
#[setter]
fn set_srp(&mut self, srp: Option<&PySolarRadiationPressureConfiguration>) {
self.config.srp = srp.map(|s| s.config.clone());
}
#[getter]
fn third_body(&self) -> Option<PyThirdBodyConfiguration> {
self.config.third_body.as_ref().map(|t| PyThirdBodyConfiguration { config: t.clone() })
}
#[setter]
fn set_third_body(&mut self, third_body: Option<&PyThirdBodyConfiguration>) {
self.config.third_body = third_body.map(|t| t.config.clone());
}
#[getter]
fn relativity(&self) -> bool {
self.config.relativity
}
#[setter]
fn set_relativity(&mut self, enabled: bool) {
self.config.relativity = enabled;
}
#[getter]
fn mass(&self) -> Option<PyParameterSource> {
self.config.mass.as_ref().map(|m| PyParameterSource { source: m.clone() })
}
#[setter]
fn set_mass(&mut self, mass: Option<&PyParameterSource>) {
self.config.mass = mass.map(|m| m.source.clone());
}
fn __repr__(&self) -> String {
format!("ForceModelConfig(requires_params={})", self.config.requires_params())
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "NumericalOrbitPropagator")]
pub struct PyNumericalOrbitPropagator {
pub propagator: propagators::DNumericalOrbitPropagator,
}
#[pymethods]
impl PyNumericalOrbitPropagator {
#[new]
#[pyo3(signature = (epoch, state, propagation_config, force_config, params=None, initial_covariance=None, additional_dynamics=None, control_input=None))]
#[allow(clippy::too_many_arguments)]
pub fn new(
py: Python<'_>,
epoch: &PyEpoch,
state: PyReadonlyArray1<f64>,
propagation_config: &PyNumericalPropagationConfig,
force_config: &PyForceModelConfig,
params: Option<PyReadonlyArray1<f64>>,
initial_covariance: Option<PyReadonlyArray2<f64>>,
additional_dynamics: Option<Py<PyAny>>,
control_input: Option<Py<PyAny>>,
) -> PyResult<Self> {
let state_vec = nalgebra::DVector::from_column_slice(state.as_slice()?);
let params_vec = params.map(|p| nalgebra::DVector::from_column_slice(p.as_slice().unwrap()));
let cov_matrix = if let Some(cov) = initial_covariance {
let cov_shape = cov.shape();
if cov_shape[0] != 6 || cov_shape[1] != 6 {
return Err(exceptions::PyValueError::new_err(
"Initial covariance must be a 6x6 matrix"
));
}
let cov_data: Vec<f64> = cov.as_slice()?.to_vec();
Some(nalgebra::DMatrix::from_row_slice(6, 6, &cov_data))
} else {
None
};
let additional_dynamics_fn: Option<crate::integrators::traits::DStateDynamics> =
additional_dynamics.map(|dyn_py| {
let dyn_py = dyn_py.clone_ref(py);
Box::new(
move |t: f64, x: &nalgebra::DVector<f64>, p: Option<&nalgebra::DVector<f64>>| {
Python::attach(|py| {
let x_np = x.as_slice().to_pyarray(py);
let p_np: Option<Bound<'_, PyArray<f64, Ix1>>> =
p.map(|pv| pv.as_slice().to_pyarray(py).to_owned());
let result = match p_np {
Some(params_arr) => dyn_py.call1(py, (t, x_np, params_arr)),
None => dyn_py.call1(py, (t, x_np, py.None())),
};
match result {
Ok(res) => {
let res_arr: PyReadonlyArray1<f64> = res.extract(py).unwrap();
nalgebra::DVector::from_column_slice(res_arr.as_slice().unwrap())
}
Err(e) => {
panic!("Error calling additional_dynamics: {e}")
}
}
})
},
) as crate::integrators::traits::DStateDynamics
});
let control_input_fn: crate::integrators::traits::DControlInput =
control_input.map(|ctrl_py| {
let ctrl_py = ctrl_py.clone_ref(py);
Box::new(
move |t: f64, x: &nalgebra::DVector<f64>, p: Option<&nalgebra::DVector<f64>>| {
Python::attach(|py| {
let x_np = x.as_slice().to_pyarray(py);
let p_np: Option<Bound<'_, PyArray<f64, Ix1>>> =
p.map(|pv| pv.as_slice().to_pyarray(py).to_owned());
let result = match p_np {
Some(params_arr) => ctrl_py.call1(py, (t, x_np, params_arr)),
None => ctrl_py.call1(py, (t, x_np, py.None())),
};
match result {
Ok(res) => {
let res_arr: PyReadonlyArray1<f64> = res.extract(py).unwrap();
nalgebra::DVector::from_column_slice(res_arr.as_slice().unwrap())
}
Err(e) => {
panic!("Error calling control_input: {e}")
}
}
})
},
) as Box<dyn Fn(f64, &nalgebra::DVector<f64>, Option<&nalgebra::DVector<f64>>) -> nalgebra::DVector<f64> + Send + Sync>
});
let prop = propagators::DNumericalOrbitPropagator::new(
epoch.obj,
state_vec,
propagation_config.config.clone(),
force_config.config.clone(),
params_vec,
additional_dynamics_fn,
control_input_fn,
cov_matrix,
).map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(PyNumericalOrbitPropagator { propagator: prop })
}
#[classmethod]
#[pyo3(signature = (epoch, state, params=None, force_config=None))]
pub fn from_eci(
_cls: &Bound<'_, PyType>,
epoch: &PyEpoch,
state: PyReadonlyArray1<f64>,
params: Option<PyReadonlyArray1<f64>>,
force_config: Option<&PyForceModelConfig>,
) -> PyResult<Self> {
let state_slice = state.as_slice()?;
if state_slice.len() < 6 {
return Err(exceptions::PyValueError::new_err(
"State vector must have at least 6 elements"
));
}
let state_vec = nalgebra::DVector::from_column_slice(state_slice);
let params_vec = params.map(|p| nalgebra::DVector::from_column_slice(p.as_slice().unwrap()));
let fc = force_config
.map(|c| c.config.clone())
.unwrap_or_default();
let prop = propagators::DNumericalOrbitPropagator::new(
epoch.obj,
state_vec,
propagators::NumericalPropagationConfig::default(),
fc,
params_vec,
None,
None,
None,
).map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(PyNumericalOrbitPropagator { propagator: prop })
}
#[pyo3(text_signature = "()")]
pub fn current_epoch(&self) -> PyEpoch {
PyEpoch { obj: self.propagator.current_epoch() }
}
#[getter]
pub fn initial_epoch(&self) -> PyEpoch {
PyEpoch { obj: self.propagator.initial_epoch() }
}
#[pyo3(text_signature = "()")]
pub fn current_state<'a>(&self, py: Python<'a>) -> Bound<'a, PyArray<f64, Ix1>> {
let state = self.propagator.current_state();
state.as_slice().to_pyarray(py).to_owned()
}
#[pyo3(text_signature = "()")]
pub fn initial_state<'a>(&self, py: Python<'a>) -> Bound<'a, PyArray<f64, Ix1>> {
let state = DStatePropagator::initial_state(&self.propagator);
state.as_slice().to_pyarray(py).to_owned()
}
#[getter]
pub fn state_dim(&self) -> usize {
DStatePropagator::state_dim(&self.propagator)
}
#[getter]
pub fn step_size(&self) -> f64 {
DStatePropagator::step_size(&self.propagator)
}
#[setter]
pub fn set_step_size(&mut self, step_size: f64) {
DStatePropagator::set_step_size(&mut self.propagator, step_size);
}
#[pyo3(text_signature = "()")]
pub fn step(&mut self) {
DStatePropagator::step(&mut self.propagator);
}
#[pyo3(text_signature = "(step_size)")]
pub fn step_by(&mut self, step_size: f64) {
DStatePropagator::step_by(&mut self.propagator, step_size);
}
#[pyo3(text_signature = "(target_epoch)")]
pub fn step_past(&mut self, target_epoch: &PyEpoch) {
DStatePropagator::step_past(&mut self.propagator, target_epoch.obj);
}
#[pyo3(text_signature = "(num_steps)")]
pub fn propagate_steps(&mut self, num_steps: usize) {
DStatePropagator::propagate_steps(&mut self.propagator, num_steps);
}
#[pyo3(text_signature = "(target_epoch)")]
pub fn propagate_to(&mut self, target_epoch: &PyEpoch) {
DStatePropagator::propagate_to(&mut self.propagator, target_epoch.obj);
}
#[pyo3(text_signature = "()")]
pub fn reset(&mut self) {
DStatePropagator::reset(&mut self.propagator);
}
#[pyo3(text_signature = "(epoch)")]
pub fn state<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DStateProvider::state(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_eci<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DOrbitStateProvider::state_eci(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_ecef<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DOrbitStateProvider::state_ecef(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_gcrf<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DOrbitStateProvider::state_gcrf(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_itrf<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DOrbitStateProvider::state_itrf(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch)")]
pub fn state_eme2000<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DOrbitStateProvider::state_eme2000(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch, angle_format)")]
pub fn state_koe_osc<'a>(
&self,
py: Python<'a>,
epoch: &PyEpoch,
angle_format: &PyAngleFormat,
) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DOrbitStateProvider::state_koe_osc(&self.propagator, epoch.obj, angle_format.value)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epoch, angle_format)")]
pub fn state_koe_mean<'a>(
&self,
py: Python<'a>,
epoch: &PyEpoch,
angle_format: &PyAngleFormat,
) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DOrbitStateProvider::state_koe_mean(&self.propagator, epoch.obj, angle_format.value)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[pyo3(text_signature = "(epochs, angle_format)")]
pub fn states_koe_osc<'a>(
&self,
py: Python<'a>,
epochs: Vec<PyRef<PyEpoch>>,
angle_format: &PyAngleFormat,
) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = DOrbitStateProvider::states_koe_osc(&self.propagator, &epoch_vec, angle_format.value)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[pyo3(text_signature = "(epochs, angle_format)")]
pub fn states_koe_mean<'a>(
&self,
py: Python<'a>,
epochs: Vec<PyRef<PyEpoch>>,
angle_format: &PyAngleFormat,
) -> PyResult<Vec<Bound<'a, PyArray<f64, Ix1>>>> {
let epoch_vec: Vec<_> = epochs.iter().map(|e| e.obj).collect();
let states = DOrbitStateProvider::states_koe_mean(&self.propagator, &epoch_vec, angle_format.value)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(states.iter().map(|s: &Vector6<f64>| s.as_slice().to_pyarray(py).to_owned()).collect())
}
#[getter]
pub fn trajectory(&self) -> PyOrbitalTrajectory {
PyOrbitalTrajectory { trajectory: self.propagator.trajectory().clone() }
}
#[pyo3(text_signature = "()")]
pub fn stm<'a>(&self, py: Python<'a>) -> Option<Bound<'a, PyArray<f64, Ix2>>> {
self.propagator.stm().map(|stm| {
let n = stm.nrows();
let flat: Vec<f64> = (0..n).flat_map(|i| (0..n).map(move |j| stm[(i, j)])).collect();
flat.into_pyarray(py).reshape([n, n]).unwrap()
})
}
#[pyo3(text_signature = "()")]
pub fn sensitivity<'a>(&self, py: Python<'a>) -> Option<Bound<'a, PyArray<f64, Ix2>>> {
self.propagator.sensitivity().map(|sens| {
let n = sens.nrows();
let p = sens.ncols();
let flat: Vec<f64> = (0..n).flat_map(|i| (0..p).map(move |j| sens[(i, j)])).collect();
flat.into_pyarray(py).reshape([n, p]).unwrap()
})
}
#[pyo3(text_signature = "(epoch)")]
pub fn covariance<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix2>>> {
let cov = DCovarianceProvider::covariance(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
let n = cov.nrows();
let mut flat = Vec::with_capacity(n * n);
for i in 0..n {
for j in 0..n {
flat.push(cov[(i, j)]);
}
}
Ok(flat.into_pyarray(py).reshape([n, n]).unwrap())
}
#[pyo3(text_signature = "(max_size)")]
pub fn set_eviction_policy_max_size(&mut self, max_size: usize) -> PyResult<()> {
DStatePropagator::set_eviction_policy_max_size(&mut self.propagator, max_size)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))
}
#[pyo3(text_signature = "(max_age)")]
pub fn set_eviction_policy_max_age(&mut self, max_age: f64) -> PyResult<()> {
DStatePropagator::set_eviction_policy_max_age(&mut self.propagator, max_age)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))
}
pub fn with_name(mut slf: PyRefMut<'_, Self>, name: String) -> PyRefMut<'_, Self> {
slf.propagator.name = Some(name);
slf
}
pub fn with_uuid(mut slf: PyRefMut<'_, Self>, uuid_str: String) -> PyResult<PyRefMut<'_, Self>> {
let uuid = uuid::Uuid::parse_str(&uuid_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?;
slf.propagator.uuid = Some(uuid);
Ok(slf)
}
pub fn with_new_uuid(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.propagator.uuid = Some(uuid::Uuid::now_v7());
slf
}
pub fn with_id(mut slf: PyRefMut<'_, Self>, id: u64) -> PyRefMut<'_, Self> {
slf.propagator.id = Some(id);
slf
}
pub fn get_name(&self) -> Option<String> {
self.propagator.name.clone()
}
pub fn get_uuid(&self) -> Option<String> {
self.propagator.uuid.map(|u| u.to_string())
}
pub fn get_id(&self) -> Option<u64> {
self.propagator.id
}
fn __repr__(&self) -> String {
format!("NumericalOrbitPropagator(epoch={:?}, state_dim={})",
DStatePropagator::current_epoch(&self.propagator),
DStatePropagator::state_dim(&self.propagator))
}
fn __str__(&self) -> String {
self.__repr__()
}
#[pyo3(text_signature = "(event)")]
pub fn add_event_detector(&mut self, event: &Bound<'_, PyAny>) -> PyResult<()> {
if let Ok(mut time_event) = event.extract::<PyRefMut<PyTimeEvent>>() {
if let Some(inner) = time_event.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err(
"TimeEvent has already been consumed"
));
}
if let Ok(mut value_event) = event.extract::<PyRefMut<PyValueEvent>>() {
if let Some(inner) = value_event.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err(
"ValueEvent has already been consumed"
));
}
if let Ok(mut binary_event) = event.extract::<PyRefMut<PyBinaryEvent>>() {
if let Some(inner) = binary_event.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err(
"BinaryEvent has already been consumed"
));
}
if let Ok(mut altitude_event) = event.extract::<PyRefMut<PyAltitudeEvent>>() {
if let Some(inner) = altitude_event.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err(
"AltitudeEvent has already been consumed"
));
}
if let Ok(mut e) = event.extract::<PyRefMut<PySemiMajorAxisEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("SemiMajorAxisEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyEccentricityEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("EccentricityEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyInclinationEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("InclinationEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyArgumentOfPerigeeEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("ArgumentOfPerigeeEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyMeanAnomalyEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("MeanAnomalyEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyEccentricAnomalyEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("EccentricAnomalyEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyTrueAnomalyEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("TrueAnomalyEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyArgumentOfLatitudeEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("ArgumentOfLatitudeEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyAscendingNodeEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("AscendingNodeEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyDescendingNodeEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("DescendingNodeEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PySpeedEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("SpeedEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyLongitudeEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("LongitudeEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyLatitudeEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("LatitudeEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyUmbraEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("UmbraEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyPenumbraEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("PenumbraEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PyEclipseEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("EclipseEvent has already been consumed"));
}
if let Ok(mut e) = event.extract::<PyRefMut<PySunlitEvent>>() {
if let Some(inner) = e.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err("SunlitEvent has already been consumed"));
}
Err(exceptions::PyTypeError::new_err(
"Expected event detector type (TimeEvent, ValueEvent, BinaryEvent, AltitudeEvent, or premade event)"
))
}
#[pyo3(text_signature = "()")]
pub fn event_log(&self) -> Vec<PyDetectedEvent> {
self.propagator
.event_log()
.iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "()")]
pub fn query_events(&self) -> PyEventQuery {
PyEventQuery::new(self.propagator.event_log().to_vec())
}
#[pyo3(text_signature = "(name)")]
pub fn events_by_name(&self, name: &str) -> Vec<PyDetectedEvent> {
self.propagator
.events_by_name(name)
.into_iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "()")]
pub fn latest_event(&self) -> Option<PyDetectedEvent> {
self.propagator
.latest_event()
.map(|e| PyDetectedEvent { event: e.clone() })
}
#[pyo3(text_signature = "(start, end)")]
pub fn events_in_range(&self, start: &PyEpoch, end: &PyEpoch) -> Vec<PyDetectedEvent> {
self.propagator
.events_in_range(start.obj, end.obj)
.into_iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "(index)")]
pub fn events_by_detector_index(&self, index: usize) -> Vec<PyDetectedEvent> {
self.propagator
.events_by_detector_index(index)
.into_iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "(index, start, end)")]
pub fn events_by_detector_index_in_range(
&self,
index: usize,
start: &PyEpoch,
end: &PyEpoch,
) -> Vec<PyDetectedEvent> {
self.propagator
.events_by_detector_index_in_range(index, start.obj, end.obj)
.into_iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "(name_pattern, start, end)")]
pub fn events_by_name_in_range(
&self,
name_pattern: &str,
start: &PyEpoch,
end: &PyEpoch,
) -> Vec<PyDetectedEvent> {
self.propagator
.events_by_name_in_range(name_pattern, start.obj, end.obj)
.into_iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "()")]
pub fn terminated(&self) -> bool {
self.propagator.terminated()
}
#[pyo3(text_signature = "()")]
pub fn reset_termination(&mut self) {
self.propagator.reset_termination();
}
#[pyo3(text_signature = "()")]
pub fn clear_events(&mut self) {
self.propagator.clear_events();
}
#[pyo3(text_signature = "()")]
pub fn current_params<'a>(&self, py: Python<'a>) -> Bound<'a, PyArray<f64, Ix1>> {
let params = self.propagator.current_params();
params.as_slice().to_pyarray(py).to_owned()
}
#[pyo3(text_signature = "(mode)")]
pub fn set_trajectory_mode(&mut self, mode: &PyTrajectoryMode) {
self.propagator.set_trajectory_mode(mode.mode);
}
#[getter]
pub fn trajectory_mode(&self) -> PyTrajectoryMode {
PyTrajectoryMode { mode: self.propagator.trajectory_mode() }
}
#[pyo3(text_signature = "(epoch)")]
pub fn stm_at<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> Option<Bound<'a, PyArray<f64, Ix2>>> {
self.propagator.stm_at(epoch.obj).map(|stm| {
let n = stm.nrows();
let mut flat = Vec::with_capacity(n * n);
for i in 0..n {
for j in 0..n {
flat.push(stm[(i, j)]);
}
}
flat.into_pyarray(py).reshape([n, n]).unwrap()
})
}
#[pyo3(text_signature = "(epoch)")]
pub fn sensitivity_at<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> Option<Bound<'a, PyArray<f64, Ix2>>> {
self.propagator.sensitivity_at(epoch.obj).map(|sens| {
let n = sens.nrows();
let p = sens.ncols();
let mut flat = Vec::with_capacity(n * p);
for i in 0..n {
for j in 0..p {
flat.push(sens[(i, j)]);
}
}
flat.into_pyarray(py).reshape([n, p]).unwrap()
})
}
#[pyo3(text_signature = "(epoch)")]
pub fn covariance_gcrf<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix2>>> {
let cov = DOrbitCovarianceProvider::covariance_gcrf(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
let n = cov.nrows();
let mut flat = Vec::with_capacity(n * n);
for i in 0..n {
for j in 0..n {
flat.push(cov[(i, j)]);
}
}
Ok(flat.into_pyarray(py).reshape([n, n]).unwrap())
}
#[pyo3(text_signature = "(epoch)")]
pub fn covariance_rtn<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix2>>> {
let cov = DOrbitCovarianceProvider::covariance_rtn(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
let n = cov.nrows();
let mut flat = Vec::with_capacity(n * n);
for i in 0..n {
for j in 0..n {
flat.push(cov[(i, j)]);
}
}
Ok(flat.into_pyarray(py).reshape([n, n]).unwrap())
}
#[pyo3(text_signature = "(method)")]
pub fn set_interpolation_method(&mut self, method: &PyInterpolationMethod) {
InterpolationConfig::set_interpolation_method(&mut self.propagator, method.method);
}
#[pyo3(text_signature = "()")]
pub fn get_interpolation_method(&self) -> PyInterpolationMethod {
PyInterpolationMethod { method: InterpolationConfig::get_interpolation_method(&self.propagator) }
}
#[pyo3(text_signature = "(method)")]
pub fn set_covariance_interpolation_method(&mut self, method: &PyCovarianceInterpolationMethod) {
CovarianceInterpolationConfig::set_covariance_interpolation_method(&mut self.propagator, method.method);
}
#[pyo3(text_signature = "()")]
pub fn get_covariance_interpolation_method(&self) -> PyCovarianceInterpolationMethod {
PyCovarianceInterpolationMethod { method: CovarianceInterpolationConfig::get_covariance_interpolation_method(&self.propagator) }
}
#[pyo3(text_signature = "(name)")]
pub fn set_name(&mut self, name: Option<String>) {
self.propagator.name = name;
}
#[pyo3(text_signature = "(id)")]
pub fn set_id(&mut self, id: Option<u64>) {
self.propagator.id = id;
}
#[pyo3(text_signature = "(uuid_str)")]
pub fn set_uuid(&mut self, uuid_str: Option<String>) -> PyResult<()> {
if let Some(s) = uuid_str {
let uuid = uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?;
self.propagator.uuid = Some(uuid);
} else {
self.propagator.uuid = None;
}
Ok(())
}
pub fn with_identity(
mut slf: PyRefMut<'_, Self>,
name: Option<String>,
uuid_str: Option<String>,
id: Option<u64>,
) -> PyResult<PyRefMut<'_, Self>> {
slf.propagator.name = name;
slf.propagator.id = id;
if let Some(s) = uuid_str {
let uuid = uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?;
slf.propagator.uuid = Some(uuid);
} else {
slf.propagator.uuid = None;
}
Ok(slf)
}
#[pyo3(text_signature = "(name, uuid_str, id)")]
pub fn set_identity(
&mut self,
name: Option<String>,
uuid_str: Option<String>,
id: Option<u64>,
) -> PyResult<()> {
self.propagator.name = name;
self.propagator.id = id;
if let Some(s) = uuid_str {
let uuid = uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?;
self.propagator.uuid = Some(uuid);
} else {
self.propagator.uuid = None;
}
Ok(())
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "TrajectoryMode")]
#[derive(Clone)]
pub struct PyTrajectoryMode {
pub(crate) mode: propagators::TrajectoryMode,
}
#[pymethods]
impl PyTrajectoryMode {
#[classattr]
#[pyo3(name = "ALL_STEPS")]
fn all_steps() -> Self {
PyTrajectoryMode {
mode: propagators::TrajectoryMode::AllSteps,
}
}
#[classattr]
#[pyo3(name = "OUTPUT_STEPS_ONLY")]
fn output_steps_only() -> Self {
PyTrajectoryMode {
mode: propagators::TrajectoryMode::OutputStepsOnly,
}
}
#[classattr]
#[pyo3(name = "DISABLED")]
fn disabled() -> Self {
PyTrajectoryMode {
mode: propagators::TrajectoryMode::Disabled,
}
}
fn __str__(&self) -> String {
format!("{:?}", self.mode)
}
fn __repr__(&self) -> String {
format!("TrajectoryMode.{:?}", self.mode)
}
fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp) -> PyResult<bool> {
match op {
pyo3::pyclass::CompareOp::Eq => Ok(self.mode == other.mode),
pyo3::pyclass::CompareOp::Ne => Ok(self.mode != other.mode),
_ => Err(pyo3::exceptions::PyNotImplementedError::new_err(
"Comparison not supported",
)),
}
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "NumericalPropagator")]
pub struct PyNumericalPropagator {
pub propagator: propagators::DNumericalPropagator,
}
#[pymethods]
impl PyNumericalPropagator {
#[new]
#[pyo3(signature = (epoch, state, dynamics, propagation_config, params=None, initial_covariance=None, control_input=None))]
#[allow(clippy::too_many_arguments)]
pub fn new(
py: Python<'_>,
epoch: &PyEpoch,
state: PyReadonlyArray1<f64>,
dynamics: Py<PyAny>,
propagation_config: &PyNumericalPropagationConfig,
params: Option<PyReadonlyArray1<f64>>,
initial_covariance: Option<PyReadonlyArray2<f64>>,
control_input: Option<Py<PyAny>>,
) -> PyResult<Self> {
let state_vec = nalgebra::DVector::from_column_slice(state.as_slice()?);
let state_dim = state_vec.len();
let params_vec = params.map(|p| nalgebra::DVector::from_column_slice(p.as_slice().unwrap()));
let cov_matrix = if let Some(cov) = initial_covariance {
let cov_shape = cov.shape();
if cov_shape[0] != state_dim || cov_shape[1] != state_dim {
return Err(exceptions::PyValueError::new_err(
format!("Initial covariance must be a {}x{} matrix", state_dim, state_dim)
));
}
let cov_data: Vec<f64> = cov.as_slice()?.to_vec();
Some(nalgebra::DMatrix::from_row_slice(state_dim, state_dim, &cov_data))
} else {
None
};
let dynamics_py = dynamics.clone_ref(py);
let dynamics_fn: crate::integrators::traits::DStateDynamics = Box::new(
move |t: f64, x: &nalgebra::DVector<f64>, p: Option<&nalgebra::DVector<f64>>| {
Python::attach(|py| {
let x_np = x.as_slice().to_pyarray(py);
let p_np: Option<Bound<'_, PyArray<f64, Ix1>>> = p.map(|pv| pv.as_slice().to_pyarray(py).to_owned());
let result = match p_np {
Some(params_arr) => dynamics_py.call1(py, (t, x_np, params_arr)),
None => dynamics_py.call1(py, (t, x_np, py.None())),
};
match result {
Ok(res) => {
let res_arr: PyReadonlyArray1<f64> = res.extract(py).unwrap();
nalgebra::DVector::from_column_slice(res_arr.as_slice().unwrap())
}
Err(e) => {
panic!("Error calling dynamics function: {e}")
}
}
})
}
);
let control_input_fn: crate::integrators::traits::DControlInput =
control_input.map(|ctrl_py| {
let ctrl_py = ctrl_py.clone_ref(py);
Box::new(
move |t: f64, x: &nalgebra::DVector<f64>, p: Option<&nalgebra::DVector<f64>>| {
Python::attach(|py| {
let x_np = x.as_slice().to_pyarray(py);
let p_np: Option<Bound<'_, PyArray<f64, Ix1>>> =
p.map(|pv| pv.as_slice().to_pyarray(py).to_owned());
let result = match p_np {
Some(params_arr) => ctrl_py.call1(py, (t, x_np, params_arr)),
None => ctrl_py.call1(py, (t, x_np, py.None())),
};
match result {
Ok(res) => {
let res_arr: PyReadonlyArray1<f64> = res.extract(py).unwrap();
nalgebra::DVector::from_column_slice(res_arr.as_slice().unwrap())
}
Err(e) => {
panic!("Error calling control_input: {e}")
}
}
})
},
) as Box<dyn Fn(f64, &nalgebra::DVector<f64>, Option<&nalgebra::DVector<f64>>) -> nalgebra::DVector<f64> + Send + Sync>
});
let prop = propagators::DNumericalPropagator::new(
epoch.obj,
state_vec,
dynamics_fn,
propagation_config.config.clone(),
params_vec,
control_input_fn,
cov_matrix,
).map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(PyNumericalPropagator { propagator: prop })
}
#[pyo3(text_signature = "()")]
pub fn current_epoch(&self) -> PyEpoch {
PyEpoch { obj: DStatePropagator::current_epoch(&self.propagator) }
}
#[getter]
pub fn initial_epoch(&self) -> PyEpoch {
PyEpoch { obj: DStatePropagator::initial_epoch(&self.propagator) }
}
#[pyo3(text_signature = "()")]
pub fn current_state<'a>(&self, py: Python<'a>) -> Bound<'a, PyArray<f64, Ix1>> {
let state = DStatePropagator::current_state(&self.propagator);
state.as_slice().to_pyarray(py).to_owned()
}
#[pyo3(text_signature = "()")]
pub fn initial_state<'a>(&self, py: Python<'a>) -> Bound<'a, PyArray<f64, Ix1>> {
let state = DStatePropagator::initial_state(&self.propagator);
state.as_slice().to_pyarray(py).to_owned()
}
#[getter]
pub fn state_dim(&self) -> usize {
DStatePropagator::state_dim(&self.propagator)
}
#[getter]
pub fn step_size(&self) -> f64 {
DStatePropagator::step_size(&self.propagator)
}
#[setter]
pub fn set_step_size(&mut self, step_size: f64) {
DStatePropagator::set_step_size(&mut self.propagator, step_size);
}
#[pyo3(text_signature = "()")]
pub fn step(&mut self) {
DStatePropagator::step(&mut self.propagator);
}
#[pyo3(text_signature = "(step_size)")]
pub fn step_by(&mut self, step_size: f64) {
DStatePropagator::step_by(&mut self.propagator, step_size);
}
#[pyo3(text_signature = "(target_epoch)")]
pub fn step_past(&mut self, target_epoch: &PyEpoch) {
DStatePropagator::step_past(&mut self.propagator, target_epoch.obj);
}
#[pyo3(text_signature = "(num_steps)")]
pub fn propagate_steps(&mut self, num_steps: usize) {
DStatePropagator::propagate_steps(&mut self.propagator, num_steps);
}
#[pyo3(text_signature = "(target_epoch)")]
pub fn propagate_to(&mut self, target_epoch: &PyEpoch) {
DStatePropagator::propagate_to(&mut self.propagator, target_epoch.obj);
}
#[pyo3(text_signature = "()")]
pub fn reset(&mut self) {
DStatePropagator::reset(&mut self.propagator);
}
#[pyo3(text_signature = "(epoch)")]
pub fn state<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix1>>> {
let state = DStateProvider::state(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(state.as_slice().to_pyarray(py).to_owned())
}
#[getter]
pub fn trajectory(&self) -> PyTrajectory {
PyTrajectory { trajectory: self.propagator.trajectory().clone() }
}
#[pyo3(text_signature = "()")]
pub fn stm<'a>(&self, py: Python<'a>) -> Option<Bound<'a, PyArray<f64, Ix2>>> {
self.propagator.stm().map(|stm| {
let n = stm.nrows();
let flat: Vec<f64> = (0..n).flat_map(|i| (0..n).map(move |j| stm[(i, j)])).collect();
flat.into_pyarray(py).reshape([n, n]).unwrap()
})
}
#[pyo3(text_signature = "()")]
pub fn sensitivity<'a>(&self, py: Python<'a>) -> Option<Bound<'a, PyArray<f64, Ix2>>> {
self.propagator.sensitivity().map(|sens| {
let n = sens.nrows();
let p = sens.ncols();
let flat: Vec<f64> = (0..n).flat_map(|i| (0..p).map(move |j| sens[(i, j)])).collect();
flat.into_pyarray(py).reshape([n, p]).unwrap()
})
}
#[pyo3(text_signature = "(epoch)")]
pub fn covariance<'a>(&self, py: Python<'a>, epoch: &PyEpoch) -> PyResult<Bound<'a, PyArray<f64, Ix2>>> {
let cov = DCovarianceProvider::covariance(&self.propagator, epoch.obj)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))?;
let n = cov.nrows();
let mut flat = Vec::with_capacity(n * n);
for i in 0..n {
for j in 0..n {
flat.push(cov[(i, j)]);
}
}
Ok(flat.into_pyarray(py).reshape([n, n]).unwrap())
}
#[pyo3(text_signature = "(max_size)")]
pub fn set_eviction_policy_max_size(&mut self, max_size: usize) -> PyResult<()> {
DStatePropagator::set_eviction_policy_max_size(&mut self.propagator, max_size)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))
}
#[pyo3(text_signature = "(max_age)")]
pub fn set_eviction_policy_max_age(&mut self, max_age: f64) -> PyResult<()> {
DStatePropagator::set_eviction_policy_max_age(&mut self.propagator, max_age)
.map_err(|e| exceptions::PyRuntimeError::new_err(e.to_string()))
}
#[pyo3(text_signature = "(mode)")]
pub fn set_trajectory_mode(&mut self, mode: &PyTrajectoryMode) {
self.propagator.set_trajectory_mode(mode.mode);
}
#[pyo3(text_signature = "()")]
pub fn trajectory_mode(&self) -> PyTrajectoryMode {
PyTrajectoryMode {
mode: self.propagator.trajectory_mode(),
}
}
pub fn with_name(mut slf: PyRefMut<'_, Self>, name: String) -> PyRefMut<'_, Self> {
slf.propagator.name = Some(name);
slf
}
pub fn with_uuid(mut slf: PyRefMut<'_, Self>, uuid_str: String) -> PyResult<PyRefMut<'_, Self>> {
let uuid = uuid::Uuid::parse_str(&uuid_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?;
slf.propagator.uuid = Some(uuid);
Ok(slf)
}
pub fn with_new_uuid(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.propagator.uuid = Some(uuid::Uuid::now_v7());
slf
}
pub fn with_id(mut slf: PyRefMut<'_, Self>, id: u64) -> PyRefMut<'_, Self> {
slf.propagator.id = Some(id);
slf
}
pub fn get_name(&self) -> Option<String> {
self.propagator.name.clone()
}
pub fn get_uuid(&self) -> Option<String> {
self.propagator.uuid.map(|u| u.to_string())
}
pub fn get_id(&self) -> Option<u64> {
self.propagator.id
}
fn __repr__(&self) -> String {
format!("NumericalPropagator(epoch={:?}, state_dim={})",
DStatePropagator::current_epoch(&self.propagator),
DStatePropagator::state_dim(&self.propagator))
}
fn __str__(&self) -> String {
self.__repr__()
}
#[pyo3(text_signature = "(event)")]
pub fn add_event_detector(&mut self, event: &Bound<'_, PyAny>) -> PyResult<()> {
if let Ok(mut time_event) = event.extract::<PyRefMut<PyTimeEvent>>() {
if let Some(inner) = time_event.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err(
"TimeEvent has already been consumed"
));
}
if let Ok(mut value_event) = event.extract::<PyRefMut<PyValueEvent>>() {
if let Some(inner) = value_event.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err(
"ValueEvent has already been consumed"
));
}
if let Ok(mut binary_event) = event.extract::<PyRefMut<PyBinaryEvent>>() {
if let Some(inner) = binary_event.event.take() {
self.propagator.add_event_detector(Box::new(inner));
return Ok(());
}
return Err(exceptions::PyValueError::new_err(
"BinaryEvent has already been consumed"
));
}
Err(exceptions::PyTypeError::new_err(
"Expected event detector type (TimeEvent, ValueEvent, or BinaryEvent)"
))
}
#[pyo3(text_signature = "()")]
pub fn event_log(&self) -> Vec<PyDetectedEvent> {
self.propagator
.event_log()
.iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "()")]
pub fn query_events(&self) -> PyEventQuery {
PyEventQuery::new(self.propagator.event_log().to_vec())
}
#[pyo3(text_signature = "(name)")]
pub fn events_by_name(&self, name: &str) -> Vec<PyDetectedEvent> {
self.propagator
.events_by_name(name)
.into_iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "()")]
pub fn latest_event(&self) -> Option<PyDetectedEvent> {
self.propagator
.latest_event()
.map(|e| PyDetectedEvent { event: e.clone() })
}
#[pyo3(text_signature = "(start, end)")]
pub fn events_in_range(&self, start: &PyEpoch, end: &PyEpoch) -> Vec<PyDetectedEvent> {
self.propagator
.events_in_range(start.obj, end.obj)
.into_iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "(index)")]
pub fn events_by_detector_index(&self, index: usize) -> Vec<PyDetectedEvent> {
self.propagator
.events_by_detector_index(index)
.into_iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "()")]
pub fn terminated(&self) -> bool {
self.propagator.terminated()
}
#[pyo3(text_signature = "()")]
pub fn reset_termination(&mut self) {
self.propagator.reset_termination();
}
#[pyo3(text_signature = "()")]
pub fn clear_events(&mut self) {
self.propagator.clear_events();
}
pub fn set_name(&mut self, name: Option<String>) {
self.propagator.name = name;
}
pub fn set_id(&mut self, id: Option<u64>) {
self.propagator.id = id;
}
pub fn set_uuid(&mut self, uuid_str: Option<String>) -> PyResult<()> {
let uuid = match uuid_str {
Some(s) => Some(uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?),
None => None,
};
self.propagator.uuid = uuid;
Ok(())
}
pub fn generate_uuid(&mut self) {
self.propagator.uuid = Some(uuid::Uuid::now_v7());
}
pub fn with_identity(mut slf: PyRefMut<'_, Self>, name: Option<String>, uuid_str: Option<String>, id: Option<u64>) -> PyResult<PyRefMut<'_, Self>> {
let uuid = match uuid_str {
Some(s) => Some(uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?),
None => None,
};
slf.propagator.name = name;
slf.propagator.uuid = uuid;
slf.propagator.id = id;
Ok(slf)
}
pub fn set_identity(&mut self, name: Option<String>, uuid_str: Option<String>, id: Option<u64>) -> PyResult<()> {
let uuid = match uuid_str {
Some(s) => Some(uuid::Uuid::parse_str(&s)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?),
None => None,
};
self.propagator.name = name;
self.propagator.uuid = uuid;
self.propagator.id = id;
Ok(())
}
pub fn with_interpolation_method(&mut self, method: &PyInterpolationMethod) {
self.propagator.set_interpolation_method(method.method);
}
pub fn set_interpolation_method(&mut self, method: &PyInterpolationMethod) {
self.propagator.set_interpolation_method(method.method);
}
pub fn get_interpolation_method(&self) -> PyInterpolationMethod {
PyInterpolationMethod { method: self.propagator.get_interpolation_method() }
}
pub fn with_covariance_interpolation_method(&mut self, method: &PyCovarianceInterpolationMethod) {
self.propagator.set_covariance_interpolation_method(method.method);
}
pub fn set_covariance_interpolation_method(&mut self, method: &PyCovarianceInterpolationMethod) {
self.propagator.set_covariance_interpolation_method(method.method);
}
pub fn get_covariance_interpolation_method(&self) -> PyCovarianceInterpolationMethod {
PyCovarianceInterpolationMethod { method: self.propagator.get_covariance_interpolation_method() }
}
}