pub mod error;
use crate::*;
pub use error::VoltageFileError;
use regex::Regex;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
use std::path::Path;
#[cfg(any(feature = "python", feature = "python-stubgen"))]
use pyo3::prelude::*;
#[cfg(feature = "python-stubgen")]
use pyo3_stub_gen_derive::gen_stub_pyclass;
#[cfg(test)]
mod test;
#[derive(Debug)]
pub(crate) struct ObsTimesAndChans {
pub start_gps_time_ms: u64, pub end_gps_time_ms: u64, pub duration_ms: u64,
pub coarse_chan_identifiers: Vec<usize>, }
#[cfg_attr(feature = "python-stubgen", gen_stub_pyclass)]
#[cfg_attr(
any(feature = "python", feature = "python-stubgen"),
pyclass(get_all, set_all, from_py_object)
)]
#[derive(Clone)]
pub struct VoltageFileBatch {
pub gps_time_seconds: u64,
pub voltage_files: Vec<VoltageFile>,
}
impl VoltageFileBatch {
pub fn new(gps_time: u64) -> Self {
Self {
gps_time_seconds: gps_time,
voltage_files: vec![],
}
}
}
impl fmt::Debug for VoltageFileBatch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"gps_time={} voltage_files={:?}",
self.gps_time_seconds, self.voltage_files,
)
}
}
#[cfg_attr(feature = "python-stubgen", gen_stub_pyclass)]
#[cfg_attr(
any(feature = "python", feature = "python-stubgen"),
pyclass(get_all, set_all, from_py_object)
)]
#[derive(Clone)]
pub struct VoltageFile {
pub filename: String,
pub channel_identifier: usize,
}
impl fmt::Debug for VoltageFile {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"filename={} channelidentifier={}",
self.filename, self.channel_identifier,
)
}
}
impl std::cmp::PartialEq for VoltageFileBatch {
fn eq(&self, other: &Self) -> bool {
self.gps_time_seconds == other.gps_time_seconds && self.voltage_files == other.voltage_files
}
}
impl std::cmp::PartialEq for VoltageFile {
fn eq(&self, other: &Self) -> bool {
self.filename == other.filename && self.channel_identifier == other.channel_identifier
}
}
#[derive(Clone, Debug)]
struct TempVoltageFile<'a> {
filename: &'a str,
obs_id: usize,
channel_identifier: usize,
gps_time_seconds: u64,
}
impl std::cmp::PartialEq for TempVoltageFile<'_> {
fn eq(&self, other: &Self) -> bool {
self.filename == other.filename
&& self.obs_id == other.obs_id
&& self.channel_identifier == other.channel_identifier
&& self.gps_time_seconds == other.gps_time_seconds
}
}
pub(crate) type VoltageFileTimeMap = BTreeMap<u64, BTreeMap<usize, String>>;
#[derive(Debug)]
pub(crate) struct VoltageFileInfo {
pub gpstime_batches: Vec<VoltageFileBatch>,
pub mwa_version: MWAVersion,
pub time_map: VoltageFileTimeMap,
#[allow(dead_code)]
pub file_size: u64,
pub voltage_file_interval_ms: u64,
}
fn convert_temp_voltage_files(
temp_voltage_files: Vec<TempVoltageFile>,
) -> HashMap<u64, VoltageFileBatch> {
let batches = temp_voltage_files.iter().map(|g| g.gps_time_seconds);
let mut voltage_file_batches: HashMap<u64, VoltageFileBatch> = HashMap::new();
for b in batches {
voltage_file_batches.insert(b, VoltageFileBatch::new(b));
}
for temp_v in temp_voltage_files.iter() {
let v = VoltageFile {
filename: temp_v.filename.to_string(),
channel_identifier: temp_v.channel_identifier,
};
let batch = voltage_file_batches
.get_mut(&temp_v.gps_time_seconds)
.unwrap();
batch.voltage_files.push(v);
}
for v in voltage_file_batches.values_mut() {
v.voltage_files
.sort_unstable_by(|a, b| a.channel_identifier.cmp(&b.channel_identifier));
}
voltage_file_batches
}
fn determine_voltage_file_gpstime_batches<T: AsRef<Path>>(
voltage_filenames: &'_ [T],
metafits_obs_id: usize,
) -> Result<(Vec<TempVoltageFile<'_>>, MWAVersion, usize, u64), VoltageFileError> {
if voltage_filenames.is_empty() {
return Err(VoltageFileError::NoVoltageFiles);
}
let mut format = None;
let mut temp_voltage_files: Vec<TempVoltageFile> = Vec::with_capacity(voltage_filenames.len());
let re_legacy_vcs_recombined: Regex =
Regex::new(r"(?P<obs_id>\d{10})_(?P<gpstime>\d{10})_ch(?P<channel>\d{1,3})\.dat").unwrap();
let re_mwax_vcs: Regex =
Regex::new(r"(?P<obs_id>\d{10})_(?P<gpstime>\d{10})_(?P<channel>\d{1,3})\.sub").unwrap();
for v_path in voltage_filenames {
let v = v_path
.as_ref()
.to_str()
.expect("Voltage filename is not UTF-8 compliant");
let new_temp_voltage_file: TempVoltageFile = {
match re_mwax_vcs.captures(v) {
Some(caps) => {
match format {
None => format = Some(MWAVersion::VCSMWAXv2),
Some(MWAVersion::VCSMWAXv2) => (),
_ => return Err(VoltageFileError::Mixture),
}
TempVoltageFile {
filename: v,
obs_id: caps["obs_id"].parse().unwrap(),
channel_identifier: caps["channel"].parse().unwrap(),
gps_time_seconds: caps["gpstime"].parse().unwrap(),
}
}
None => match re_legacy_vcs_recombined.captures(v) {
Some(caps) => {
match format {
None => format = Some(MWAVersion::VCSLegacyRecombined),
Some(MWAVersion::VCSLegacyRecombined) => (),
_ => return Err(VoltageFileError::Mixture),
}
TempVoltageFile {
filename: v,
obs_id: caps["obs_id"].parse().unwrap(),
channel_identifier: caps["channel"].parse().unwrap(),
gps_time_seconds: caps["gpstime"].parse().unwrap(),
}
}
None => return Err(VoltageFileError::Unrecognised(v.to_string())),
},
}
};
if new_temp_voltage_file.obs_id == metafits_obs_id {
temp_voltage_files.push(new_temp_voltage_file);
} else {
return Err(VoltageFileError::MetafitsObsidMismatch);
}
}
let mwa_version = format.unwrap();
let voltage_file_interval_seconds: u64 = match mwa_version {
MWAVersion::VCSMWAXv2 => MWA_VCS_MWAXV2_SUBFILE_SECONDS,
MWAVersion::VCSLegacyRecombined => MWA_VCS_LEGACY_RECOMBINED_FILE_SECONDS,
_ => return Err(VoltageFileError::InvalidMwaVersion { mwa_version }),
};
let mut batches_and_files: BTreeMap<u64, u8> = BTreeMap::new();
for voltage_file in &temp_voltage_files {
*batches_and_files
.entry(voltage_file.gps_time_seconds)
.or_insert(0) += 1;
}
let mut file_count: Option<u8> = None;
let mut prev_batch_num: u64 = 0;
for (batch_num, num_files) in batches_and_files.iter() {
if prev_batch_num != 0 && prev_batch_num + voltage_file_interval_seconds != *batch_num {
return Err(VoltageFileError::GpsTimeMissing {
expected: prev_batch_num + voltage_file_interval_seconds,
got: *batch_num,
});
}
prev_batch_num = *batch_num;
match file_count {
None => file_count = Some(*num_files),
Some(c) => {
if c != *num_files {
return Err(VoltageFileError::UnevenChannelsForGpsTime {
expected: c,
got: *num_files,
});
}
}
}
}
temp_voltage_files.sort_unstable_by_key(|v| (v.gps_time_seconds, v.channel_identifier));
Ok((
temp_voltage_files,
format.unwrap(),
batches_and_files.len(),
voltage_file_interval_seconds * 1000,
))
}
pub(crate) fn examine_voltage_files<T: AsRef<Path>>(
metafits_context: &MetafitsContext,
voltage_filenames: &[T],
) -> Result<VoltageFileInfo, VoltageFileError> {
let (temp_voltage_files, mwa_version, _, voltage_file_interval_ms) =
determine_voltage_file_gpstime_batches(
voltage_filenames,
metafits_context.obs_id as usize,
)?;
let time_map: VoltageFileTimeMap = create_time_map(&temp_voltage_files);
let mut gpstime_batches: HashMap<u64, VoltageFileBatch> =
convert_temp_voltage_files(temp_voltage_files);
let mut voltage_file_size: Option<u64> = None;
for b in gpstime_batches.values_mut() {
for v in &mut b.voltage_files {
let metadata = std::fs::metadata(&v.filename);
let this_size = match metadata {
Ok(m) => m.len(),
Err(e) => {
return Err(VoltageFileError::VoltageFileError(
(*v.filename).to_string(),
format!("{}", e),
));
}
};
match voltage_file_size {
None => voltage_file_size = Some(this_size),
Some(s) => {
if s != this_size {
return Err(VoltageFileError::UnequalFileSizes);
}
}
}
}
}
let mut gpstime_batches_vec: Vec<VoltageFileBatch> = Vec::new();
for (_, b) in gpstime_batches {
gpstime_batches_vec.push(b);
}
gpstime_batches_vec.sort_by_key(|b| b.gps_time_seconds);
Ok(VoltageFileInfo {
gpstime_batches: gpstime_batches_vec,
mwa_version,
time_map,
file_size: voltage_file_size.unwrap(),
voltage_file_interval_ms,
})
}
fn create_time_map(voltage_file_batches: &[TempVoltageFile]) -> VoltageFileTimeMap {
let mut voltage_time_map = BTreeMap::new();
for voltage_file in voltage_file_batches.iter() {
voltage_time_map
.entry(voltage_file.gps_time_seconds)
.or_insert_with(BTreeMap::new)
.entry(voltage_file.channel_identifier)
.or_insert_with(|| voltage_file.filename.to_string());
}
voltage_time_map
}
pub(crate) fn populate_provided_timesteps(
voltage_time_map: &VoltageFileTimeMap,
volt_timesteps: &[TimeStep],
) -> Vec<usize> {
let mut return_vec: Vec<usize> = voltage_time_map
.iter()
.map(|t| {
volt_timesteps
.iter()
.position(|v| v.gps_time_ms == *t.0 * 1000)
.unwrap()
})
.collect();
return_vec.sort_unstable();
return_vec
}
pub(crate) fn populate_provided_coarse_channels(
voltage_time_map: &VoltageFileTimeMap,
volt_coarse_chans: &[CoarseChannel],
) -> Vec<usize> {
let chans: HashSet<usize> = voltage_time_map
.iter()
.flat_map(|ts| ts.1.iter().map(|ch| *ch.0))
.collect::<HashSet<usize>>();
let mut return_vec: Vec<usize> = chans
.iter()
.map(|c| {
volt_coarse_chans
.iter()
.position(|v| v.gpubox_number == *c)
.unwrap()
})
.collect();
return_vec.sort_unstable();
return_vec
}
pub(crate) fn determine_common_obs_times_and_chans(
voltage_time_map: &VoltageFileTimeMap,
timestep_duration_ms: u64,
good_time_gps_time_ms: Option<u64>,
) -> Result<Option<ObsTimesAndChans>, VoltageFileError> {
let timemap = match good_time_gps_time_ms {
Some(good_time) => voltage_time_map
.clone()
.into_iter()
.filter(|ts| ts.0 * 1000 >= good_time)
.collect(),
None => voltage_time_map.clone(),
};
let max_chans = voltage_time_map
.iter()
.flat_map(|ts| ts.1.iter().map(|ch| *ch.0))
.collect::<HashSet<usize>>()
.into_iter()
.len();
let mut filtered_timesteps = timemap
.into_iter()
.filter(|(_, submap)| submap.len() == max_chans);
let first_ts = match filtered_timesteps.next() {
Some(ts) => ts,
None => return Ok(None),
};
let first_ts_chans = first_ts
.1
.iter()
.map(|ts_chans| *ts_chans.0)
.collect::<Vec<usize>>();
let common_start_gps_ms = first_ts.0 * 1000;
let mut common_end_gps_ms = common_start_gps_ms + timestep_duration_ms;
let mut prev_ts_gps_ms = common_start_gps_ms;
loop {
let next_item = filtered_timesteps.next();
match next_item {
Some(ts) => {
if (ts.0 * 1000 == prev_ts_gps_ms + timestep_duration_ms)
&& first_ts_chans.len() == ts.1.len()
{
common_end_gps_ms = (ts.0 * 1000) + timestep_duration_ms;
prev_ts_gps_ms = ts.0 * 1000;
} else {
break;
}
}
None => break,
}
}
Ok(Some(ObsTimesAndChans {
start_gps_time_ms: common_start_gps_ms,
end_gps_time_ms: common_end_gps_ms,
duration_ms: common_end_gps_ms - common_start_gps_ms,
coarse_chan_identifiers: first_ts_chans,
}))
}