use super::{MeasurementType, measurement::Measurement};
use core::fmt;
use hifitime::prelude::{Duration, Epoch};
use indexmap::{IndexMap, IndexSet};
use log::{info, warn};
use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::ops::{Add, AddAssign, RangeBounds};
mod io_ccsds_tdm;
mod io_parquet;
#[cfg(feature = "python")]
use pyo3::prelude::*;
#[cfg(feature = "python")]
mod python;
#[derive(Clone, Default)]
#[cfg_attr(feature = "python", pyclass(from_py_object))]
pub struct TrackingDataArc {
pub measurements: Vec<Measurement>,
pub source: Option<String>,
pub moduli: Option<IndexMap<MeasurementType, f64>>,
pub force_reject: bool,
}
#[cfg_attr(feature = "python", pymethods)]
impl TrackingDataArc {
pub fn sort(&mut self) {
self.measurements.sort_unstable_by(|a, b| {
a.epoch
.cmp(&b.epoch)
.then_with(|| a.tracker.cmp(&b.tracker))
});
self.measurements.dedup_by(|next, kept| {
if next.epoch == kept.epoch && next.tracker == kept.tracker {
kept.data.extend(next.data.drain(..));
kept.rejected |= next.rejected;
true
} else {
false
}
});
}
pub fn start_epoch(&self) -> Option<Epoch> {
self.measurements.first().map(|msr| msr.epoch)
}
pub fn end_epoch(&self) -> Option<Epoch> {
self.measurements.last().map(|msr| msr.epoch)
}
pub fn duration(&self) -> Option<Duration> {
match self.start_epoch() {
Some(start) => self.end_epoch().map(|end| end - start),
None => None,
}
}
pub fn len(&self) -> usize {
self.measurements.len()
}
pub fn is_empty(&self) -> bool {
self.measurements.is_empty()
}
pub fn min_duration_sep(&self) -> Option<Duration> {
if self.is_empty() {
None
} else {
let mut min_sep = Duration::MAX;
let mut prev_epoch = self.start_epoch().unwrap();
for msr in self.measurements.iter().skip(1) {
let epoch = msr.epoch;
let this_sep = epoch - prev_epoch;
min_sep = min_sep.min(this_sep);
prev_epoch = epoch;
}
Some(min_sep)
}
}
pub fn set_moduli(&mut self, msr_type: MeasurementType, modulus: f64) {
if modulus.is_nan() || modulus.abs() < f64::EPSILON {
warn!("cannot set modulus for {msr_type:?} to {modulus}");
return;
}
if self.moduli.is_none() {
self.moduli = Some(IndexMap::new());
}
self.moduli.as_mut().unwrap().insert(msr_type, modulus);
}
pub fn apply_moduli(&mut self) {
if let Some(moduli) = &self.moduli {
for msr in &mut self.measurements {
for (msr_type, modulus) in moduli {
if let Some(msr_value) = msr.data.get_mut(msr_type) {
*msr_value %= *modulus;
}
}
}
}
}
pub fn downsample(&self, target_step: Duration) -> Self {
if self.is_empty() {
return self.clone();
}
let current_step = self.min_duration_sep().unwrap();
if current_step >= target_step {
warn!(
"cannot downsample tracking data from {current_step} to {target_step} (that would be upsampling)"
);
return self.clone();
}
let current_hz = 1.0 / current_step.to_seconds();
let target_hz = 1.0 / target_step.to_seconds();
let window_size = (current_hz / target_hz).round() as usize;
info!(
"downsampling tracking data from {current_step} ({current_hz:.6} Hz) to {target_step} ({target_hz:.6} Hz) (N = {window_size})"
);
let mut result = TrackingDataArc {
source: self.source.clone(),
..Default::default()
};
let measurements: Vec<_> = self.measurements.iter().collect();
for (i, msr) in measurements.iter().enumerate().step_by(window_size) {
let epoch = msr.epoch;
let start = i.saturating_sub(window_size / 2);
let end = (i + window_size / 2 + 1).min(measurements.len());
let window = &measurements[start..end];
let mut filtered_measurement = Measurement {
tracker: window[0].tracker.clone(),
epoch,
data: IndexMap::new(),
rejected: false,
};
for mtype in self.unique_types() {
let sum: f64 = window.iter().filter_map(|m| m.data.get(&mtype)).sum();
let count = window
.iter()
.filter(|m| m.data.contains_key(&mtype))
.count();
if count > 0 {
filtered_measurement.data.insert(mtype, sum / count as f64);
}
}
result.measurements.push(filtered_measurement);
}
result.sort();
result
}
pub fn chunk(&self, max_duration: Duration) -> Vec<TrackingDataArc> {
let mut chunks = Vec::new();
if self.is_empty() || max_duration <= Duration::ZERO {
return chunks;
}
let mut start_idx = 0;
let total_measurements = self.measurements.len();
while start_idx < total_measurements {
let chunk_start_epoch = self.measurements[start_idx].epoch;
let chunk_end_time = chunk_start_epoch + max_duration;
let remaining = &self.measurements[start_idx..];
let offset = remaining.partition_point(|msr| msr.epoch <= chunk_end_time);
let end_idx = start_idx + offset;
let chunk_measurements = self.measurements[start_idx..end_idx].to_vec();
chunks.push(TrackingDataArc {
measurements: chunk_measurements,
source: self.source.clone(),
moduli: self.moduli.clone(),
force_reject: self.force_reject,
});
start_idx = end_idx;
}
chunks
}
}
impl TrackingDataArc {
fn resolve_bounds<R: RangeBounds<Epoch>>(&self, bound: R) -> (usize, usize) {
let start_idx = match bound.start_bound() {
Bound::Included(&epoch) => self.measurements.partition_point(|m| m.epoch < epoch),
Bound::Excluded(&epoch) => self.measurements.partition_point(|m| m.epoch <= epoch),
Bound::Unbounded => 0,
};
let end_idx = match bound.end_bound() {
Bound::Included(&epoch) => self.measurements.partition_point(|m| m.epoch <= epoch),
Bound::Excluded(&epoch) => self.measurements.partition_point(|m| m.epoch < epoch),
Bound::Unbounded => self.measurements.len(),
};
(start_idx, end_idx)
}
pub fn unique_aliases(&self) -> IndexSet<String> {
self.unique().0
}
pub fn unique_types(&self) -> IndexSet<MeasurementType> {
self.unique().1
}
pub fn unique(&self) -> (IndexSet<String>, IndexSet<MeasurementType>) {
let mut aliases = IndexSet::new();
let mut types = IndexSet::new();
for msr in &self.measurements {
aliases.insert(msr.tracker.clone());
for k in msr.data.keys() {
types.insert(*k);
}
}
(aliases, types)
}
pub fn filter_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
let (start_idx, end_idx) = self.resolve_bounds(bound);
if start_idx >= end_idx || start_idx >= self.measurements.len() {
self.measurements.clear();
return self;
}
self.measurements.truncate(end_idx);
self.measurements.drain(0..start_idx);
self.measurements.shrink_to_fit();
self
}
pub fn filter_by_offset<R: RangeBounds<Duration>>(self, bound: R) -> Self {
if self.is_empty() {
return self;
}
let start = match bound.start_bound() {
Unbounded => self.start_epoch().unwrap(),
Included(offset) | Excluded(offset) => self.start_epoch().unwrap() + *offset,
};
let end = match bound.end_bound() {
Unbounded => self.end_epoch().unwrap(),
Included(offset) | Excluded(offset) => self.start_epoch().unwrap() + *offset,
};
self.filter_by_epoch(start..end)
}
pub fn filter_by_tracker(mut self, tracker: String) -> Self {
self.measurements = self
.measurements
.iter()
.filter_map(|msr| {
if msr.tracker == tracker {
Some(msr.clone())
} else {
None
}
})
.collect::<Vec<Measurement>>();
self
}
pub fn filter_by_measurement_type(mut self, included_type: MeasurementType) -> Self {
self.measurements.retain_mut(|msr| {
msr.data.retain(|msr_type, _| *msr_type == included_type);
!msr.data.is_empty()
});
self
}
pub fn exclude_tracker(mut self, excluded_tracker: String) -> Self {
self.measurements = self
.measurements
.iter()
.filter_map(|msr| {
if msr.tracker != excluded_tracker {
Some(msr.clone())
} else {
None
}
})
.collect::<Vec<Measurement>>();
self
}
pub fn exclude_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
let (start_idx, end_idx) = self.resolve_bounds(bound);
if start_idx < end_idx && start_idx < self.measurements.len() {
self.measurements.drain(start_idx..end_idx);
}
self
}
pub fn exclude_measurement_type(mut self, excluded_type: MeasurementType) -> Self {
self.measurements = self
.measurements
.iter_mut()
.map(|msr| {
msr.data.retain(|msr_type, _| *msr_type != excluded_type);
msr.clone()
})
.collect::<Vec<Measurement>>();
self
}
pub fn reject_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
let (start_idx, end_idx) = self.resolve_bounds(bound);
if start_idx < end_idx && start_idx < self.measurements.len() {
for msr in &mut self.measurements[start_idx..end_idx] {
msr.rejected = true;
}
}
self
}
pub fn reject_by_tracker(mut self, tracker: &str) -> Self {
for msr in &mut self.measurements {
if msr.tracker == tracker {
msr.rejected = true;
}
}
self
}
pub fn resid_vs_ref_check(mut self) -> Self {
self.force_reject = true;
self
}
}
impl fmt::Display for TrackingDataArc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.is_empty() {
write!(f, "Empty tracking arc")
} else {
let start = self.start_epoch().unwrap();
let end = self.end_epoch().unwrap();
let src = match &self.source {
Some(src) => format!(" (source: {src})"),
None => String::new(),
};
write!(
f,
"Tracking arc with {} measurements of type {:?} over {} (from {start} to {end}) with trackers {:?}{src}",
self.len(),
self.unique_types(),
end - start,
self.unique_aliases()
)
}
}
}
impl fmt::Debug for TrackingDataArc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self} @ {self:p}")
}
}
impl PartialEq for TrackingDataArc {
fn eq(&self, other: &Self) -> bool {
self.measurements == other.measurements
}
}
impl Add for TrackingDataArc {
type Output = Self;
fn add(mut self, rhs: Self) -> Self::Output {
self.force_reject = false;
self.measurements.extend(rhs.measurements);
self.sort();
self.force_reject = false;
self
}
}
impl AddAssign for TrackingDataArc {
fn add_assign(&mut self, rhs: Self) {
*self = self.clone() + rhs;
}
}