pyref_core/
loader.rs

1use astrors_fork::fits;
2use astrors_fork::io::hdulist::HDU;
3
4use polars::{lazy::prelude::*, prelude::*};
5use rayon::prelude::*;
6use std::fs;
7use std::path::PathBuf;
8
9use crate::errors::FitsLoaderError;
10use crate::io::{add_calculated_domains, process_file_name, process_image, process_metadata};
11
12/// Reads a single FITS file and converts it to a Polars DataFrame.
13///
14/// # Arguments
15///
16/// * `file_path` - Path to the FITS file to read
17/// * `header_items` - List of header values to extract
18///
19/// # Returns
20///
21/// A `Result` containing either the DataFrame or a `FitsLoaderError`.
22pub fn read_fits(
23    file_path: std::path::PathBuf,
24    header_items: &Vec<String>,
25) -> Result<DataFrame, FitsLoaderError> {
26    if file_path.extension().and_then(|ext| ext.to_str()) != Some("fits") {
27        return Err(FitsLoaderError::NoData);
28    }
29
30    // Safely get path as string
31    let path_str = file_path
32        .to_str()
33        .ok_or_else(|| FitsLoaderError::InvalidFileName("Invalid UTF-8 in path".into()))?;
34
35    // Use try block pattern for more concise error handling
36    let result = (|| {
37        let hdul = fits::fromfile(path_str)?;
38
39        // Process primary header metadata
40        let meta = match hdul.hdus.get(0) {
41            Some(HDU::Primary(hdu)) => process_metadata(hdu, header_items)?,
42            _ => return Err(FitsLoaderError::NoData),
43        };
44
45        // Process image data
46        let img_data = match hdul.hdus.get(2) {
47            Some(HDU::Image(hdu)) => process_image(hdu)?,
48            // If there's no image at index 2, try index 1 as a fallback
49            _ => match hdul.hdus.get(1) {
50                Some(HDU::Image(hdu)) => process_image(hdu)?,
51                _ => return Err(FitsLoaderError::NoData),
52            },
53        };
54
55        // Extract file name information
56        let names = process_file_name(file_path.clone());
57
58        // Combine all columns
59        let mut columns = meta;
60        columns.extend(img_data);
61        columns.extend(names);
62
63        // Create DataFrame
64        DataFrame::new(columns).map_err(FitsLoaderError::PolarsError)
65    })();
66
67    // Add file path to error context if an error occurred
68    result.map_err(|e| {
69        FitsLoaderError::FitsError(format!("Error processing file '{}': {}", path_str, e))
70    })
71}
72
73/// Helper function to combine DataFrames with schema alignment
74fn combine_dataframes_with_alignment(
75    acc: DataFrame,
76    df: DataFrame,
77) -> Result<DataFrame, FitsLoaderError> {
78    // Try simple vstack first
79    match acc.vstack(&df) {
80        Ok(combined) => Ok(combined),
81        Err(_) => {
82            // If vstack fails, align the schemas and try again
83            let acc_cols = acc.get_column_names();
84            let df_cols = df.get_column_names();
85
86            // Find missing columns in each DataFrame
87            let missing_in_acc: Vec<_> = df_cols.iter().filter(|c| !acc_cols.contains(c)).collect();
88            let missing_in_df: Vec<_> = acc_cols.iter().filter(|c| !df_cols.contains(c)).collect();
89
90            // Add missing columns to each DataFrame with null values
91            let mut acc_aligned = acc.clone();
92            let mut df_aligned = df.clone();
93
94            for col in missing_in_acc {
95                // Convert to PlSmallStr
96                let col_name: PlSmallStr = (*col).clone().into();
97                let null_series = Series::new_null(col_name, acc.height());
98                let _ = acc_aligned.with_column(null_series).unwrap();
99            }
100
101            for col in missing_in_df {
102                // Convert to PlSmallStr
103                let col_name: PlSmallStr = (*col).clone().into();
104                let null_series = Series::new_null(col_name, df.height());
105                let _ = df_aligned.with_column(null_series).unwrap();
106            }
107
108            // Try again with aligned schemas
109            acc_aligned
110                .vstack(&df_aligned)
111                .map_err(|e| FitsLoaderError::PolarsError(e))
112        }
113    }
114}
115
116/// Reads all FITS files in a directory and combines them into a single DataFrame.
117///
118/// # Arguments
119///
120/// * `dir` - Path to the directory containing FITS files
121/// * `header_items` - List of header values to extract
122///
123/// # Returns
124///
125/// A `Result` containing either the combined DataFrame or a `FitsLoaderError`.
126pub fn read_experiment(
127    dir: &str,
128    header_items: &Vec<String>,
129) -> Result<DataFrame, FitsLoaderError> {
130    let dir_path = std::path::PathBuf::from(dir);
131
132    if !dir_path.exists() {
133        return Err(FitsLoaderError::FitsError(format!(
134            "Directory not found: {}",
135            dir
136        )));
137    }
138
139    // Find all FITS files in the directory
140    let entries: Vec<_> = fs::read_dir(dir)
141        .map_err(|e| FitsLoaderError::IoError(e))?
142        .par_bridge()
143        .filter_map(|entry| entry.ok())
144        .filter(|entry| entry.path().extension().and_then(|ext| ext.to_str()) == Some("fits"))
145        .collect();
146
147    if entries.is_empty() {
148        return Err(FitsLoaderError::FitsError(format!(
149            "No FITS files found in directory: {}",
150            dir
151        )));
152    }
153
154    // Process each file in parallel, collect results
155    let results: Vec<Result<DataFrame, FitsLoaderError>> = entries
156        .par_iter()
157        .map(|entry| read_fits(entry.path(), &header_items))
158        .collect();
159
160    // Filter out errors and keep only successful DataFrames
161    let successful_dfs: Vec<DataFrame> = results
162        .into_iter()
163        .filter_map(|result| result.ok())
164        .collect();
165
166    // If no files were successfully processed, return an error
167    if successful_dfs.is_empty() {
168        return Err(FitsLoaderError::FitsError(
169            "None of the files in the directory could be processed successfully".into(),
170        ));
171    }
172
173    // Combine all successful DataFrames
174    let combined_df = successful_dfs
175        .into_par_iter()
176        .reduce_with(|acc, df| {
177            let acc_clone = acc.clone();
178            combine_dataframes_with_alignment(acc, df).unwrap_or(acc_clone)
179        })
180        .ok_or(FitsLoaderError::NoData)?;
181
182    // If there is a column for energy, theta add the q column
183    Ok(add_calculated_domains(combined_df.lazy()))
184}
185
186/// Reads multiple specific FITS files and combines them into a single DataFrame.
187///
188/// # Arguments
189///
190/// * `file_paths` - Vector of paths to the FITS files to read
191/// * `header_items` - List of header values to extract
192///
193/// # Returns
194///
195/// A `Result` containing either the combined DataFrame or a `FitsLoaderError`.
196pub fn read_multiple_fits(
197    file_paths: Vec<PathBuf>,
198    header_items: &Vec<String>,
199) -> Result<DataFrame, FitsLoaderError> {
200    if file_paths.is_empty() {
201        return Err(FitsLoaderError::FitsError("No files provided".into()));
202    }
203
204    // Check that all files exist
205    for path in &file_paths {
206        if !path.exists() {
207            return Err(FitsLoaderError::FitsError(format!(
208                "File not found: {}",
209                path.display()
210            )));
211        }
212    }
213
214    // Process each file in parallel, collect results
215    let results: Vec<Result<DataFrame, FitsLoaderError>> = file_paths
216        .par_iter()
217        .map(|path| read_fits(path.clone(), header_items))
218        .collect();
219
220    // Filter out errors and keep only successful DataFrames
221    let successful_dfs: Vec<DataFrame> = results
222        .into_iter()
223        .filter_map(|result| result.ok())
224        .collect();
225
226    // If no files were successfully processed, return an error
227    if successful_dfs.is_empty() {
228        return Err(FitsLoaderError::FitsError(
229            "None of the provided files could be processed successfully".into(),
230        ));
231    }
232
233    // Combine all successful DataFrames
234    let combined_df = successful_dfs
235        .into_par_iter()
236        .reduce_with(|acc, df| {
237            let acc_clone = acc.clone();
238            combine_dataframes_with_alignment(acc, df).unwrap_or(acc_clone)
239        })
240        .ok_or(FitsLoaderError::NoData)?;
241
242    Ok(add_calculated_domains(combined_df.lazy()))
243}
244
245/// Reads FITS files matching a pattern and combines them into a single DataFrame.
246///
247/// # Arguments
248///
249/// * `dir` - Directory containing FITS files
250/// * `pattern` - Glob pattern to match files (e.g., "Y6_refl_*.fits")
251/// * `header_items` - List of header values to extract
252///
253/// # Returns
254///
255/// A `Result` containing either the combined DataFrame or a `FitsLoaderError`.
256pub fn read_experiment_pattern(
257    dir: &str,
258    pattern: &str,
259    header_items: &Vec<String>,
260) -> Result<DataFrame, FitsLoaderError> {
261    let dir_path = std::path::PathBuf::from(dir);
262
263    if !dir_path.exists() {
264        return Err(FitsLoaderError::FitsError(format!(
265            "Directory not found: {}",
266            dir
267        )));
268    }
269
270    // Clone the header items to avoid borrowing issues
271    let header_items = header_items
272        .iter()
273        .map(|s| s.to_string())
274        .collect::<Vec<_>>();
275
276    // Find all matching FITS files
277    let entries: Vec<_> = fs::read_dir(dir)
278        .map_err(FitsLoaderError::IoError)?
279        .par_bridge()
280        .filter_map(|entry| entry.ok())
281        .filter(|entry| {
282            let path = entry.path();
283            path.extension().and_then(|ext| ext.to_str()) == Some("fits")
284                && match path.file_name().and_then(|name| name.to_str()) {
285                    Some(name) => glob_match::glob_match(pattern, name),
286                    None => false,
287                }
288        })
289        .map(|entry| entry.path())
290        .collect();
291
292    if entries.is_empty() {
293        return Err(FitsLoaderError::FitsError(format!(
294            "No FITS files matching pattern '{}' found in directory: {}",
295            pattern, dir
296        )));
297    }
298
299    read_multiple_fits(entries, &header_items)
300}