1use astrors_fork::fits;
2use astrors_fork::io::hdulist::HDU;
3
4use polars::{lazy::prelude::*, prelude::*};
5use rayon::prelude::*;
6use std::fs;
7use std::path::PathBuf;
8
9use crate::errors::FitsLoaderError;
10use crate::io::{add_calculated_domains, process_file_name, process_image, process_metadata};
11
12pub fn read_fits(
23 file_path: std::path::PathBuf,
24 header_items: &Vec<String>,
25) -> Result<DataFrame, FitsLoaderError> {
26 if file_path.extension().and_then(|ext| ext.to_str()) != Some("fits") {
27 return Err(FitsLoaderError::NoData);
28 }
29
30 let path_str = file_path
32 .to_str()
33 .ok_or_else(|| FitsLoaderError::InvalidFileName("Invalid UTF-8 in path".into()))?;
34
35 let result = (|| {
37 let hdul = fits::fromfile(path_str)?;
38
39 let meta = match hdul.hdus.get(0) {
41 Some(HDU::Primary(hdu)) => process_metadata(hdu, header_items)?,
42 _ => return Err(FitsLoaderError::NoData),
43 };
44
45 let img_data = match hdul.hdus.get(2) {
47 Some(HDU::Image(hdu)) => process_image(hdu)?,
48 _ => match hdul.hdus.get(1) {
50 Some(HDU::Image(hdu)) => process_image(hdu)?,
51 _ => return Err(FitsLoaderError::NoData),
52 },
53 };
54
55 let names = process_file_name(file_path.clone());
57
58 let mut columns = meta;
60 columns.extend(img_data);
61 columns.extend(names);
62
63 DataFrame::new(columns).map_err(FitsLoaderError::PolarsError)
65 })();
66
67 result.map_err(|e| {
69 FitsLoaderError::FitsError(format!("Error processing file '{}': {}", path_str, e))
70 })
71}
72
73fn combine_dataframes_with_alignment(
75 acc: DataFrame,
76 df: DataFrame,
77) -> Result<DataFrame, FitsLoaderError> {
78 match acc.vstack(&df) {
80 Ok(combined) => Ok(combined),
81 Err(_) => {
82 let acc_cols = acc.get_column_names();
84 let df_cols = df.get_column_names();
85
86 let missing_in_acc: Vec<_> = df_cols.iter().filter(|c| !acc_cols.contains(c)).collect();
88 let missing_in_df: Vec<_> = acc_cols.iter().filter(|c| !df_cols.contains(c)).collect();
89
90 let mut acc_aligned = acc.clone();
92 let mut df_aligned = df.clone();
93
94 for col in missing_in_acc {
95 let col_name: PlSmallStr = (*col).clone().into();
97 let null_series = Series::new_null(col_name, acc.height());
98 let _ = acc_aligned.with_column(null_series).unwrap();
99 }
100
101 for col in missing_in_df {
102 let col_name: PlSmallStr = (*col).clone().into();
104 let null_series = Series::new_null(col_name, df.height());
105 let _ = df_aligned.with_column(null_series).unwrap();
106 }
107
108 acc_aligned
110 .vstack(&df_aligned)
111 .map_err(|e| FitsLoaderError::PolarsError(e))
112 }
113 }
114}
115
116pub fn read_experiment(
127 dir: &str,
128 header_items: &Vec<String>,
129) -> Result<DataFrame, FitsLoaderError> {
130 let dir_path = std::path::PathBuf::from(dir);
131
132 if !dir_path.exists() {
133 return Err(FitsLoaderError::FitsError(format!(
134 "Directory not found: {}",
135 dir
136 )));
137 }
138
139 let entries: Vec<_> = fs::read_dir(dir)
141 .map_err(|e| FitsLoaderError::IoError(e))?
142 .par_bridge()
143 .filter_map(|entry| entry.ok())
144 .filter(|entry| entry.path().extension().and_then(|ext| ext.to_str()) == Some("fits"))
145 .collect();
146
147 if entries.is_empty() {
148 return Err(FitsLoaderError::FitsError(format!(
149 "No FITS files found in directory: {}",
150 dir
151 )));
152 }
153
154 let results: Vec<Result<DataFrame, FitsLoaderError>> = entries
156 .par_iter()
157 .map(|entry| read_fits(entry.path(), &header_items))
158 .collect();
159
160 let successful_dfs: Vec<DataFrame> = results
162 .into_iter()
163 .filter_map(|result| result.ok())
164 .collect();
165
166 if successful_dfs.is_empty() {
168 return Err(FitsLoaderError::FitsError(
169 "None of the files in the directory could be processed successfully".into(),
170 ));
171 }
172
173 let combined_df = successful_dfs
175 .into_par_iter()
176 .reduce_with(|acc, df| {
177 let acc_clone = acc.clone();
178 combine_dataframes_with_alignment(acc, df).unwrap_or(acc_clone)
179 })
180 .ok_or(FitsLoaderError::NoData)?;
181
182 Ok(add_calculated_domains(combined_df.lazy()))
184}
185
186pub fn read_multiple_fits(
197 file_paths: Vec<PathBuf>,
198 header_items: &Vec<String>,
199) -> Result<DataFrame, FitsLoaderError> {
200 if file_paths.is_empty() {
201 return Err(FitsLoaderError::FitsError("No files provided".into()));
202 }
203
204 for path in &file_paths {
206 if !path.exists() {
207 return Err(FitsLoaderError::FitsError(format!(
208 "File not found: {}",
209 path.display()
210 )));
211 }
212 }
213
214 let results: Vec<Result<DataFrame, FitsLoaderError>> = file_paths
216 .par_iter()
217 .map(|path| read_fits(path.clone(), header_items))
218 .collect();
219
220 let successful_dfs: Vec<DataFrame> = results
222 .into_iter()
223 .filter_map(|result| result.ok())
224 .collect();
225
226 if successful_dfs.is_empty() {
228 return Err(FitsLoaderError::FitsError(
229 "None of the provided files could be processed successfully".into(),
230 ));
231 }
232
233 let combined_df = successful_dfs
235 .into_par_iter()
236 .reduce_with(|acc, df| {
237 let acc_clone = acc.clone();
238 combine_dataframes_with_alignment(acc, df).unwrap_or(acc_clone)
239 })
240 .ok_or(FitsLoaderError::NoData)?;
241
242 Ok(add_calculated_domains(combined_df.lazy()))
243}
244
245pub fn read_experiment_pattern(
257 dir: &str,
258 pattern: &str,
259 header_items: &Vec<String>,
260) -> Result<DataFrame, FitsLoaderError> {
261 let dir_path = std::path::PathBuf::from(dir);
262
263 if !dir_path.exists() {
264 return Err(FitsLoaderError::FitsError(format!(
265 "Directory not found: {}",
266 dir
267 )));
268 }
269
270 let header_items = header_items
272 .iter()
273 .map(|s| s.to_string())
274 .collect::<Vec<_>>();
275
276 let entries: Vec<_> = fs::read_dir(dir)
278 .map_err(FitsLoaderError::IoError)?
279 .par_bridge()
280 .filter_map(|entry| entry.ok())
281 .filter(|entry| {
282 let path = entry.path();
283 path.extension().and_then(|ext| ext.to_str()) == Some("fits")
284 && match path.file_name().and_then(|name| name.to_str()) {
285 Some(name) => glob_match::glob_match(pattern, name),
286 None => false,
287 }
288 })
289 .map(|entry| entry.path())
290 .collect();
291
292 if entries.is_empty() {
293 return Err(FitsLoaderError::FitsError(format!(
294 "No FITS files matching pattern '{}' found in directory: {}",
295 pattern, dir
296 )));
297 }
298
299 read_multiple_fits(entries, &header_items)
300}