#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "PropertiesDict")]
pub struct PyPropertiesDict {
parent: Py<PyAny>,
}
#[pymethods]
impl PyPropertiesDict {
fn __getitem__(&self, key: String, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_properties_dict(py)?;
props_dict
.get_item(&key)?
.ok_or_else(|| exceptions::PyKeyError::new_err(format!("Key '{}' not found", key)))
.map(|item| item.unbind())
}
fn __setitem__(&self, key: String, value: &Bound<'_, PyAny>, py: Python) -> PyResult<()> {
let json_module = py.import("json")?;
let dumps = json_module.getattr("dumps")?;
let json_str: String = dumps.call1((value,))?.extract()?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("JSON error: {}", e)))?;
self.set_property(py, key, json_value)?;
Ok(())
}
fn __delitem__(&self, key: String, py: Python) -> PyResult<()> {
self.remove_property(py, key)
}
fn __len__(&self, py: Python) -> PyResult<usize> {
let props_dict = self.get_properties_dict(py)?;
Ok(props_dict.len())
}
fn __contains__(&self, key: String, py: Python) -> PyResult<bool> {
let props_dict = self.get_properties_dict(py)?;
props_dict.contains(&key)
}
fn __iter__(&self, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_properties_dict(py)?;
Ok(props_dict.call_method0("__iter__")?.into())
}
fn __repr__(&self, py: Python) -> PyResult<String> {
let props_dict = self.get_properties_dict(py)?;
Ok(format!("PropertiesDict({})", props_dict.repr()?))
}
#[pyo3(signature = (key, default=None))]
fn get(&self, key: String, default: Option<Py<PyAny>>, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_properties_dict(py)?;
match props_dict.get_item(&key)? {
Some(value) => Ok(value.into()),
None => Ok(default.unwrap_or_else(|| py.None())),
}
}
fn keys(&self, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_properties_dict(py)?;
Ok(props_dict.call_method0("keys")?.into())
}
fn values(&self, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_properties_dict(py)?;
Ok(props_dict.call_method0("values")?.into())
}
fn items(&self, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_properties_dict(py)?;
Ok(props_dict.call_method0("items")?.into())
}
fn clear(&self, py: Python) -> PyResult<()> {
let props_dict = self.get_properties_dict(py)?;
let keys_list = props_dict.call_method0("keys")?;
let keys: Vec<String> = keys_list.extract()?;
for key in keys {
self.remove_property(py, key)?;
}
Ok(())
}
fn update(&self, other: &Bound<'_, PyDict>, py: Python) -> PyResult<()> {
for (key, value) in other.iter() {
let key_str: String = key.extract()?;
self.__setitem__(key_str, &value, py)?;
}
Ok(())
}
}
impl PyPropertiesDict {
fn new(parent: Py<PyAny>) -> Self {
Self { parent }
}
fn get_properties_dict<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyDict>> {
let parent_obj = self.parent.bind(py);
let props_obj = parent_obj.call_method0("_get_properties_dict")?;
props_obj.cast::<PyDict>().cloned().map_err(|e| e.into())
}
fn set_property(&self, py: Python, key: String, value: serde_json::Value) -> PyResult<()> {
let parent_obj = self.parent.bind(py);
parent_obj.call_method1("_set_property", (key, value.to_string()))?;
Ok(())
}
fn remove_property(&self, py: Python, key: String) -> PyResult<()> {
let parent_obj = self.parent.bind(py);
parent_obj.call_method1("_remove_property", (key,))?;
Ok(())
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "LookDirection")]
#[derive(Clone)]
pub struct PyLookDirection {
pub(crate) value: LookDirection,
}
#[pymethods]
impl PyLookDirection {
#[classattr]
#[allow(non_snake_case)]
fn LEFT() -> Self {
PyLookDirection {
value: LookDirection::Left,
}
}
#[classattr]
#[allow(non_snake_case)]
fn RIGHT() -> Self {
PyLookDirection {
value: LookDirection::Right,
}
}
#[classattr]
#[allow(non_snake_case)]
fn EITHER() -> Self {
PyLookDirection {
value: LookDirection::Either,
}
}
fn __str__(&self) -> String {
format!("{:?}", self.value)
}
fn __repr__(&self) -> String {
format!("LookDirection.{:?}", self.value)
}
fn __richcmp__(&self, other: &Self, op: CompareOp) -> PyResult<bool> {
match op {
CompareOp::Eq => Ok(self.value == other.value),
CompareOp::Ne => Ok(self.value != other.value),
_ => Err(exceptions::PyNotImplementedError::new_err(
"Comparison not supported",
)),
}
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "AscDsc")]
#[derive(Clone)]
pub struct PyAscDsc {
pub(crate) value: AscDsc,
}
#[pymethods]
impl PyAscDsc {
#[classattr]
#[allow(non_snake_case)]
fn ASCENDING() -> Self {
PyAscDsc {
value: AscDsc::Ascending,
}
}
#[classattr]
#[allow(non_snake_case)]
fn DESCENDING() -> Self {
PyAscDsc {
value: AscDsc::Descending,
}
}
#[classattr]
#[allow(non_snake_case)]
fn EITHER() -> Self {
PyAscDsc {
value: AscDsc::Either,
}
}
fn __str__(&self) -> String {
format!("{:?}", self.value)
}
fn __repr__(&self) -> String {
format!("AscDsc.{:?}", self.value)
}
fn __richcmp__(&self, other: &Self, op: CompareOp) -> PyResult<bool> {
match op {
CompareOp::Eq => Ok(self.value == other.value),
CompareOp::Ne => Ok(self.value != other.value),
_ => Err(exceptions::PyNotImplementedError::new_err(
"Comparison not supported",
)),
}
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "ElevationConstraint")]
pub struct PyElevationConstraint {
pub constraint: ElevationConstraint,
}
#[pymethods]
impl PyElevationConstraint {
#[new]
#[pyo3(signature = (min_elevation_deg=None, max_elevation_deg=None))]
fn new(min_elevation_deg: Option<f64>, max_elevation_deg: Option<f64>) -> PyResult<Self> {
ElevationConstraint::new(min_elevation_deg, max_elevation_deg)
.map(|constraint| PyElevationConstraint { constraint })
.map_err(|e| exceptions::PyValueError::new_err(e.to_string()))
}
fn name(&self) -> &str {
self.constraint.name()
}
fn evaluate(
&self,
_py: Python,
epoch: &PyEpoch,
sat_state_ecef: &Bound<'_, PyAny>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
let sat_state = pyany_to_svector::<6>(sat_state_ecef)?;
let location = pyany_to_svector::<3>(location_ecef)?;
Ok(self.constraint.evaluate(&epoch.obj, &sat_state, &location))
}
fn __str__(&self) -> String {
self.constraint.name().to_string()
}
fn __repr__(&self) -> String {
let min_str = match self.constraint.min_elevation_deg {
Some(v) => format!("Some({})", v),
None => "None".to_string(),
};
let max_str = match self.constraint.max_elevation_deg {
Some(v) => format!("Some({})", v),
None => "None".to_string(),
};
format!("ElevationConstraint(min_elevation_deg={}, max_elevation_deg={}, name={:?})",
min_str, max_str, self.constraint.name())
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "ElevationMaskConstraint")]
pub struct PyElevationMaskConstraint {
pub constraint: ElevationMaskConstraint,
}
#[pymethods]
impl PyElevationMaskConstraint {
#[new]
fn new(mask: Vec<(f64, f64)>) -> Self {
PyElevationMaskConstraint {
constraint: ElevationMaskConstraint::new(mask),
}
}
fn name(&self) -> &str {
self.constraint.name()
}
fn evaluate(
&self,
_py: Python,
epoch: &PyEpoch,
sat_state_ecef: &Bound<'_, PyAny>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
let sat_state = pyany_to_svector::<6>(sat_state_ecef)?;
let location = pyany_to_svector::<3>(location_ecef)?;
Ok(self.constraint.evaluate(&epoch.obj, &sat_state, &location))
}
fn __str__(&self) -> String {
self.constraint.name().to_string()
}
fn __repr__(&self) -> String {
format!("ElevationMaskConstraint(mask={:?}, name={:?})",
self.constraint.mask,
self.constraint.name())
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "OffNadirConstraint")]
pub struct PyOffNadirConstraint {
pub constraint: OffNadirConstraint,
}
#[pymethods]
impl PyOffNadirConstraint {
#[new]
#[pyo3(signature = (min_off_nadir_deg=None, max_off_nadir_deg=None))]
fn new(min_off_nadir_deg: Option<f64>, max_off_nadir_deg: Option<f64>) -> PyResult<Self> {
OffNadirConstraint::new(min_off_nadir_deg, max_off_nadir_deg)
.map(|constraint| PyOffNadirConstraint { constraint })
.map_err(|e| exceptions::PyValueError::new_err(e.to_string()))
}
fn name(&self) -> &str {
self.constraint.name()
}
fn evaluate(
&self,
_py: Python,
epoch: &PyEpoch,
sat_state_ecef: &Bound<'_, PyAny>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
let sat_state = pyany_to_svector::<6>(sat_state_ecef)?;
let location = pyany_to_svector::<3>(location_ecef)?;
Ok(self.constraint.evaluate(&epoch.obj, &sat_state, &location))
}
fn __str__(&self) -> String {
self.constraint.name().to_string()
}
fn __repr__(&self) -> String {
let min_str = match self.constraint.min_off_nadir_deg {
Some(v) => format!("Some({})", v),
None => "None".to_string(),
};
let max_str = match self.constraint.max_off_nadir_deg {
Some(v) => format!("Some({})", v),
None => "None".to_string(),
};
format!("OffNadirConstraint(min_off_nadir_deg={}, max_off_nadir_deg={}, name={:?})",
min_str, max_str, self.constraint.name())
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "LocalTimeConstraint")]
pub struct PyLocalTimeConstraint {
pub constraint: LocalTimeConstraint,
}
#[pymethods]
impl PyLocalTimeConstraint {
#[new]
fn new(time_windows: Vec<(u16, u16)>) -> PyResult<Self> {
LocalTimeConstraint::new(time_windows)
.map(|constraint| PyLocalTimeConstraint { constraint })
.map_err(|e| exceptions::PyValueError::new_err(e.to_string()))
}
#[classmethod]
fn from_hours(_cls: &Bound<'_, PyType>, time_windows: Vec<(f64, f64)>) -> PyResult<Self> {
LocalTimeConstraint::from_hours(time_windows)
.map(|constraint| PyLocalTimeConstraint { constraint })
.map_err(|e| exceptions::PyValueError::new_err(e.to_string()))
}
fn name(&self) -> &str {
self.constraint.name()
}
fn evaluate(
&self,
_py: Python,
epoch: &PyEpoch,
sat_state_ecef: &Bound<'_, PyAny>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
let sat_state = pyany_to_svector::<6>(sat_state_ecef)?;
let location = pyany_to_svector::<3>(location_ecef)?;
Ok(self.constraint.evaluate(&epoch.obj, &sat_state, &location))
}
fn __str__(&self) -> String {
self.constraint.name().to_string()
}
fn __repr__(&self) -> String {
format!("LocalTimeConstraint(time_windows={:?}, name={:?})",
self.constraint.time_windows,
self.constraint.name())
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "LookDirectionConstraint")]
pub struct PyLookDirectionConstraint {
pub constraint: LookDirectionConstraint,
}
#[pymethods]
impl PyLookDirectionConstraint {
#[new]
fn new(allowed: &PyLookDirection) -> Self {
PyLookDirectionConstraint {
constraint: LookDirectionConstraint::new(allowed.value),
}
}
fn name(&self) -> &str {
self.constraint.name()
}
fn evaluate(
&self,
_py: Python,
epoch: &PyEpoch,
sat_state_ecef: &Bound<'_, PyAny>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
let sat_state = pyany_to_svector::<6>(sat_state_ecef)?;
let location = pyany_to_svector::<3>(location_ecef)?;
Ok(self.constraint.evaluate(&epoch.obj, &sat_state, &location))
}
fn __str__(&self) -> String {
self.constraint.name().to_string()
}
fn __repr__(&self) -> String {
format!("LookDirectionConstraint(allowed={:?}, name={:?})",
self.constraint.allowed,
self.constraint.name())
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AscDscConstraint")]
pub struct PyAscDscConstraint {
pub constraint: AscDscConstraint,
}
#[pymethods]
impl PyAscDscConstraint {
#[new]
fn new(allowed: &PyAscDsc) -> Self {
PyAscDscConstraint {
constraint: AscDscConstraint::new(allowed.value),
}
}
fn name(&self) -> &str {
self.constraint.name()
}
fn evaluate(
&self,
_py: Python,
epoch: &PyEpoch,
sat_state_ecef: &Bound<'_, PyAny>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
let sat_state = pyany_to_svector::<6>(sat_state_ecef)?;
let location = pyany_to_svector::<3>(location_ecef)?;
Ok(self.constraint.evaluate(&epoch.obj, &sat_state, &location))
}
fn __str__(&self) -> String {
self.constraint.name().to_string()
}
fn __repr__(&self) -> String {
format!("AscDscConstraint(allowed={:?}, name={:?})",
self.constraint.allowed,
self.constraint.name())
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "ConstraintAll")]
pub struct PyConstraintAll {
pub composite: ConstraintComposite,
}
#[pymethods]
impl PyConstraintAll {
#[new]
fn new(constraints: Vec<Py<PyAny>>) -> PyResult<Self> {
Python::attach(|py| {
let mut rust_constraints: Vec<Box<dyn AccessConstraint>> = Vec::new();
for py_obj in constraints {
if let Ok(c) = py_obj.extract::<PyRef<PyElevationConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyElevationMaskConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyOffNadirConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyLocalTimeConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyLookDirectionConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyAscDscConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else {
return Err(exceptions::PyTypeError::new_err(
"All constraints must be valid constraint objects"
));
}
}
Ok(PyConstraintAll {
composite: ConstraintComposite::All(rust_constraints),
})
})
}
fn name(&self) -> &str {
self.composite.name()
}
fn evaluate(
&self,
_py: Python,
epoch: &PyEpoch,
sat_state_ecef: &Bound<'_, PyAny>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
let sat_state = pyany_to_svector::<6>(sat_state_ecef)?;
let location = pyany_to_svector::<3>(location_ecef)?;
Ok(self.composite.evaluate(&epoch.obj, &sat_state, &location))
}
fn __str__(&self) -> String {
self.composite.format_string()
}
fn __repr__(&self) -> String {
format!("{:?}", self.composite)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "ConstraintAny")]
pub struct PyConstraintAny {
pub composite: ConstraintComposite,
}
#[pymethods]
impl PyConstraintAny {
#[new]
fn new(constraints: Vec<Py<PyAny>>) -> PyResult<Self> {
Python::attach(|py| {
let mut rust_constraints: Vec<Box<dyn AccessConstraint>> = Vec::new();
for py_obj in constraints {
if let Ok(c) = py_obj.extract::<PyRef<PyElevationConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyElevationMaskConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyOffNadirConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyLocalTimeConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyLookDirectionConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else if let Ok(c) = py_obj.extract::<PyRef<PyAscDscConstraint>>(py) {
rust_constraints.push(Box::new(c.constraint.clone()));
} else {
return Err(exceptions::PyTypeError::new_err(
"All constraints must be valid constraint objects"
));
}
}
Ok(PyConstraintAny {
composite: ConstraintComposite::Any(rust_constraints),
})
})
}
fn name(&self) -> &str {
self.composite.name()
}
fn evaluate(
&self,
_py: Python,
epoch: &PyEpoch,
sat_state_ecef: &Bound<'_, PyAny>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
let sat_state = pyany_to_svector::<6>(sat_state_ecef)?;
let location = pyany_to_svector::<3>(location_ecef)?;
Ok(self.composite.evaluate(&epoch.obj, &sat_state, &location))
}
fn __str__(&self) -> String {
self.composite.format_string()
}
fn __repr__(&self) -> String {
format!("{:?}", self.composite)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "ConstraintNot")]
pub struct PyConstraintNot {
pub composite: ConstraintComposite,
}
#[pymethods]
impl PyConstraintNot {
#[new]
fn new(constraint: Py<PyAny>) -> PyResult<Self> {
Python::attach(|py| {
let rust_constraint: Box<dyn AccessConstraint> =
if let Ok(c) = constraint.extract::<PyRef<PyElevationConstraint>>(py) {
Box::new(c.constraint.clone())
} else if let Ok(c) = constraint.extract::<PyRef<PyElevationMaskConstraint>>(py) {
Box::new(c.constraint.clone())
} else if let Ok(c) = constraint.extract::<PyRef<PyOffNadirConstraint>>(py) {
Box::new(c.constraint.clone())
} else if let Ok(c) = constraint.extract::<PyRef<PyLocalTimeConstraint>>(py) {
Box::new(c.constraint.clone())
} else if let Ok(c) = constraint.extract::<PyRef<PyLookDirectionConstraint>>(py) {
Box::new(c.constraint.clone())
} else if let Ok(c) = constraint.extract::<PyRef<PyAscDscConstraint>>(py) {
Box::new(c.constraint.clone())
} else {
return Err(exceptions::PyTypeError::new_err(
"Constraint must be a valid constraint object"
));
};
Ok(PyConstraintNot {
composite: ConstraintComposite::Not(rust_constraint),
})
})
}
fn name(&self) -> &str {
self.composite.name()
}
fn evaluate(
&self,
_py: Python,
epoch: &PyEpoch,
sat_state_ecef: &Bound<'_, PyAny>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
let sat_state = pyany_to_svector::<6>(sat_state_ecef)?;
let location = pyany_to_svector::<3>(location_ecef)?;
Ok(self.composite.evaluate(&epoch.obj, &sat_state, &location))
}
fn __str__(&self) -> String {
self.composite.format_string()
}
fn __repr__(&self) -> String {
format!("{:?}", self.composite)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "PointLocation")]
pub struct PyPointLocation {
pub location: PointLocation,
}
#[pymethods]
impl PyPointLocation {
#[new]
#[pyo3(signature = (lon, lat, alt=0.0))]
fn new(lon: f64, lat: f64, alt: f64) -> Self {
PyPointLocation {
location: PointLocation::new(lon, lat, alt),
}
}
#[classmethod]
fn from_geojson(_cls: &Bound<'_, PyType>, geojson: &Bound<'_, PyAny>) -> PyResult<Self> {
let json_str = Python::attach(|py| -> PyResult<String> {
let json_module = py.import("json")?;
let dumps = json_module.getattr("dumps")?;
let json_str = dumps.call1((geojson,))?;
json_str.extract()
})?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid JSON: {}", e)))?;
PointLocation::from_geojson(&json_value)
.map(|location| PyPointLocation { location })
.map_err(|e| exceptions::PyValueError::new_err(e.to_string()))
}
#[getter]
fn lon(&self) -> f64 {
self.location.lon()
}
#[getter]
fn lat(&self) -> f64 {
self.location.lat()
}
#[getter]
fn alt(&self) -> f64 {
self.location.alt()
}
fn longitude(&self, angle_format: &PyAngleFormat) -> f64 {
self.location.longitude(angle_format.value)
}
fn latitude(&self, angle_format: &PyAngleFormat) -> f64 {
self.location.latitude(angle_format.value)
}
fn altitude(&self) -> f64 {
self.location.altitude()
}
fn center_geodetic(&self, py: Python) -> Py<PyAny> {
let center = self.location.center_geodetic();
vec![center.x, center.y, center.z].into_pyarray(py).into_any().unbind()
}
fn center_ecef(&self, py: Python) -> Py<PyAny> {
let ecef = self.location.center_ecef();
vec![ecef.x, ecef.y, ecef.z].into_pyarray(py).into_any().unbind()
}
#[getter]
fn properties(slf: Bound<'_, Self>) -> PyResult<Py<PyPropertiesDict>> {
let py = slf.py();
let parent: Py<PyAny> = slf.clone().into_any().unbind();
Py::new(py, PyPropertiesDict::new(parent))
}
#[pyo3(name = "_get_properties_dict")]
fn get_properties_dict(&self, py: Python) -> PyResult<Py<PyAny>> {
let props = self.location.properties();
let py_dict = PyDict::new(py);
for (key, value) in props.iter() {
let py_value = serde_json::to_string(value)
.map_err(|e| exceptions::PyValueError::new_err(format!("JSON error: {}", e)))?;
let json_module = py.import("json")?;
let loads = json_module.getattr("loads")?;
let py_val = loads.call1((py_value,))?;
py_dict.set_item(key, py_val)?;
}
Ok(py_dict.into())
}
#[pyo3(name = "_set_property")]
fn set_property(&mut self, key: String, json_str: String) -> PyResult<()> {
let value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("JSON error: {}", e)))?;
self.location.properties_mut().insert(key, value);
Ok(())
}
#[pyo3(name = "_remove_property")]
fn remove_property(&mut self, key: String) -> PyResult<()> {
self.location.properties_mut().remove(&key)
.ok_or_else(|| exceptions::PyKeyError::new_err(format!("Key '{}' not found", key)))?;
Ok(())
}
fn to_geojson(&self, py: Python) -> PyResult<Py<PyAny>> {
let geojson = self.location.to_geojson();
let json_str = serde_json::to_string(&geojson)
.map_err(|e| exceptions::PyValueError::new_err(format!("JSON error: {}", e)))?;
let json_module = py.import("json")?;
let loads = json_module.getattr("loads")?;
let result = loads.call1((json_str,))?;
Ok(result.into())
}
fn add_property<'py>(mut slf: PyRefMut<'py, Self>, key: String, value: &Bound<'py, PyAny>) -> PyResult<PyRefMut<'py, Self>> {
let json_str = Python::attach(|py| -> PyResult<String> {
let json_module = py.import("json")?;
let dumps = json_module.getattr("dumps")?;
let json_str = dumps.call1((value,))?;
json_str.extract()
})?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid JSON: {}", e)))?;
slf.location = slf.location.clone().add_property(&key, json_value);
Ok(slf)
}
fn with_name(mut slf: PyRefMut<'_, Self>, name: String) -> PyRefMut<'_, Self> {
slf.location = slf.location.clone().with_name(&name);
slf
}
fn with_id(mut slf: PyRefMut<'_, Self>, id: u64) -> PyRefMut<'_, Self> {
slf.location = slf.location.clone().with_id(id);
slf
}
fn with_uuid(mut slf: PyRefMut<'_, Self>, uuid_str: String) -> PyResult<PyRefMut<'_, Self>> {
let uuid = uuid::Uuid::parse_str(&uuid_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?;
slf.location = slf.location.clone().with_uuid(uuid);
Ok(slf)
}
fn with_new_uuid(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.location = slf.location.clone().with_new_uuid();
slf
}
fn get_name(&self) -> Option<String> {
self.location.get_name().map(|s| s.to_string())
}
fn get_id(&self) -> Option<u64> {
self.location.get_id()
}
fn get_uuid(&self) -> Option<String> {
self.location.get_uuid().map(|u| u.to_string())
}
fn set_name(&mut self, name: Option<String>) {
self.location.set_name(name.as_deref());
}
fn set_id(&mut self, id: Option<u64>) {
self.location.set_id(id);
}
fn generate_uuid(&mut self) {
self.location.generate_uuid();
}
fn __str__(&self) -> String {
if let Some(name) = self.location.get_name() {
format!("PointLocation({}, lon={:.4}°, lat={:.4}°, alt={:.1}m)",
name, self.location.lon(), self.location.lat(), self.location.alt())
} else {
format!("PointLocation(lon={:.4}°, lat={:.4}°, alt={:.1}m)",
self.location.lon(), self.location.lat(), self.location.alt())
}
}
fn __repr__(&self) -> String {
self.__str__()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "PolygonLocation")]
pub struct PyPolygonLocation {
pub location: PolygonLocation,
}
#[pymethods]
impl PyPolygonLocation {
#[new]
fn new(vertices: Vec<Vec<f64>>) -> PyResult<Self> {
let mut vec3_vertices = Vec::new();
for vertex in vertices {
if vertex.len() != 3 {
return Err(exceptions::PyValueError::new_err(
"Each vertex must have [lon, lat, alt]",
));
}
vec3_vertices.push(Vector3::new(vertex[0], vertex[1], vertex[2]));
}
PolygonLocation::new(vec3_vertices)
.map(|location| PyPolygonLocation { location })
.map_err(|e| exceptions::PyValueError::new_err(e.to_string()))
}
#[classmethod]
fn from_geojson(_cls: &Bound<'_, PyType>, geojson: &Bound<'_, PyAny>) -> PyResult<Self> {
let json_str = Python::attach(|py| -> PyResult<String> {
let json_module = py.import("json")?;
let dumps = json_module.getattr("dumps")?;
let json_str = dumps.call1((geojson,))?;
json_str.extract()
})?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid JSON: {}", e)))?;
PolygonLocation::from_geojson(&json_value)
.map(|location| PyPolygonLocation { location })
.map_err(|e| exceptions::PyValueError::new_err(e.to_string()))
}
#[getter]
fn vertices(&self, py: Python) -> Py<PyAny> {
let verts = self.location.vertices();
let n = verts.len();
let mut data = Vec::new();
for v in verts {
data.extend_from_slice(&[v.x, v.y, v.z]);
}
data.into_pyarray(py).reshape([n, 3]).unwrap().into_any().unbind()
}
#[getter]
fn num_vertices(&self) -> usize {
self.location.num_vertices()
}
#[getter]
fn lon(&self) -> f64 {
self.location.lon()
}
#[getter]
fn lat(&self) -> f64 {
self.location.lat()
}
#[getter]
fn alt(&self) -> f64 {
self.location.alt()
}
fn longitude(&self, angle_format: &PyAngleFormat) -> f64 {
self.location.longitude(angle_format.value)
}
fn latitude(&self, angle_format: &PyAngleFormat) -> f64 {
self.location.latitude(angle_format.value)
}
fn altitude(&self) -> f64 {
self.location.altitude()
}
fn center_geodetic(&self, py: Python) -> Py<PyAny> {
let center = self.location.center_geodetic();
vec![center.x, center.y, center.z].into_pyarray(py).into_any().unbind()
}
fn center_ecef(&self, py: Python) -> Py<PyAny> {
let ecef = self.location.center_ecef();
vec![ecef.x, ecef.y, ecef.z].into_pyarray(py).into_any().unbind()
}
#[getter]
fn properties(slf: Bound<'_, Self>) -> PyResult<Py<PyPropertiesDict>> {
let py = slf.py();
let parent: Py<PyAny> = slf.clone().into_any().unbind();
Py::new(py, PyPropertiesDict::new(parent))
}
#[pyo3(name = "_get_properties_dict")]
fn get_properties_dict(&self, py: Python) -> PyResult<Py<PyAny>> {
let props = self.location.properties();
let py_dict = PyDict::new(py);
for (key, value) in props.iter() {
let py_value = serde_json::to_string(value)
.map_err(|e| exceptions::PyValueError::new_err(format!("JSON error: {}", e)))?;
let json_module = py.import("json")?;
let loads = json_module.getattr("loads")?;
let py_val = loads.call1((py_value,))?;
py_dict.set_item(key, py_val)?;
}
Ok(py_dict.into())
}
#[pyo3(name = "_set_property")]
fn set_property(&mut self, key: String, json_str: String) -> PyResult<()> {
let value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("JSON error: {}", e)))?;
self.location.properties_mut().insert(key, value);
Ok(())
}
#[pyo3(name = "_remove_property")]
fn remove_property(&mut self, key: String) -> PyResult<()> {
self.location.properties_mut().remove(&key)
.ok_or_else(|| exceptions::PyKeyError::new_err(format!("Key '{}' not found", key)))?;
Ok(())
}
fn to_geojson(&self, py: Python) -> PyResult<Py<PyAny>> {
let geojson = self.location.to_geojson();
let json_str = serde_json::to_string(&geojson)
.map_err(|e| exceptions::PyValueError::new_err(format!("JSON error: {}", e)))?;
let json_module = py.import("json")?;
let loads = json_module.getattr("loads")?;
let result = loads.call1((json_str,))?;
Ok(result.into())
}
fn add_property<'py>(mut slf: PyRefMut<'py, Self>, key: String, value: &Bound<'py, PyAny>) -> PyResult<PyRefMut<'py, Self>> {
let json_str = Python::attach(|py| -> PyResult<String> {
let json_module = py.import("json")?;
let dumps = json_module.getattr("dumps")?;
let json_str = dumps.call1((value,))?;
json_str.extract()
})?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid JSON: {}", e)))?;
slf.location = slf.location.clone().add_property(&key, json_value);
Ok(slf)
}
fn with_name(mut slf: PyRefMut<'_, Self>, name: String) -> PyRefMut<'_, Self> {
slf.location = slf.location.clone().with_name(&name);
slf
}
fn with_id(mut slf: PyRefMut<'_, Self>, id: u64) -> PyRefMut<'_, Self> {
slf.location = slf.location.clone().with_id(id);
slf
}
fn with_uuid(mut slf: PyRefMut<'_, Self>, uuid_str: String) -> PyResult<PyRefMut<'_, Self>> {
let uuid = uuid::Uuid::parse_str(&uuid_str)
.map_err(|e| exceptions::PyValueError::new_err(format!("Invalid UUID: {}", e)))?;
slf.location = slf.location.clone().with_uuid(uuid);
Ok(slf)
}
fn with_new_uuid(mut slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> {
slf.location = slf.location.clone().with_new_uuid();
slf
}
fn get_name(&self) -> Option<String> {
self.location.get_name().map(|s| s.to_string())
}
fn get_id(&self) -> Option<u64> {
self.location.get_id()
}
fn get_uuid(&self) -> Option<String> {
self.location.get_uuid().map(|u| u.to_string())
}
fn set_name(&mut self, name: Option<String>) {
self.location.set_name(name.as_deref());
}
fn set_id(&mut self, id: Option<u64>) {
self.location.set_id(id);
}
fn generate_uuid(&mut self) {
self.location.generate_uuid();
}
fn __str__(&self) -> String {
if let Some(name) = self.location.get_name() {
format!("PolygonLocation({}, {} vertices, center=[{:.4}°, {:.4}°, {:.1}m])",
name, self.location.num_vertices(),
self.location.lon(), self.location.lat(), self.location.alt())
} else {
format!("PolygonLocation({} vertices, center=[{:.4}°, {:.4}°, {:.1}m])",
self.location.num_vertices(),
self.location.lon(), self.location.lat(), self.location.alt())
}
}
fn __repr__(&self) -> String {
self.__str__()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "PropertyValue")]
pub struct PyPropertyValue {
pub(crate) value: PropertyValue,
}
#[pymethods]
impl PyPropertyValue {
#[new]
fn new(value: &Bound<'_, PyAny>) -> PyResult<Self> {
let property_value = if let Ok(v) = value.extract::<bool>() {
PropertyValue::Boolean(v)
} else if let Ok(v) = value.extract::<f64>() {
PropertyValue::Scalar(v)
} else if let Ok(v) = value.extract::<Vec<f64>>() {
PropertyValue::Vector(v)
} else if let Ok(v) = value.extract::<String>() {
PropertyValue::String(v)
} else if let Ok(dict) = value.cast::<PyDict>() {
let json_module = value.py().import("json")?;
let dumps = json_module.getattr("dumps")?;
let json_str: String = dumps.call1((dict,))?.extract()?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| PyErr::new::<exceptions::PyValueError, _>(format!("JSON error: {}", e)))?;
PropertyValue::Json(json_value)
} else {
return Err(PyErr::new::<exceptions::PyTypeError, _>(
"PropertyValue must be a number, list, bool, str, or dict"
));
};
Ok(Self { value: property_value })
}
fn to_python(&self, py: Python) -> PyResult<Py<PyAny>> {
match &self.value {
PropertyValue::Scalar(v) => {
let float_obj = PyFloat::new(py, *v);
Ok(float_obj.unbind().into())
}
PropertyValue::Vector(v) => {
let list = PyList::new(py, v.iter())?;
Ok(list.unbind().into())
}
PropertyValue::TimeSeries { times, values } => {
let dict = PyDict::new(py);
dict.set_item("times", times)?;
dict.set_item("values", values)?;
Ok(dict.into())
}
PropertyValue::Boolean(v) => {
let builtins = py.import("builtins")?;
let bool_func = builtins.getattr("bool")?;
Ok(bool_func.call1((*v,))?.into())
}
PropertyValue::String(v) => {
let str_obj = PyString::new(py, v);
Ok(str_obj.unbind().into())
}
PropertyValue::Json(v) => {
let json_module = py.import("json")?;
let loads = json_module.getattr("loads")?;
let json_str = serde_json::to_string(v)
.map_err(|e| PyErr::new::<exceptions::PyValueError, _>(format!("JSON error: {}", e)))?;
Ok(loads.call1((json_str,))?.into())
}
}
}
fn __repr__(&self, py: Python) -> PyResult<String> {
let value_str = self.to_python(py)?;
Ok(format!("PropertyValue({})", value_str))
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AccessWindow")]
pub struct PyAccessWindow {
pub(crate) window: AccessWindow,
}
#[pymethods]
impl PyAccessWindow {
#[new]
fn new(window_open: &PyEpoch, window_close: &PyEpoch) -> Self {
Self {
window: AccessWindow {
window_open: window_open.obj,
window_close: window_close.obj,
location_name: None,
location_id: None,
location_uuid: None,
satellite_name: None,
satellite_id: None,
satellite_uuid: None,
name: None,
id: None,
uuid: None,
properties: AccessProperties {
azimuth_open: 0.0,
azimuth_close: 0.0,
elevation_min: 0.0,
elevation_max: 0.0,
elevation_open: 0.0,
elevation_close: 0.0,
off_nadir_min: 0.0,
off_nadir_max: 0.0,
local_time: 0.0,
look_direction: LookDirection::Either,
asc_dsc: AscDsc::Either,
center_lon: 0.0,
center_lat: 0.0,
center_alt: 0.0,
center_ecef: [0.0, 0.0, 0.0],
additional: std::collections::HashMap::new(),
},
},
}
}
#[getter]
fn window_open(&self) -> PyEpoch {
PyEpoch { obj: self.window.window_open }
}
#[getter]
fn window_close(&self) -> PyEpoch {
PyEpoch { obj: self.window.window_close }
}
#[getter]
fn start(&self) -> PyEpoch {
PyEpoch { obj: self.window.start() }
}
#[getter]
fn end(&self) -> PyEpoch {
PyEpoch { obj: self.window.end() }
}
#[getter]
fn t_start(&self) -> PyEpoch {
PyEpoch { obj: self.window.t_start() }
}
#[getter]
fn t_end(&self) -> PyEpoch {
PyEpoch { obj: self.window.t_end() }
}
#[getter]
fn midtime(&self) -> PyEpoch {
PyEpoch { obj: self.window.midtime() }
}
#[getter]
fn duration(&self) -> f64 {
self.window.duration()
}
#[getter]
fn name(&self) -> Option<String> {
self.window.name.clone()
}
#[getter]
fn id(&self) -> Option<u64> {
self.window.id
}
#[getter]
fn uuid(&self) -> Option<String> {
self.window.uuid.map(|u| u.to_string())
}
#[getter]
fn location_name(&self) -> Option<String> {
self.window.location_name.clone()
}
#[getter]
fn location_id(&self) -> Option<u64> {
self.window.location_id
}
#[getter]
fn satellite_name(&self) -> Option<String> {
self.window.satellite_name.clone()
}
#[getter]
fn satellite_id(&self) -> Option<u64> {
self.window.satellite_id
}
#[getter]
fn location_uuid(&self) -> Option<String> {
self.window.location_uuid.map(|u| u.to_string())
}
#[getter]
fn satellite_uuid(&self) -> Option<String> {
self.window.satellite_uuid.map(|u| u.to_string())
}
#[getter]
fn properties(slf: Bound<'_, Self>) -> PyResult<Py<PyAccessPropertiesView>> {
let py = slf.py();
let view = PyAccessPropertiesView { parent: slf.into_any().unbind() };
Py::new(py, view)
}
#[setter]
fn set_properties(&mut self, value: PyAccessProperties) {
self.window.properties = value.properties;
}
#[getter]
fn azimuth_open(&self) -> f64 {
self.window.properties.azimuth_open
}
#[getter]
fn azimuth_close(&self) -> f64 {
self.window.properties.azimuth_close
}
#[getter]
fn elevation_min(&self) -> f64 {
self.window.properties.elevation_min
}
#[getter]
fn elevation_max(&self) -> f64 {
self.window.properties.elevation_max
}
#[getter]
fn elevation_open(&self) -> f64 {
self.window.properties.elevation_open
}
#[getter]
fn elevation_close(&self) -> f64 {
self.window.properties.elevation_close
}
#[getter]
fn off_nadir_min(&self) -> f64 {
self.window.properties.off_nadir_min
}
#[getter]
fn off_nadir_max(&self) -> f64 {
self.window.properties.off_nadir_max
}
#[getter]
fn local_time(&self) -> f64 {
self.window.properties.local_time
}
#[getter]
fn look_direction(&self) -> PyLookDirection {
PyLookDirection { value: self.window.properties.look_direction }
}
#[getter]
fn asc_dsc(&self) -> PyAscDsc {
PyAscDsc { value: self.window.properties.asc_dsc }
}
#[getter]
fn center_lon(&self) -> f64 {
self.window.properties.center_lon
}
#[getter]
fn center_lat(&self) -> f64 {
self.window.properties.center_lat
}
#[getter]
fn center_alt(&self) -> f64 {
self.window.properties.center_alt
}
#[getter]
fn center_ecef(&self) -> [f64; 3] {
self.window.properties.center_ecef
}
fn __repr__(&self) -> String {
format!(
"AccessWindow(start={}, end={}, duration={:.2}s)",
self.window.start(),
self.window.end(),
self.window.duration()
)
}
fn _get_properties_azimuth_open(&self) -> f64 {
self.window.properties.azimuth_open
}
fn _get_properties_azimuth_close(&self) -> f64 {
self.window.properties.azimuth_close
}
fn _get_properties_elevation_min(&self) -> f64 {
self.window.properties.elevation_min
}
fn _get_properties_elevation_max(&self) -> f64 {
self.window.properties.elevation_max
}
fn _get_properties_elevation_open(&self) -> f64 {
self.window.properties.elevation_open
}
fn _get_properties_elevation_close(&self) -> f64 {
self.window.properties.elevation_close
}
fn _get_properties_off_nadir_min(&self) -> f64 {
self.window.properties.off_nadir_min
}
fn _get_properties_off_nadir_max(&self) -> f64 {
self.window.properties.off_nadir_max
}
fn _get_properties_local_time(&self) -> f64 {
self.window.properties.local_time
}
fn _get_properties_look_direction(&self) -> PyLookDirection {
PyLookDirection { value: self.window.properties.look_direction }
}
fn _get_properties_asc_dsc(&self) -> PyAscDsc {
PyAscDsc { value: self.window.properties.asc_dsc }
}
fn _get_properties_center_lon(&self) -> f64 {
self.window.properties.center_lon
}
fn _get_properties_center_lat(&self) -> f64 {
self.window.properties.center_lat
}
fn _get_properties_center_alt(&self) -> f64 {
self.window.properties.center_alt
}
fn _get_properties_center_ecef(&self) -> [f64; 3] {
self.window.properties.center_ecef
}
fn _get_additional_properties_dict(&self, py: Python) -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
for (key, value) in &self.window.properties.additional {
let py_value: Py<PyAny> = match value {
PropertyValue::Scalar(v) => {
let float_obj = PyFloat::new(py, *v);
float_obj.unbind().into()
}
PropertyValue::Vector(v) => {
let list = PyList::new(py, v.iter())?;
list.unbind().into()
}
PropertyValue::TimeSeries { times, values } => {
let ts_dict = PyDict::new(py);
ts_dict.set_item("times", times)?;
ts_dict.set_item("values", values)?;
ts_dict.into()
}
PropertyValue::Boolean(v) => {
let builtins = py.import("builtins")?;
let bool_func = builtins.getattr("bool")?;
bool_func.call1((*v,))?.into()
}
PropertyValue::String(v) => {
let str_obj = PyString::new(py, v);
str_obj.unbind().into()
}
PropertyValue::Json(v) => {
let json_module = py.import("json")?;
let loads = json_module.getattr("loads")?;
let json_str = serde_json::to_string(v)
.map_err(|e| PyErr::new::<exceptions::PyValueError, _>(format!("JSON error: {}", e)))?;
loads.call1((json_str,))?.into()
}
};
dict.set_item(key, py_value)?;
}
Ok(dict.into())
}
fn _set_additional_property(&mut self, key: String, value: &Bound<'_, PyAny>) -> PyResult<()> {
let property_value = if let Ok(v) = value.extract::<bool>() {
PropertyValue::Boolean(v)
} else if let Ok(v) = value.extract::<f64>() {
PropertyValue::Scalar(v)
} else if let Ok(v) = value.extract::<Vec<f64>>() {
PropertyValue::Vector(v)
} else if value.hasattr("tolist")? {
let as_list = value.call_method0("tolist")?;
if let Ok(v) = as_list.extract::<Vec<f64>>() {
PropertyValue::Vector(v)
} else {
return Err(PyErr::new::<exceptions::PyTypeError, _>(
"Numpy array must contain numeric values"
));
}
} else if let Ok(v) = value.extract::<String>() {
PropertyValue::String(v)
} else if let Ok(dict) = value.cast::<PyDict>() {
let json_module = value.py().import("json")?;
let dumps = json_module.getattr("dumps")?;
let json_str: String = dumps.call1((dict,))?.extract()?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| PyErr::new::<exceptions::PyValueError, _>(format!("JSON error: {}", e)))?;
PropertyValue::Json(json_value)
} else {
return Err(PyErr::new::<exceptions::PyTypeError, _>(
"Property value must be a number, list, numpy array, bool, str, or dict"
));
};
self.window.properties.additional.insert(key, property_value);
Ok(())
}
fn _remove_additional_property(&mut self, key: String) -> PyResult<()> {
self.window.properties.additional.remove(&key)
.ok_or_else(|| exceptions::PyKeyError::new_err(format!("Key '{}' not found", key)))?;
Ok(())
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AccessPropertiesView")]
pub struct PyAccessPropertiesView {
parent: Py<PyAny>,
}
#[pymethods]
impl PyAccessPropertiesView {
#[getter]
fn azimuth_open(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_azimuth_open")?.extract()
}
#[getter]
fn azimuth_close(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_azimuth_close")?.extract()
}
#[getter]
fn elevation_min(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_elevation_min")?.extract()
}
#[getter]
fn elevation_max(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_elevation_max")?.extract()
}
#[getter]
fn elevation_open(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_elevation_open")?.extract()
}
#[getter]
fn elevation_close(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_elevation_close")?.extract()
}
#[getter]
fn off_nadir_min(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_off_nadir_min")?.extract()
}
#[getter]
fn off_nadir_max(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_off_nadir_max")?.extract()
}
#[getter]
fn local_time(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_local_time")?.extract()
}
#[getter]
fn look_direction(&self, py: Python) -> PyResult<Py<PyAny>> {
let parent = self.parent.bind(py);
Ok(parent.call_method0("_get_properties_look_direction")?.unbind())
}
#[getter]
fn asc_dsc(&self, py: Python) -> PyResult<Py<PyAny>> {
let parent = self.parent.bind(py);
Ok(parent.call_method0("_get_properties_asc_dsc")?.unbind())
}
#[getter]
fn center_lon(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_center_lon")?.extract()
}
#[getter]
fn center_lat(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_center_lat")?.extract()
}
#[getter]
fn center_alt(&self, py: Python) -> PyResult<f64> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_center_alt")?.extract()
}
#[getter]
fn center_ecef(&self, py: Python) -> PyResult<[f64; 3]> {
let parent = self.parent.bind(py);
parent.call_method0("_get_properties_center_ecef")?.extract()
}
#[getter]
fn additional(slf: Bound<'_, Self>) -> PyResult<Py<PyAdditionalPropertiesDict>> {
let py = slf.py();
let dict = PyAdditionalPropertiesDict::new(slf.into_any().unbind());
Py::new(py, dict)
}
fn __getitem__(&self, key: String, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self._get_additional_properties_dict(py)?;
let dict = props_dict.bind(py);
dict.get_item(&key)?
.ok_or_else(|| exceptions::PyKeyError::new_err(format!("Key '{}' not found", key)))
.map(|item| item.into())
}
fn __setitem__(&self, key: String, value: &Bound<'_, PyAny>, py: Python) -> PyResult<()> {
let parent = self.parent.bind(py);
parent.call_method1("_set_additional_property", (key, value))?;
Ok(())
}
fn _get_additional_properties_dict(&self, py: Python) -> PyResult<Py<PyDict>> {
let parent = self.parent.bind(py);
let result = parent.call_method0("_get_additional_properties_dict")?;
Ok(result.extract()?)
}
fn _set_additional_property(&self, key: String, value: &Bound<'_, PyAny>, py: Python) -> PyResult<()> {
let parent = self.parent.bind(py);
parent.call_method1("_set_additional_property", (key, value))?;
Ok(())
}
fn _remove_additional_property(&self, key: String, py: Python) -> PyResult<()> {
let parent = self.parent.bind(py);
parent.call_method1("_remove_additional_property", (key,))?;
Ok(())
}
fn __repr__(&self, py: Python) -> PyResult<String> {
let az_open: f64 = self.parent.bind(py).call_method0("_get_properties_azimuth_open")?.extract()?;
let az_close: f64 = self.parent.bind(py).call_method0("_get_properties_azimuth_close")?.extract()?;
let el_min: f64 = self.parent.bind(py).call_method0("_get_properties_elevation_min")?.extract()?;
let el_max: f64 = self.parent.bind(py).call_method0("_get_properties_elevation_max")?.extract()?;
let on_min: f64 = self.parent.bind(py).call_method0("_get_properties_off_nadir_min")?.extract()?;
let on_max: f64 = self.parent.bind(py).call_method0("_get_properties_off_nadir_max")?.extract()?;
Ok(format!(
"AccessPropertiesView(az=[{:.1}°, {:.1}°], el=[{:.1}°, {:.1}°], off_nadir=[{:.1}°, {:.1}°])",
az_open, az_close, el_min, el_max, on_min, on_max
))
}
}
#[derive(Clone)]
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "AccessProperties")]
pub struct PyAccessProperties {
pub(crate) properties: AccessProperties,
}
#[pymethods]
impl PyAccessProperties {
#[new]
#[allow(clippy::too_many_arguments)]
fn new(
azimuth_open: f64,
azimuth_close: f64,
elevation_min: f64,
elevation_max: f64,
elevation_open: f64,
elevation_close: f64,
off_nadir_min: f64,
off_nadir_max: f64,
local_time: f64,
look_direction: &PyLookDirection,
asc_dsc: &PyAscDsc,
center_lon: f64,
center_lat: f64,
center_alt: f64,
center_ecef: [f64; 3],
) -> Self {
Self {
properties: AccessProperties {
azimuth_open,
azimuth_close,
elevation_min,
elevation_max,
elevation_open,
elevation_close,
off_nadir_min,
off_nadir_max,
local_time,
look_direction: look_direction.value,
asc_dsc: asc_dsc.value,
center_lon,
center_lat,
center_alt,
center_ecef,
additional: std::collections::HashMap::new(),
},
}
}
#[getter]
fn azimuth_open(&self) -> f64 {
self.properties.azimuth_open
}
#[getter]
fn azimuth_close(&self) -> f64 {
self.properties.azimuth_close
}
#[getter]
fn elevation_min(&self) -> f64 {
self.properties.elevation_min
}
#[getter]
fn elevation_max(&self) -> f64 {
self.properties.elevation_max
}
#[getter]
fn elevation_open(&self) -> f64 {
self.properties.elevation_open
}
#[getter]
fn elevation_close(&self) -> f64 {
self.properties.elevation_close
}
#[getter]
fn off_nadir_min(&self) -> f64 {
self.properties.off_nadir_min
}
#[getter]
fn off_nadir_max(&self) -> f64 {
self.properties.off_nadir_max
}
#[getter]
fn local_time(&self) -> f64 {
self.properties.local_time
}
#[getter]
fn look_direction(&self) -> PyLookDirection {
PyLookDirection { value: self.properties.look_direction }
}
#[getter]
fn asc_dsc(&self) -> PyAscDsc {
PyAscDsc { value: self.properties.asc_dsc }
}
#[getter]
fn center_lon(&self) -> f64 {
self.properties.center_lon
}
#[getter]
fn center_lat(&self) -> f64 {
self.properties.center_lat
}
#[getter]
fn center_alt(&self) -> f64 {
self.properties.center_alt
}
#[getter]
fn center_ecef(&self) -> [f64; 3] {
self.properties.center_ecef
}
#[getter]
fn additional(slf: Bound<'_, Self>) -> PyResult<Py<PyAdditionalPropertiesDict>> {
let py = slf.py();
let dict = PyAdditionalPropertiesDict::new(slf.into_any().unbind());
Py::new(py, dict)
}
fn _get_additional_properties_dict(&self, py: Python) -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
for (key, value) in &self.properties.additional {
let py_value: Py<PyAny> = match value {
PropertyValue::Scalar(v) => {
let float_obj = PyFloat::new(py, *v);
float_obj.unbind().into()
}
PropertyValue::Vector(v) => {
let list = PyList::new(py, v.iter())?;
list.unbind().into()
}
PropertyValue::TimeSeries { times, values } => {
let ts_dict = PyDict::new(py);
ts_dict.set_item("times", times)?;
ts_dict.set_item("values", values)?;
ts_dict.into()
}
PropertyValue::Boolean(v) => {
let builtins = py.import("builtins")?;
let bool_func = builtins.getattr("bool")?;
bool_func.call1((*v,))?.into()
}
PropertyValue::String(v) => {
let str_obj = PyString::new(py, v);
str_obj.unbind().into()
}
PropertyValue::Json(v) => {
let json_module = py.import("json")?;
let loads = json_module.getattr("loads")?;
let json_str = serde_json::to_string(v)
.map_err(|e| PyErr::new::<exceptions::PyValueError, _>(format!("JSON error: {}", e)))?;
loads.call1((json_str,))?.into()
}
};
dict.set_item(key, py_value)?;
}
Ok(dict.into())
}
fn _set_additional_property(&mut self, key: String, value: &Bound<'_, PyAny>) -> PyResult<()> {
let property_value = if let Ok(v) = value.extract::<bool>() {
PropertyValue::Boolean(v)
} else if let Ok(v) = value.extract::<f64>() {
PropertyValue::Scalar(v)
} else if let Ok(v) = value.extract::<Vec<f64>>() {
PropertyValue::Vector(v)
} else if value.hasattr("tolist")? {
let as_list = value.call_method0("tolist")?;
if let Ok(v) = as_list.extract::<Vec<f64>>() {
PropertyValue::Vector(v)
} else {
return Err(PyErr::new::<exceptions::PyTypeError, _>(
"Numpy array must contain numeric values"
));
}
} else if let Ok(v) = value.extract::<String>() {
PropertyValue::String(v)
} else if let Ok(dict) = value.cast::<PyDict>() {
let json_module = value.py().import("json")?;
let dumps = json_module.getattr("dumps")?;
let json_str: String = dumps.call1((dict,))?.extract()?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| PyErr::new::<exceptions::PyValueError, _>(format!("JSON error: {}", e)))?;
PropertyValue::Json(json_value)
} else {
return Err(PyErr::new::<exceptions::PyTypeError, _>(
"Property value must be a number, list, numpy array, bool, str, or dict"
));
};
self.properties.additional.insert(key, property_value);
Ok(())
}
fn _remove_additional_property(&mut self, key: String) -> PyResult<()> {
self.properties.additional.remove(&key)
.ok_or_else(|| exceptions::PyKeyError::new_err(format!("Key '{}' not found", key)))?;
Ok(())
}
fn __repr__(&self) -> String {
format!(
"AccessProperties(az=[{:.1}°, {:.1}°], el=[{:.1}°, {:.1}°], off_nadir=[{:.1}°, {:.1}°])",
self.properties.azimuth_open,
self.properties.azimuth_close,
self.properties.elevation_min,
self.properties.elevation_max,
self.properties.off_nadir_min,
self.properties.off_nadir_max
)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AdditionalPropertiesDict")]
pub struct PyAdditionalPropertiesDict {
parent: Py<PyAny>,
}
#[pymethods]
impl PyAdditionalPropertiesDict {
fn __getitem__(&self, key: String, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_additional_properties_dict(py)?;
props_dict
.get_item(&key)?
.ok_or_else(|| exceptions::PyKeyError::new_err(format!("Key '{}' not found", key)))
.map(|item| item.into())
}
fn __setitem__(&self, key: String, value: &Bound<'_, PyAny>, py: Python) -> PyResult<()> {
self.set_additional_property(py, key, value)?;
Ok(())
}
fn __delitem__(&self, key: String, py: Python) -> PyResult<()> {
self.remove_additional_property(py, key)
}
fn __len__(&self, py: Python) -> PyResult<usize> {
let props_dict = self.get_additional_properties_dict(py)?;
Ok(props_dict.len())
}
fn __contains__(&self, key: String, py: Python) -> PyResult<bool> {
let props_dict = self.get_additional_properties_dict(py)?;
props_dict.contains(&key)
}
fn __iter__(&self, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_additional_properties_dict(py)?;
Ok(props_dict.call_method0("__iter__")?.into())
}
fn __repr__(&self, py: Python) -> PyResult<String> {
let props_dict = self.get_additional_properties_dict(py)?;
Ok(format!("AdditionalPropertiesDict({})", props_dict.repr()?))
}
#[pyo3(signature = (key, default=None))]
fn get(&self, key: String, default: Option<Py<PyAny>>, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_additional_properties_dict(py)?;
match props_dict.get_item(&key)? {
Some(value) => Ok(value.into()),
None => Ok(default.unwrap_or_else(|| py.None())),
}
}
fn keys(&self, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_additional_properties_dict(py)?;
Ok(props_dict.call_method0("keys")?.into())
}
fn values(&self, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_additional_properties_dict(py)?;
Ok(props_dict.call_method0("values")?.into())
}
fn items(&self, py: Python) -> PyResult<Py<PyAny>> {
let props_dict = self.get_additional_properties_dict(py)?;
Ok(props_dict.call_method0("items")?.into())
}
fn clear(&self, py: Python) -> PyResult<()> {
let props_dict = self.get_additional_properties_dict(py)?;
let keys_view = props_dict.call_method0("keys")?;
let builtins = py.import("builtins")?;
let list_fn = builtins.getattr("list")?;
let keys_list = list_fn.call1((keys_view,))?;
let keys: Vec<String> = keys_list.extract()?;
for key in keys {
self.remove_additional_property(py, key)?;
}
Ok(())
}
fn update(&self, other: &Bound<'_, PyDict>, py: Python) -> PyResult<()> {
for (key, value) in other.iter() {
let key_str: String = key.extract()?;
self.__setitem__(key_str, &value, py)?;
}
Ok(())
}
}
impl PyAdditionalPropertiesDict {
fn new(parent: Py<PyAny>) -> Self {
Self { parent }
}
fn get_additional_properties_dict<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyDict>> {
let parent_obj = self.parent.bind(py);
let props_obj = parent_obj.call_method0("_get_additional_properties_dict")?;
props_obj.cast::<PyDict>().cloned().map_err(|e| e.into())
}
fn set_additional_property(&self, py: Python, key: String, value: &Bound<'_, PyAny>) -> PyResult<()> {
let parent_obj = self.parent.bind(py);
parent_obj.call_method1("_set_additional_property", (key, value))?;
Ok(())
}
fn remove_additional_property(&self, py: Python, key: String) -> PyResult<()> {
let parent_obj = self.parent.bind(py);
parent_obj.call_method1("_remove_additional_property", (key,))?;
Ok(())
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "SamplingConfig")]
#[derive(Clone)]
pub struct PySamplingConfig {
config: SamplingConfig,
}
#[pymethods]
impl PySamplingConfig {
#[new]
#[pyo3(signature = (relative_times=None, interval=None, offset=None, count=None))]
fn new(
relative_times: Option<Vec<f64>>,
interval: Option<f64>,
offset: Option<f64>,
count: Option<usize>,
) -> Self {
let config = if let Some(times) = relative_times {
SamplingConfig::RelativePoints(times)
} else if let (Some(int), Some(off)) = (interval, offset) {
SamplingConfig::FixedInterval { interval: int, offset: off }
} else if let Some(c) = count {
SamplingConfig::FixedCount(c)
} else {
SamplingConfig::Midpoint
};
PySamplingConfig { config }
}
#[staticmethod]
fn midpoint() -> Self {
PySamplingConfig {
config: SamplingConfig::Midpoint,
}
}
#[staticmethod]
fn relative_points(relative_times: Vec<f64>) -> Self {
PySamplingConfig {
config: SamplingConfig::RelativePoints(relative_times),
}
}
#[staticmethod]
fn fixed_interval(interval: f64, offset: f64) -> Self {
PySamplingConfig {
config: SamplingConfig::FixedInterval { interval, offset },
}
}
#[staticmethod]
fn fixed_count(count: usize) -> Self {
PySamplingConfig {
config: SamplingConfig::FixedCount(count),
}
}
fn __repr__(&self) -> String {
match &self.config {
SamplingConfig::Midpoint => "SamplingConfig.midpoint()".to_string(),
SamplingConfig::RelativePoints(times) => {
format!("SamplingConfig.relative_points({:?})", times)
}
SamplingConfig::FixedInterval { interval, offset } => {
format!(
"SamplingConfig.fixed_interval(interval={}, offset={})",
interval, offset
)
}
SamplingConfig::FixedCount(count) => {
format!("SamplingConfig.fixed_count({})", count)
}
}
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "DopplerComputer")]
pub struct PyDopplerComputer {
#[allow(dead_code)]
computer: DopplerComputer,
}
#[pymethods]
impl PyDopplerComputer {
#[new]
fn new(
uplink_frequency: Option<f64>,
downlink_frequency: Option<f64>,
sampling_config: &PySamplingConfig,
) -> Self {
PyDopplerComputer {
computer: DopplerComputer::new(
uplink_frequency,
downlink_frequency,
sampling_config.config.clone(),
),
}
}
fn __repr__(&self) -> String {
format!(
"DopplerComputer(uplink_frequency={:?}, downlink_frequency={:?})",
self.computer.uplink_frequency, self.computer.downlink_frequency
)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "RangeComputer")]
pub struct PyRangeComputer {
#[allow(dead_code)]
computer: RangeComputer,
}
#[pymethods]
impl PyRangeComputer {
#[new]
fn new(sampling_config: &PySamplingConfig) -> Self {
PyRangeComputer {
computer: RangeComputer::new(sampling_config.config.clone()),
}
}
fn __repr__(&self) -> String {
"RangeComputer()".to_string()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "RangeRateComputer")]
pub struct PyRangeRateComputer {
#[allow(dead_code)]
computer: RangeRateComputer,
}
#[pymethods]
impl PyRangeRateComputer {
#[new]
fn new(sampling_config: &PySamplingConfig) -> Self {
PyRangeRateComputer {
computer: RangeRateComputer::new(sampling_config.config.clone()),
}
}
fn __repr__(&self) -> String {
"RangeRateComputer()".to_string()
}
}
#[pyclass(module = "brahe._brahe", subclass)]
#[pyo3(name = "AccessPropertyComputer")]
pub struct PyAccessPropertyComputer {
}
#[pymethods]
impl PyAccessPropertyComputer {
#[new]
#[pyo3(signature = (*_args, **_kwargs))]
fn new(_args: &Bound<'_, PyTuple>, _kwargs: Option<&Bound<'_, PyDict>>) -> Self {
PyAccessPropertyComputer {}
}
fn sampling_config(&self) -> PyResult<PySamplingConfig> {
Err(exceptions::PyNotImplementedError::new_err(
"Subclasses must implement sampling_config() method",
))
}
#[allow(unused_variables)]
fn compute(
&self,
window: &PyAccessWindow,
sample_epochs: PyReadonlyArray1<f64>,
sample_states_ecef: PyReadonlyArray2<f64>,
location_ecef: &Bound<'_, PyAny>,
location_geodetic: &Bound<'_, PyAny>,
) -> PyResult<Py<PyDict>> {
Err(exceptions::PyNotImplementedError::new_err(
"Subclasses must implement compute() method",
))
}
fn property_names(&self) -> PyResult<Vec<String>> {
Err(exceptions::PyNotImplementedError::new_err(
"Subclasses must implement property_names() method",
))
}
}
enum PropertyComputerHolder {
RustNative(Box<dyn AccessPropertyComputer>),
PythonWrapper(RustAccessPropertyComputerWrapper),
}
impl AccessPropertyComputer for PropertyComputerHolder {
fn sampling_config(&self) -> SamplingConfig {
match self {
PropertyComputerHolder::RustNative(computer) => computer.sampling_config(),
PropertyComputerHolder::PythonWrapper(wrapper) => wrapper.sampling_config(),
}
}
fn compute(
&self,
window: &AccessWindow,
sample_epochs: &[f64],
sample_states_ecef: &[nalgebra::SVector<f64, 6>],
location_ecef: &nalgebra::Vector3<f64>,
location_geodetic: &nalgebra::Vector3<f64>,
) -> Result<HashMap<String, PropertyValue>, RustBraheError> {
match self {
PropertyComputerHolder::RustNative(computer) => {
computer.compute(window, sample_epochs, sample_states_ecef, location_ecef, location_geodetic)
}
PropertyComputerHolder::PythonWrapper(wrapper) => {
wrapper.compute(window, sample_epochs, sample_states_ecef, location_ecef, location_geodetic)
}
}
}
fn property_names(&self) -> Vec<String> {
match self {
PropertyComputerHolder::RustNative(computer) => computer.property_names(),
PropertyComputerHolder::PythonWrapper(wrapper) => wrapper.property_names(),
}
}
}
#[allow(dead_code)]
pub(crate) struct RustAccessPropertyComputerWrapper {
py_computer: Py<PyAny>,
}
#[allow(dead_code)]
impl RustAccessPropertyComputerWrapper {
pub fn new(py_computer: Py<PyAny>) -> Self {
RustAccessPropertyComputerWrapper { py_computer }
}
}
impl AccessPropertyComputer for RustAccessPropertyComputerWrapper {
fn sampling_config(&self) -> SamplingConfig {
Python::attach(|py| {
let py_obj = self.py_computer.bind(py);
py_obj
.call_method0("sampling_config")
.and_then(|result| {
let py_config: &Bound<'_, PyAny> = &result;
if let Ok(config) = py_config.extract::<PySamplingConfig>() {
Ok(config.config.clone())
} else {
Err(pyo3::PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"sampling_config() must return a SamplingConfig",
))
}
})
.unwrap_or(SamplingConfig::Midpoint)
})
}
fn compute(
&self,
window: &AccessWindow,
sample_epochs: &[f64],
sample_states_ecef: &[nalgebra::SVector<f64, 6>],
location_ecef: &nalgebra::Vector3<f64>,
location_geodetic: &nalgebra::Vector3<f64>,
) -> Result<HashMap<String, PropertyValue>, RustBraheError> {
Python::attach(|py| {
let epochs_array = sample_epochs.to_pyarray(py).to_owned();
let states_array = numpy::PyArray2::from_vec2(
py,
&sample_states_ecef
.iter()
.map(|state| state.as_slice().to_vec())
.collect::<Vec<_>>(),
)
.map_err(|e| RustBraheError::Error(format!("Failed to create states array: {}", e)))?
.to_owned();
let loc_array = location_ecef.as_slice().to_pyarray(py).to_owned();
let loc_geodetic_array = location_geodetic.as_slice().to_pyarray(py).to_owned();
let py_window = Py::new(
py,
PyAccessWindow {
window: window.clone(),
},
)
.map_err(|e| RustBraheError::Error(format!("Failed to create PyAccessWindow: {}", e)))?;
let py_obj = self.py_computer.bind(py);
let result_dict = py_obj
.call_method1(
"compute",
(py_window, epochs_array, states_array, loc_array, loc_geodetic_array),
)
.map_err(|e| {
RustBraheError::Error(format!("Python compute() method failed: {}", e))
})?;
let dict: &Bound<'_, PyDict> = result_dict
.cast()
.map_err(|e| RustBraheError::Error(format!("compute() must return a dict: {}", e)))?;
let mut props = HashMap::new();
for (key, value) in dict.iter() {
let key_str: String = key.extract().map_err(|e| {
RustBraheError::Error(format!("Property keys must be strings: {}", e))
})?;
let prop_value = python_value_to_property_value(&value)?;
props.insert(key_str, prop_value);
}
Ok(props)
})
}
fn property_names(&self) -> Vec<String> {
Python::attach(|py| {
let py_obj = self.py_computer.bind(py);
py_obj
.call_method0("property_names")
.and_then(|result| result.extract())
.unwrap_or_else(|_| Vec::new())
})
}
}
#[allow(dead_code)]
fn python_value_to_property_value(value: &Bound<'_, PyAny>) -> Result<PropertyValue, RustBraheError> {
if let Ok(b) = value.extract::<bool>() {
return Ok(PropertyValue::Boolean(b));
}
if let Ok(f) = value.extract::<f64>() {
return Ok(PropertyValue::Scalar(f));
}
if let Ok(s) = value.extract::<String>() {
return Ok(PropertyValue::String(s));
}
if let Ok(vec) = value.extract::<Vec<f64>>() {
return Ok(PropertyValue::Vector(vec));
}
if let Ok(dict) = value.cast::<PyDict>() {
let has_times = dict.contains("times").map_err(|e| {
RustBraheError::Error(format!("Failed to check for 'times' key: {}", e))
})?;
let has_values = dict.contains("values").map_err(|e| {
RustBraheError::Error(format!("Failed to check for 'values' key: {}", e))
})?;
if has_times && has_values {
let times: Vec<f64> = dict
.get_item("times")
.map_err(|e| RustBraheError::Error(format!("Failed to get 'times': {}", e)))?
.ok_or_else(|| RustBraheError::Error("Missing 'times' key".to_string()))?
.extract()
.map_err(|e| RustBraheError::Error(format!("Failed to extract 'times': {}", e)))?;
let values: Vec<f64> = dict
.get_item("values")
.map_err(|e| RustBraheError::Error(format!("Failed to get 'values': {}", e)))?
.ok_or_else(|| RustBraheError::Error("Missing 'values' key".to_string()))?
.extract()
.map_err(|e| RustBraheError::Error(format!("Failed to extract 'values': {}", e)))?;
return Ok(PropertyValue::TimeSeries { times, values });
}
let json_module = value.py().import("json").map_err(|e| {
RustBraheError::Error(format!("Failed to import json module: {}", e))
})?;
let dumps = json_module.getattr("dumps").map_err(|e| {
RustBraheError::Error(format!("Failed to get json.dumps: {}", e))
})?;
let json_str: String = dumps
.call1((value,))
.and_then(|s| s.extract::<String>())
.map_err(|e| RustBraheError::Error(format!("Failed to serialize to JSON: {}", e)))?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| RustBraheError::ParseError(format!("Invalid JSON: {}", e)))?;
return Ok(PropertyValue::Json(json_value));
}
Python::attach(|py| {
let json_module = py
.import("json")
.map_err(|e| RustBraheError::Error(format!("Failed to import json: {}", e)))?;
let dumps = json_module
.getattr("dumps")
.map_err(|e| RustBraheError::Error(format!("Failed to get json.dumps: {}", e)))?;
let json_str: String = dumps
.call1((value,))
.and_then(|s| s.extract::<String>())
.map_err(|e| RustBraheError::Error(format!("Failed to serialize value: {}", e)))?;
let json_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| RustBraheError::ParseError(format!("Invalid JSON: {}", e)))?;
Ok(PropertyValue::Json(json_value))
})
}
#[pyclass(module = "brahe._brahe", subclass)]
#[pyo3(name = "AccessConstraintComputer")]
pub struct PyAccessConstraintComputer {
}
#[pymethods]
impl PyAccessConstraintComputer {
#[new]
fn new() -> Self {
PyAccessConstraintComputer {}
}
#[allow(unused_variables)]
fn evaluate(
&self,
epoch: &PyEpoch,
satellite_state_ecef: PyReadonlyArray1<f64>,
location_ecef: &Bound<'_, PyAny>,
) -> PyResult<bool> {
Err(exceptions::PyNotImplementedError::new_err(
"Subclasses must implement evaluate() method",
))
}
fn name(&self) -> PyResult<String> {
Err(exceptions::PyNotImplementedError::new_err(
"Subclasses must implement name() method",
))
}
}
#[allow(dead_code)]
pub(crate) struct RustAccessConstraintComputerWrapper {
py_computer: Py<PyAny>,
}
#[allow(dead_code)]
impl RustAccessConstraintComputerWrapper {
pub fn new(py_computer: Py<PyAny>) -> Self {
RustAccessConstraintComputerWrapper { py_computer }
}
}
impl AccessConstraintComputer for RustAccessConstraintComputerWrapper {
fn evaluate(
&self,
epoch: &Epoch,
sat_state_ecef: &nalgebra::Vector6<f64>,
location_ecef: &nalgebra::Vector3<f64>,
) -> bool {
Python::attach(|py| {
let sat_state_array = sat_state_ecef.as_slice().to_pyarray(py).to_owned();
let loc_array = location_ecef.as_slice().to_pyarray(py).to_owned();
let py_epoch = Py::new(
py,
PyEpoch {
obj: *epoch,
},
);
let py_epoch = match py_epoch {
Ok(e) => e,
Err(e) => {
eprintln!("Warning: Failed to create PyEpoch: {}", e);
return false;
}
};
let py_obj = self.py_computer.bind(py);
let result = py_obj
.call_method1("evaluate", (py_epoch, sat_state_array, loc_array));
match result {
Ok(val) => val.extract::<bool>().unwrap_or_else(|e| {
eprintln!("Warning: evaluate() must return bool: {}", e);
false
}),
Err(e) => {
eprintln!("Warning: Python evaluate() method failed: {}", e);
false
}
}
})
}
fn name(&self) -> &str {
"PythonConstraintComputer"
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "SubdivisionConfig")]
#[derive(Clone)]
pub struct PySubdivisionConfig {
pub(crate) config: SubdivisionConfig,
}
#[pymethods]
impl PySubdivisionConfig {
#[new]
#[pyo3(signature = (count=None, duration=None, offset=0.0, gap=0.0, truncate_partial=false))]
fn new(
count: Option<usize>,
duration: Option<f64>,
offset: f64,
gap: f64,
truncate_partial: bool,
) -> PyResult<Self> {
match (count, duration) {
(Some(n), None) => Ok(Self {
config: SubdivisionConfig::equal_count(n)?,
}),
(None, Some(d)) => Ok(Self {
config: SubdivisionConfig::fixed_duration(d, offset, gap, truncate_partial)?,
}),
(Some(_), Some(_)) => Err(exceptions::PyValueError::new_err(
"Specify either count or duration, not both",
)),
(None, None) => Err(exceptions::PyValueError::new_err(
"Must specify either count or duration",
)),
}
}
#[staticmethod]
#[pyo3(signature = (count,))]
fn equal_count(count: usize) -> PyResult<Self> {
Ok(Self {
config: SubdivisionConfig::equal_count(count)?,
})
}
#[staticmethod]
#[pyo3(signature = (duration, offset=0.0, gap=0.0, truncate_partial=false))]
fn fixed_duration(
duration: f64,
offset: f64,
gap: f64,
truncate_partial: bool,
) -> PyResult<Self> {
Ok(Self {
config: SubdivisionConfig::fixed_duration(duration, offset, gap, truncate_partial)?,
})
}
#[getter]
fn count(&self) -> Option<usize> {
match self.config {
SubdivisionConfig::EqualCount { count } => Some(count),
_ => None,
}
}
#[getter]
fn duration(&self) -> Option<f64> {
match self.config {
SubdivisionConfig::FixedDuration { duration, .. } => Some(duration),
_ => None,
}
}
#[getter]
fn offset(&self) -> Option<f64> {
match self.config {
SubdivisionConfig::FixedDuration { offset, .. } => Some(offset),
_ => None,
}
}
#[getter]
fn gap(&self) -> Option<f64> {
match self.config {
SubdivisionConfig::FixedDuration { gap, .. } => Some(gap),
_ => None,
}
}
#[getter]
fn truncate_partial(&self) -> Option<bool> {
match self.config {
SubdivisionConfig::FixedDuration {
truncate_partial, ..
} => Some(truncate_partial),
_ => None,
}
}
fn __repr__(&self) -> String {
match self.config {
SubdivisionConfig::EqualCount { count } => {
format!("SubdivisionConfig.equal_count({})", count)
}
SubdivisionConfig::FixedDuration {
duration,
offset,
gap,
truncate_partial,
} => {
format!(
"SubdivisionConfig.fixed_duration(duration={}, offset={}, gap={}, truncate_partial={})",
duration,
offset,
gap,
if truncate_partial { "True" } else { "False" }
)
}
}
}
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "AccessSearchConfig")]
#[derive(Clone)]
pub struct PyAccessSearchConfig {
pub(crate) config: AccessSearchConfig,
}
#[pymethods]
impl PyAccessSearchConfig {
#[new]
#[pyo3(signature = (initial_time_step=60.0, adaptive_step=false, adaptive_fraction=0.75, parallel=true, num_threads=None, time_tolerance=0.001, subdivisions=None))]
fn new(
initial_time_step: f64,
adaptive_step: bool,
adaptive_fraction: f64,
parallel: bool,
num_threads: Option<usize>,
time_tolerance: f64,
subdivisions: Option<PySubdivisionConfig>,
) -> Self {
Self {
config: AccessSearchConfig {
initial_time_step,
adaptive_step,
adaptive_fraction,
parallel,
num_threads,
time_tolerance,
subdivisions: subdivisions.map(|s| s.config),
},
}
}
#[getter]
fn initial_time_step(&self) -> f64 {
self.config.initial_time_step
}
#[setter]
fn set_initial_time_step(&mut self, value: f64) {
self.config.initial_time_step = value;
}
#[getter]
fn adaptive_step(&self) -> PyResult<bool> {
Ok(self.config.adaptive_step)
}
#[setter]
fn set_adaptive_step(&mut self, value: bool) {
self.config.adaptive_step = value;
}
#[getter]
fn adaptive_fraction(&self) -> f64 {
self.config.adaptive_fraction
}
#[setter]
fn set_adaptive_fraction(&mut self, value: f64) {
self.config.adaptive_fraction = value;
}
#[getter]
fn parallel(&self) -> PyResult<bool> {
Ok(self.config.parallel)
}
#[setter]
fn set_parallel(&mut self, value: bool) {
self.config.parallel = value;
}
#[getter]
fn num_threads(&self) -> Option<usize> {
self.config.num_threads
}
#[setter]
fn set_num_threads(&mut self, value: Option<usize>) {
self.config.num_threads = value;
}
#[getter]
fn time_tolerance(&self) -> f64 {
self.config.time_tolerance
}
#[setter]
fn set_time_tolerance(&mut self, value: f64) {
self.config.time_tolerance = value;
}
#[getter]
fn subdivisions(&self) -> Option<PySubdivisionConfig> {
self.config
.subdivisions
.map(|config| PySubdivisionConfig { config })
}
#[setter]
fn set_subdivisions(&mut self, value: Option<PySubdivisionConfig>) {
self.config.subdivisions = value.map(|s| s.config);
}
fn __repr__(&self) -> String {
let sub_repr = match &self.config.subdivisions {
Some(s) => {
let py_sub = PySubdivisionConfig { config: *s };
py_sub.__repr__()
}
None => "None".to_string(),
};
format!(
"AccessSearchConfig(initial_time_step={}, adaptive_step={}, adaptive_fraction={}, parallel={}, num_threads={:?}, time_tolerance={}, subdivisions={})",
self.config.initial_time_step, self.config.adaptive_step, self.config.adaptive_fraction, self.config.parallel, self.config.num_threads, self.config.time_tolerance, sub_repr
)
}
}
fn process_property_computers(
property_computers: Option<Vec<Py<PyAny>>>,
) -> Vec<PropertyComputerHolder> {
if let Some(computers) = property_computers {
computers
.into_iter()
.map(|py_computer| {
Python::attach(|py| {
let obj = py_computer.bind(py);
if let Ok(doppler) = obj.cast::<PyDopplerComputer>() {
PropertyComputerHolder::RustNative(Box::new(
doppler.borrow().computer.clone(),
))
} else if let Ok(range) = obj.cast::<PyRangeComputer>() {
PropertyComputerHolder::RustNative(Box::new(range.borrow().computer.clone()))
} else if let Ok(range_rate) = obj.cast::<PyRangeRateComputer>() {
PropertyComputerHolder::RustNative(Box::new(
range_rate.borrow().computer.clone(),
))
} else {
PropertyComputerHolder::PythonWrapper(
RustAccessPropertyComputerWrapper::new(py_computer),
)
}
})
})
.collect()
} else {
Vec::new()
}
}
#[pyfunction(name = "location_accesses")]
#[pyo3(signature = (locations, propagators, search_start, search_end, constraint, property_computers=None, config=None))]
#[allow(clippy::too_many_arguments)]
fn py_location_accesses(
py: Python,
locations: &Bound<'_, PyAny>,
propagators: &Bound<'_, PyAny>,
search_start: &PyEpoch,
search_end: &PyEpoch,
constraint: &Bound<'_, PyAny>,
property_computers: Option<Vec<Py<PyAny>>>,
config: Option<&PyAccessSearchConfig>,
) -> PyResult<Vec<PyAccessWindow>> {
let search_config = config.map(|c| c.config).unwrap_or_default();
let constraint_trait: &dyn AccessConstraint = if let Ok(c) = constraint.cast::<PyElevationConstraint>() {
&c.borrow().constraint
} else if let Ok(c) = constraint.cast::<PyOffNadirConstraint>() {
&c.borrow().constraint
} else if let Ok(c) = constraint.cast::<PyLocalTimeConstraint>() {
&c.borrow().constraint
} else if let Ok(c) = constraint.cast::<PyLookDirectionConstraint>() {
&c.borrow().constraint
} else if let Ok(c) = constraint.cast::<PyAscDscConstraint>() {
&c.borrow().constraint
} else if let Ok(c) = constraint.cast::<PyElevationMaskConstraint>() {
&c.borrow().constraint
} else if let Ok(c) = constraint.cast::<PyConstraintAll>() {
&c.borrow().composite
} else if let Ok(c) = constraint.cast::<PyConstraintAny>() {
&c.borrow().composite
} else if let Ok(c) = constraint.cast::<PyConstraintNot>() {
&c.borrow().composite
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"constraint must be an AccessConstraint type"
));
};
let loc_is_list = locations.is_instance_of::<PyList>();
let prop_is_list = propagators.is_instance_of::<PyList>();
enum LocationVec {
Point(Vec<PointLocation>),
Polygon(Vec<PolygonLocation>),
}
let locations_vec = if loc_is_list {
let list = locations.cast::<PyList>()?;
let mut point_locs = Vec::new();
let mut polygon_locs = Vec::new();
let mut is_point = true;
for item in list.iter() {
if let Ok(loc) = item.cast::<PyPointLocation>() {
point_locs.push(loc.borrow().location.clone());
} else if let Ok(loc) = item.cast::<PyPolygonLocation>() {
polygon_locs.push(loc.borrow().location.clone());
is_point = false;
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"locations list must contain only PointLocation or PolygonLocation objects"
));
}
}
if is_point {
LocationVec::Point(point_locs)
} else {
LocationVec::Polygon(polygon_locs)
}
} else if let Ok(loc) = locations.cast::<PyPointLocation>() {
LocationVec::Point(vec![loc.borrow().location.clone()])
} else if let Ok(loc) = locations.cast::<PyPolygonLocation>() {
LocationVec::Polygon(vec![loc.borrow().location.clone()])
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"locations must be PointLocation, PolygonLocation, or a list of these types"
));
};
let is_numerical = if prop_is_list {
let list = propagators.cast::<PyList>()?;
!list.is_empty() && list.get_item(0)?.is_instance_of::<PyNumericalOrbitPropagator>()
} else {
propagators.is_instance_of::<PyNumericalOrbitPropagator>()
};
let is_trajectory = if prop_is_list {
let list = propagators.cast::<PyList>()?;
!list.is_empty() && list.get_item(0)?.is_instance_of::<PyOrbitalTrajectory>()
} else {
propagators.is_instance_of::<PyOrbitalTrajectory>()
};
if is_trajectory {
let borrow_guards: Vec<pyo3::PyRef<'_, PyOrbitalTrajectory>> = if prop_is_list {
let list = propagators.cast::<PyList>()?;
let mut guards = Vec::new();
for item in list.iter() {
if let Ok(traj) = item.cast::<PyOrbitalTrajectory>() {
guards.push(traj.borrow());
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"propagators list must contain only OrbitTrajectory objects (cannot mix types)"
));
}
}
guards
} else {
vec![propagators.cast::<PyOrbitalTrajectory>()?.borrow()]
};
let traj_refs: Vec<&trajectories::DOrbitTrajectory> =
borrow_guards.iter().map(|g| &g.trajectory).collect();
let rust_property_computers = process_property_computers(property_computers);
let property_computer_refs: Vec<&dyn AccessPropertyComputer> = rust_property_computers
.iter()
.map(|c| c as &dyn AccessPropertyComputer)
.collect();
let property_computers_option = if property_computer_refs.is_empty() {
None
} else {
Some(property_computer_refs.as_slice())
};
let windows = match &locations_vec {
LocationVec::Point(locs) => location_accesses(
locs,
traj_refs.as_slice(),
search_start.obj,
search_end.obj,
constraint_trait,
property_computers_option,
Some(&search_config),
),
LocationVec::Polygon(locs) => location_accesses(
locs,
traj_refs.as_slice(),
search_start.obj,
search_end.obj,
constraint_trait,
property_computers_option,
Some(&search_config),
),
}?;
return Ok(windows.into_iter().map(|w| PyAccessWindow { window: w }).collect());
}
if is_numerical {
let borrow_guards: Vec<pyo3::PyRef<'_, PyNumericalOrbitPropagator>> = if prop_is_list {
let list = propagators.cast::<PyList>()?;
let mut guards = Vec::new();
for item in list.iter() {
if let Ok(prop) = item.cast::<PyNumericalOrbitPropagator>() {
guards.push(prop.borrow());
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"propagators list must contain only NumericalOrbitPropagator objects (cannot mix propagator types)"
));
}
}
guards
} else {
vec![propagators.cast::<PyNumericalOrbitPropagator>()?.borrow()]
};
let prop_refs: Vec<&propagators::DNumericalOrbitPropagator> =
borrow_guards.iter().map(|g| &g.propagator).collect();
let rust_property_computers = process_property_computers(property_computers);
let property_computer_refs: Vec<&dyn AccessPropertyComputer> = rust_property_computers
.iter()
.map(|c| c as &dyn AccessPropertyComputer)
.collect();
let property_computers_option = if property_computer_refs.is_empty() {
None
} else {
Some(property_computer_refs.as_slice())
};
let windows = match &locations_vec {
LocationVec::Point(locs) => location_accesses(
locs,
prop_refs.as_slice(),
search_start.obj,
search_end.obj,
constraint_trait,
property_computers_option,
Some(&search_config),
),
LocationVec::Polygon(locs) => location_accesses(
locs,
prop_refs.as_slice(),
search_start.obj,
search_end.obj,
constraint_trait,
property_computers_option,
Some(&search_config),
),
}?;
return Ok(windows.into_iter().map(|w| PyAccessWindow { window: w }).collect());
}
enum PropagatorVec {
Sgp(Vec<crate::propagators::sgp_propagator::SGPPropagator>),
Keplerian(Vec<crate::propagators::keplerian_propagator::KeplerianPropagator>),
}
let propagators_vec = if prop_is_list {
let list = propagators.cast::<PyList>()?;
let mut sgp_props = Vec::new();
let mut kep_props = Vec::new();
let mut is_sgp = true;
for item in list.iter() {
if let Ok(prop) = item.cast::<PySGPPropagator>() {
sgp_props.push(prop.borrow().propagator.clone());
} else if let Ok(prop) = item.cast::<PyKeplerianPropagator>() {
kep_props.push(prop.borrow().propagator.clone());
is_sgp = false;
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"propagators list must contain only SGPPropagator or KeplerianPropagator objects"
));
}
}
if is_sgp {
PropagatorVec::Sgp(sgp_props)
} else {
PropagatorVec::Keplerian(kep_props)
}
} else if let Ok(prop) = propagators.cast::<PySGPPropagator>() {
PropagatorVec::Sgp(vec![prop.borrow().propagator.clone()])
} else if let Ok(prop) = propagators.cast::<PyKeplerianPropagator>() {
PropagatorVec::Keplerian(vec![prop.borrow().propagator.clone()])
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"propagators must be SGPPropagator, KeplerianPropagator, NumericalOrbitPropagator, OrbitTrajectory, or a list of these types"
));
};
let rust_property_computers = process_property_computers(property_computers);
let property_computer_refs: Vec<&dyn AccessPropertyComputer> = rust_property_computers
.iter()
.map(|c| c as &dyn AccessPropertyComputer)
.collect();
let property_computers_option = if property_computer_refs.is_empty() {
None
} else {
Some(property_computer_refs.as_slice())
};
let windows = py.detach(|| match (&locations_vec, &propagators_vec) {
(LocationVec::Point(locs), PropagatorVec::Sgp(props)) => {
location_accesses(
locs,
props,
search_start.obj,
search_end.obj,
constraint_trait,
property_computers_option,
Some(&search_config),
)
}
(LocationVec::Point(locs), PropagatorVec::Keplerian(props)) => {
location_accesses(
locs,
props,
search_start.obj,
search_end.obj,
constraint_trait,
property_computers_option,
Some(&search_config),
)
}
(LocationVec::Polygon(locs), PropagatorVec::Sgp(props)) => {
location_accesses(
locs,
props,
search_start.obj,
search_end.obj,
constraint_trait,
property_computers_option,
Some(&search_config),
)
}
(LocationVec::Polygon(locs), PropagatorVec::Keplerian(props)) => {
location_accesses(
locs,
props,
search_start.obj,
search_end.obj,
constraint_trait,
property_computers_option,
Some(&search_config),
)
}
})?;
Ok(windows.into_iter().map(|w| PyAccessWindow { window: w }).collect())
}
#[pyclass(module = "brahe._brahe", from_py_object)]
#[pyo3(name = "OrbitGeometryTessellatorConfig")]
#[derive(Clone)]
pub struct PyOrbitGeometryTessellatorConfig {
pub(crate) config: access::tessellation::OrbitGeometryTessellatorConfig,
}
#[pymethods]
impl PyOrbitGeometryTessellatorConfig {
#[new]
#[pyo3(signature = (
image_width=5000.0,
image_length=5000.0,
crosstrack_overlap=200.0,
alongtrack_overlap=200.0,
asc_dsc=None,
min_image_length=5000.0,
max_image_length=5000.0,
))]
fn new(
image_width: f64,
image_length: f64,
crosstrack_overlap: f64,
alongtrack_overlap: f64,
asc_dsc: Option<PyAscDsc>,
min_image_length: f64,
max_image_length: f64,
) -> Self {
let ad = asc_dsc.map(|a| a.value).unwrap_or(AscDsc::Either);
Self {
config: access::tessellation::OrbitGeometryTessellatorConfig {
image_width,
image_length,
crosstrack_overlap,
alongtrack_overlap,
asc_dsc: ad,
min_image_length,
max_image_length,
},
}
}
#[getter]
fn image_width(&self) -> f64 {
self.config.image_width
}
#[getter]
fn image_length(&self) -> f64 {
self.config.image_length
}
#[getter]
fn crosstrack_overlap(&self) -> f64 {
self.config.crosstrack_overlap
}
#[getter]
fn alongtrack_overlap(&self) -> f64 {
self.config.alongtrack_overlap
}
#[getter]
fn asc_dsc(&self) -> PyAscDsc {
PyAscDsc {
value: self.config.asc_dsc,
}
}
#[getter]
fn min_image_length(&self) -> f64 {
self.config.min_image_length
}
#[getter]
fn max_image_length(&self) -> f64 {
self.config.max_image_length
}
fn __repr__(&self) -> String {
format!(
"OrbitGeometryTessellatorConfig(image_width={}, image_length={}, asc_dsc={:?})",
self.config.image_width, self.config.image_length, self.config.asc_dsc,
)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "OrbitGeometryTessellator")]
pub struct PyOrbitGeometryTessellator {
tessellator: access::tessellation::OrbitGeometryTessellator,
}
#[pymethods]
impl PyOrbitGeometryTessellator {
#[new]
#[pyo3(signature = (propagator, epoch, config=None, spacecraft_id=None))]
fn new(
propagator: &PySGPPropagator,
epoch: &PyEpoch,
config: Option<&PyOrbitGeometryTessellatorConfig>,
spacecraft_id: Option<String>,
) -> PyResult<Self> {
let rust_config = config
.map(|c| c.config.clone())
.unwrap_or_default();
let prop_clone = propagator.propagator.clone();
Ok(Self {
tessellator: access::tessellation::OrbitGeometryTessellator::new(
Box::new(prop_clone),
epoch.obj,
rust_config,
spacecraft_id,
),
})
}
fn tessellate_point(&self, point: &PyPointLocation) -> PyResult<Vec<PyPolygonLocation>> {
let tiles = self
.tessellator
.tessellate(&point.location)
.map_err(|e| BraheError::new_err(e.to_string()))?;
Ok(tiles
.into_iter()
.map(|t| PyPolygonLocation { location: t })
.collect())
}
fn tessellate_polygon(&self, polygon: &PyPolygonLocation) -> PyResult<Vec<PyPolygonLocation>> {
let tiles = self
.tessellator
.tessellate(&polygon.location)
.map_err(|e| BraheError::new_err(e.to_string()))?;
Ok(tiles
.into_iter()
.map(|t| PyPolygonLocation { location: t })
.collect())
}
fn tessellate(&self, location: &Bound<'_, PyAny>) -> PyResult<Vec<PyPolygonLocation>> {
if let Ok(point_ref) = location.cast::<PyPointLocation>() {
let point = point_ref.borrow();
let tiles = self
.tessellator
.tessellate(&point.location)
.map_err(|e| BraheError::new_err(e.to_string()))?;
return Ok(tiles
.into_iter()
.map(|t| PyPolygonLocation { location: t })
.collect());
}
if let Ok(polygon_ref) = location.cast::<PyPolygonLocation>() {
let polygon = polygon_ref.borrow();
let tiles = self
.tessellator
.tessellate(&polygon.location)
.map_err(|e| BraheError::new_err(e.to_string()))?;
return Ok(tiles
.into_iter()
.map(|t| PyPolygonLocation { location: t })
.collect());
}
Err(pyo3::exceptions::PyTypeError::new_err(
"Expected PointLocation or PolygonLocation",
))
}
#[getter]
fn config(&self) -> PyOrbitGeometryTessellatorConfig {
PyOrbitGeometryTessellatorConfig {
config: self.tessellator.config().clone(),
}
}
fn name(&self) -> String {
access::tessellation::Tessellator::name(&self.tessellator).to_string()
}
fn __repr__(&self) -> String {
"OrbitGeometryTessellator()".to_string()
}
}
#[pyfunction]
#[pyo3(name = "tile_merge_orbit_geometry")]
#[pyo3(signature = (tiles, at_overlap=200.0, ct_overlap=200.0, mergable_range_deg=2.0))]
pub fn py_tile_merge_orbit_geometry(
tiles: &Bound<'_, PyList>,
at_overlap: f64,
ct_overlap: f64,
mergable_range_deg: f64,
) -> PyResult<Vec<PyPolygonLocation>> {
let mut rust_tiles = Vec::new();
for item in tiles.iter() {
let poly_ref = item.cast::<PyPolygonLocation>()
.map_err(|_| pyo3::exceptions::PyTypeError::new_err(
"All items in tiles must be PolygonLocation"
))?;
rust_tiles.push(poly_ref.borrow().location.clone());
}
let merged = access::tessellation::tile_merge_orbit_geometry(
&rust_tiles,
at_overlap,
ct_overlap,
mergable_range_deg,
);
Ok(merged
.into_iter()
.map(|t| PyPolygonLocation { location: t })
.collect())
}