use astrors_fork::fits;
use astrors_fork::io::hdulist::HDU;
use polars::{lazy::prelude::*, prelude::*};
use rayon::prelude::*;
use std::fs;
use std::path::PathBuf;
use crate::errors::FitsLoaderError;
use crate::io::{add_calculated_domains, process_file_name, process_image, process_metadata};
pub fn read_fits(
file_path: std::path::PathBuf,
header_items: &Vec<String>,
) -> Result<DataFrame, FitsLoaderError> {
if file_path.extension().and_then(|ext| ext.to_str()) != Some("fits") {
return Err(FitsLoaderError::NoData);
}
let path_str = file_path
.to_str()
.ok_or_else(|| FitsLoaderError::InvalidFileName("Invalid UTF-8 in path".into()))?;
let result = (|| {
let hdul = fits::fromfile(path_str)?;
let meta = match hdul.hdus.get(0) {
Some(HDU::Primary(hdu)) => process_metadata(hdu, header_items)?,
_ => return Err(FitsLoaderError::NoData),
};
let img_data = match hdul.hdus.get(2) {
Some(HDU::Image(hdu)) => process_image(hdu)?,
_ => match hdul.hdus.get(1) {
Some(HDU::Image(hdu)) => process_image(hdu)?,
_ => return Err(FitsLoaderError::NoData),
},
};
let names = process_file_name(file_path.clone());
let mut columns = meta;
columns.extend(img_data);
columns.extend(names);
DataFrame::new(columns).map_err(FitsLoaderError::PolarsError)
})();
result.map_err(|e| {
FitsLoaderError::FitsError(format!("Error processing file '{}': {}", path_str, e))
})
}
fn combine_dataframes_with_alignment(
acc: DataFrame,
df: DataFrame,
) -> Result<DataFrame, FitsLoaderError> {
match acc.vstack(&df) {
Ok(combined) => Ok(combined),
Err(_) => {
let acc_cols = acc.get_column_names();
let df_cols = df.get_column_names();
let missing_in_acc: Vec<_> = df_cols.iter().filter(|c| !acc_cols.contains(c)).collect();
let missing_in_df: Vec<_> = acc_cols.iter().filter(|c| !df_cols.contains(c)).collect();
let mut acc_aligned = acc.clone();
let mut df_aligned = df.clone();
for col in missing_in_acc {
let col_name: PlSmallStr = (*col).clone().into();
let null_series = Series::new_null(col_name, acc.height());
let _ = acc_aligned.with_column(null_series).unwrap();
}
for col in missing_in_df {
let col_name: PlSmallStr = (*col).clone().into();
let null_series = Series::new_null(col_name, df.height());
let _ = df_aligned.with_column(null_series).unwrap();
}
acc_aligned
.vstack(&df_aligned)
.map_err(|e| FitsLoaderError::PolarsError(e))
}
}
}
pub fn read_experiment(
dir: &str,
header_items: &Vec<String>,
) -> Result<DataFrame, FitsLoaderError> {
let dir_path = std::path::PathBuf::from(dir);
if !dir_path.exists() {
return Err(FitsLoaderError::FitsError(format!(
"Directory not found: {}",
dir
)));
}
let entries: Vec<_> = fs::read_dir(dir)
.map_err(|e| FitsLoaderError::IoError(e))?
.par_bridge()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension().and_then(|ext| ext.to_str()) == Some("fits"))
.collect();
if entries.is_empty() {
return Err(FitsLoaderError::FitsError(format!(
"No FITS files found in directory: {}",
dir
)));
}
let results: Vec<Result<DataFrame, FitsLoaderError>> = entries
.par_iter()
.map(|entry| read_fits(entry.path(), &header_items))
.collect();
let successful_dfs: Vec<DataFrame> = results
.into_iter()
.filter_map(|result| result.ok())
.collect();
if successful_dfs.is_empty() {
return Err(FitsLoaderError::FitsError(
"None of the files in the directory could be processed successfully".into(),
));
}
let combined_df = successful_dfs
.into_par_iter()
.reduce_with(|acc, df| {
let acc_clone = acc.clone();
combine_dataframes_with_alignment(acc, df).unwrap_or(acc_clone)
})
.ok_or(FitsLoaderError::NoData)?;
Ok(add_calculated_domains(combined_df.lazy()))
}
pub fn read_multiple_fits(
file_paths: Vec<PathBuf>,
header_items: &Vec<String>,
) -> Result<DataFrame, FitsLoaderError> {
if file_paths.is_empty() {
return Err(FitsLoaderError::FitsError("No files provided".into()));
}
for path in &file_paths {
if !path.exists() {
return Err(FitsLoaderError::FitsError(format!(
"File not found: {}",
path.display()
)));
}
}
let results: Vec<Result<DataFrame, FitsLoaderError>> = file_paths
.par_iter()
.map(|path| read_fits(path.clone(), header_items))
.collect();
let successful_dfs: Vec<DataFrame> = results
.into_iter()
.filter_map(|result| result.ok())
.collect();
if successful_dfs.is_empty() {
return Err(FitsLoaderError::FitsError(
"None of the provided files could be processed successfully".into(),
));
}
let combined_df = successful_dfs
.into_par_iter()
.reduce_with(|acc, df| {
let acc_clone = acc.clone();
combine_dataframes_with_alignment(acc, df).unwrap_or(acc_clone)
})
.ok_or(FitsLoaderError::NoData)?;
Ok(add_calculated_domains(combined_df.lazy()))
}
pub fn read_experiment_pattern(
dir: &str,
pattern: &str,
header_items: &Vec<String>,
) -> Result<DataFrame, FitsLoaderError> {
let dir_path = std::path::PathBuf::from(dir);
if !dir_path.exists() {
return Err(FitsLoaderError::FitsError(format!(
"Directory not found: {}",
dir
)));
}
let header_items = header_items
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
let entries: Vec<_> = fs::read_dir(dir)
.map_err(FitsLoaderError::IoError)?
.par_bridge()
.filter_map(|entry| entry.ok())
.filter(|entry| {
let path = entry.path();
path.extension().and_then(|ext| ext.to_str()) == Some("fits")
&& match path.file_name().and_then(|name| name.to_str()) {
Some(name) => glob_match::glob_match(pattern, name),
None => false,
}
})
.map(|entry| entry.path())
.collect();
if entries.is_empty() {
return Err(FitsLoaderError::FitsError(format!(
"No FITS files matching pattern '{}' found in directory: {}",
pattern, dir
)));
}
read_multiple_fits(entries, &header_items)
}