use super::{Measurement, MeasurementType, TrackingDataArc};
use crate::io::{ExportCfg, InputOutputError};
use hifitime::{Duration, Epoch};
use pyo3::prelude::*;
use pyo3::types::PyType;
use std::collections::HashMap;
use std::ops::Bound::{Excluded, Included, Unbounded};
#[pymethods]
impl TrackingDataArc {
#[new]
fn py_new(measurements: Vec<Measurement>) -> Self {
let mut trk_data = Self {
measurements,
source: None,
moduli: None,
force_reject: false,
};
trk_data.sort();
trk_data
}
#[classmethod]
#[pyo3(name = "from_ccsds_tdm")]
fn py_from_ccsds_tdm_file(
_cls: Bound<'_, PyType>,
path: &str,
aliases: Option<HashMap<String, String>>,
) -> Result<Self, InputOutputError> {
TrackingDataArc::from_tdm(path, aliases)
}
#[pyo3(name = "write_ccsds_tdm")]
fn py_write_ccsds_tdm(
&self,
spacecraft_name: String,
aliases: Option<HashMap<String, String>>,
path: &str,
) -> Result<String, InputOutputError> {
Ok(self
.clone()
.to_tdm_file(spacecraft_name, aliases, path, ExportCfg::default())?
.to_str()
.unwrap_or("woah_bug_building_path")
.to_string())
}
#[pyo3(name = "unique_aliases")]
fn py_unique_aliases(&self) -> Vec<String> {
self.unique_aliases().iter().cloned().collect()
}
#[pyo3(name = "unique_types")]
fn py_unique_types(&self) -> Vec<MeasurementType> {
self.unique_types().iter().cloned().collect()
}
fn __str__(&self) -> String {
format!("{self}")
}
fn __repr__(&self) -> String {
format!("{self} @ {self:p}")
}
#[getter]
fn get_force_reject(&self) -> bool {
self.force_reject
}
#[setter]
fn set_force_reject(&mut self, reject: bool) {
self.force_reject = reject;
}
#[pyo3(name = "filter_by_epoch")]
fn py_filter_by_epoch(&self, start: Option<Epoch>, end: Option<Epoch>) -> Self {
let start_bound = start.map(Included).unwrap_or(Unbounded);
let end_bound = end.map(Excluded).unwrap_or(Unbounded);
self.clone().filter_by_epoch((start_bound, end_bound))
}
#[pyo3(name = "filter_by_offset")]
fn py_filter_by_offset(&self, start: Option<Duration>, end: Option<Duration>) -> Self {
let start_bound = match start {
Some(s) => Included(s),
None => Unbounded,
};
let end_bound = match end {
Some(e) => Excluded(e),
None => Unbounded,
};
self.clone().filter_by_offset((start_bound, end_bound))
}
#[pyo3(name = "filter_by_tracker")]
fn py_filter_by_tracker(&self, tracker: String) -> Self {
self.clone().filter_by_tracker(tracker)
}
#[pyo3(name = "filter_by_measurement_type")]
fn py_filter_by_measurement_type(&self, msr_type: MeasurementType) -> Self {
self.clone().filter_by_measurement_type(msr_type)
}
#[pyo3(name = "exclude_tracker")]
fn py_exclude_tracker(&self, tracker: String) -> Self {
self.clone().exclude_tracker(tracker)
}
#[pyo3(name = "exclude_by_epoch")]
fn py_exclude_by_epoch(&self, start: Option<Epoch>, end: Option<Epoch>) -> Self {
let start_bound = match start {
Some(s) => Included(s),
None => Unbounded,
};
let end_bound = match end {
Some(e) => Excluded(e),
None => Unbounded,
};
self.clone().exclude_by_epoch((start_bound, end_bound))
}
#[pyo3(name = "exclude_measurement_type")]
fn py_exclude_measurement_type(&self, msr_type: MeasurementType) -> Self {
self.clone().exclude_measurement_type(msr_type)
}
#[pyo3(name = "resid_vs_ref_check")]
fn py_resid_vs_ref_check(&self) -> Self {
self.clone().resid_vs_ref_check()
}
}