pub mod error;
use crate::misc::{has_whitening_filter, vec_compare_f32, vec_compare_f64};
use crate::signal_chain_correction::SignalChainCorrection;
use crate::types::{Pol, ReceiverType};
use crate::{fits_open_hdu_by_name, fits_read::*};
use core::f32;
use error::RfinputError;
use fitsio::FitsFile;
use std::fmt;
pub mod ffi;
#[cfg(any(feature = "python", feature = "python-stubgen"))]
use pyo3::prelude::*;
#[cfg(feature = "python-stubgen")]
use pyo3_stub_gen_derive::gen_stub_pyclass;
#[cfg(test)]
mod test;
fn get_vcs_order(input: u32) -> u32 {
if input < 256 {
(input & 0xC0) | ((input & 0x30) >> 4) | ((input & 0x0F) << 2)
} else {
input
}
}
fn get_mwax_order(antenna: u32, pol: Pol) -> u32 {
(antenna * 2) + u32::from(pol == Pol::Y)
}
fn get_electrical_length(metafits_length_string: String, coax_v_factor: f64) -> f64 {
if metafits_length_string.starts_with("EL_") {
metafits_length_string
.replace("EL_", "")
.parse::<f64>()
.unwrap()
} else {
metafits_length_string.parse::<f64>().unwrap() * coax_v_factor
}
}
struct RfInputMetafitsTableRow {
input: u32,
antenna: u32,
tile_id: u32,
tile_name: String,
pol: Pol,
length_string: String,
north_m: f64,
east_m: f64,
height_m: f64,
flag: i32,
digital_gains: Vec<f64>,
dipole_delays: Vec<u32>,
dipole_gains: Vec<f64>,
rx: u32,
slot: u32,
rx_type: String,
flavour: String,
whitening_filter: i32,
}
struct RfInputMetafitsCalibDataTableRow {
antenna: u32,
tile: u32,
tilename: String,
pol: Pol,
calib_delay: f32,
calib_gains: Vec<f32>,
}
#[cfg_attr(feature = "python-stubgen", gen_stub_pyclass)]
#[cfg_attr(
any(feature = "python", feature = "python-stubgen"),
pyclass(get_all, set_all, from_py_object)
)]
#[derive(Clone, Debug)]
pub struct Rfinput {
pub input: u32,
pub ant: u32,
pub tile_id: u32,
pub tile_name: String,
pub pol: Pol,
pub electrical_length_m: f64,
pub north_m: f64,
pub east_m: f64,
pub height_m: f64,
pub vcs_order: u32,
pub subfile_order: u32,
pub flagged: bool,
pub digital_gains: Vec<f64>,
pub dipole_gains: Vec<f64>,
pub dipole_delays: Vec<u32>,
pub rec_number: u32,
pub rec_slot_number: u32,
pub rec_type: ReceiverType,
pub flavour: String,
pub has_whitening_filter: bool,
pub calib_delay: Option<f32>,
pub calib_gains: Option<Vec<f32>>,
pub signal_chain_corrections_index: Option<usize>,
}
impl PartialEq for Rfinput {
fn eq(&self, other: &Self) -> bool {
let initial_eq = self.input == other.input
&& self.ant == other.ant
&& self.tile_id == other.tile_id
&& self.tile_name == other.tile_name
&& self.pol == other.pol
&& self.electrical_length_m == other.electrical_length_m
&& self.north_m == other.north_m
&& self.east_m == other.east_m
&& self.height_m == other.height_m
&& self.vcs_order == other.vcs_order
&& self.subfile_order == other.subfile_order
&& self.flagged == other.flagged
&& self.rec_number == other.rec_number
&& self.rec_slot_number == other.rec_slot_number
&& self.rec_type == other.rec_type
&& self.dipole_delays == other.dipole_delays
&& self.flavour == other.flavour
&& self.has_whitening_filter == other.has_whitening_filter
&& self.signal_chain_corrections_index == other.signal_chain_corrections_index;
let calib_gains_eq: bool = if self.calib_gains.is_some() && other.calib_gains.is_some() {
vec_compare_f32(
&self.calib_gains.clone().unwrap(),
&other.calib_gains.clone().unwrap(),
)
} else {
self.calib_gains.is_none() && other.calib_gains.is_none()
};
let digital_gains_eq = vec_compare_f64(&self.digital_gains, &other.digital_gains);
let dipole_gains_eq = vec_compare_f64(&self.dipole_gains, &other.dipole_gains);
initial_eq && calib_gains_eq && digital_gains_eq && dipole_gains_eq
}
}
impl Rfinput {
fn read_metafits_tiledata_values(
metafits_fptr: &mut FitsFile,
metafits_tile_table_hdu: &fitsio::hdu::FitsHdu,
row: usize,
num_coarse_chans: usize,
) -> Result<RfInputMetafitsTableRow, RfinputError> {
let input = read_cell_value(metafits_fptr, metafits_tile_table_hdu, "Input", row)?;
let antenna = read_cell_value(metafits_fptr, metafits_tile_table_hdu, "Antenna", row)?;
let tile_id = read_cell_value(metafits_fptr, metafits_tile_table_hdu, "Tile", row)?;
let tile_name = read_cell_string(metafits_fptr, metafits_tile_table_hdu, "TileName", row)?;
let pol = {
let p: String = read_cell_string(metafits_fptr, metafits_tile_table_hdu, "Pol", row)?;
match p.as_str() {
"X" => Pol::X,
"Y" => Pol::Y,
_ => {
return Err(RfinputError::UnrecognisedPol {
fits_filename: metafits_fptr.file_path().to_path_buf(),
hdu_num: metafits_tile_table_hdu.number + 1,
row_num: row,
got: p,
})
}
}
};
let length_string: String =
read_cell_string(metafits_fptr, metafits_tile_table_hdu, "Length", row)?;
let north_m = read_cell_value(metafits_fptr, metafits_tile_table_hdu, "North", row)?;
let east_m = read_cell_value(metafits_fptr, metafits_tile_table_hdu, "East", row)?;
let height_m = read_cell_value(metafits_fptr, metafits_tile_table_hdu, "Height", row)?;
let flag = read_cell_value(metafits_fptr, metafits_tile_table_hdu, "Flag", row)?;
let digital_gains = read_cell_array_u32(
metafits_fptr,
metafits_tile_table_hdu,
"Gains",
row,
num_coarse_chans,
)?
.iter()
.map(|gains| *gains as f64 / 64.0)
.collect();
let dipole_delays =
read_cell_array_u32(metafits_fptr, metafits_tile_table_hdu, "Delays", row, 16)?;
let rx = read_cell_value(metafits_fptr, metafits_tile_table_hdu, "Rx", row)?;
let slot = read_cell_value(metafits_fptr, metafits_tile_table_hdu, "Slot", row)?;
let dipole_gains = dipole_delays
.iter()
.map(|&delay| if delay == 32 { 0.0 } else { 1.0 })
.collect();
let rx_type: String = read_cell_string(
metafits_fptr,
metafits_tile_table_hdu,
"Receiver_Types",
row,
)
.unwrap_or_default();
let flavour: String =
read_cell_string(metafits_fptr, metafits_tile_table_hdu, "Flavors", row)
.unwrap_or_default();
let whitening_filter: i32 = read_cell_value(
metafits_fptr,
metafits_tile_table_hdu,
"Whitening_Filter",
row,
)
.unwrap_or(-1);
Ok(RfInputMetafitsTableRow {
input,
antenna,
tile_id,
tile_name,
pol,
length_string,
north_m,
east_m,
height_m,
flag,
digital_gains,
dipole_delays,
dipole_gains,
rx,
slot,
rx_type,
flavour,
whitening_filter,
})
}
fn read_metafits_calibdata_values(
metafits_fptr: &mut FitsFile,
metafits_calibdata_table_hdu: &Option<fitsio::hdu::FitsHdu>,
row: usize,
num_coarse_chans: usize,
) -> Result<Option<RfInputMetafitsCalibDataTableRow>, RfinputError> {
if let Some(metafits_cal_hdu) = metafits_calibdata_table_hdu {
let antenna = read_cell_value(metafits_fptr, metafits_cal_hdu, "Antenna", row)?;
let tile_id = read_cell_value(metafits_fptr, metafits_cal_hdu, "Tile", row)?;
let tile_name = read_cell_string(metafits_fptr, metafits_cal_hdu, "TileName", row)?;
let pol = {
let p: String = read_cell_string(metafits_fptr, metafits_cal_hdu, "Pol", row)?;
match p.as_str() {
"X" => Pol::X,
"Y" => Pol::Y,
_ => {
return Err(RfinputError::UnrecognisedPol {
fits_filename: metafits_fptr.file_path().to_path_buf(),
hdu_num: metafits_cal_hdu.number + 1,
row_num: row,
got: p,
})
}
}
};
let calib_delay_m: f32 =
read_cell_value(metafits_fptr, metafits_cal_hdu, "Calib_Delay", row)?;
let calib_gains: Vec<f32> = read_cell_array_f32(
metafits_fptr,
metafits_cal_hdu,
"Calib_Gains",
row,
num_coarse_chans,
)?;
Ok(Some(RfInputMetafitsCalibDataTableRow {
antenna,
tile: tile_id,
tilename: tile_name,
pol,
calib_delay: calib_delay_m,
calib_gains,
}))
} else {
Ok(None)
}
}
pub(crate) fn populate_rf_inputs(
num_inputs: usize,
metafits_fptr: &mut FitsFile,
metafits_tile_table_hdu: &fitsio::hdu::FitsHdu,
coax_v_factor: f64,
num_coarse_chans: usize,
signal_chain_corrections: &Option<Vec<SignalChainCorrection>>,
) -> Result<Vec<Self>, RfinputError> {
let mut rf_inputs: Vec<Self> = Vec::with_capacity(num_inputs);
let metafits_cal_hdu_result = fits_open_hdu_by_name!(metafits_fptr, "CALIBDATA");
let metafits_cal_hdu_option = metafits_cal_hdu_result.ok();
for input in 0..num_inputs {
let metafits_row = Self::read_metafits_tiledata_values(
metafits_fptr,
metafits_tile_table_hdu,
input,
num_coarse_chans,
)?;
let electrical_length_m =
get_electrical_length(metafits_row.length_string, coax_v_factor);
let vcs_order = get_vcs_order(metafits_row.input);
let subfile_order = get_mwax_order(metafits_row.antenna, metafits_row.pol);
let has_whitening_filter: bool =
has_whitening_filter(&metafits_row.flavour, metafits_row.whitening_filter);
let rec_type = metafits_row.rx_type.parse::<ReceiverType>().unwrap();
let mut calib_delay: Option<f32> = None;
let mut calib_gains: Option<Vec<f32>> = None;
if let Some(calibdata_row) = Self::read_metafits_calibdata_values(
metafits_fptr,
&metafits_cal_hdu_option,
input,
num_coarse_chans,
)? {
assert_eq!(calibdata_row.antenna, metafits_row.antenna);
assert_eq!(calibdata_row.tile, metafits_row.tile_id);
assert_eq!(calibdata_row.tilename, metafits_row.tile_name);
assert_eq!(calibdata_row.pol, metafits_row.pol);
calib_delay = match calibdata_row.calib_delay.is_nan() {
true => None,
false => Some(calibdata_row.calib_delay),
};
calib_gains = Some(calibdata_row.calib_gains);
}
let signal_chain_corrections_index: Option<usize> = match signal_chain_corrections {
Some(s) => s.iter().position(|sc| {
sc.receiver_type == rec_type && sc.whitening_filter == has_whitening_filter
}),
None => None,
};
rf_inputs.push(Self {
input: metafits_row.input,
ant: metafits_row.antenna,
tile_id: metafits_row.tile_id,
tile_name: metafits_row.tile_name,
pol: metafits_row.pol,
electrical_length_m,
north_m: metafits_row.north_m,
east_m: metafits_row.east_m,
height_m: metafits_row.height_m,
vcs_order,
subfile_order,
flagged: metafits_row.flag == 1,
digital_gains: metafits_row.digital_gains,
dipole_gains: metafits_row.dipole_gains,
dipole_delays: metafits_row.dipole_delays,
rec_number: metafits_row.rx,
rec_slot_number: metafits_row.slot,
rec_type,
flavour: metafits_row.flavour,
has_whitening_filter,
calib_delay,
calib_gains,
signal_chain_corrections_index,
})
}
Ok(rf_inputs)
}
}
impl fmt::Display for Rfinput {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}{}", self.tile_name, self.pol)
}
}