use std::path::PathBuf;
use self::ndarray::prelude::*;
use num_complex::Complex64 as c64;
use numpy::*;
use pyo3::prelude::*;
use crate::fee::FEEBeam as FEEBeamRust;
#[cfg(any(feature = "cuda", feature = "hip"))]
use crate::GpuComplex;
#[pyclass]
#[allow(clippy::upper_case_acronyms)]
pub(super) struct FEEBeam {
beam: FEEBeamRust,
}
#[pymethods]
impl FEEBeam {
#[new]
#[pyo3(signature = (hdf5_file))]
fn new(hdf5_file: Option<Bound<'_, PyAny>>) -> PyResult<Self> {
let strct = match hdf5_file {
Some(f) => {
let f: PathBuf = f.extract().expect("can convert python string to rust path");
FEEBeamRust::new(f)?
}
None => FEEBeamRust::new_from_env()?,
};
Ok(FEEBeam { beam: strct })
}
#[pyo3(
signature = (az_rad, za_rad, freq_hz, delays, amps, norm_to_zenith, latitude_rad=None, iau_order=None)
)]
#[allow(clippy::too_many_arguments)]
fn calc_jones<'py>(
&self,
py: Python<'py>,
az_rad: f64,
za_rad: f64,
freq_hz: f64,
delays: [u32; 16],
amps: Vec<f64>,
norm_to_zenith: bool,
latitude_rad: Option<f64>,
iau_order: Option<bool>,
) -> PyResult<Bound<'py, PyArray1<c64>>> {
let jones = self.beam.calc_jones_pair(
az_rad,
za_rad,
freq_hz.round() as _,
&delays,
&s,
norm_to_zenith,
latitude_rad,
iau_order.unwrap_or(false),
)?;
let jones_py: Vec<c64> = jones.iter().map(|c| c64::new(c.re, c.im)).collect();
let np_array = PyArray1::from_vec(py, jones_py);
Ok(np_array)
}
#[pyo3(
signature = (az_rad, za_rad, freq_hz, delays, amps, norm_to_zenith, latitude_rad=None, iau_order=None)
)]
#[allow(clippy::too_many_arguments)]
fn calc_jones_array<'py>(
&self,
py: Python<'py>,
az_rad: Vec<f64>,
za_rad: Vec<f64>,
freq_hz: f64,
delays: [u32; 16],
amps: Vec<f64>,
norm_to_zenith: bool,
latitude_rad: Option<f64>,
iau_order: Option<bool>,
) -> PyResult<Bound<'py, PyArray2<c64>>> {
let jones = self.beam.calc_jones_array_pair(
&az_rad,
&za_rad,
freq_hz.round() as _,
&delays,
&s,
norm_to_zenith,
latitude_rad,
iau_order.unwrap_or(false),
)?;
let mut jones = std::mem::ManuallyDrop::new(jones);
let old_len = jones.len();
let new_len = old_len * 4;
let new_cap = jones.capacity() * 4;
let new_ptr = jones.as_mut_ptr() as *mut c64;
let flat = unsafe { Vec::from_raw_parts(new_ptr, new_len, new_cap) };
let a2 = Array2::from_shape_vec((old_len, 4), flat).unwrap();
Ok(a2.into_pyarray(py))
}
fn get_fee_beam_freqs<'py>(&self, py: Python<'py>) -> Bound<'py, PyArray1<u32>> {
self.beam.get_freqs().to_vec().into_pyarray(py)
}
#[pyo3(text_signature = "(freq_hz)")]
fn closest_freq(&self, freq_hz: f64) -> u32 {
self.beam.find_closest_freq(freq_hz.round() as _)
}
#[cfg(any(feature = "cuda", feature = "hip"))]
#[pyo3(
signature = (az_rad, za_rad, freqs_hz, delays_array, amps_array, norm_to_zenith, latitude_rad=None, iau_order=None)
)]
#[allow(clippy::too_many_arguments)]
fn calc_jones_gpu<'py>(
&self,
py: Python<'py>,
az_rad: Vec<f64>,
za_rad: Vec<f64>,
freqs_hz: Vec<f64>,
delays_array: Vec<u32>,
amps_array: Vec<f64>,
norm_to_zenith: bool,
latitude_rad: Option<f64>,
iau_order: Option<bool>,
) -> PyResult<Bound<'py, PyArray4<GpuComplex>>> {
let freqs: Vec<u32> = freqs_hz.iter().map(|&f| f.round() as _).collect();
let num_tiles = delays_array.len() / 16;
let delays = Array2::from_shape_vec((num_tiles, 16), delays_array).unwrap();
let amps =
Array2::from_shape_vec((num_tiles, amps_array.len() / num_tiles), amps_array).unwrap();
let azs: Vec<_> = az_rad.into_iter().map(|f| f as _).collect();
let zas: Vec<_> = za_rad.into_iter().map(|f| f as _).collect();
let gpu_beam = unsafe {
self.beam
.gpu_prepare(&freqs, delays.view(), amps.view(), norm_to_zenith)?
};
let jones =
gpu_beam.calc_jones_pair(&azs, &zas, latitude_rad, iau_order.unwrap_or(false))?;
let old_dim = jones.dim();
let mut jones = std::mem::ManuallyDrop::new(jones.into_raw_vec_and_offset().0);
let new_len = jones.len() * 4;
let new_cap = jones.capacity() * 4;
let new_ptr = jones.as_mut_ptr() as *mut GpuComplex;
let flat = unsafe { Vec::from_raw_parts(new_ptr, new_len, new_cap) };
let a4 = Array4::from_shape_vec((old_dim.0, old_dim.1, old_dim.2, 4), flat).unwrap();
Ok(a4.into_pyarray(py))
}
}