Skip to main content

floe_core/io/read/
xlsx.rs

1use std::collections::{HashMap, HashSet};
2use std::io::{Read, Seek};
3use std::path::Path;
4
5use calamine::{open_workbook, Data, Reader, Xlsx};
6use polars::prelude::{DataFrame, NamedFrom, Series};
7
8use crate::errors::IoError;
9use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
10use crate::{config, FloeResult};
11
12struct XlsxInputAdapter;
13
14static XLSX_INPUT_ADAPTER: XlsxInputAdapter = XlsxInputAdapter;
15
16pub(crate) fn xlsx_input_adapter() -> &'static dyn InputAdapter {
17    &XLSX_INPUT_ADAPTER
18}
19
20#[derive(Debug, Clone)]
21pub struct XlsxReadError {
22    pub rule: String,
23    pub message: String,
24}
25
26impl std::fmt::Display for XlsxReadError {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        write!(f, "{}: {}", self.rule, self.message)
29    }
30}
31
32impl std::error::Error for XlsxReadError {}
33
34#[derive(Debug, Clone)]
35struct XlsxOptions {
36    sheet: Option<String>,
37    header_row: u64,
38    data_row: u64,
39}
40
41impl XlsxOptions {
42    fn from_source_options(source_options: Option<&config::SourceOptions>) -> Self {
43        let defaults = config::SourceOptions::default();
44        let options = source_options.unwrap_or(&defaults);
45        let header_row = options.header_row.unwrap_or(1);
46        let data_row = options.data_row.unwrap_or(header_row + 1);
47        Self {
48            sheet: options.sheet.clone(),
49            header_row,
50            data_row,
51        }
52    }
53}
54
55fn read_xlsx_header(
56    input_path: &Path,
57    options: &XlsxOptions,
58) -> Result<Vec<String>, XlsxReadError> {
59    let range = open_sheet_range(input_path, options)?;
60    let (_height, width) = range.get_size();
61    if width == 0 {
62        return Err(XlsxReadError {
63            rule: "xlsx_read_error".to_string(),
64            message: format!("sheet is empty in {}", input_path.display()),
65        });
66    }
67    let header_idx = options.header_row.saturating_sub(1) as usize;
68    if header_idx >= range.get_size().0 {
69        return Err(XlsxReadError {
70            rule: "xlsx_read_error".to_string(),
71            message: format!(
72                "header_row={} is outside sheet bounds in {}",
73                options.header_row,
74                input_path.display()
75            ),
76        });
77    }
78    Ok(extract_header(&range, header_idx, width))
79}
80
81fn read_xlsx_file(input_path: &Path, options: &XlsxOptions) -> Result<DataFrame, XlsxReadError> {
82    let range = open_sheet_range(input_path, options)?;
83    let (height, width) = range.get_size();
84    if width == 0 {
85        return Err(XlsxReadError {
86            rule: "xlsx_read_error".to_string(),
87            message: format!("sheet is empty in {}", input_path.display()),
88        });
89    }
90    let header_idx = options.header_row.saturating_sub(1) as usize;
91    if header_idx >= height {
92        return Err(XlsxReadError {
93            rule: "xlsx_read_error".to_string(),
94            message: format!(
95                "header_row={} is outside sheet bounds in {}",
96                options.header_row,
97                input_path.display()
98            ),
99        });
100    }
101
102    let header = extract_header(&range, header_idx, width);
103    let data_idx = options.data_row.saturating_sub(1) as usize;
104    let row_count = height.saturating_sub(data_idx);
105    let mut columns = vec![Vec::with_capacity(row_count); width];
106
107    if data_idx < height {
108        for row in data_idx..height {
109            for (col, column) in columns.iter_mut().enumerate() {
110                let cell = range.get((row, col)).and_then(cell_to_string);
111                column.push(cell);
112            }
113        }
114    }
115
116    let mut series = Vec::with_capacity(width);
117    for (index, name) in header.iter().enumerate() {
118        let values = std::mem::take(&mut columns[index]);
119        series.push(Series::new(name.as_str().into(), values).into());
120    }
121
122    DataFrame::new(series).map_err(|err| XlsxReadError {
123        rule: "xlsx_read_error".to_string(),
124        message: format!("failed to build dataframe: {err}"),
125    })
126}
127
128fn open_sheet_range(
129    input_path: &Path,
130    options: &XlsxOptions,
131) -> Result<calamine::Range<Data>, XlsxReadError> {
132    let mut workbook: Xlsx<_> = open_workbook(input_path).map_err(|err| XlsxReadError {
133        rule: "xlsx_read_error".to_string(),
134        message: format!("failed to open xlsx at {}: {err}", input_path.display()),
135    })?;
136    let sheet_name = resolve_sheet_name(&mut workbook, options, input_path)?;
137    let range = workbook
138        .worksheet_range(&sheet_name)
139        .map_err(|err| XlsxReadError {
140            rule: "xlsx_read_error".to_string(),
141            message: format!(
142                "failed to read sheet {} in {}: {err}",
143                sheet_name,
144                input_path.display()
145            ),
146        })?;
147    Ok(range)
148}
149
150fn resolve_sheet_name<R: Read + Seek>(
151    workbook: &mut Xlsx<R>,
152    options: &XlsxOptions,
153    input_path: &Path,
154) -> Result<String, XlsxReadError> {
155    let names = workbook.sheet_names().to_vec();
156    if names.is_empty() {
157        return Err(XlsxReadError {
158            rule: "xlsx_read_error".to_string(),
159            message: format!("no sheets found in {}", input_path.display()),
160        });
161    }
162    if let Some(sheet) = options.sheet.as_ref() {
163        if names.iter().any(|name| name == sheet) {
164            return Ok(sheet.clone());
165        }
166        return Err(XlsxReadError {
167            rule: "xlsx_sheet_error".to_string(),
168            message: format!(
169                "sheet {} not found in {} (available: {})",
170                sheet,
171                input_path.display(),
172                names.join(", ")
173            ),
174        });
175    }
176    Ok(names[0].clone())
177}
178
179fn extract_header(range: &calamine::Range<Data>, row: usize, width: usize) -> Vec<String> {
180    let mut header = Vec::with_capacity(width);
181    for col in 0..width {
182        let name = range
183            .get((row, col))
184            .and_then(cell_to_string)
185            .unwrap_or_else(|| format!("column_{}", col + 1));
186        let name = name.trim();
187        if name.is_empty() {
188            header.push(format!("column_{}", col + 1));
189        } else {
190            header.push(name.to_string());
191        }
192    }
193    // Excel exports often repeat header labels. Polars requires unique column names,
194    // so we dedupe here to avoid DataFrame::new errors during read.
195    dedupe_header_names(header)
196}
197
198fn dedupe_header_names(names: Vec<String>) -> Vec<String> {
199    let mut base_counts: HashMap<String, usize> = HashMap::new();
200    let mut used: HashSet<String> = HashSet::new();
201    let mut deduped = Vec::with_capacity(names.len());
202
203    for name in names {
204        let count = base_counts.entry(name.clone()).or_insert(0);
205        *count += 1;
206        let mut candidate = if *count == 1 {
207            name.clone()
208        } else {
209            format!("{name}_dup_{}", *count)
210        };
211        while used.contains(&candidate) {
212            *count += 1;
213            candidate = format!("{name}_dup_{}", *count);
214        }
215        used.insert(candidate.clone());
216        deduped.push(candidate);
217    }
218
219    deduped
220}
221
222fn cell_to_string(cell: &Data) -> Option<String> {
223    match cell {
224        Data::Empty => None,
225        _ => Some(cell.to_string()),
226    }
227}
228
229impl InputAdapter for XlsxInputAdapter {
230    fn format(&self) -> &'static str {
231        "xlsx"
232    }
233
234    fn read_input_columns(
235        &self,
236        entity: &config::EntityConfig,
237        input_file: &InputFile,
238        _columns: &[config::ColumnConfig],
239    ) -> Result<Vec<String>, FileReadError> {
240        let options = XlsxOptions::from_source_options(entity.source.options.as_ref());
241        let header = read_xlsx_header(&input_file.source_local_path, &options).map_err(|err| {
242            FileReadError {
243                rule: err.rule,
244                message: err.message,
245            }
246        })?;
247        Ok(header)
248    }
249
250    fn read_inputs(
251        &self,
252        entity: &config::EntityConfig,
253        files: &[InputFile],
254        columns: &[config::ColumnConfig],
255        normalize_strategy: Option<&str>,
256        collect_raw: bool,
257    ) -> FloeResult<Vec<ReadInput>> {
258        let options = XlsxOptions::from_source_options(entity.source.options.as_ref());
259        let mut inputs = Vec::with_capacity(files.len());
260        for input_file in files {
261            let path = &input_file.source_local_path;
262            let df = read_xlsx_file(path, &options).map_err(|err| {
263                Box::new(IoError(err.to_string())) as Box<dyn std::error::Error + Send + Sync>
264            })?;
265            let input = format::read_input_from_df(
266                input_file,
267                &df,
268                columns,
269                normalize_strategy,
270                collect_raw,
271            )?;
272            inputs.push(input);
273        }
274        Ok(inputs)
275    }
276}