#![cfg(feature = "parallel")]
use itertools::Either;
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use crate::{
NightId, TrajId,
observation_dataset::{
ObsDataset,
index::{ObsDatasetIndex, ObsIndex, ObsMapIndex},
iter::MemLayoutObservations,
observation::Observation,
},
};
impl ObsDatasetIndex {
pub(crate) fn par_iter_night_obs_index(
&self,
night_id: &NightId,
) -> Option<impl ParallelIterator<Item = ObsIndex> + '_> {
self.get_by_night(night_id).map(|indices| match indices {
ObsMapIndex::Contiguous { start, end } => Either::Left((*start..*end).into_par_iter()),
ObsMapIndex::Split(vec) => Either::Right(vec.par_iter().copied()),
})
}
pub(crate) fn par_iter_full_night(
&self,
) -> Option<impl ParallelIterator<Item = (NightId, ObsIndex)> + '_> {
self.obs_index_by_night.as_ref().map(|night_map| {
night_map
.par_iter()
.flat_map(|(night_id, indices)| match indices {
ObsMapIndex::Contiguous { start, end } => Either::Left(
(*start..*end)
.into_par_iter()
.map(move |idx| (*night_id, idx)),
),
ObsMapIndex::Split(vec) => {
Either::Right(vec.par_iter().map(move |&idx| (*night_id, idx)))
}
})
})
}
pub(crate) fn par_iter_traj_obs_index(
&self,
traj_id: impl Into<TrajId>,
) -> Option<impl ParallelIterator<Item = ObsIndex> + '_> {
self.get_by_trajectory(traj_id)
.map(|indices| match indices {
ObsMapIndex::Contiguous { start, end } => {
Either::Left((*start..*end).into_par_iter())
}
ObsMapIndex::Split(vec) => Either::Right(vec.par_iter().copied()),
})
}
pub(crate) fn par_iter_full_trajectory(
&self,
) -> Option<impl ParallelIterator<Item = (TrajId, ObsIndex)> + '_> {
self.obs_index_by_trajectory.as_ref().map(|traj_map| {
traj_map
.par_iter()
.flat_map(|(traj_id, indices)| match indices {
ObsMapIndex::Contiguous { start, end } => Either::Left(
(*start..*end)
.into_par_iter()
.map(move |idx| (traj_id.clone(), idx)),
),
ObsMapIndex::Split(vec) => {
Either::Right(vec.par_iter().map(move |&idx| (traj_id.clone(), idx)))
}
})
})
}
}
impl ObsDataset {
pub fn par_iter_observations(&self) -> impl ParallelIterator<Item = &Observation> {
self.observations.par_iter()
}
pub fn par_iter_full_night(
&self,
) -> Option<impl ParallelIterator<Item = (NightId, &Observation)>> {
self.index
.par_iter_full_night()
.map(|night_iter| night_iter.map(|(night_id, idx)| (night_id, &self.observations[idx])))
}
pub fn par_iter_night_observations(
&self,
night_id: &NightId,
) -> Option<impl ParallelIterator<Item = &Observation> + '_> {
self.index
.par_iter_night_obs_index(night_id)
.map(|indices| indices.map(|idx| &self.observations[idx]))
}
pub fn materialize_night_par(&self, night_id: &NightId) -> Option<MemLayoutObservations<'_>> {
let night_index = self.index.obs_index_by_night.as_ref()?.get(night_id)?;
match night_index {
ObsMapIndex::Split(indices) => Some(MemLayoutObservations::Split(
indices
.par_iter()
.map(|idx| &self.observations[*idx])
.collect(),
)),
ObsMapIndex::Contiguous { start, end } => Some(MemLayoutObservations::Contiguous(
&self.observations[*start..*end],
)),
}
}
pub fn par_iter_trajectory_observations(
&self,
traj_id: impl Into<TrajId>,
) -> Option<impl ParallelIterator<Item = &Observation>> {
self.index
.par_iter_traj_obs_index(traj_id)
.map(|indices| indices.map(|idx| &self.observations[idx]))
}
pub fn par_iter_full_trajectory(
&self,
) -> Option<impl ParallelIterator<Item = (TrajId, &Observation)>> {
self.index.par_iter_full_trajectory().map(|traj_iter| {
traj_iter.map(|(traj_id, idx)| (traj_id.clone(), &self.observations[idx]))
})
}
pub fn materialize_trajectory_par(
&self,
traj_id: impl Into<TrajId>,
) -> Option<MemLayoutObservations<'_>> {
let traj_id = traj_id.into();
let traj_index = self.index.obs_index_by_trajectory.as_ref()?.get(&traj_id)?;
match traj_index {
ObsMapIndex::Split(indices) => Some(MemLayoutObservations::Split(
indices
.par_iter()
.map(|idx| &self.observations[*idx])
.collect(),
)),
ObsMapIndex::Contiguous { start, end } => Some(MemLayoutObservations::Contiguous(
&self.observations[*start..*end],
)),
}
}
}
#[cfg(test)]
mod obsdataset_parallel_tests {
use super::*;
use ahash::AHashMap;
use rayon::iter::ParallelIterator;
use crate::{
NightId, TrajId,
coordinates::equatorial::EquCoord,
observation_dataset::{
index::{NightIndexMap, TrajIndexMap},
observation::ObservationInput,
},
observer::error_model::ObsErrorModel,
photometry::{Filter, Photometry},
};
fn make_obs(id: u64, _index: usize) -> ObservationInput {
ObservationInput {
id,
equ_coord: EquCoord::new(0.5, 1e-5, 0.2, 1e-5),
photometry: Photometry {
magnitude: 15.0,
error: 0.1,
filter: Filter::String("G".to_string()),
},
mjd_tt: 60000.0 + id as f64,
observer: None,
}
}
fn make_dataset_with_index() -> ObsDataset {
let obs = vec![
make_obs(1, 0),
make_obs(2, 1),
make_obs(3, 2),
make_obs(4, 3),
];
let mut night_map: NightIndexMap = AHashMap::new();
night_map.insert(NightId(1), ObsMapIndex::Split(vec![0, 1]));
night_map.insert(NightId(2), ObsMapIndex::Split(vec![2, 3]));
let mut traj_map: TrajIndexMap = AHashMap::new();
traj_map.insert(TrajId::Int(10), ObsMapIndex::Split(vec![0, 2]));
traj_map.insert(TrajId::Int(20), ObsMapIndex::Split(vec![1, 3]));
ObsDataset::new(
obs,
vec![],
Some(ObsErrorModel::FCCT14),
Some(night_map),
Some(traj_map),
)
}
fn make_dataset_no_index() -> ObsDataset {
let obs = vec![make_obs(1, 0), make_obs(2, 1)];
ObsDataset::new(obs, vec![], Some(ObsErrorModel::FCCT14), None, None)
}
fn traj_id_key(id: &TrajId) -> u32 {
match id {
TrajId::Int(n) => *n,
TrajId::Str(_) => panic!("unexpected TrajId::Str in sort key"),
}
}
mod parallel_obs_iter {
use super::*;
#[test]
fn par_iter_observations_count() {
let dataset = make_dataset_with_index();
assert_eq!(dataset.par_iter_observations().count(), 4);
}
#[test]
fn par_iter_observations_ids() {
let dataset = make_dataset_with_index();
let mut ids: Vec<u64> = dataset.par_iter_observations().map(|o| o.id).collect();
ids.sort_unstable();
assert_eq!(ids, vec![1u64, 2, 3, 4]);
}
#[test]
fn par_iter_observations_no_index_count() {
let dataset = make_dataset_no_index();
assert_eq!(dataset.par_iter_observations().count(), 2);
}
}
mod parallel_night {
use super::*;
#[test]
fn par_iter_full_night_some_when_night_index_present() {
let dataset = make_dataset_with_index();
assert!(
dataset.par_iter_full_night().is_some(),
"Expected Some when night index is present"
);
}
#[test]
fn par_iter_full_night_none_when_no_night_index() {
let dataset = make_dataset_no_index();
assert!(
dataset.par_iter_full_night().is_none(),
"Expected None when no night index"
);
}
#[test]
fn par_iter_full_night_total_count() {
let dataset = make_dataset_with_index();
let count = dataset.par_iter_full_night().unwrap().count();
assert_eq!(count, 4);
}
#[test]
fn par_iter_full_night_night_ids() {
let dataset = make_dataset_with_index();
let mut night_ids: Vec<NightId> = dataset
.par_iter_full_night()
.unwrap()
.map(|(nid, _)| nid)
.collect();
night_ids.sort_unstable();
night_ids.dedup();
assert_eq!(night_ids, vec![NightId(1), NightId(2)]);
}
#[test]
fn par_iter_night_observations_some_for_existing_night() {
let dataset = make_dataset_with_index();
assert!(
dataset.par_iter_night_observations(&NightId(1)).is_some(),
"Expected Some for NightId(1) which is present in the index"
);
}
#[test]
fn par_iter_night_observations_none_for_missing_night() {
let dataset = make_dataset_with_index();
assert!(
dataset.par_iter_night_observations(&NightId(99)).is_none(),
"Expected None for NightId(99) which is absent from the index"
);
}
#[test]
fn par_iter_night_observations_none_without_index() {
let dataset = make_dataset_no_index();
assert!(
dataset.par_iter_night_observations(&NightId(1)).is_none(),
"Expected None when the dataset has no night index"
);
}
#[test]
fn par_iter_night_observations_count() {
let dataset = make_dataset_with_index();
let count = dataset
.par_iter_night_observations(&NightId(1))
.unwrap()
.count();
assert_eq!(count, 2);
}
#[test]
fn par_iter_night_observations_ids() {
let dataset = make_dataset_with_index();
let mut ids: Vec<u64> = dataset
.par_iter_night_observations(&NightId(1))
.unwrap()
.map(|o| o.id)
.collect();
ids.sort_unstable();
assert_eq!(ids, vec![1u64, 2]);
}
#[test]
fn materialize_night_par_some_for_existing_night() {
let dataset = make_dataset_with_index();
assert!(
dataset.materialize_night_par(&NightId(1)).is_some(),
"Expected Some(vec) for NightId(1)"
);
}
#[test]
fn materialize_night_par_none_for_missing_night() {
let dataset = make_dataset_with_index();
assert!(
dataset.materialize_night_par(&NightId(99)).is_none(),
"Expected None for NightId(99)"
);
}
#[test]
fn materialize_night_par_count() {
let dataset = make_dataset_with_index();
let vec = dataset.materialize_night_par(&NightId(1)).unwrap();
assert_eq!(vec.len(), 2);
}
#[test]
fn materialize_night_par_ids() {
let dataset = make_dataset_with_index();
let vec = dataset.materialize_night_par(&NightId(1)).unwrap();
let mut ids: Vec<u64> = vec.iter().map(|o| o.id).collect();
ids.sort_unstable();
assert_eq!(ids, vec![1u64, 2]);
}
}
mod parallel_trajectory {
use super::*;
#[test]
fn par_iter_trajectory_observations_some_for_existing_traj() {
let dataset = make_dataset_with_index();
assert!(
dataset
.par_iter_trajectory_observations(&TrajId::Int(10))
.is_some(),
"Expected Some for TrajId::Int(10) which is present in the index"
);
}
#[test]
fn par_iter_trajectory_observations_none_for_missing_traj() {
let dataset = make_dataset_with_index();
assert!(
dataset
.par_iter_trajectory_observations(&TrajId::Int(99))
.is_none(),
"Expected None for TrajId::Int(99) which is absent from the index"
);
}
#[test]
fn par_iter_trajectory_observations_none_without_index() {
let dataset = make_dataset_no_index();
assert!(
dataset
.par_iter_trajectory_observations(&TrajId::Int(10))
.is_none(),
"Expected None when the dataset has no trajectory index"
);
}
#[test]
fn par_iter_trajectory_observations_count() {
let dataset = make_dataset_with_index();
let count = dataset
.par_iter_trajectory_observations(&TrajId::Int(10))
.unwrap()
.count();
assert_eq!(count, 2);
}
#[test]
fn par_iter_trajectory_observations_ids() {
let dataset = make_dataset_with_index();
let mut ids: Vec<u64> = dataset
.par_iter_trajectory_observations(&TrajId::Int(10))
.unwrap()
.map(|o| o.id)
.collect();
ids.sort_unstable();
assert_eq!(ids, vec![1u64, 3]);
}
#[test]
fn par_iter_full_trajectory_some_when_traj_index_present() {
let dataset = make_dataset_with_index();
assert!(
dataset.par_iter_full_trajectory().is_some(),
"Expected Some when trajectory index is present"
);
}
#[test]
fn par_iter_full_trajectory_none_when_no_traj_index() {
let dataset = make_dataset_no_index();
assert!(
dataset.par_iter_full_trajectory().is_none(),
"Expected None when no trajectory index"
);
}
#[test]
fn par_iter_full_trajectory_total_count() {
let dataset = make_dataset_with_index();
let count = dataset.par_iter_full_trajectory().unwrap().count();
assert_eq!(count, 4);
}
#[test]
fn par_iter_full_trajectory_traj_ids() {
let dataset = make_dataset_with_index();
let mut traj_ids: Vec<TrajId> = dataset
.par_iter_full_trajectory()
.unwrap()
.map(|(tid, _)| tid)
.collect();
traj_ids.sort_unstable_by_key(traj_id_key);
traj_ids.dedup_by_key(|id| traj_id_key(id));
assert_eq!(traj_ids, vec![TrajId::Int(10), TrajId::Int(20)]);
}
#[test]
fn materialize_trajectory_par_some_for_existing_traj() {
let dataset = make_dataset_with_index();
assert!(
dataset
.materialize_trajectory_par(TrajId::Int(10))
.is_some(),
"Expected Some(vec) for TrajId::Int(10)"
);
}
#[test]
fn materialize_trajectory_par_none_for_missing_traj() {
let dataset = make_dataset_with_index();
assert!(
dataset
.materialize_trajectory_par(TrajId::Int(99))
.is_none(),
"Expected None for TrajId::Int(99)"
);
}
#[test]
fn materialize_trajectory_par_count() {
let dataset = make_dataset_with_index();
let vec = dataset.materialize_trajectory_par(TrajId::Int(10)).unwrap();
assert_eq!(vec.len(), 2);
}
#[test]
fn materialize_trajectory_par_ids() {
let dataset = make_dataset_with_index();
let vec = dataset.materialize_trajectory_par(TrajId::Int(10)).unwrap();
let mut ids: Vec<u64> = vec.iter().map(|o| o.id).collect();
ids.sort_unstable();
assert_eq!(ids, vec![1u64, 3]);
}
}
mod parallel_index {
use super::*;
#[test]
fn index_par_iter_night_obs_index_none_without_index() {
let dataset = make_dataset_no_index();
assert!(
dataset
.index
.par_iter_night_obs_index(&NightId(1))
.is_none(),
"Expected None when the dataset index has no night map"
);
}
#[test]
fn index_par_iter_night_obs_index_some_for_existing() {
let dataset = make_dataset_with_index();
assert!(
dataset
.index
.par_iter_night_obs_index(&NightId(1))
.is_some(),
"Expected Some for NightId(1) which is in the night map"
);
}
#[test]
fn index_par_iter_night_obs_index_count() {
let dataset = make_dataset_with_index();
let count = dataset
.index
.par_iter_night_obs_index(&NightId(1))
.unwrap()
.count();
assert_eq!(count, 2);
}
#[test]
fn index_par_iter_full_night_none_without_index() {
let dataset = make_dataset_no_index();
assert!(
dataset.index.par_iter_full_night().is_none(),
"Expected None from index.par_iter_full_night() when no night map"
);
}
#[test]
fn index_par_iter_full_night_some_and_count() {
let dataset = make_dataset_with_index();
let count = dataset.index.par_iter_full_night().unwrap().count();
assert_eq!(count, 4);
}
#[test]
fn index_par_iter_traj_obs_index_none_without_index() {
let dataset = make_dataset_no_index();
assert!(
dataset
.index
.par_iter_traj_obs_index(TrajId::Int(10))
.is_none(),
"Expected None when the dataset index has no trajectory map"
);
}
#[test]
fn index_par_iter_traj_obs_index_some_for_existing() {
let dataset = make_dataset_with_index();
assert!(
dataset
.index
.par_iter_traj_obs_index(TrajId::Int(10))
.is_some(),
"Expected Some for TrajId::Int(10) which is in the trajectory map"
);
}
#[test]
fn index_par_iter_traj_obs_index_count() {
let dataset = make_dataset_with_index();
let count = dataset
.index
.par_iter_traj_obs_index(TrajId::Int(10))
.unwrap()
.count();
assert_eq!(count, 2);
}
#[test]
fn index_par_iter_full_trajectory_none_without_index() {
let dataset = make_dataset_no_index();
assert!(
dataset.index.par_iter_full_trajectory().is_none(),
"Expected None from index.par_iter_full_trajectory() when no trajectory map"
);
}
#[test]
fn index_par_iter_full_trajectory_some_and_count() {
let dataset = make_dataset_with_index();
let count = dataset.index.par_iter_full_trajectory().unwrap().count();
assert_eq!(count, 4);
}
}
}