#![cfg(feature = "polars")]
use ahash::AHashMap;
use itertools::{Either, izip};
use polars::{
frame::DataFrame,
lazy::frame::LazyFrame,
prelude::{ChunkedArray, Column, DataType, SortMultipleOptions, StringType, UInt32Type},
};
use crate::{
NightId, TrajId,
coordinates::equatorial::EquCoord,
io::polars::{
base_field::BaseFields,
error::PolarsError,
observer_field::{RawObsRow, ResolvedObserver, resolve_observer},
},
observation_dataset::{
ObsDataset,
index::{NightIndexMap, ObsMapIndex, TrajIndexMap},
observation::ObservationInput,
},
observer::{Observer, dataset::ObserverId, error_model::ObsErrorModel},
photometry::Photometry,
};
pub(crate) mod base_field;
pub mod error;
pub(crate) mod observer_field;
mod sealed {
pub trait Sealed {}
}
pub trait IntoFrame: sealed::Sealed {
fn collect_frame(self) -> Result<DataFrame, PolarsError>;
}
impl sealed::Sealed for DataFrame {}
impl IntoFrame for DataFrame {
#[inline]
fn collect_frame(self) -> Result<DataFrame, PolarsError> {
Ok(self)
}
}
impl sealed::Sealed for &DataFrame {}
impl IntoFrame for &DataFrame {
#[inline]
fn collect_frame(self) -> Result<DataFrame, PolarsError> {
Ok(self.clone())
}
}
impl sealed::Sealed for LazyFrame {}
impl IntoFrame for LazyFrame {
#[inline]
fn collect_frame(self) -> Result<DataFrame, PolarsError> {
Ok(self.collect()?)
}
}
#[inline]
pub(crate) fn f64_slice(col: &Column) -> Result<&[f64], PolarsError> {
Ok(col.as_materialized_series().f64()?.cont_slice()?)
}
#[inline]
pub(crate) fn u64_slice(col: &Column) -> Result<&[u64], PolarsError> {
Ok(col.as_materialized_series().u64()?.cont_slice()?)
}
fn iter_opt_f64<'df>(
df: &'df DataFrame,
name: &str,
n: usize,
) -> Result<impl Iterator<Item = Option<f64>> + 'df, PolarsError> {
match df.column(name) {
Ok(col) => Ok(Either::Left(col.as_materialized_series().f64()?.iter())),
Err(_) => Ok(Either::Right(std::iter::repeat_n(None, n))),
}
}
fn iter_opt_str<'df>(
df: &'df DataFrame,
name: &str,
n: usize,
) -> Result<impl Iterator<Item = Option<&'df str>> + 'df, PolarsError> {
match df.column(name) {
Ok(col) => Ok(Either::Left(col.as_materialized_series().str()?.iter())),
Err(_) => Ok(Either::Right(std::iter::repeat_n(None, n))),
}
}
pub enum ContiguousChoice {
ContiguousNight,
ContiguousTraj,
}
pub struct FromPolarsArgs {
pub error_model: Option<ObsErrorModel>,
pub do_rechunk: Option<bool>,
pub contiguous_choice: Option<ContiguousChoice>,
}
impl Default for FromPolarsArgs {
fn default() -> Self {
Self {
error_model: None,
do_rechunk: false.into(),
contiguous_choice: ContiguousChoice::ContiguousNight.into(),
}
}
}
fn sort_and_rechunk(df: &DataFrame, col_name: &str) -> Result<DataFrame, PolarsError> {
let mut sorted = df.sort(
[col_name],
SortMultipleOptions::default()
.with_nulls_last(true)
.with_maintain_order(true),
)?;
sorted.rechunk_mut();
Ok(sorted)
}
#[inline]
fn intern_observer(
raw: &RawObsRow<'_>,
row_idx: usize,
custom_observers: &mut Vec<Observer>,
observer_lookup: &mut AHashMap<Observer, usize>,
) -> Result<Option<ObserverId>, PolarsError> {
match resolve_observer(raw, row_idx)? {
ResolvedObserver::Geodetic(observer) => {
let idx = match observer_lookup.get(&observer) {
Some(&i) => i,
None => {
let i = custom_observers.len();
custom_observers.push(observer.clone());
observer_lookup.insert(observer, i);
i
}
};
Ok(Some(ObserverId::IntId(idx)))
}
ResolvedObserver::Mpc(id) => Ok(Some(id)),
ResolvedObserver::None => Ok(None),
}
}
struct ContiguousGroupTracker<K, I> {
current: Option<(K, usize)>,
make_entry: fn(usize, usize) -> I,
}
impl<K: Clone + Eq, I> ContiguousGroupTracker<K, I> {
fn new(make_entry: fn(usize, usize) -> I) -> Self {
Self {
current: None,
make_entry,
}
}
fn on_row(&mut self, row_idx: usize, key: Option<K>) -> Option<(K, I)> {
match key {
Some(k) => match &self.current {
Some((ck, _)) if *ck == k => {
None
}
Some((prev_key, start)) => {
let entry = (self.make_entry)(*start, row_idx);
let finished = (prev_key.clone(), entry);
self.current = Some((k, row_idx));
Some(finished)
}
None => {
self.current = Some((k, row_idx));
None
}
},
None => {
self.current.take().map(|(key, start)| {
let entry = (self.make_entry)(start, row_idx);
(key, entry)
})
}
}
}
fn finalize(mut self, n: usize) -> Option<(K, I)> {
self.current.take().map(|(key, start)| {
let entry = (self.make_entry)(start, n);
(key, entry)
})
}
}
pub(crate) fn load_observation_from_polars<T: IntoFrame>(
frame: T,
args: FromPolarsArgs,
) -> Result<ObsDataset, PolarsError> {
let df = frame.collect_frame()?;
let mut df = df;
if df
.columns()
.iter()
.any(|c: &Column| c.as_materialized_series().chunks().len() > 1)
&& args.do_rechunk.unwrap_or(true)
{
df.rechunk_mut();
}
load_observation_from_frame(&df, args)
}
fn load_observation_from_frame(
df: &DataFrame,
args: FromPolarsArgs,
) -> Result<ObsDataset, PolarsError> {
let has_night_col = df.column("night_id").is_ok();
let has_traj_col = df.column("traj_id").is_ok();
let contiguous_col: Option<&str> = match &args.contiguous_choice {
Some(ContiguousChoice::ContiguousNight) if has_night_col => Some("night_id"),
Some(ContiguousChoice::ContiguousTraj) if has_traj_col => Some("traj_id"),
_ => None,
};
let sorted_df_storage: DataFrame;
let df: &DataFrame = if let Some(col_name) = contiguous_col {
sorted_df_storage = sort_and_rechunk(df, col_name)?;
&sorted_df_storage
} else {
df
};
let base = BaseFields::materialize_fields(df)?;
let n = base.ids.len();
let obs_lon = iter_opt_f64(df, "obs_lon", n)?;
let obs_lat = iter_opt_f64(df, "obs_lat", n)?;
let obs_alt = iter_opt_f64(df, "obs_alt", n)?;
let obs_ra_acc = iter_opt_f64(df, "obs_ra_acc", n)?;
let obs_dec_acc = iter_opt_f64(df, "obs_dec_acc", n)?;
let mpc_codes = iter_opt_str(df, "mpc_code_obs", n)?;
let night_ca: Option<&ChunkedArray<UInt32Type>> = match df.column("night_id") {
Err(_) => None,
Ok(col) => Some(
col.as_materialized_series()
.u32()
.map_err(|_| PolarsError::NightIdColumnTypeError(col.dtype().to_string()))?,
),
};
let traj_u32_ca: Option<&ChunkedArray<UInt32Type>>;
let traj_str_ca: Option<&ChunkedArray<StringType>>;
match df.column("traj_id") {
Err(_) => {
traj_u32_ca = None;
traj_str_ca = None;
}
Ok(col) => match col.dtype() {
DataType::UInt32 => {
traj_u32_ca = Some(col.as_materialized_series().u32()?);
traj_str_ca = None;
}
DataType::String => {
traj_u32_ca = None;
traj_str_ca = Some(col.as_materialized_series().str()?);
}
other => return Err(PolarsError::TrajIdColumnTypeError(other.to_string())),
},
}
let has_night = night_ca.is_some();
let has_traj = traj_u32_ca.is_some() || traj_str_ca.is_some();
let night_iter = night_ca
.map(|ca| Either::Left(ca.iter().map(|opt| opt.map(NightId))))
.unwrap_or_else(|| Either::Right(std::iter::repeat_n(None, n)));
let traj_iter = match (traj_u32_ca, traj_str_ca) {
(Some(ca), _) => Either::Left(Either::Left(
ca.iter().map(|opt: Option<u32>| opt.map(TrajId::Int)),
)),
(_, Some(ca)) => Either::Left(Either::Right(
ca.iter()
.map(|opt: Option<&str>| opt.map(|s| TrajId::Str(s.to_owned()))),
)),
(None, None) => Either::Right(std::iter::repeat_n(None::<TrajId>, n)),
};
let mut custom_observers: Vec<Observer> = Vec::with_capacity(16);
let mut observer_lookup: AHashMap<Observer, usize> = AHashMap::with_capacity(16);
let mut night_map: Option<NightIndexMap> = has_night.then(NightIndexMap::new);
let mut traj_map: Option<TrajIndexMap> = has_traj.then(TrajIndexMap::new);
let night_is_contiguous = matches!(contiguous_col, Some("night_id"));
let traj_is_contiguous = matches!(contiguous_col, Some("traj_id"));
let mut night_tracker: ContiguousGroupTracker<NightId, ObsMapIndex> =
ContiguousGroupTracker::new(|start, end| ObsMapIndex::Contiguous { start, end });
let mut traj_tracker: ContiguousGroupTracker<TrajId, ObsMapIndex> =
ContiguousGroupTracker::new(|start, end| ObsMapIndex::Contiguous { start, end });
let mut observations: Vec<ObservationInput> = Vec::with_capacity(n);
for (
row_idx,
(&id, &ra, &ra_err, &dec, &dec_err, &mag, &mag_err, &mjd_tt, filter),
obs_lon,
obs_lat,
obs_alt,
obs_ra_acc,
obs_dec_acc,
mpc_code,
night_id,
traj_id,
) in izip!(
0usize..,
base.iter_base_fields()?,
obs_lon,
obs_lat,
obs_alt,
obs_ra_acc,
obs_dec_acc,
mpc_codes,
night_iter,
traj_iter,
) {
let raw = RawObsRow {
obs_lon,
obs_lat,
obs_alt,
obs_ra_acc,
obs_dec_acc,
mpc_code,
};
let observer_id =
intern_observer(&raw, row_idx, &mut custom_observers, &mut observer_lookup)?;
if let Some(map) = &mut night_map {
if night_is_contiguous {
if let Some((key, entry)) = night_tracker.on_row(row_idx, night_id) {
map.insert(key, entry);
}
} else if let Some(nid) = night_id {
map.entry(nid)
.or_insert_with(|| ObsMapIndex::Split(Vec::new()))
.push_split(row_idx);
}
}
if let Some(map) = &mut traj_map {
if traj_is_contiguous {
if let Some((key, entry)) = traj_tracker.on_row(row_idx, traj_id.clone()) {
map.insert(key, entry);
}
} else if let Some(tid) = traj_id.clone() {
map.entry(tid)
.or_insert_with(|| ObsMapIndex::Split(Vec::new()))
.push_split(row_idx);
}
}
observations.push(ObservationInput {
id,
equ_coord: EquCoord::new(ra, ra_err, dec, dec_err),
photometry: Photometry {
magnitude: mag,
error: mag_err,
filter,
},
mjd_tt,
observer: observer_id,
});
}
if night_is_contiguous
&& let (Some(map), Some((key, entry))) = (&mut night_map, night_tracker.finalize(n))
{
map.insert(key, entry);
}
if traj_is_contiguous
&& let (Some(map), Some((key, entry))) = (&mut traj_map, traj_tracker.finalize(n))
{
map.insert(key, entry);
}
Ok(ObsDataset::new(
observations,
custom_observers,
args.error_model,
night_map,
traj_map,
))
}
#[cfg(test)]
mod polars_reader_tests {
use super::*;
use crate::observation_dataset::observation::Observation;
use crate::photometry::Filter;
use polars::frame::DataFrame;
fn base_columns_single_row() -> Vec<Column> {
vec![
Column::new("id".into(), &[42u64]),
Column::new("ra".into(), &[10.5f64]),
Column::new("ra_err".into(), &[0.001f64]),
Column::new("dec".into(), &[-5.0f64]),
Column::new("dec_err".into(), &[0.001f64]),
Column::new("magnitude".into(), &[15.2f64]),
Column::new("mag_err".into(), &[0.05f64]),
Column::new("filter".into(), &["G"]),
Column::new("mjd_tt".into(), &[60000.0f64]),
]
}
fn base_columns_two_rows() -> Vec<Column> {
vec![
Column::new("id".into(), &[1u64, 2u64]),
Column::new("ra".into(), &[10.5f64, 20.0f64]),
Column::new("ra_err".into(), &[0.001f64, 0.002f64]),
Column::new("dec".into(), &[-5.0f64, 15.0f64]),
Column::new("dec_err".into(), &[0.001f64, 0.002f64]),
Column::new("magnitude".into(), &[15.2f64, 16.0f64]),
Column::new("mag_err".into(), &[0.05f64, 0.06f64]),
Column::new("filter".into(), &["G", "V"]),
Column::new("mjd_tt".into(), &[60000.0f64, 60001.0f64]),
]
}
#[test]
fn test_no_observer_columns() {
let df = DataFrame::new_infer_height(base_columns_single_row())
.expect("DataFrame construction must succeed for valid base columns");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
assert!(
result.is_ok(),
"Expected Ok for a DataFrame with only base columns, got: {:?}",
result.err()
);
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
assert_eq!(obs.len(), 1, "Expected exactly 1 observation");
assert!(
obs[0].observer.is_none(),
"Expected observer to be None when no observer columns are present"
);
}
#[test]
fn test_mpc_code_observer() {
let mut cols = base_columns_single_row();
let mpc: Vec<Option<&str>> = vec![Some("I41")];
cols.push(Column::new("mpc_code_obs".into(), mpc));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
assert!(
result.is_ok(),
"Expected Ok for valid MPC code, got: {:?}",
result.err()
);
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
assert_eq!(obs.len(), 1);
match obs[0].observer {
Some(ObserverId::MpcCode(code)) => {
assert_eq!(code, *b"I41", "MPC code bytes must match \"I41\"");
}
other => panic!("Expected Some(ObserverId::MpcCode(*b\"I41\")), got: {other:?}"),
}
}
#[test]
fn test_geodetic_observer() {
let mut cols = base_columns_single_row();
let obs_lon: Vec<Option<f64>> = vec![Some(15.0)];
let obs_lat: Vec<Option<f64>> = vec![Some(48.0)];
let obs_alt: Vec<Option<f64>> = vec![Some(200.0)];
let obs_ra_acc: Vec<Option<f64>> = vec![Some(1e-4)];
let obs_dec_acc: Vec<Option<f64>> = vec![Some(1e-4)];
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
cols.push(Column::new("obs_ra_acc".into(), obs_ra_acc));
cols.push(Column::new("obs_dec_acc".into(), obs_dec_acc));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
assert!(
result.is_ok(),
"Expected Ok for fully specified geodetic observer, got: {:?}",
result.err()
);
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
assert_eq!(obs.len(), 1);
assert!(
matches!(obs[0].observer, Some(ObserverId::IntId(0))),
"Expected Some(ObserverId::IntId(0)), got: {:?}",
obs[0].observer
);
}
#[test]
fn test_geodetic_interning() {
let mut cols = base_columns_two_rows();
let obs_lon: Vec<Option<f64>> = vec![Some(15.0), Some(15.0)];
let obs_lat: Vec<Option<f64>> = vec![Some(48.0), Some(48.0)];
let obs_alt: Vec<Option<f64>> = vec![Some(200.0), Some(200.0)];
let obs_ra_acc: Vec<Option<f64>> = vec![Some(1e-4), Some(1e-4)];
let obs_dec_acc: Vec<Option<f64>> = vec![Some(1e-4), Some(1e-4)];
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
cols.push(Column::new("obs_ra_acc".into(), obs_ra_acc));
cols.push(Column::new("obs_dec_acc".into(), obs_dec_acc));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
assert!(
result.is_ok(),
"Expected Ok for two identical geodetic observers, got: {:?}",
result.err()
);
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
assert_eq!(obs.len(), 2, "Expected exactly 2 observations");
assert!(
matches!(obs[0].observer, Some(ObserverId::IntId(0))),
"Expected first observation to reference IntId(0), got: {:?}",
obs[0].observer
);
assert!(
matches!(obs[1].observer, Some(ObserverId::IntId(0))),
"Expected second observation to reference IntId(0) (interned), got: {:?}",
obs[1].observer
);
}
#[test]
fn test_partial_triplet_error() {
let mut cols = base_columns_single_row();
let obs_lon: Vec<Option<f64>> = vec![Some(15.0)];
let obs_lat: Vec<Option<f64>> = vec![None];
let obs_alt: Vec<Option<f64>> = vec![None];
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
match result {
Err(PolarsError::PartialTripletNull { .. }) => { }
Err(other) => panic!("Expected PolarsError::PartialTripletNull, got: {other:?}"),
Ok(_) => panic!("Expected Err for partially-null geodetic triplet, got Ok"),
}
}
#[test]
fn test_missing_accuracy_error() {
let mut cols = base_columns_single_row();
let obs_lon: Vec<Option<f64>> = vec![Some(15.0)];
let obs_lat: Vec<Option<f64>> = vec![Some(48.0)];
let obs_alt: Vec<Option<f64>> = vec![Some(200.0)];
let obs_ra_acc: Vec<Option<f64>> = vec![None];
let obs_dec_acc: Vec<Option<f64>> = vec![Some(1e-4)];
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
cols.push(Column::new("obs_ra_acc".into(), obs_ra_acc));
cols.push(Column::new("obs_dec_acc".into(), obs_dec_acc));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
match result {
Err(PolarsError::MissingAccuracyForGeodesic(_)) => { }
Err(other) => {
panic!("Expected PolarsError::MissingAccuracyForGeodesic, got: {other:?}")
}
Ok(_) => panic!(
"Expected Err when obs_ra_acc is null but geodetic triplet is complete, got Ok"
),
}
}
#[test]
fn test_invalid_mpc_code() {
let mut cols = base_columns_single_row();
let mpc: Vec<Option<&str>> = vec![Some("ABCD")];
cols.push(Column::new("mpc_code_obs".into(), mpc));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
match result {
Err(PolarsError::InvalidMpcCode(_, _)) => { }
Err(other) => panic!("Expected PolarsError::InvalidMpcCode, got: {other:?}"),
Ok(_) => panic!("Expected Err for a four-byte MPC code, got Ok"),
}
}
#[test]
fn test_invalid_mpc_code_too_short() {
let mut cols = base_columns_single_row();
let mpc: Vec<Option<&str>> = vec![Some("AB")];
cols.push(Column::new("mpc_code_obs".into(), mpc));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
match result {
Err(PolarsError::InvalidMpcCode(_, _)) => { }
Err(other) => panic!("Expected PolarsError::InvalidMpcCode, got: {other:?}"),
Ok(_) => panic!("Expected Err for a two-byte MPC code, got Ok"),
}
}
#[test]
fn test_mpc_takes_precedence_over_geodetic() {
let mut cols = base_columns_single_row();
let mpc: Vec<Option<&str>> = vec![Some("I41")];
let obs_lon: Vec<Option<f64>> = vec![Some(15.0)];
let obs_lat: Vec<Option<f64>> = vec![Some(48.0)];
let obs_alt: Vec<Option<f64>> = vec![Some(200.0)];
let obs_ra_acc: Vec<Option<f64>> = vec![Some(1e-4)];
let obs_dec_acc: Vec<Option<f64>> = vec![Some(1e-4)];
cols.push(Column::new("mpc_code_obs".into(), mpc));
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
cols.push(Column::new("obs_ra_acc".into(), obs_ra_acc));
cols.push(Column::new("obs_dec_acc".into(), obs_dec_acc));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
assert!(
result.is_ok(),
"Expected Ok when MPC code and geodetic triplet coexist, got: {:?}",
result.err()
);
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
assert_eq!(obs.len(), 1);
assert!(
matches!(obs[0].observer, Some(ObserverId::MpcCode(_))),
"Expected MPC code to take precedence over geodetic triplet, \
but got: {:?}",
obs[0].observer
);
}
#[test]
fn test_filter_column_integer() {
let cols = vec![
Column::new("id".into(), &[42u64]),
Column::new("ra".into(), &[10.5f64]),
Column::new("ra_err".into(), &[0.001f64]),
Column::new("dec".into(), &[-5.0f64]),
Column::new("dec_err".into(), &[0.001f64]),
Column::new("magnitude".into(), &[15.2f64]),
Column::new("mag_err".into(), &[0.05f64]),
Column::new("filter".into(), &[7u32]),
Column::new("mjd_tt".into(), &[60000.0f64]),
];
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
assert!(
result.is_ok(),
"Expected Ok for a UInt32 filter column, got: {:?}",
result.err()
);
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
assert_eq!(obs.len(), 1);
assert!(
matches!(obs[0].photometry.filter, Filter::Int(7)),
"Expected Filter::Int(7), got: {:?}",
obs[0].photometry.filter
);
}
#[test]
fn test_filter_column_uint8() {
use polars::prelude::{Column, IntoSeries, NewChunkedArray, Series, UInt8Chunked};
let filter_series: Series = UInt8Chunked::from_slice("filter".into(), &[3u8]).into_series();
let cols = vec![
Column::new("id".into(), &[42u64]),
Column::new("ra".into(), &[10.5f64]),
Column::new("ra_err".into(), &[0.001f64]),
Column::new("dec".into(), &[-5.0f64]),
Column::new("dec_err".into(), &[0.001f64]),
Column::new("magnitude".into(), &[15.2f64]),
Column::new("mag_err".into(), &[0.05f64]),
Column::from(filter_series),
Column::new("mjd_tt".into(), &[60000.0f64]),
];
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
assert!(
result.is_ok(),
"Expected Ok for a UInt8 filter column, got: {:?}",
result.err()
);
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
assert_eq!(obs.len(), 1);
assert!(
matches!(obs[0].photometry.filter, Filter::Int(3)),
"Expected Filter::Int(3), got: {:?}",
obs[0].photometry.filter
);
}
#[test]
fn test_filter_column_uint16() {
use polars::prelude::{Column, IntoSeries, NewChunkedArray, Series, UInt16Chunked};
let filter_series: Series =
UInt16Chunked::from_slice("filter".into(), &[512u16]).into_series();
let cols = vec![
Column::new("id".into(), &[42u64]),
Column::new("ra".into(), &[10.5f64]),
Column::new("ra_err".into(), &[0.001f64]),
Column::new("dec".into(), &[-5.0f64]),
Column::new("dec_err".into(), &[0.001f64]),
Column::new("magnitude".into(), &[15.2f64]),
Column::new("mag_err".into(), &[0.05f64]),
Column::from(filter_series),
Column::new("mjd_tt".into(), &[60000.0f64]),
];
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
assert!(
result.is_ok(),
"Expected Ok for a UInt16 filter column, got: {:?}",
result.err()
);
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
assert_eq!(obs.len(), 1);
assert!(
matches!(obs[0].photometry.filter, Filter::Int(512)),
"Expected Filter::Int(512), got: {:?}",
obs[0].photometry.filter
);
}
#[test]
fn test_filter_column_unsupported_type() {
let cols = vec![
Column::new("id".into(), &[42u64]),
Column::new("ra".into(), &[10.5f64]),
Column::new("ra_err".into(), &[0.001f64]),
Column::new("dec".into(), &[-5.0f64]),
Column::new("dec_err".into(), &[0.001f64]),
Column::new("magnitude".into(), &[15.2f64]),
Column::new("mag_err".into(), &[0.05f64]),
Column::new("filter".into(), &[7i64]),
Column::new("mjd_tt".into(), &[60000.0f64]),
];
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
match result {
Err(PolarsError::FilterColumnTypeError(_)) => { }
Err(other) => panic!("Expected PolarsError::FilterColumnTypeError, got: {other:?}"),
Ok(_) => panic!("Expected Err for unsupported filter column type, got Ok"),
}
}
fn four_row_cols_with_both_ids() -> Vec<Column> {
vec![
Column::new("id".into(), &[1u64, 2u64, 3u64, 4u64]),
Column::new("ra".into(), &[0.1f64, 0.2f64, 0.3f64, 0.4f64]),
Column::new("ra_err".into(), &[1e-4f64, 1e-4f64, 1e-4f64, 1e-4f64]),
Column::new("dec".into(), &[0.0f64, 0.0f64, 0.0f64, 0.0f64]),
Column::new("dec_err".into(), &[1e-4f64, 1e-4f64, 1e-4f64, 1e-4f64]),
Column::new("magnitude".into(), &[15.0f64, 15.0f64, 15.0f64, 15.0f64]),
Column::new("mag_err".into(), &[0.1f64, 0.1f64, 0.1f64, 0.1f64]),
Column::new("filter".into(), &["G", "G", "G", "G"]),
Column::new(
"mjd_tt".into(),
&[60000.0f64, 60001.0f64, 60002.0f64, 60003.0f64],
),
Column::new("night_id".into(), &[1u32, 1u32, 2u32, 2u32]),
Column::new("traj_id".into(), &[10u32, 20u32, 10u32, 20u32]),
]
}
#[test]
fn test_index_contiguous_night() {
use crate::observation_dataset::index::ObsMapIndex;
let df = DataFrame::new_infer_height(four_row_cols_with_both_ids())
.expect("DataFrame construction must succeed");
let dataset = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
contiguous_choice: Some(ContiguousChoice::ContiguousNight),
..Default::default()
},
)
.expect("ingestion must succeed");
let index = dataset.index_ref();
let night_map = index
.obs_index_by_night
.as_ref()
.expect("night index must be present");
for (night_id, entry) in night_map.iter() {
assert!(
matches!(entry, ObsMapIndex::Contiguous { .. }),
"night_id={night_id:?} expected Contiguous, got {entry:?}"
);
}
match night_map.get(&NightId(1)).expect("night 1 must be present") {
ObsMapIndex::Contiguous { start, end } => {
assert_eq!(*start, 0, "night 1 start");
assert_eq!(*end, 2, "night 1 end");
}
ObsMapIndex::Split(_) => panic!("night 1 must be Contiguous"),
}
match night_map.get(&NightId(2)).expect("night 2 must be present") {
ObsMapIndex::Contiguous { start, end } => {
assert_eq!(*start, 2, "night 2 start");
assert_eq!(*end, 4, "night 2 end");
}
ObsMapIndex::Split(_) => panic!("night 2 must be Contiguous"),
}
let traj_map = index
.obs_index_by_trajectory
.as_ref()
.expect("traj index must be present");
for (traj_id, entry) in traj_map.iter() {
assert!(
matches!(entry, ObsMapIndex::Split(_)),
"traj_id={traj_id:?} expected Split, got {entry:?}"
);
}
for tid in [&TrajId::Int(10), &TrajId::Int(20)] {
let entry = traj_map.get(tid).expect("traj entry must be present");
let len = match entry {
ObsMapIndex::Split(v) => v.len(),
ObsMapIndex::Contiguous { start, end } => end - start,
};
assert_eq!(len, 2, "traj {tid:?} must have 2 observations");
}
}
#[test]
fn test_index_contiguous_traj() {
use crate::observation_dataset::index::ObsMapIndex;
let df = DataFrame::new_infer_height(four_row_cols_with_both_ids())
.expect("DataFrame construction must succeed");
let dataset = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
contiguous_choice: Some(ContiguousChoice::ContiguousTraj),
..Default::default()
},
)
.expect("ingestion must succeed");
let index = dataset.index_ref();
let traj_map = index
.obs_index_by_trajectory
.as_ref()
.expect("traj index must be present");
for (traj_id, entry) in traj_map.iter() {
assert!(
matches!(entry, ObsMapIndex::Contiguous { .. }),
"traj_id={traj_id:?} expected Contiguous, got {entry:?}"
);
}
match traj_map
.get(&TrajId::Int(10))
.expect("traj 10 must be present")
{
ObsMapIndex::Contiguous { start, end } => {
assert_eq!(*start, 0, "traj 10 start");
assert_eq!(*end, 2, "traj 10 end");
}
ObsMapIndex::Split(_) => panic!("traj 10 must be Contiguous"),
}
match traj_map
.get(&TrajId::Int(20))
.expect("traj 20 must be present")
{
ObsMapIndex::Contiguous { start, end } => {
assert_eq!(*start, 2, "traj 20 start");
assert_eq!(*end, 4, "traj 20 end");
}
ObsMapIndex::Split(_) => panic!("traj 20 must be Contiguous"),
}
let night_map = index
.obs_index_by_night
.as_ref()
.expect("night index must be present");
for (night_id, entry) in night_map.iter() {
assert!(
matches!(entry, ObsMapIndex::Split(_)),
"night_id={night_id:?} expected Split, got {entry:?}"
);
}
for nid in [&NightId(1), &NightId(2)] {
let entry = night_map.get(nid).expect("night entry must be present");
let len = match entry {
ObsMapIndex::Split(v) => v.len(),
ObsMapIndex::Contiguous { start, end } => end - start,
};
assert_eq!(len, 2, "night {nid:?} must have 2 observations");
}
}
#[test]
fn test_index_no_contiguous_choice() {
use crate::observation_dataset::index::ObsMapIndex;
let df = DataFrame::new_infer_height(four_row_cols_with_both_ids())
.expect("DataFrame construction must succeed");
let dataset = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
contiguous_choice: None,
..Default::default()
},
)
.expect("ingestion must succeed");
let index = dataset.index_ref();
let night_map = index
.obs_index_by_night
.as_ref()
.expect("night index must be present");
for (nid, entry) in night_map.iter() {
assert!(
matches!(entry, ObsMapIndex::Split(_)),
"night_id={nid:?} expected Split with no contiguous_choice, got {entry:?}"
);
}
let traj_map = index
.obs_index_by_trajectory
.as_ref()
.expect("traj index must be present");
for (tid, entry) in traj_map.iter() {
assert!(
matches!(entry, ObsMapIndex::Split(_)),
"traj_id={tid:?} expected Split with no contiguous_choice, got {entry:?}"
);
}
}
#[test]
fn test_contiguous_finalization_end_equals_n() {
use crate::observation_dataset::index::ObsMapIndex;
let cols = vec![
Column::new("id".into(), &[1u64, 2u64, 3u64]),
Column::new("ra".into(), &[0.1f64, 0.2f64, 0.3f64]),
Column::new("ra_err".into(), &[1e-4f64, 1e-4f64, 1e-4f64]),
Column::new("dec".into(), &[0.0f64, 0.0f64, 0.0f64]),
Column::new("dec_err".into(), &[1e-4f64, 1e-4f64, 1e-4f64]),
Column::new("magnitude".into(), &[15.0f64, 15.0f64, 15.0f64]),
Column::new("mag_err".into(), &[0.1f64, 0.1f64, 0.1f64]),
Column::new("filter".into(), &["G", "G", "G"]),
Column::new("mjd_tt".into(), &[60000.0f64, 60001.0f64, 60002.0f64]),
Column::new("night_id".into(), &[7u32, 7u32, 7u32]),
];
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let dataset = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
contiguous_choice: Some(ContiguousChoice::ContiguousNight),
..Default::default()
},
)
.expect("ingestion must succeed");
let index = dataset.index_ref();
let night_map = index
.obs_index_by_night
.as_ref()
.expect("night index must be present");
match night_map.get(&NightId(7)).expect("night 7 must be present") {
ObsMapIndex::Contiguous { start, end } => {
assert_eq!(*start, 0, "single-night group must start at 0");
assert_eq!(*end, 3, "single-night group end must equal n=3");
}
ObsMapIndex::Split(_) => panic!("expected Contiguous for single night"),
}
}
fn assert_index_consistency(dataset: &ObsDataset) {
for (idx, obs) in dataset.iter_observations().enumerate() {
assert_eq!(
idx,
obs.index(),
"index-consistency violated: enumeration position {idx} != obs.index() {}",
obs.index()
);
}
}
#[test]
fn index_consistency_from_parquet_traj_int() {
use polars::prelude::{LazyFrame, ScanArgsParquet};
let path = format!(
"{}/tests/data/test_data_traj_int.parquet",
env!("CARGO_MANIFEST_DIR")
);
let lf = LazyFrame::scan_parquet(path.as_str().into(), ScanArgsParquet::default())
.expect("scan_parquet must succeed for valid fixture");
let ds = ObsDataset::from_lazy(
lf,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("from_lazy must succeed for valid parquet fixture");
assert_index_consistency(&ds);
}
#[test]
fn index_consistency_from_parquet_traj_str() {
use polars::prelude::{LazyFrame, ScanArgsParquet};
let path = format!(
"{}/tests/data/test_data_traj_str.parquet",
env!("CARGO_MANIFEST_DIR")
);
let lf = LazyFrame::scan_parquet(path.as_str().into(), ScanArgsParquet::default())
.expect("scan_parquet must succeed for valid fixture");
let ds = ObsDataset::from_lazy(
lf,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("from_lazy must succeed for valid parquet fixture");
assert_index_consistency(&ds);
}
}
#[cfg(test)]
mod polars_reader_prop_tests {
use super::*;
use crate::observation_dataset::observation::Observation;
use polars::frame::DataFrame;
use proptest::prelude::*;
fn finite_f64() -> impl Strategy<Value = f64> {
prop::num::f64::NORMAL | prop::num::f64::POSITIVE | prop::num::f64::NEGATIVE
}
fn positive_f64() -> impl Strategy<Value = f64> {
1e-10_f64..1e6_f64
}
fn longitude() -> impl Strategy<Value = f64> {
-180.0_f64..=180.0_f64
}
fn latitude() -> impl Strategy<Value = f64> {
-90.0_f64..=90.0_f64
}
fn altitude() -> impl Strategy<Value = f64> {
0.0_f64..=8848.0_f64
}
fn valid_mpc_code() -> impl Strategy<Value = String> {
prop::collection::vec(0x20u8..=0x7Eu8, 3..=3)
.prop_map(|bytes| String::from_utf8(bytes).unwrap())
}
fn filter_label() -> impl Strategy<Value = String> {
prop::string::string_regex("[A-Za-z][A-Za-z0-9]{0,7}").unwrap()
}
fn base_columns(n: usize) -> impl Strategy<Value = Vec<Column>> {
let ids: Vec<u64> = (1u64..=(n as u64)).collect();
let ra_s = prop::collection::vec(finite_f64(), n..=n);
let ra_err_s = prop::collection::vec(positive_f64(), n..=n);
let dec_s = prop::collection::vec(finite_f64(), n..=n);
let dec_err_s = prop::collection::vec(positive_f64(), n..=n);
let mag_s = prop::collection::vec(finite_f64(), n..=n);
let mag_err_s = prop::collection::vec(positive_f64(), n..=n);
let filter_s = prop::collection::vec(filter_label(), n..=n);
let mjd_s = prop::collection::vec(finite_f64(), n..=n);
(
ra_s, ra_err_s, dec_s, dec_err_s, mag_s, mag_err_s, filter_s, mjd_s,
)
.prop_map(
move |(ra, ra_err, dec, dec_err, mag, mag_err, filter, mjd)| {
let filter_refs: Vec<&str> = filter.iter().map(|s| s.as_str()).collect();
vec![
Column::new("id".into(), ids.as_slice()),
Column::new("ra".into(), ra.as_slice()),
Column::new("ra_err".into(), ra_err.as_slice()),
Column::new("dec".into(), dec.as_slice()),
Column::new("dec_err".into(), dec_err.as_slice()),
Column::new("magnitude".into(), mag.as_slice()),
Column::new("mag_err".into(), mag_err.as_slice()),
Column::new("filter".into(), filter_refs.as_slice()),
Column::new("mjd_tt".into(), mjd.as_slice()),
]
},
)
}
proptest! {
#[test]
fn prop_row_count_equals_input(n in 1usize..=32, cols in base_columns(1)) {
let _ = (n, cols); }
#[test]
fn prop_single_row_base_only(cols in base_columns(1)) {
let df = DataFrame::new_infer_height(cols)
.expect("DataFrame construction must succeed");
let result = load_observation_from_polars(&df, FromPolarsArgs { error_model: Some(ObsErrorModel::FCCT14), ..Default::default() });
prop_assert!(result.is_ok(), "Expected Ok, got: {:?}", result.err());
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
prop_assert_eq!(obs.len(), 1, "Expected exactly 1 observation");
prop_assert!(obs[0].observer.is_none(), "Expected observer None");
}
#[test]
fn prop_base_only_all_observers_none(cols in base_columns(4)) {
let df = DataFrame::new_infer_height(cols)
.expect("DataFrame construction must succeed");
let result = load_observation_from_polars(&df, FromPolarsArgs { error_model: Some(ObsErrorModel::FCCT14), ..Default::default() });
prop_assert!(result.is_ok(), "Expected Ok, got: {:?}", result.err());
let dataset = result.unwrap();
for obs in dataset.iter_observations() {
prop_assert!(
obs.observer.is_none(),
"Expected observer to be None for base-only DataFrame, got: {:?}",
obs.observer
);
}
}
#[test]
fn prop_mpc_code_round_trips(
mut cols in base_columns(1),
code in valid_mpc_code(),
) {
let mpc: Vec<Option<&str>> = vec![Some(code.as_str())];
cols.push(Column::new("mpc_code_obs".into(), mpc));
let df = DataFrame::new_infer_height(cols)
.expect("DataFrame construction must succeed");
let result = load_observation_from_polars(&df, FromPolarsArgs { error_model: Some(ObsErrorModel::FCCT14), ..Default::default() });
prop_assert!(result.is_ok(), "Expected Ok for valid 3-byte code {:?}, got: {:?}", code, result.err());
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
prop_assert_eq!(obs.len(), 1);
let code_bytes: [u8; 3] = code.as_bytes().try_into().unwrap();
match obs[0].observer {
Some(ObserverId::MpcCode(got)) => {
prop_assert_eq!(got, code_bytes, "MPC code bytes must round-trip");
}
other => prop_assert!(false, "Expected MpcCode, got: {other:?}"),
}
}
#[test]
fn prop_geodetic_single_row_is_int_id_zero(
mut cols in base_columns(1),
lon in longitude(),
lat in latitude(),
alt in altitude(),
ra_acc in positive_f64(),
dec_acc in positive_f64(),
) {
let obs_lon: Vec<Option<f64>> = vec![Some(lon)];
let obs_lat: Vec<Option<f64>> = vec![Some(lat)];
let obs_alt: Vec<Option<f64>> = vec![Some(alt)];
let obs_ra_acc: Vec<Option<f64>> = vec![Some(ra_acc)];
let obs_dec_acc: Vec<Option<f64>> = vec![Some(dec_acc)];
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
cols.push(Column::new("obs_ra_acc".into(), obs_ra_acc));
cols.push(Column::new("obs_dec_acc".into(), obs_dec_acc));
let df = DataFrame::new_infer_height(cols)
.expect("DataFrame construction must succeed");
let result = load_observation_from_polars(&df, FromPolarsArgs { error_model: Some(ObsErrorModel::FCCT14), ..Default::default() });
prop_assert!(result.is_ok(), "Expected Ok for valid geodetic observer, got: {:?}", result.err());
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
prop_assert_eq!(obs.len(), 1);
prop_assert!(
matches!(obs[0].observer, Some(ObserverId::IntId(0))),
"Expected IntId(0), got: {:?}", obs[0].observer
);
}
#[test]
fn prop_partial_triplet_lon_only_is_error(
mut cols in base_columns(1),
lon in longitude(),
) {
let obs_lon: Vec<Option<f64>> = vec![Some(lon)];
let obs_lat: Vec<Option<f64>> = vec![None];
let obs_alt: Vec<Option<f64>> = vec![None];
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
let df = DataFrame::new_infer_height(cols)
.expect("DataFrame construction must succeed");
let result = load_observation_from_polars(&df, FromPolarsArgs { error_model: Some(ObsErrorModel::FCCT14), ..Default::default() });
match result {
Err(PolarsError::PartialTripletNull { .. }) => { }
Err(other) => prop_assert!(false, "Expected PartialTripletNull, got: {other:?}"),
Ok(_) => prop_assert!(false, "Expected Err for lon-only partial triplet, got Ok"),
}
}
#[test]
fn prop_partial_triplet_lat_only_is_error(
mut cols in base_columns(1),
lat in latitude(),
) {
let obs_lon: Vec<Option<f64>> = vec![None];
let obs_lat: Vec<Option<f64>> = vec![Some(lat)];
let obs_alt: Vec<Option<f64>> = vec![None];
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
let df = DataFrame::new_infer_height(cols)
.expect("DataFrame construction must succeed");
let result = load_observation_from_polars(&df, FromPolarsArgs { error_model: Some(ObsErrorModel::FCCT14), ..Default::default() });
match result {
Err(PolarsError::PartialTripletNull { .. }) => { }
Err(other) => prop_assert!(false, "Expected PartialTripletNull, got: {other:?}"),
Ok(_) => prop_assert!(false, "Expected Err for lat-only partial triplet, got Ok"),
}
}
#[test]
fn prop_partial_triplet_alt_only_is_error(
mut cols in base_columns(1),
alt in altitude(),
) {
let obs_lon: Vec<Option<f64>> = vec![None];
let obs_lat: Vec<Option<f64>> = vec![None];
let obs_alt: Vec<Option<f64>> = vec![Some(alt)];
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
let df = DataFrame::new_infer_height(cols)
.expect("DataFrame construction must succeed");
let result = load_observation_from_polars(&df, FromPolarsArgs { error_model: Some(ObsErrorModel::FCCT14), ..Default::default() });
match result {
Err(PolarsError::PartialTripletNull { .. }) => { }
Err(other) => prop_assert!(false, "Expected PartialTripletNull, got: {other:?}"),
Ok(_) => prop_assert!(false, "Expected Err for alt-only partial triplet, got Ok"),
}
}
#[test]
fn prop_missing_ra_acc_is_error(
mut cols in base_columns(1),
lon in longitude(),
lat in latitude(),
alt in altitude(),
dec_acc in positive_f64(),
) {
let obs_lon: Vec<Option<f64>> = vec![Some(lon)];
let obs_lat: Vec<Option<f64>> = vec![Some(lat)];
let obs_alt: Vec<Option<f64>> = vec![Some(alt)];
let obs_ra_acc: Vec<Option<f64>> = vec![None]; let obs_dec_acc: Vec<Option<f64>> = vec![Some(dec_acc)];
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
cols.push(Column::new("obs_ra_acc".into(), obs_ra_acc));
cols.push(Column::new("obs_dec_acc".into(), obs_dec_acc));
let df = DataFrame::new_infer_height(cols)
.expect("DataFrame construction must succeed");
let result = load_observation_from_polars(&df, FromPolarsArgs { error_model: Some(ObsErrorModel::FCCT14), ..Default::default() });
match result {
Err(PolarsError::MissingAccuracyForGeodesic(_)) => { }
Err(other) => prop_assert!(false, "Expected MissingAccuracyForGeodesic, got: {other:?}"),
Ok(_) => prop_assert!(false, "Expected Err when obs_ra_acc is null, got Ok"),
}
}
#[test]
fn prop_mpc_wins_over_geodetic(
mut cols in base_columns(1),
code in valid_mpc_code(),
lon in longitude(),
lat in latitude(),
alt in altitude(),
ra_acc in positive_f64(),
dec_acc in positive_f64(),
) {
let mpc: Vec<Option<&str>> = vec![Some(code.as_str())];
let obs_lon: Vec<Option<f64>> = vec![Some(lon)];
let obs_lat: Vec<Option<f64>> = vec![Some(lat)];
let obs_alt: Vec<Option<f64>> = vec![Some(alt)];
let obs_ra_acc: Vec<Option<f64>> = vec![Some(ra_acc)];
let obs_dec_acc: Vec<Option<f64>> = vec![Some(dec_acc)];
cols.push(Column::new("mpc_code_obs".into(), mpc));
cols.push(Column::new("obs_lon".into(), obs_lon));
cols.push(Column::new("obs_lat".into(), obs_lat));
cols.push(Column::new("obs_alt".into(), obs_alt));
cols.push(Column::new("obs_ra_acc".into(), obs_ra_acc));
cols.push(Column::new("obs_dec_acc".into(), obs_dec_acc));
let df = DataFrame::new_infer_height(cols)
.expect("DataFrame construction must succeed");
let result = load_observation_from_polars(&df, FromPolarsArgs { error_model: Some(ObsErrorModel::FCCT14), ..Default::default() });
prop_assert!(result.is_ok(), "Expected Ok, got: {:?}", result.err());
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
prop_assert_eq!(obs.len(), 1);
prop_assert!(
matches!(obs[0].observer, Some(ObserverId::MpcCode(_))),
"Expected MpcCode to win over geodetic, got: {:?}", obs[0].observer
);
}
}
}
#[cfg(test)]
mod lazy_frame_tests {
use super::*;
use crate::observation_dataset::observation::Observation;
use polars::{frame::DataFrame, lazy::frame::IntoLazy as _};
fn base_df_single_row() -> DataFrame {
DataFrame::new_infer_height(vec![
Column::new("id".into(), &[1u64]),
Column::new("ra".into(), &[10.0f64]),
Column::new("ra_err".into(), &[0.001f64]),
Column::new("dec".into(), &[-5.0f64]),
Column::new("dec_err".into(), &[0.001f64]),
Column::new("magnitude".into(), &[15.0f64]),
Column::new("mag_err".into(), &[0.05f64]),
Column::new("filter".into(), &["G"]),
Column::new("mjd_tt".into(), &[60000.0f64]),
])
.expect("DataFrame construction must succeed")
}
#[test]
fn test_lazy_obs_same_result_as_eager() {
let df = base_df_single_row();
let lf = df.clone().lazy();
let eager = load_observation_from_polars(
df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("eager path must succeed");
let lazy = load_observation_from_polars(
lf,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("lazy path must succeed");
let eager_obs: Vec<&Observation> = eager.iter_observations().collect();
let lazy_obs: Vec<&Observation> = lazy.iter_observations().collect();
assert_eq!(eager_obs.len(), lazy_obs.len(), "row counts must match");
assert_eq!(
eager_obs[0].id, lazy_obs[0].id,
"observation ids must match"
);
assert_eq!(eager_obs[0].mjd_tt, lazy_obs[0].mjd_tt, "mjd_tt must match");
}
#[test]
fn test_lazy_obs_mpc_code() {
let mut df = base_df_single_row();
let mpc_col: Vec<Option<&str>> = vec![Some("I41")];
df.with_column(Column::new("mpc_code_obs".into(), mpc_col))
.expect("column addition must succeed");
let result = load_observation_from_polars(
df.lazy(),
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
assert!(result.is_ok(), "expected Ok, got: {:?}", result.err());
let dataset = result.unwrap();
let obs: Vec<&Observation> = dataset.iter_observations().collect();
assert_eq!(obs.len(), 1);
assert!(
matches!(obs[0].observer, Some(ObserverId::MpcCode(_))),
"expected MpcCode observer, got: {:?}",
obs[0].observer
);
}
}
#[cfg(test)]
mod index_tests {
use super::*;
use crate::{NightId, TrajId};
use polars::frame::DataFrame;
fn base_cols(n: usize) -> Vec<Column> {
let ids: Vec<u64> = (1u64..=n as u64).collect();
let f: Vec<f64> = vec![1.0; n];
let s: Vec<&str> = vec!["G"; n];
vec![
Column::new("id".into(), ids.as_slice()),
Column::new("ra".into(), f.as_slice()),
Column::new("ra_err".into(), f.as_slice()),
Column::new("dec".into(), f.as_slice()),
Column::new("dec_err".into(), f.as_slice()),
Column::new("magnitude".into(), f.as_slice()),
Column::new("mag_err".into(), f.as_slice()),
Column::new("filter".into(), s.as_slice()),
Column::new("mjd_tt".into(), f.as_slice()),
]
}
#[test]
fn night_index_absent_when_no_column() {
let df =
DataFrame::new_infer_height(base_cols(2)).expect("DataFrame construction must succeed");
let ds = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("ingestion must succeed");
assert!(
ds.iter_night_observations(&NightId(0)).is_none(),
"Expected None when night_id column is absent"
);
}
#[test]
fn night_index_groups_correctly() {
let mut cols = base_cols(3);
let nights: Vec<Option<u32>> = vec![Some(10), Some(20), Some(10)];
cols.push(Column::new("night_id".into(), nights));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let ds = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("ingestion must succeed");
let night10: Vec<u64> = ds
.iter_night_observations(&NightId(10))
.expect("night_id column present, NightId(10) must exist")
.map(|o| o.id)
.collect();
assert_eq!(
night10,
vec![1u64, 3u64],
"Night 10 must contain obs ids 1 and 3"
);
let night20: Vec<u64> = ds
.iter_night_observations(&NightId(20))
.expect("NightId(20) must exist")
.map(|o| o.id)
.collect();
assert_eq!(night20, vec![2u64], "Night 20 must contain obs id 2");
}
#[test]
fn night_index_null_cell_is_skipped() {
let mut cols = base_cols(3);
let nights: Vec<Option<u32>> = vec![Some(5), None, Some(5)];
cols.push(Column::new("night_id".into(), nights));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let ds = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("ingestion must succeed");
assert_eq!(
ds.iter_observations().count(),
3,
"All 3 observations must be present"
);
let night5: Vec<u64> = ds
.iter_night_observations(&NightId(5))
.expect("NightId(5) must exist")
.map(|o| o.id)
.collect();
assert_eq!(
night5,
vec![1u64, 3u64],
"Night 5 must contain obs ids 1 and 3 (null skipped)"
);
}
#[test]
fn night_id_wrong_type_is_error() {
let mut cols = base_cols(1);
let bad: Vec<i32> = vec![1];
cols.push(Column::new("night_id".into(), bad.as_slice()));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
match result {
Err(PolarsError::NightIdColumnTypeError(_)) => { }
Err(other) => panic!("Expected NightIdColumnTypeError, got: {other:?}"),
Ok(_) => panic!("Expected Err for wrong night_id type, got Ok"),
}
}
#[test]
fn traj_index_absent_when_no_column() {
let df =
DataFrame::new_infer_height(base_cols(2)).expect("DataFrame construction must succeed");
let ds = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("ingestion must succeed");
assert!(
ds.iter_trajectory_observations(&TrajId::Int(0)).is_none(),
"Expected None when traj_id column is absent"
);
}
#[test]
fn traj_index_uint64_groups_correctly() {
let mut cols = base_cols(4);
let trajs: Vec<Option<u32>> = vec![Some(100), Some(200), Some(100), Some(200)];
cols.push(Column::new("traj_id".into(), trajs));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let ds = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("ingestion must succeed");
let mut t100: Vec<u64> = ds
.iter_trajectory_observations(&TrajId::Int(100))
.expect("TrajId::Int(100) must exist")
.map(|o| o.id)
.collect();
t100.sort_unstable();
assert_eq!(
t100,
vec![1u64, 3u64],
"Traj 100 must contain obs ids 1 and 3"
);
let mut t200: Vec<u64> = ds
.iter_trajectory_observations(&TrajId::Int(200))
.expect("TrajId::Int(200) must exist")
.map(|o| o.id)
.collect();
t200.sort_unstable();
assert_eq!(
t200,
vec![2u64, 4u64],
"Traj 200 must contain obs ids 2 and 4"
);
}
#[test]
fn traj_index_string_groups_correctly() {
let mut cols = base_cols(3);
let trajs: Vec<Option<&str>> = vec![Some("alpha"), Some("beta"), Some("alpha")];
cols.push(Column::new("traj_id".into(), trajs));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let ds = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("ingestion must succeed");
let alpha: Vec<u64> = ds
.iter_trajectory_observations(&TrajId::Str("alpha".to_owned()))
.expect("TrajId::Str(\"alpha\") must exist")
.map(|o| o.id)
.collect();
assert_eq!(
alpha,
vec![1u64, 3u64],
"Traj 'alpha' must contain obs ids 1 and 3"
);
let beta: Vec<u64> = ds
.iter_trajectory_observations(&TrajId::Str("beta".to_owned()))
.expect("TrajId::Str(\"beta\") must exist")
.map(|o| o.id)
.collect();
assert_eq!(beta, vec![2u64], "Traj 'beta' must contain obs id 2");
}
#[test]
fn traj_index_null_cell_is_skipped() {
let mut cols = base_cols(3);
let trajs: Vec<Option<u32>> = vec![Some(1), None, Some(1)];
cols.push(Column::new("traj_id".into(), trajs));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let ds = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("ingestion must succeed");
assert_eq!(
ds.iter_observations().count(),
3,
"All 3 observations must be present"
);
let t1: Vec<u64> = ds
.iter_trajectory_observations(&TrajId::Int(1))
.expect("TrajId::Int(1) must exist")
.map(|o| o.id)
.collect();
assert_eq!(
t1,
vec![1u64, 3u64],
"Traj 1 must contain obs ids 1 and 3 (null skipped)"
);
}
#[test]
fn traj_id_wrong_type_is_error() {
let mut cols = base_cols(1);
let bad: Vec<i32> = vec![1];
cols.push(Column::new("traj_id".into(), bad.as_slice()));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let result = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
);
match result {
Err(PolarsError::TrajIdColumnTypeError(_)) => { }
Err(other) => panic!("Expected TrajIdColumnTypeError, got: {other:?}"),
Ok(_) => panic!("Expected Err for wrong traj_id type, got Ok"),
}
}
#[test]
fn both_night_and_traj_index_built_simultaneously() {
let mut cols = base_cols(4);
let nights: Vec<Option<u32>> = vec![Some(1), Some(1), Some(2), Some(2)];
let trajs: Vec<Option<u32>> = vec![Some(10), Some(20), Some(10), Some(20)];
cols.push(Column::new("night_id".into(), nights));
cols.push(Column::new("traj_id".into(), trajs));
let df = DataFrame::new_infer_height(cols).expect("DataFrame construction must succeed");
let ds = load_observation_from_polars(
&df,
FromPolarsArgs {
error_model: Some(ObsErrorModel::FCCT14),
..Default::default()
},
)
.expect("ingestion must succeed");
let n1: Vec<u64> = ds
.iter_night_observations(&NightId(1))
.expect("NightId(1) must exist")
.map(|o| o.id)
.collect();
assert_eq!(n1, vec![1u64, 2u64]);
let n2: Vec<u64> = ds
.iter_night_observations(&NightId(2))
.expect("NightId(2) must exist")
.map(|o| o.id)
.collect();
assert_eq!(n2, vec![3u64, 4u64]);
let mut t10: Vec<u64> = ds
.iter_trajectory_observations(&TrajId::Int(10))
.expect("TrajId::Int(10) must exist")
.map(|o| o.id)
.collect();
t10.sort_unstable();
assert_eq!(t10, vec![1u64, 3u64]);
let mut t20: Vec<u64> = ds
.iter_trajectory_observations(&TrajId::Int(20))
.expect("TrajId::Int(20) must exist")
.map(|o| o.id)
.collect();
t20.sort_unstable();
assert_eq!(t20, vec![2u64, 4u64]);
}
}