nyx-space 2.4.0

Flight-proven, blazing fast astrodynamics from preliminary design to operations
Documentation
/*
    Nyx, blazing fast astrodynamics
    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published
    by the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
*/

use super::{Measurement, MeasurementType, TrackingDataArc};
use crate::io::{ExportCfg, InputOutputError};
use hifitime::{Duration, Epoch};
use pyo3::prelude::*;
use pyo3::types::PyType;
use std::collections::HashMap;
use std::ops::Bound::{Excluded, Included, Unbounded};

#[pymethods]
impl TrackingDataArc {
    #[new]
    fn py_new(measurements: Vec<Measurement>) -> Self {
        let mut trk_data = Self {
            measurements,
            source: None,
            moduli: None,
            force_reject: false,
        };

        trk_data.sort();

        trk_data
    }

    /// Initializes a new Almanac from a file path to CCSDS OEM file, after converting to to SPICE SPK/BSP
    ///
    /// :type path: str
    /// :type aliases: dict
    /// :rtype: nyx_space.od.TrackingDataArc
    #[classmethod]
    #[pyo3(name = "from_ccsds_tdm")]
    fn py_from_ccsds_tdm_file(
        _cls: Bound<'_, PyType>,
        path: &str,
        aliases: Option<HashMap<String, String>>,
    ) -> Result<Self, InputOutputError> {
        TrackingDataArc::from_tdm(path, aliases)
    }

    #[pyo3(name = "write_ccsds_tdm")]
    fn py_write_ccsds_tdm(
        &self,
        spacecraft_name: String,
        aliases: Option<HashMap<String, String>>,
        path: &str,
    ) -> Result<String, InputOutputError> {
        Ok(self
            .clone()
            .to_tdm_file(spacecraft_name, aliases, path, ExportCfg::default())?
            .to_str()
            .unwrap_or("woah_bug_building_path")
            .to_string())
    }

    #[pyo3(name = "unique_aliases")]
    fn py_unique_aliases(&self) -> Vec<String> {
        self.unique_aliases().iter().cloned().collect()
    }
    #[pyo3(name = "unique_types")]
    fn py_unique_types(&self) -> Vec<MeasurementType> {
        self.unique_types().iter().cloned().collect()
    }

    fn __str__(&self) -> String {
        format!("{self}")
    }

    fn __repr__(&self) -> String {
        format!("{self} @ {self:p}")
    }

    #[getter]
    fn get_force_reject(&self) -> bool {
        self.force_reject
    }

    #[setter]
    fn set_force_reject(&mut self, reject: bool) {
        self.force_reject = reject;
    }

    #[pyo3(name = "filter_by_epoch")]
    fn py_filter_by_epoch(&self, start: Option<Epoch>, end: Option<Epoch>) -> Self {
        let start_bound = start.map(Included).unwrap_or(Unbounded);
        let end_bound = end.map(Excluded).unwrap_or(Unbounded);
        self.clone().filter_by_epoch((start_bound, end_bound))
    }

    #[pyo3(name = "filter_by_offset")]
    fn py_filter_by_offset(&self, start: Option<Duration>, end: Option<Duration>) -> Self {
        let start_bound = match start {
            Some(s) => Included(s),
            None => Unbounded,
        };
        let end_bound = match end {
            Some(e) => Excluded(e),
            None => Unbounded,
        };
        self.clone().filter_by_offset((start_bound, end_bound))
    }

    #[pyo3(name = "filter_by_tracker")]
    fn py_filter_by_tracker(&self, tracker: String) -> Self {
        self.clone().filter_by_tracker(tracker)
    }

    #[pyo3(name = "filter_by_measurement_type")]
    fn py_filter_by_measurement_type(&self, msr_type: MeasurementType) -> Self {
        self.clone().filter_by_measurement_type(msr_type)
    }

    #[pyo3(name = "exclude_tracker")]
    fn py_exclude_tracker(&self, tracker: String) -> Self {
        self.clone().exclude_tracker(tracker)
    }

    #[pyo3(name = "exclude_by_epoch")]
    fn py_exclude_by_epoch(&self, start: Option<Epoch>, end: Option<Epoch>) -> Self {
        let start_bound = match start {
            Some(s) => Included(s),
            None => Unbounded,
        };
        let end_bound = match end {
            Some(e) => Excluded(e),
            None => Unbounded,
        };
        self.clone().exclude_by_epoch((start_bound, end_bound))
    }

    #[pyo3(name = "exclude_measurement_type")]
    fn py_exclude_measurement_type(&self, msr_type: MeasurementType) -> Self {
        self.clone().exclude_measurement_type(msr_type)
    }

    #[pyo3(name = "resid_vs_ref_check")]
    fn py_resid_vs_ref_check(&self) -> Self {
        self.clone().resid_vs_ref_check()
    }
}