use std::collections::{HashMap, HashSet};
use crate::analysis::assets::{AssetId, ConstellationId, DynScenario, GroundStation, Spacecraft};
use crate::analysis::imaging::{
AccessError, Aoi, AoiId, LookSide, OpticalAccessAnalysis, OpticalPayload, SarAccessAnalysis,
SarPayload,
};
use crate::analysis::power::{
PowerBudgetAnalysis, PowerBudgetResults, PowerError, SpacecraftFilter,
};
use crate::analysis::sun::AnalyticalSunEphemeris;
use crate::analysis::visibility::{
DynPass, ElevationMask, ElevationMaskError, PairType, Pass, VisibilityAnalysis,
VisibilityError, VisibilityResults,
};
use crate::bodies::DynOrigin;
use crate::bodies::python::PyOrigin;
use crate::comms::python::PyCommunicationSystem;
use crate::ephem::python::PySpk;
use crate::frames::python::PyFrame;
use crate::orbits::ground::Observables;
use crate::orbits::python::{
PyGroundLocation, PyInterval, PyJ2Propagator, PyJ4Propagator, PyNumericalPropagator, PySgp4,
PyTrajectory, PyVallado,
};
use crate::time::deltas::TimeDelta;
use crate::time::python::deltas::PyTimeDelta;
use crate::time::python::time::PyTime;
use crate::time::python::time_series::PyTimeSeries;
use crate::units::python::{PyAngle, PyAngularRate, PyDistance, PyVelocity};
use lox_frames::DynFrame;
use lox_frames::providers::DefaultRotationProvider;
use lox_orbits::orbits::Ensemble;
use lox_orbits::propagators::OrbitSource;
use lox_time::intervals::TimeInterval;
use lox_time::series::TimeSeries;
use lox_time::time_scales::Tai;
use lox_units::{Angle, Distance, Velocity};
use numpy::{PyArray1, PyArrayMethods};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyType;
struct PyVisibilityError(VisibilityError);
impl From<PyVisibilityError> for PyErr {
fn from(err: PyVisibilityError) -> Self {
PyValueError::new_err(err.0.to_string())
}
}
pub struct PyElevationMaskError(pub ElevationMaskError);
impl From<PyElevationMaskError> for PyErr {
fn from(err: PyElevationMaskError) -> Self {
PyValueError::new_err(err.0.to_string())
}
}
#[pyclass(name = "GroundStation", module = "lox_space", frozen, from_py_object)]
#[derive(Clone, Debug)]
pub struct PyGroundStation(pub GroundStation);
#[pymethods]
impl PyGroundStation {
#[new]
#[pyo3(signature = (id, location, mask, body_fixed_frame=None, network_id=None, communication_systems=None))]
fn new(
id: String,
location: PyGroundLocation,
mask: PyElevationMask,
body_fixed_frame: Option<PyFrame>,
network_id: Option<String>,
communication_systems: Option<Vec<PyCommunicationSystem>>,
) -> Self {
let mut gs = GroundStation::new(id, location.0, mask.0);
if let Some(frame) = body_fixed_frame {
gs = gs.with_body_fixed_frame(frame.0);
}
if let Some(nid) = network_id {
gs = gs.with_network_id(nid);
}
if let Some(systems) = communication_systems {
for system in systems {
gs = gs.with_communication_system(system.0);
}
}
PyGroundStation(gs)
}
fn id(&self) -> String {
self.0.id().as_str().to_string()
}
fn location(&self) -> PyGroundLocation {
PyGroundLocation(self.0.location().clone())
}
fn mask(&self) -> PyElevationMask {
PyElevationMask(self.0.mask().clone())
}
fn network_id(&self) -> Option<String> {
self.0.network_id().map(|id| id.as_str().to_string())
}
fn body_fixed_frame(&self) -> PyFrame {
PyFrame(self.0.body_fixed_frame())
}
fn communication_systems(&self) -> Vec<PyCommunicationSystem> {
self.0
.communication_systems()
.iter()
.map(|s| PyCommunicationSystem(s.clone()))
.collect()
}
fn __repr__(&self) -> String {
format!(
"GroundStation(\"{}\", {}, {})",
self.id(),
self.location().__repr__(),
self.mask().__repr__(),
)
}
}
fn extract_orbit_source(obj: &Bound<'_, PyAny>) -> PyResult<OrbitSource> {
if let Ok(sgp4) = obj.extract::<PySgp4>() {
return Ok(OrbitSource::Sgp4(sgp4.inner));
}
if let Ok(vallado) = obj.extract::<PyVallado>() {
return Ok(OrbitSource::Vallado(vallado.0));
}
if let Ok(n) = obj.extract::<PyNumericalPropagator>() {
return Ok(OrbitSource::Numerical(n.0));
}
if let Ok(p) = obj.extract::<PyJ2Propagator>() {
return Ok(OrbitSource::J2(p.0));
}
if let Ok(p) = obj.extract::<PyJ4Propagator>() {
return Ok(OrbitSource::J4(p.0));
}
if let Ok(traj) = obj.extract::<PyTrajectory>() {
return Ok(OrbitSource::Trajectory(traj.0));
}
Err(PyValueError::new_err(
"expected a propagator (SGP4, Vallado, Numerical, J2, J4) or Trajectory object",
))
}
#[pyclass(name = "Spacecraft", module = "lox_space", frozen, from_py_object)]
#[derive(Clone, Debug)]
pub struct PySpacecraft(pub Spacecraft);
#[pymethods]
impl PySpacecraft {
#[new]
#[pyo3(signature = (id, orbit, max_slew_rate=None, constellation_id=None, optical_payload=None, sar_payload=None, communication_systems=None))]
#[allow(clippy::too_many_arguments)]
fn new(
id: String,
orbit: &Bound<'_, PyAny>,
max_slew_rate: Option<PyAngularRate>,
constellation_id: Option<String>,
optical_payload: Option<PyOpticalPayload>,
sar_payload: Option<PySarPayload>,
communication_systems: Option<Vec<PyCommunicationSystem>>,
) -> PyResult<Self> {
let orbit_source = extract_orbit_source(orbit)?;
let mut asset = Spacecraft::new(id, orbit_source);
if let Some(rate) = max_slew_rate {
asset = asset.with_max_slew_rate(rate.0);
}
if let Some(cid) = constellation_id {
asset = asset.with_constellation_id(cid);
}
if let Some(payload) = optical_payload {
asset = asset.with_optical_payload(payload.0);
}
if let Some(payload) = sar_payload {
asset = asset.with_sar_payload(payload.0);
}
if let Some(systems) = communication_systems {
for system in systems {
asset = asset.with_communication_system(system.0);
}
}
Ok(PySpacecraft(asset))
}
fn id(&self) -> String {
self.0.id().as_str().to_string()
}
fn constellation_id(&self) -> Option<String> {
self.0.constellation_id().map(|id| id.as_str().to_string())
}
fn max_slew_rate(&self) -> Option<PyAngularRate> {
self.0.max_slew_rate().map(PyAngularRate)
}
fn optical_payload(&self) -> Option<PyOpticalPayload> {
self.0.optical_payload().map(PyOpticalPayload)
}
fn sar_payload(&self) -> Option<PySarPayload> {
self.0.sar_payload().map(PySarPayload)
}
fn communication_systems(&self) -> Vec<PyCommunicationSystem> {
self.0
.communication_systems()
.iter()
.map(|s| PyCommunicationSystem(s.clone()))
.collect()
}
fn __repr__(&self) -> String {
format!("Spacecraft(\"{}\")", self.id())
}
}
#[pyclass(name = "Scenario", module = "lox_space", frozen, from_py_object)]
#[derive(Clone, Debug)]
pub struct PyScenario(pub DynScenario);
#[pymethods]
impl PyScenario {
#[new]
#[pyo3(signature = (start, end, spacecraft=None, ground_stations=None))]
fn new(
start: PyTime,
end: PyTime,
spacecraft: Option<Vec<PySpacecraft>>,
ground_stations: Option<Vec<PyGroundStation>>,
) -> Self {
let tai_start = start.0.to_scale(Tai);
let tai_end = end.0.to_scale(Tai);
let mut scenario = DynScenario::new(tai_start, tai_end, DynOrigin::Earth, DynFrame::Icrf);
if let Some(sc) = spacecraft {
let sc_vec: Vec<Spacecraft> = sc.into_iter().map(|s| s.0).collect();
scenario = scenario.with_spacecraft(&sc_vec);
}
if let Some(gs) = ground_stations {
let gs_vec: Vec<GroundStation> = gs.into_iter().map(|g| g.0).collect();
scenario = scenario.with_ground_stations(&gs_vec);
}
PyScenario(scenario)
}
fn propagate(&self, py: Python<'_>) -> PyResult<PyEnsemble> {
let ensemble = py.detach(|| self.0.propagate(&DefaultRotationProvider));
Ok(PyEnsemble(
ensemble.map_err(|e| PyValueError::new_err(e.to_string()))?,
))
}
fn start(&self) -> PyTime {
PyTime(self.0.interval().start().into_dyn())
}
fn with_constellation(
&self,
constellation: crate::constellations::python::PyConstellation,
) -> PyResult<Self> {
let scenario = self
.0
.clone()
.with_constellation(constellation.0)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
Ok(PyScenario(scenario))
}
fn end(&self) -> PyTime {
PyTime(self.0.interval().end().into_dyn())
}
fn __repr__(&self) -> String {
format!(
"Scenario({} spacecraft, {} ground stations)",
self.0.spacecraft().len(),
self.0.ground_stations().len(),
)
}
}
#[pyclass(name = "Ensemble", module = "lox_space", frozen, from_py_object)]
#[derive(Clone, Debug)]
pub struct PyEnsemble(pub Ensemble<AssetId, Tai, DynOrigin, DynFrame>);
#[pymethods]
impl PyEnsemble {
fn get(&self, id: &str) -> Option<PyTrajectory> {
self.0
.get(&AssetId::new(id))
.map(|t| PyTrajectory(t.clone().into_dyn()))
}
fn __len__(&self) -> usize {
self.0.len()
}
fn __repr__(&self) -> String {
format!("Ensemble({} trajectories)", self.0.len())
}
}
#[pyclass(name = "VisibilityAnalysis", module = "lox_space", frozen)]
pub struct PyVisibilityAnalysis {
scenario: DynScenario,
ensemble: Option<Ensemble<AssetId, Tai, DynOrigin, DynFrame>>,
occulting_bodies: Vec<DynOrigin>,
step: TimeDelta,
min_pass_duration: Option<TimeDelta>,
inter_satellite: bool,
ground_space_filter: Option<Py<PyAny>>,
inter_satellite_filter: Option<Py<PyAny>>,
min_range: Option<Distance>,
max_range: Option<Distance>,
}
#[pymethods]
impl PyVisibilityAnalysis {
#[new]
#[pyo3(signature = (scenario, ensemble=None, occulting_bodies=None, step=None, min_pass_duration=None, inter_satellite=false, ground_space_filter=None, inter_satellite_filter=None, min_range=None, max_range=None))]
#[allow(clippy::too_many_arguments)]
fn new(
py: Python<'_>,
scenario: PyScenario,
ensemble: Option<PyEnsemble>,
occulting_bodies: Option<Vec<Bound<'_, PyAny>>>,
step: Option<PyTimeDelta>,
min_pass_duration: Option<PyTimeDelta>,
inter_satellite: bool,
ground_space_filter: Option<Py<PyAny>>,
inter_satellite_filter: Option<Py<PyAny>>,
min_range: Option<PyDistance>,
max_range: Option<PyDistance>,
) -> PyResult<Self> {
let occulting_bodies: Vec<DynOrigin> = occulting_bodies
.unwrap_or_default()
.iter()
.map(|b| Ok(PyOrigin::try_from(b)?.0))
.collect::<PyResult<_>>()?;
if let Some(ref f) = ground_space_filter
&& !f.bind(py).is_callable()
{
return Err(PyValueError::new_err(
"ground_space_filter must be callable",
));
}
if let Some(ref f) = inter_satellite_filter
&& !f.bind(py).is_callable()
{
return Err(PyValueError::new_err(
"inter_satellite_filter must be callable",
));
}
Ok(Self {
scenario: scenario.0,
ensemble: ensemble.map(|e| e.0),
occulting_bodies,
step: step
.map(|s| s.0)
.unwrap_or_else(|| TimeDelta::from_seconds_f64(60.0)),
min_pass_duration: min_pass_duration.map(|d| d.0),
inter_satellite,
ground_space_filter,
inter_satellite_filter,
min_range: min_range.map(|d| d.0),
max_range: max_range.map(|d| d.0),
})
}
fn compute(
&self,
py: Python<'_>,
ephemeris: &Bound<'_, PySpk>,
) -> PyResult<PyVisibilityResults> {
let ephemeris = &ephemeris.get().0;
let step = self.step;
let scenario = &self.scenario;
let auto_ensemble;
let ensemble = match &self.ensemble {
Some(e) => e,
None => {
auto_ensemble = scenario
.propagate(&DefaultRotationProvider)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
&auto_ensemble
}
};
let occulting_bodies = self.occulting_bodies.clone();
let min_pass_duration = self.min_pass_duration;
let inter_satellite = self.inter_satellite;
let min_range = self.min_range;
let max_range = self.max_range;
let gs_accepted: Option<HashSet<(AssetId, AssetId)>> =
if let Some(ref filter) = self.ground_space_filter {
let ground_stations = scenario.ground_stations();
let spacecraft = scenario.spacecraft();
let mut set = HashSet::new();
for gs in ground_stations {
for sc in spacecraft {
let py_gs = PyGroundStation(gs.clone());
let py_sc = PySpacecraft(sc.clone());
let accept: bool = filter.call1(py, (py_gs, py_sc))?.extract(py)?;
if accept {
set.insert((gs.id().clone(), sc.id().clone()));
}
}
}
Some(set)
} else {
None
};
let isl_accepted: Option<HashSet<(AssetId, AssetId)>> =
if let Some(ref filter) = self.inter_satellite_filter {
let spacecraft = scenario.spacecraft();
let n = spacecraft.len();
let mut set = HashSet::new();
for i in 0..n {
for j in (i + 1)..n {
let py_sc1 = PySpacecraft(spacecraft[i].clone());
let py_sc2 = PySpacecraft(spacecraft[j].clone());
let accept: bool = filter.call1(py, (py_sc1, py_sc2))?.extract(py)?;
if accept {
set.insert((spacecraft[i].id().clone(), spacecraft[j].id().clone()));
}
}
}
Some(set)
} else {
None
};
let results = py.detach(|| {
let mut analysis = VisibilityAnalysis::new(scenario, ensemble, ephemeris)
.with_occulting_bodies(occulting_bodies)
.with_step(step);
if let Some(mpd) = min_pass_duration {
analysis = analysis.with_min_pass_duration(mpd);
}
if let Some(ref accepted) = gs_accepted {
analysis = analysis.with_ground_space_filter(|gs, sc| {
accepted.contains(&(gs.id().clone(), sc.id().clone()))
});
}
if let Some(ref accepted) = isl_accepted {
analysis = analysis.with_inter_satellite_filter(|sc1, sc2| {
accepted.contains(&(sc1.id().clone(), sc2.id().clone()))
});
} else if inter_satellite {
analysis = analysis.with_inter_satellite();
}
if let Some(min_range) = min_range {
analysis = analysis.with_min_range(min_range);
}
if let Some(max_range) = max_range {
analysis = analysis.with_max_range(max_range);
}
analysis.compute()
});
Ok(PyVisibilityResults {
results: results.map_err(PyVisibilityError)?,
scenario: self.scenario.clone(),
ensemble: ensemble.clone(),
step: self.step,
})
}
fn __repr__(&self) -> String {
let sc_count = self.scenario.spacecraft().len();
let gs_count = self.scenario.ground_stations().len();
let mut extras = Vec::new();
if self.ground_space_filter.is_some() {
extras.push("ground_space_filter=True".to_string());
}
if self.inter_satellite_filter.is_some() {
extras.push("inter_satellite_filter=True".to_string());
} else if self.inter_satellite {
extras.push("inter_satellite=True".to_string());
}
if extras.is_empty() {
format!("VisibilityAnalysis({gs_count} ground assets, {sc_count} space assets)")
} else {
format!(
"VisibilityAnalysis({gs_count} ground assets, {sc_count} space assets, {})",
extras.join(", "),
)
}
}
}
#[pyclass(name = "VisibilityResults", module = "lox_space", frozen)]
pub struct PyVisibilityResults {
results: VisibilityResults,
scenario: DynScenario,
ensemble: Ensemble<AssetId, Tai, DynOrigin, DynFrame>,
step: TimeDelta,
}
#[pymethods]
impl PyVisibilityResults {
fn intervals(&self, id1: &str, id2: &str) -> Vec<PyInterval> {
let id1 = AssetId::new(id1);
let id2 = AssetId::new(id2);
self.results
.intervals_for(&id1, &id2)
.map(|intervals| {
intervals
.iter()
.map(|i| {
PyInterval(TimeInterval::new(i.start().into_dyn(), i.end().into_dyn()))
})
.collect()
})
.unwrap_or_default()
}
fn all_intervals(&self) -> HashMap<(String, String), Vec<PyInterval>> {
self.results
.all_intervals()
.iter()
.map(|((id1, id2), intervals)| {
(
(id1.as_str().to_string(), id2.as_str().to_string()),
intervals
.iter()
.map(|i| {
PyInterval(TimeInterval::new(i.start().into_dyn(), i.end().into_dyn()))
})
.collect(),
)
})
.collect()
}
fn ground_space_intervals(&self) -> HashMap<(String, String), Vec<PyInterval>> {
self.results
.ground_space_pair_ids()
.into_iter()
.filter_map(|(gs_id, sc_id)| {
let intervals = self.results.intervals_for(gs_id, sc_id)?;
Some((
(gs_id.as_str().to_string(), sc_id.as_str().to_string()),
intervals
.iter()
.map(|i| {
PyInterval(TimeInterval::new(i.start().into_dyn(), i.end().into_dyn()))
})
.collect(),
))
})
.collect()
}
fn inter_satellite_intervals(&self) -> HashMap<(String, String), Vec<PyInterval>> {
self.results
.inter_satellite_pair_ids()
.into_iter()
.filter_map(|(sc1_id, sc2_id)| {
let intervals = self.results.intervals_for(sc1_id, sc2_id)?;
Some((
(sc1_id.as_str().to_string(), sc2_id.as_str().to_string()),
intervals
.iter()
.map(|i| {
PyInterval(TimeInterval::new(i.start().into_dyn(), i.end().into_dyn()))
})
.collect(),
))
})
.collect()
}
fn passes(&self, ground_id: &str, space_id: &str) -> PyResult<Vec<PyPass>> {
let gs_id = AssetId::new(ground_id);
let sc_id = AssetId::new(space_id);
if self.results.pair_type(&gs_id, &sc_id) == Some(PairType::InterSatellite) {
return Err(PyValueError::new_err(format!(
"passes are not supported for inter-satellite pair ({}, {}): use intervals() instead",
ground_id, space_id,
)));
}
let gs = self
.scenario
.ground_stations()
.iter()
.find(|g| g.id() == &gs_id);
let sc_traj = self.ensemble.get(&sc_id);
match (gs, sc_traj) {
(Some(gs), Some(sc_traj)) => {
let dyn_traj = sc_traj.clone().into_dyn();
let passes = self
.results
.to_passes(
&gs_id,
&sc_id,
gs.location(),
gs.mask(),
&dyn_traj,
self.step,
gs.body_fixed_frame(),
)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
Ok(passes.into_iter().map(PyPass).collect())
}
_ => Ok(vec![]),
}
}
fn all_passes(&self) -> HashMap<(String, String), Vec<PyPass>> {
let gs_map: HashMap<&AssetId, &GroundStation> = self
.scenario
.ground_stations()
.iter()
.map(|g| (g.id(), g))
.collect();
self.results
.ground_space_pair_ids()
.into_iter()
.filter_map(|(gs_id, sc_id)| {
let gs = gs_map.get(gs_id)?;
let sc_traj = self.ensemble.get(sc_id)?;
let dyn_traj = sc_traj.clone().into_dyn();
let intervals = self.results.intervals_for(gs_id, sc_id)?;
let passes: Vec<PyPass> = intervals
.iter()
.filter_map(|interval| {
let dyn_interval = TimeInterval::new(
interval.start().into_dyn(),
interval.end().into_dyn(),
);
DynPass::from_interval(
dyn_interval,
self.step,
gs.location(),
gs.mask(),
&dyn_traj,
gs.body_fixed_frame(),
)
})
.map(PyPass)
.collect();
Some((
(gs_id.as_str().to_string(), sc_id.as_str().to_string()),
passes,
))
})
.collect()
}
fn pair_ids(&self) -> Vec<(String, String)> {
self.results
.pair_ids()
.map(|(id1, id2)| (id1.as_str().to_string(), id2.as_str().to_string()))
.collect()
}
fn ground_space_pair_ids(&self) -> Vec<(String, String)> {
self.results
.ground_space_pair_ids()
.into_iter()
.map(|(id1, id2)| (id1.as_str().to_string(), id2.as_str().to_string()))
.collect()
}
fn inter_satellite_pair_ids(&self) -> Vec<(String, String)> {
self.results
.inter_satellite_pair_ids()
.into_iter()
.map(|(id1, id2)| (id1.as_str().to_string(), id2.as_str().to_string()))
.collect()
}
fn num_pairs(&self) -> usize {
self.results.num_pairs()
}
fn total_intervals(&self) -> usize {
self.results.total_intervals()
}
fn __repr__(&self) -> String {
format!(
"VisibilityResults({} pairs, {} intervals)",
self.results.num_pairs(),
self.results.total_intervals(),
)
}
}
#[pyclass(
name = "ElevationMask",
module = "lox_space",
frozen,
eq,
from_py_object
)]
#[derive(Debug, Clone, PartialEq)]
pub struct PyElevationMask(pub ElevationMask);
#[pymethods]
impl PyElevationMask {
#[new]
#[pyo3(signature = (azimuth=None, elevation=None, min_elevation=None))]
fn new(
azimuth: Option<&Bound<'_, PyArray1<f64>>>,
elevation: Option<&Bound<'_, PyArray1<f64>>>,
min_elevation: Option<PyAngle>,
) -> PyResult<Self> {
if let Some(min_elevation) = min_elevation {
return Ok(PyElevationMask(ElevationMask::with_fixed_elevation(
min_elevation.0.to_radians(),
)));
}
if let (Some(azimuth), Some(elevation)) = (azimuth, elevation) {
let azimuth = azimuth.to_vec()?;
let elevation = elevation.to_vec()?;
return Ok(PyElevationMask(
ElevationMask::new(azimuth, elevation).map_err(PyElevationMaskError)?,
));
}
Err(PyValueError::new_err(
"invalid argument combination, either `min_elevation` or `azimuth` and `elevation` arrays need to be present",
))
}
#[classmethod]
fn fixed(_cls: &Bound<'_, PyType>, min_elevation: PyAngle) -> Self {
PyElevationMask(ElevationMask::with_fixed_elevation(
min_elevation.0.to_radians(),
))
}
#[classmethod]
fn variable(
_cls: &Bound<'_, PyType>,
azimuth: &Bound<'_, PyArray1<f64>>,
elevation: &Bound<'_, PyArray1<f64>>,
) -> PyResult<Self> {
let azimuth = azimuth.to_vec()?;
let elevation = elevation.to_vec()?;
Ok(PyElevationMask(
ElevationMask::new(azimuth, elevation).map_err(PyElevationMaskError)?,
))
}
fn __getnewargs__(&self) -> (Option<Vec<f64>>, Option<Vec<f64>>, Option<PyAngle>) {
(self.azimuth(), self.elevation(), self.fixed_elevation())
}
fn azimuth(&self) -> Option<Vec<f64>> {
match &self.0 {
ElevationMask::Fixed(_) => None,
ElevationMask::Variable(series) => Some(series.x().to_vec()),
}
}
fn elevation(&self) -> Option<Vec<f64>> {
match &self.0 {
ElevationMask::Fixed(_) => None,
ElevationMask::Variable(series) => Some(series.y().to_vec()),
}
}
fn fixed_elevation(&self) -> Option<PyAngle> {
match &self.0 {
ElevationMask::Fixed(min_elevation) => Some(PyAngle(Angle::radians(*min_elevation))),
ElevationMask::Variable(_) => None,
}
}
fn min_elevation(&self, azimuth: PyAngle) -> PyAngle {
PyAngle(Angle::radians(self.0.min_elevation(azimuth.0.to_radians())))
}
fn __repr__(&self) -> String {
match &self.0 {
ElevationMask::Fixed(min_elevation) => {
format!(
"ElevationMask(min_elevation={})",
PyAngle(Angle::radians(*min_elevation)).__repr__(),
)
}
ElevationMask::Variable(series) => {
let n = series.x().len();
format!("ElevationMask({n} azimuth/elevation pairs)")
}
}
}
}
#[pyclass(name = "Observables", module = "lox_space", frozen, from_py_object)]
#[derive(Clone, Debug)]
pub struct PyObservables(pub Observables);
#[pymethods]
impl PyObservables {
#[new]
fn new(
azimuth: PyAngle,
elevation: PyAngle,
range: PyDistance,
range_rate: PyVelocity,
) -> Self {
PyObservables(Observables::new(
azimuth.0.to_radians(),
elevation.0.to_radians(),
range.0.to_meters(),
range_rate.0.to_meters_per_second(),
))
}
fn azimuth(&self) -> PyAngle {
PyAngle(Angle::radians(self.0.azimuth()))
}
fn elevation(&self) -> PyAngle {
PyAngle(Angle::radians(self.0.elevation()))
}
fn range(&self) -> PyDistance {
PyDistance(Distance::meters(self.0.range()))
}
fn range_rate(&self) -> PyVelocity {
PyVelocity(Velocity::meters_per_second(self.0.range_rate()))
}
fn __repr__(&self) -> String {
format!(
"Observables({}, {}, {}, {})",
self.azimuth().__repr__(),
self.elevation().__repr__(),
self.range().__repr__(),
self.range_rate().__repr__(),
)
}
}
#[pyclass(name = "Pass", module = "lox_space", frozen, from_py_object)]
#[derive(Debug, Clone)]
pub struct PyPass(pub DynPass);
#[pymethods]
impl PyPass {
#[new]
fn new(
interval: PyInterval,
times: Vec<PyTime>,
observables: Vec<PyObservables>,
) -> PyResult<Self> {
let times: Vec<crate::time::DynTime> = times.into_iter().map(|t| t.0).collect();
let observables: Vec<Observables> = observables.into_iter().map(|o| o.0).collect();
let pass = Pass::try_new(interval.0, times, observables)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
Ok(PyPass(pass))
}
fn interval(&self) -> PyInterval {
PyInterval(*self.0.interval())
}
fn times(&self) -> Vec<PyTime> {
self.0.times().iter().map(|&t| PyTime(t)).collect()
}
fn observables(&self) -> Vec<PyObservables> {
self.0
.observables()
.iter()
.map(|o| PyObservables(o.clone()))
.collect()
}
fn interpolate(&self, time: PyTime) -> Option<PyObservables> {
self.0.interpolate(time.0).map(PyObservables)
}
fn __repr__(&self) -> String {
format!(
"Pass(interval={}, {} observables)",
self.interval().__repr__(),
self.0.observables().len(),
)
}
}
struct PyPowerError(PowerError);
impl From<PyPowerError> for PyErr {
fn from(err: PyPowerError) -> Self {
PyValueError::new_err(err.0.to_string())
}
}
#[pyclass(name = "PowerBudgetAnalysis", module = "lox_space", frozen)]
pub struct PyPowerBudgetAnalysis {
scenario: DynScenario,
ensemble: Option<Ensemble<AssetId, Tai, DynOrigin, DynFrame>>,
step: TimeDelta,
filter: Option<SpacecraftFilter>,
}
#[pymethods]
impl PyPowerBudgetAnalysis {
#[new]
#[pyo3(signature = (scenario, ensemble=None, step=None, spacecraft_ids=None, constellation_id=None))]
fn new(
scenario: PyScenario,
ensemble: Option<PyEnsemble>,
step: Option<PyTimeDelta>,
spacecraft_ids: Option<Vec<String>>,
constellation_id: Option<String>,
) -> PyResult<Self> {
let filter = match (spacecraft_ids, constellation_id) {
(Some(_), Some(_)) => {
return Err(PyValueError::new_err(
"spacecraft_ids and constellation_id are mutually exclusive",
));
}
(Some(ids), None) => Some(SpacecraftFilter::Ids(
ids.into_iter().map(AssetId::new).collect(),
)),
(None, Some(cid)) => Some(SpacecraftFilter::Constellation(ConstellationId::new(cid))),
(None, None) => None,
};
Ok(Self {
scenario: scenario.0,
ensemble: ensemble.map(|e| e.0),
step: step
.map(|s| s.0)
.unwrap_or_else(|| TimeDelta::from_seconds_f64(60.0)),
filter,
})
}
#[pyo3(signature = (ephemeris=None))]
fn compute(
&self,
py: Python<'_>,
ephemeris: Option<&Bound<'_, PySpk>>,
) -> PyResult<PyPowerBudgetResults> {
let scenario = &self.scenario;
let step = self.step;
let filter = self.filter.clone();
let auto_ensemble;
let ensemble = match &self.ensemble {
Some(e) => e,
None => {
auto_ensemble = scenario
.propagate(&DefaultRotationProvider)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
&auto_ensemble
}
};
let results = if let Some(spk_bound) = ephemeris {
let spk = &spk_bound.get().0;
py.detach(|| {
let mut a = PowerBudgetAnalysis::new(scenario, ensemble, spk).with_step(step);
if let Some(f) = &filter {
a = a.with_filter(f.clone());
}
a.compute()
})
} else {
let analytical = AnalyticalSunEphemeris;
py.detach(|| {
let mut a =
PowerBudgetAnalysis::new(scenario, ensemble, &analytical).with_step(step);
if let Some(f) = &filter {
a = a.with_filter(f.clone());
}
a.compute()
})
};
Ok(PyPowerBudgetResults {
results: results.map_err(PyPowerError)?,
})
}
fn __repr__(&self) -> String {
let sc_count = self.scenario.spacecraft().len();
match &self.filter {
Some(SpacecraftFilter::Ids(ids)) => format!(
"PowerBudgetAnalysis({sc_count} spacecraft, filtered to {} ids)",
ids.len()
),
Some(SpacecraftFilter::Constellation(cid)) => {
format!("PowerBudgetAnalysis({sc_count} spacecraft, constellation=\"{cid}\")",)
}
None => format!("PowerBudgetAnalysis({sc_count} spacecraft)"),
}
}
}
fn to_py_time_series(ts: &TimeSeries<Tai>) -> PyTimeSeries {
let dyn_ts = TimeSeries::new(ts.epoch().into_dyn(), ts.series().clone());
PyTimeSeries(dyn_ts)
}
#[pyclass(name = "PowerBudgetResults", module = "lox_space", frozen)]
pub struct PyPowerBudgetResults {
results: PowerBudgetResults,
}
#[pymethods]
impl PyPowerBudgetResults {
fn eclipse_intervals(&self, id: &str) -> Vec<PyInterval> {
let asset_id = AssetId::new(id);
self.results
.eclipse_intervals_for(&asset_id)
.map(|intervals| {
intervals
.iter()
.map(|i| {
PyInterval(TimeInterval::new(i.start().into_dyn(), i.end().into_dyn()))
})
.collect()
})
.unwrap_or_default()
}
fn eclipse_fraction(&self, id: &str) -> Option<f64> {
self.results.eclipse_fraction(&AssetId::new(id))
}
fn sunlit_fraction(&self, id: &str) -> Option<f64> {
self.results.sunlit_fraction(&AssetId::new(id))
}
fn beta_angles(&self, id: &str) -> Option<PyTimeSeries> {
let ts = self.results.beta_angles_for(&AssetId::new(id))?;
Some(to_py_time_series(ts))
}
fn solar_flux(&self, id: &str) -> Option<PyTimeSeries> {
let ts = self.results.solar_flux_for(&AssetId::new(id))?;
Some(to_py_time_series(ts))
}
fn __repr__(&self) -> String {
let n = self.results.all_eclipse_intervals().len();
format!("PowerBudgetResults({n} spacecraft)")
}
}
struct PyAccessError(AccessError);
impl From<PyAccessError> for PyErr {
fn from(err: PyAccessError) -> Self {
PyValueError::new_err(err.0.to_string())
}
}
#[pyclass(name = "Aoi", module = "lox_space", frozen, from_py_object)]
#[derive(Clone, Debug)]
pub struct PyAoi(pub Aoi);
#[pymethods]
impl PyAoi {
#[new]
fn new(coords: Vec<(f64, f64)>) -> Self {
let line_string = geo::LineString::from(coords);
let polygon = geo::Polygon::new(line_string, vec![]);
PyAoi(Aoi::new(polygon))
}
#[classmethod]
fn from_geojson(_cls: &Bound<'_, PyType>, geojson: &str) -> PyResult<Self> {
let aoi = Aoi::from_geojson(geojson).map_err(|e| PyValueError::new_err(e.to_string()))?;
Ok(PyAoi(aoi))
}
fn __repr__(&self) -> String {
let n = self.0.polygon().exterior().0.len();
format!("Aoi({n} vertices)")
}
}
#[pyclass(name = "OpticalPayload", module = "lox_space", frozen, from_py_object)]
#[derive(Clone, Debug)]
pub struct PyOpticalPayload(pub OpticalPayload);
#[pymethods]
impl PyOpticalPayload {
#[classmethod]
fn nadir_only(_cls: &Bound<'_, PyType>, swath_width: PyDistance) -> Self {
PyOpticalPayload(OpticalPayload::nadir_only(swath_width.0))
}
#[classmethod]
fn off_nadir(
_cls: &Bound<'_, PyType>,
swath_width: PyDistance,
max_off_nadir: PyAngle,
) -> Self {
PyOpticalPayload(OpticalPayload::off_nadir(swath_width.0, max_off_nadir.0))
}
fn __repr__(&self) -> String {
"OpticalPayload(...)".to_string()
}
}
#[pyclass(name = "OpticalAccessAnalysis", module = "lox_space", frozen)]
pub struct PyOpticalAccessAnalysis {
scenario: DynScenario,
aois: Vec<(AoiId, Aoi)>,
ensemble: Option<Ensemble<AssetId, Tai, DynOrigin, DynFrame>>,
step: TimeDelta,
body_fixed_frame: Option<DynFrame>,
}
#[pymethods]
impl PyOpticalAccessAnalysis {
#[new]
#[pyo3(signature = (scenario, aois, ensemble=None, step=None, body_fixed_frame=None))]
fn new(
scenario: PyScenario,
aois: Vec<(String, PyAoi)>,
ensemble: Option<PyEnsemble>,
step: Option<PyTimeDelta>,
body_fixed_frame: Option<PyFrame>,
) -> Self {
let aois = aois
.into_iter()
.map(|(id, aoi)| (AoiId::new(id), aoi.0))
.collect();
Self {
scenario: scenario.0,
aois,
ensemble: ensemble.map(|e| e.0),
step: step
.map(|s| s.0)
.unwrap_or_else(|| TimeDelta::from_seconds_f64(60.0)),
body_fixed_frame: body_fixed_frame.map(|f| f.0),
}
}
fn compute(&self, py: Python<'_>) -> PyResult<PyAccessResults> {
let scenario = &self.scenario;
let step = self.step;
let body_fixed_frame = self.body_fixed_frame;
let auto_ensemble;
let ensemble = match &self.ensemble {
Some(e) => e,
None => {
auto_ensemble = scenario
.propagate(&DefaultRotationProvider)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
&auto_ensemble
}
};
let aois = self.aois.clone();
let results = py.detach(|| {
let mut analysis = OpticalAccessAnalysis::new(scenario, ensemble, aois).with_step(step);
if let Some(frame) = body_fixed_frame {
analysis = analysis.with_body_fixed_frame(frame);
}
analysis.compute()
});
Ok(PyAccessResults {
results: results.map_err(PyAccessError)?,
})
}
fn __repr__(&self) -> String {
let sc_count = self.scenario.spacecraft().len();
let aoi_count = self.aois.len();
let aoi_label = if aoi_count == 1 { "AOI" } else { "AOIs" };
format!("OpticalAccessAnalysis({sc_count} spacecraft, {aoi_count} {aoi_label})")
}
}
#[pyclass(
name = "PassDirection",
module = "lox_space",
eq,
eq_int,
hash,
frozen,
from_py_object
)]
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub enum PyPassDirection {
Ascending,
Descending,
}
impl From<PyPassDirection> for crate::analysis::imaging::PassDirection {
fn from(d: PyPassDirection) -> Self {
match d {
PyPassDirection::Ascending => Self::Ascending,
PyPassDirection::Descending => Self::Descending,
}
}
}
impl From<crate::analysis::imaging::PassDirection> for PyPassDirection {
fn from(d: crate::analysis::imaging::PassDirection) -> Self {
match d {
crate::analysis::imaging::PassDirection::Ascending => Self::Ascending,
crate::analysis::imaging::PassDirection::Descending => Self::Descending,
}
}
}
#[pyclass(name = "AccessWindow", module = "lox_space", frozen, from_py_object)]
#[derive(Clone, Copy)]
pub struct PyAccessWindow(pub crate::analysis::imaging::AccessWindow);
#[pymethods]
impl PyAccessWindow {
fn interval(&self) -> PyInterval {
PyInterval(TimeInterval::new(
self.0.interval.start().into_dyn(),
self.0.interval.end().into_dyn(),
))
}
fn direction(&self) -> PyPassDirection {
self.0.direction.into()
}
fn __repr__(&self) -> String {
let dir = match self.0.direction {
crate::analysis::imaging::PassDirection::Ascending => "Ascending",
crate::analysis::imaging::PassDirection::Descending => "Descending",
};
format!(
"AccessWindow({} → {}, {dir})",
self.0.interval.start(),
self.0.interval.end(),
)
}
}
#[pyclass(name = "AccessResults", module = "lox_space", frozen)]
pub struct PyAccessResults {
results: crate::analysis::imaging::AccessResults,
}
#[pymethods]
impl PyAccessResults {
fn windows(&self, spacecraft_id: &str, aoi_id: &str) -> Vec<PyAccessWindow> {
let sc_id = AssetId::new(spacecraft_id);
let aoi_id = AoiId::new(aoi_id);
self.results
.windows(&sc_id, &aoi_id)
.iter()
.map(|w| PyAccessWindow(*w))
.collect()
}
fn all_windows(&self) -> HashMap<(String, String), Vec<PyAccessWindow>> {
self.results
.all_windows()
.iter()
.map(|((sc_id, aoi_id), windows)| {
(
(sc_id.as_str().to_string(), aoi_id.as_str().to_string()),
windows.iter().map(|w| PyAccessWindow(*w)).collect(),
)
})
.collect()
}
fn __repr__(&self) -> String {
let n = self.results.num_pairs();
let label = if n == 1 { "pair" } else { "pairs" };
format!("AccessResults({n} {label})")
}
}
#[pyclass(
name = "LookSide",
module = "lox_space",
eq,
eq_int,
hash,
frozen,
from_py_object
)]
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub enum PyLookSide {
Left,
Right,
Either,
}
impl From<PyLookSide> for LookSide {
fn from(s: PyLookSide) -> Self {
match s {
PyLookSide::Left => LookSide::Left,
PyLookSide::Right => LookSide::Right,
PyLookSide::Either => LookSide::Either,
}
}
}
impl From<LookSide> for PyLookSide {
fn from(s: LookSide) -> Self {
match s {
LookSide::Left => PyLookSide::Left,
LookSide::Right => PyLookSide::Right,
LookSide::Either => PyLookSide::Either,
}
}
}
#[pyclass(name = "SarPayload", module = "lox_space", frozen, from_py_object)]
#[derive(Clone, Copy)]
pub struct PySarPayload(pub SarPayload);
#[pymethods]
impl PySarPayload {
#[classmethod]
fn with_look_angles(
_cls: &Bound<'_, PyType>,
min: PyAngle,
max: PyAngle,
side: PyLookSide,
) -> PyResult<Self> {
SarPayload::with_look_angles(min.0, max.0, side.into())
.map(PySarPayload)
.map_err(|e| PyValueError::new_err(e.to_string()))
}
#[classmethod]
fn with_incidence_angles(
_cls: &Bound<'_, PyType>,
min: PyAngle,
max: PyAngle,
side: PyLookSide,
) -> PyResult<Self> {
SarPayload::with_incidence_angles(min.0, max.0, side.into())
.map(PySarPayload)
.map_err(|e| PyValueError::new_err(e.to_string()))
}
fn side(&self) -> PyLookSide {
self.0.side().into()
}
fn __repr__(&self) -> String {
"SarPayload(...)".to_string()
}
}
#[pyclass(name = "SarAccessAnalysis", module = "lox_space", frozen)]
pub struct PySarAccessAnalysis {
scenario: DynScenario,
aois: Vec<(AoiId, Aoi)>,
ensemble: Option<Ensemble<AssetId, Tai, DynOrigin, DynFrame>>,
step: TimeDelta,
body_fixed_frame: Option<DynFrame>,
}
#[pymethods]
impl PySarAccessAnalysis {
#[new]
#[pyo3(signature = (scenario, aois, ensemble=None, step=None, body_fixed_frame=None))]
fn new(
scenario: PyScenario,
aois: Vec<(String, PyAoi)>,
ensemble: Option<PyEnsemble>,
step: Option<PyTimeDelta>,
body_fixed_frame: Option<PyFrame>,
) -> Self {
let aois = aois
.into_iter()
.map(|(id, aoi)| (AoiId::new(id), aoi.0))
.collect();
Self {
scenario: scenario.0,
aois,
ensemble: ensemble.map(|e| e.0),
step: step
.map(|s| s.0)
.unwrap_or_else(|| TimeDelta::from_seconds_f64(60.0)),
body_fixed_frame: body_fixed_frame.map(|f| f.0),
}
}
fn compute(&self, py: Python<'_>) -> PyResult<PyAccessResults> {
let scenario = &self.scenario;
let step = self.step;
let body_fixed_frame = self.body_fixed_frame;
let auto_ensemble;
let ensemble = match &self.ensemble {
Some(e) => e,
None => {
auto_ensemble = scenario
.propagate(&DefaultRotationProvider)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
&auto_ensemble
}
};
let aois = self.aois.clone();
let results = py.detach(|| {
let mut analysis = SarAccessAnalysis::new(scenario, ensemble, aois).with_step(step);
if let Some(frame) = body_fixed_frame {
analysis = analysis.with_body_fixed_frame(frame);
}
analysis.compute()
});
Ok(PyAccessResults {
results: results.map_err(PyAccessError)?,
})
}
fn __repr__(&self) -> String {
let sc_count = self.scenario.spacecraft().len();
let aoi_count = self.aois.len();
let aoi_label = if aoi_count == 1 { "AOI" } else { "AOIs" };
format!("SarAccessAnalysis({sc_count} spacecraft, {aoi_count} {aoi_label})")
}
}