pub mod error;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::fmt;
use std::path::Path;
use fitsio::hdu::FitsHdu;
use regex::Regex;
use crate::*;
pub use error::GpuboxError;
use fitsio::FitsFile;
#[cfg(any(feature = "python", feature = "python-stubgen"))]
use pyo3::prelude::*;
#[cfg(feature = "python-stubgen")]
use pyo3_stub_gen_derive::gen_stub_pyclass;
#[cfg(test)]
mod test;
#[derive(Debug)]
pub(crate) struct ObsTimesAndChans {
pub start_time_unix_ms: u64, pub end_time_unix_ms: u64, pub duration_ms: u64,
pub coarse_chan_identifiers: Vec<usize>, }
#[cfg_attr(feature = "python-stubgen", gen_stub_pyclass)]
#[cfg_attr(
any(feature = "python", feature = "python-stubgen"),
pyclass(get_all, set_all, from_py_object)
)]
#[derive(Clone)]
pub struct GpuBoxBatch {
pub batch_number: usize,
pub gpubox_files: Vec<GpuBoxFile>,
}
impl GpuBoxBatch {
pub fn new(batch_number: usize) -> Self {
Self {
batch_number,
gpubox_files: vec![],
}
}
}
impl fmt::Debug for GpuBoxBatch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"batch_number={} gpubox_files={:?}",
self.batch_number, self.gpubox_files,
)
}
}
#[cfg_attr(feature = "python-stubgen", gen_stub_pyclass)]
#[cfg_attr(
any(feature = "python", feature = "python-stubgen"),
pyclass(get_all, set_all, from_py_object)
)]
#[derive(Clone)]
pub struct GpuBoxFile {
pub filename: String,
pub channel_identifier: usize,
}
impl fmt::Debug for GpuBoxFile {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"filename={} channelidentifier={}",
self.filename, self.channel_identifier,
)
}
}
impl std::cmp::PartialEq for GpuBoxBatch {
fn eq(&self, other: &Self) -> bool {
self.batch_number == other.batch_number && self.gpubox_files == other.gpubox_files
}
}
impl std::cmp::PartialEq for GpuBoxFile {
fn eq(&self, other: &Self) -> bool {
self.filename == other.filename && self.channel_identifier == other.channel_identifier
}
}
#[derive(Clone, Debug)]
struct TempGpuBoxFile<'a> {
filename: &'a str,
channel_identifier: usize,
batch_number: usize,
}
impl std::cmp::PartialEq for TempGpuBoxFile<'_> {
fn eq(&self, other: &Self) -> bool {
self.filename == other.filename
&& self.channel_identifier == other.channel_identifier
&& self.batch_number == other.batch_number
}
}
pub(crate) type GpuboxTimeMap = BTreeMap<u64, BTreeMap<usize, (usize, usize)>>;
pub(crate) struct GpuboxInfo {
pub batches: Vec<GpuBoxBatch>,
pub mwa_version: MWAVersion,
pub time_map: GpuboxTimeMap,
pub hdu_size: usize,
}
fn convert_temp_gpuboxes(temp_gpuboxes: Vec<TempGpuBoxFile>) -> Vec<GpuBoxBatch> {
let num_batches = temp_gpuboxes.iter().map(|g| g.batch_number).max().unwrap() + 1;
let mut gpubox_batches: Vec<GpuBoxBatch> = Vec::with_capacity(num_batches);
for b in 0..num_batches {
gpubox_batches.push(GpuBoxBatch::new(b));
}
for temp_g in temp_gpuboxes.into_iter() {
let g = GpuBoxFile {
filename: temp_g.filename.to_string(),
channel_identifier: temp_g.channel_identifier,
};
gpubox_batches[temp_g.batch_number].gpubox_files.push(g);
}
for v in &mut gpubox_batches {
v.gpubox_files
.sort_unstable_by(|a, b| a.channel_identifier.cmp(&b.channel_identifier));
}
gpubox_batches.sort_by_key(|b| b.batch_number);
gpubox_batches
}
pub(crate) fn examine_gpubox_files<T: AsRef<Path>>(
gpubox_filenames: &[T],
metafits_obs_id: u32,
) -> Result<GpuboxInfo, GpuboxError> {
let (temp_gpuboxes, corr_format) = determine_gpubox_batches(gpubox_filenames)?;
let time_map: GpuboxTimeMap = create_time_map(&temp_gpuboxes, corr_format)?;
let mut batches = convert_temp_gpuboxes(temp_gpuboxes);
let mut hdu_size: Option<usize> = None;
for b in &mut batches {
for g in &mut b.gpubox_files {
let mut fptr = fits_open!(&g.filename)?;
let hdu = match fptr.iter().count() {
1 => {
return Err(GpuboxError::NoDataHDUsInGpuboxFile {
gpubox_filename: g.filename.clone(),
})
}
_ => fits_open_hdu!(&mut fptr, 1)?,
};
let this_size = get_hdu_image_size!(&mut fptr, &hdu)?.iter().product();
match hdu_size {
None => hdu_size = Some(this_size),
Some(s) => {
if s != this_size {
return Err(GpuboxError::UnequalHduSizes);
}
}
}
let primary_hdu = fits_open_hdu!(&mut fptr, 0)?;
validate_gpubox_metadata_mwa_version(
&mut fptr,
&primary_hdu,
&g.filename,
corr_format,
)?;
validate_gpubox_metadata_obs_id(&mut fptr, &primary_hdu, &g.filename, metafits_obs_id)?;
}
}
Ok(GpuboxInfo {
batches,
mwa_version: corr_format,
time_map,
hdu_size: hdu_size.unwrap(),
})
}
fn determine_gpubox_batches<T: AsRef<Path>>(
gpubox_filenames: &'_ [T],
) -> Result<(Vec<TempGpuBoxFile<'_>>, MWAVersion), GpuboxError> {
if gpubox_filenames.is_empty() {
return Err(GpuboxError::NoGpuboxes);
}
let re_mwax: Regex =
Regex::new(r"\d{10}_\d{8}(.)?\d{6}_ch(?P<channel>\d{3})_(?P<batch>\d{3}).fits").unwrap();
let re_legacy_batch: Regex =
Regex::new(r"\d{10}_\d{14}_gpubox(?P<band>\d{2})_(?P<batch>\d{2}).fits").unwrap();
let re_old_legacy_format: Regex =
Regex::new(r"\d{10}_\d{14}_gpubox(?P<band>\d{2}).fits").unwrap();
let mut format = None;
let mut temp_gpuboxes: Vec<TempGpuBoxFile> = Vec::with_capacity(gpubox_filenames.len());
for g_path in gpubox_filenames {
let g = g_path
.as_ref()
.to_str()
.expect("gpubox filename is not UTF-8 compliant");
match re_mwax.captures(g) {
Some(caps) => {
match format {
None => format = Some(MWAVersion::CorrMWAXv2),
Some(MWAVersion::CorrMWAXv2) => (),
_ => return Err(GpuboxError::Mixture),
}
temp_gpuboxes.push(TempGpuBoxFile {
filename: g,
channel_identifier: caps["channel"].parse().unwrap(),
batch_number: caps["batch"].parse().unwrap(),
});
}
None => match re_legacy_batch.captures(g) {
Some(caps) => {
match format {
None => format = Some(MWAVersion::CorrLegacy),
Some(MWAVersion::CorrLegacy) => (),
_ => return Err(GpuboxError::Mixture),
}
temp_gpuboxes.push(TempGpuBoxFile {
filename: g,
channel_identifier: caps["band"].parse().unwrap(),
batch_number: caps["batch"].parse().unwrap(),
});
}
None => match re_old_legacy_format.captures(g) {
Some(caps) => {
match format {
None => format = Some(MWAVersion::CorrOldLegacy),
Some(MWAVersion::CorrOldLegacy) => (),
_ => return Err(GpuboxError::Mixture),
}
temp_gpuboxes.push(TempGpuBoxFile {
filename: g,
channel_identifier: caps["band"].parse().unwrap(),
batch_number: 0,
});
}
None => return Err(GpuboxError::Unrecognised(g.to_string())),
},
},
}
}
temp_gpuboxes.sort_unstable_by_key(|g| (g.batch_number, g.channel_identifier));
Ok((temp_gpuboxes, format.unwrap()))
}
fn determine_hdu_time(
gpubox_fptr: &mut FitsFile,
gpubox_hdu_fptr: &FitsHdu,
) -> Result<u64, FitsError> {
let start_unix_time: u64 = get_required_fits_key!(gpubox_fptr, gpubox_hdu_fptr, "TIME")?;
let start_unix_millitime: u64 =
get_required_fits_key!(gpubox_fptr, gpubox_hdu_fptr, "MILLITIM")?;
Ok(start_unix_time * 1000 + start_unix_millitime)
}
fn map_unix_times_to_hdus(
gpubox_fptr: &mut FitsFile,
mwa_version: MWAVersion,
) -> Result<BTreeMap<u64, usize>, FitsError> {
let mut map = BTreeMap::new();
let last_hdu_index = gpubox_fptr.iter().count();
let step_size = if mwa_version == MWAVersion::CorrMWAXv2 {
2
} else {
1
};
for hdu_index in (1..last_hdu_index).step_by(step_size) {
let hdu = fits_open_hdu!(gpubox_fptr, hdu_index)?;
let time = determine_hdu_time(gpubox_fptr, &hdu)?;
map.insert(time, hdu_index);
}
Ok(map)
}
fn validate_gpubox_metadata_mwa_version(
gpubox_fptr: &mut FitsFile,
gpubox_primary_hdu: &FitsHdu,
gpubox_filename: &str,
mwa_version: MWAVersion,
) -> Result<(), GpuboxError> {
let gpu_mwa_version: Option<u8> =
get_optional_fits_key!(gpubox_fptr, gpubox_primary_hdu, "CORR_VER")?;
match mwa_version {
MWAVersion::CorrMWAXv2 => match gpu_mwa_version {
None => Err(GpuboxError::MwaxCorrVerMissing(gpubox_filename.to_string())),
Some(gpu_mwa_version_value) => match gpu_mwa_version_value {
2 => Ok(()),
_ => Err(GpuboxError::MwaxCorrVerMismatch(
gpubox_filename.to_string(),
)),
},
},
MWAVersion::CorrOldLegacy | MWAVersion::CorrLegacy => match gpu_mwa_version {
None => Ok(()),
Some(gpu_corr_version_value) => Err(GpuboxError::CorrVerMismatch {
gpubox_filename: gpubox_filename.to_string(),
gpu_corr_version_value,
}),
},
_ => Err(GpuboxError::InvalidMwaVersion { mwa_version }),
}
}
fn validate_gpubox_metadata_obs_id(
gpubox_fptr: &mut FitsFile,
gpubox_primary_hdu: &FitsHdu,
gpubox_filename: &str,
metafits_obs_id: u32,
) -> Result<(), GpuboxError> {
let gpu_obs_id: u32 = match get_required_fits_key!(gpubox_fptr, gpubox_primary_hdu, "OBSID") {
Ok(o) => o,
Err(_) => return Err(GpuboxError::MissingObsid(gpubox_filename.to_string())),
};
if gpu_obs_id != metafits_obs_id {
Err(GpuboxError::ObsidMismatch {
obsid: metafits_obs_id,
gpubox_filename: gpubox_filename.to_string(),
gpubox_obsid: gpu_obs_id,
})
} else {
Ok(())
}
}
fn create_time_map(
gpuboxes: &[TempGpuBoxFile],
mwa_version: MWAVersion,
) -> Result<GpuboxTimeMap, GpuboxError> {
let maps = gpuboxes
.into_iter()
.map(|g| {
let mut fptr = fits_open!(&g.filename)?;
let hdu = fits_open_hdu!(&mut fptr, 0)?;
if mwa_version == MWAVersion::CorrMWAXv2 {
let v: u8 = get_required_fits_key!(&mut fptr, &hdu, "CORR_VER")?;
if v != 2 {
return Err(GpuboxError::MwaxCorrVerMismatch(g.filename.to_string()));
}
}
map_unix_times_to_hdus(&mut fptr, mwa_version).map_err(GpuboxError::from)
})
.collect::<Vec<Result<BTreeMap<u64, usize>, GpuboxError>>>();
let mut gpubox_time_map = BTreeMap::new();
for (map_maybe_error, gpubox) in maps.into_iter().zip(gpuboxes.iter()) {
let map = map_maybe_error?;
for (time, hdu_index) in map {
gpubox_time_map
.entry(time)
.or_insert_with(BTreeMap::new)
.entry(gpubox.channel_identifier)
.or_insert((gpubox.batch_number, hdu_index));
}
}
Ok(gpubox_time_map)
}
pub(crate) fn populate_provided_timesteps(
gpubox_time_map: &GpuboxTimeMap,
corr_timesteps: &[TimeStep],
) -> Vec<usize> {
let mut return_vec: Vec<usize> = gpubox_time_map
.iter()
.map(|t| {
corr_timesteps
.iter()
.position(|v| v.unix_time_ms == *t.0)
.unwrap()
})
.collect();
return_vec.sort_unstable();
return_vec
}
pub(crate) fn populate_provided_coarse_channels(
gpubox_time_map: &GpuboxTimeMap,
corr_coarse_chans: &[CoarseChannel],
) -> Vec<usize> {
let chans: HashSet<usize> = gpubox_time_map
.iter()
.flat_map(|ts| ts.1.iter().map(|ch| *ch.0))
.collect::<HashSet<usize>>();
let mut return_vec: Vec<usize> = chans
.iter()
.map(|c| {
corr_coarse_chans
.iter()
.position(|v| v.gpubox_number == *c)
.unwrap()
})
.collect();
return_vec.sort_unstable();
return_vec
}
pub(crate) fn determine_common_obs_times_and_chans(
gpubox_time_map: &GpuboxTimeMap,
integration_time_ms: u64,
good_time_unix_time_ms: Option<u64>,
) -> Result<Option<ObsTimesAndChans>, GpuboxError> {
let timemap = match good_time_unix_time_ms {
Some(good_time) => gpubox_time_map
.clone()
.into_iter()
.filter(|ts| ts.0 >= good_time)
.collect(),
None => gpubox_time_map.clone(),
};
let max_chans = gpubox_time_map
.iter()
.flat_map(|ts| ts.1.iter().map(|ch| *ch.0))
.collect::<HashSet<usize>>()
.into_iter()
.len();
let mut filtered_timesteps = timemap
.into_iter()
.filter(|(_, submap)| submap.len() == max_chans);
let first_ts = match filtered_timesteps.next() {
Some(ts) => ts,
None => return Ok(None),
};
let first_ts_chans = first_ts
.1
.iter()
.map(|ts_chans| *ts_chans.0)
.collect::<Vec<usize>>();
let common_start_unix_ms = first_ts.0;
let mut common_end_unix_ms = common_start_unix_ms + integration_time_ms;
let mut prev_ts_unix_ms = common_start_unix_ms;
loop {
let next_item = filtered_timesteps.next();
match next_item {
Some(ts) => {
if (ts.0 == prev_ts_unix_ms + integration_time_ms)
&& first_ts_chans.len() == ts.1.len()
{
common_end_unix_ms = ts.0 + integration_time_ms;
prev_ts_unix_ms = ts.0;
} else {
break;
}
}
None => break,
}
}
Ok(Some(ObsTimesAndChans {
start_time_unix_ms: common_start_unix_ms,
end_time_unix_ms: common_end_unix_ms,
duration_ms: common_end_unix_ms - common_start_unix_ms,
coarse_chan_identifiers: first_ts_chans,
}))
}
#[cfg(test)]
mod tests {}