1use std::collections::{HashMap, HashSet};
2use std::io::{Read, Seek};
3use std::path::Path;
4
5use calamine::{open_workbook, Data, Reader, Xlsx};
6use polars::prelude::{DataFrame, NamedFrom, Series};
7
8use crate::errors::IoError;
9use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
10use crate::{config, FloeResult};
11
12struct XlsxInputAdapter;
13
14static XLSX_INPUT_ADAPTER: XlsxInputAdapter = XlsxInputAdapter;
15
16pub(crate) fn xlsx_input_adapter() -> &'static dyn InputAdapter {
17 &XLSX_INPUT_ADAPTER
18}
19
20#[derive(Debug, Clone)]
21pub struct XlsxReadError {
22 pub rule: String,
23 pub message: String,
24}
25
26impl std::fmt::Display for XlsxReadError {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 write!(f, "{}: {}", self.rule, self.message)
29 }
30}
31
32impl std::error::Error for XlsxReadError {}
33
34#[derive(Debug, Clone)]
35struct XlsxOptions {
36 sheet: Option<String>,
37 header_row: u64,
38 data_row: u64,
39}
40
41impl XlsxOptions {
42 fn from_source_options(source_options: Option<&config::SourceOptions>) -> Self {
43 let defaults = config::SourceOptions::default();
44 let options = source_options.unwrap_or(&defaults);
45 let header_row = options.header_row.unwrap_or(1);
46 let data_row = options.data_row.unwrap_or(header_row + 1);
47 Self {
48 sheet: options.sheet.clone(),
49 header_row,
50 data_row,
51 }
52 }
53}
54
55fn read_xlsx_header(
56 input_path: &Path,
57 options: &XlsxOptions,
58) -> Result<Vec<String>, XlsxReadError> {
59 let range = open_sheet_range(input_path, options)?;
60 let (_height, width) = range.get_size();
61 if width == 0 {
62 return Err(XlsxReadError {
63 rule: "xlsx_read_error".to_string(),
64 message: format!("sheet is empty in {}", input_path.display()),
65 });
66 }
67 let header_idx = options.header_row.saturating_sub(1) as usize;
68 if header_idx >= range.get_size().0 {
69 return Err(XlsxReadError {
70 rule: "xlsx_read_error".to_string(),
71 message: format!(
72 "header_row={} is outside sheet bounds in {}",
73 options.header_row,
74 input_path.display()
75 ),
76 });
77 }
78 Ok(extract_header(&range, header_idx, width))
79}
80
81fn read_xlsx_file(input_path: &Path, options: &XlsxOptions) -> Result<DataFrame, XlsxReadError> {
82 let range = open_sheet_range(input_path, options)?;
83 let (height, width) = range.get_size();
84 if width == 0 {
85 return Err(XlsxReadError {
86 rule: "xlsx_read_error".to_string(),
87 message: format!("sheet is empty in {}", input_path.display()),
88 });
89 }
90 let header_idx = options.header_row.saturating_sub(1) as usize;
91 if header_idx >= height {
92 return Err(XlsxReadError {
93 rule: "xlsx_read_error".to_string(),
94 message: format!(
95 "header_row={} is outside sheet bounds in {}",
96 options.header_row,
97 input_path.display()
98 ),
99 });
100 }
101
102 let header = extract_header(&range, header_idx, width);
103 let data_idx = options.data_row.saturating_sub(1) as usize;
104 let row_count = height.saturating_sub(data_idx);
105 let mut columns = vec![Vec::with_capacity(row_count); width];
106
107 if data_idx < height {
108 for row in data_idx..height {
109 for (col, column) in columns.iter_mut().enumerate() {
110 let cell = range.get((row, col)).and_then(cell_to_string);
111 column.push(cell);
112 }
113 }
114 }
115
116 let mut series = Vec::with_capacity(width);
117 for (index, name) in header.iter().enumerate() {
118 let values = std::mem::take(&mut columns[index]);
119 series.push(Series::new(name.as_str().into(), values).into());
120 }
121
122 DataFrame::new(series).map_err(|err| XlsxReadError {
123 rule: "xlsx_read_error".to_string(),
124 message: format!("failed to build dataframe: {err}"),
125 })
126}
127
128fn open_sheet_range(
129 input_path: &Path,
130 options: &XlsxOptions,
131) -> Result<calamine::Range<Data>, XlsxReadError> {
132 let mut workbook: Xlsx<_> = open_workbook(input_path).map_err(|err| XlsxReadError {
133 rule: "xlsx_read_error".to_string(),
134 message: format!("failed to open xlsx at {}: {err}", input_path.display()),
135 })?;
136 let sheet_name = resolve_sheet_name(&mut workbook, options, input_path)?;
137 let range = workbook
138 .worksheet_range(&sheet_name)
139 .map_err(|err| XlsxReadError {
140 rule: "xlsx_read_error".to_string(),
141 message: format!(
142 "failed to read sheet {} in {}: {err}",
143 sheet_name,
144 input_path.display()
145 ),
146 })?;
147 Ok(range)
148}
149
150fn resolve_sheet_name<R: Read + Seek>(
151 workbook: &mut Xlsx<R>,
152 options: &XlsxOptions,
153 input_path: &Path,
154) -> Result<String, XlsxReadError> {
155 let names = workbook.sheet_names().to_vec();
156 if names.is_empty() {
157 return Err(XlsxReadError {
158 rule: "xlsx_read_error".to_string(),
159 message: format!("no sheets found in {}", input_path.display()),
160 });
161 }
162 if let Some(sheet) = options.sheet.as_ref() {
163 if names.iter().any(|name| name == sheet) {
164 return Ok(sheet.clone());
165 }
166 return Err(XlsxReadError {
167 rule: "xlsx_sheet_error".to_string(),
168 message: format!(
169 "sheet {} not found in {} (available: {})",
170 sheet,
171 input_path.display(),
172 names.join(", ")
173 ),
174 });
175 }
176 Ok(names[0].clone())
177}
178
179fn extract_header(range: &calamine::Range<Data>, row: usize, width: usize) -> Vec<String> {
180 let mut header = Vec::with_capacity(width);
181 for col in 0..width {
182 let name = range
183 .get((row, col))
184 .and_then(cell_to_string)
185 .unwrap_or_else(|| format!("column_{}", col + 1));
186 let name = name.trim();
187 if name.is_empty() {
188 header.push(format!("column_{}", col + 1));
189 } else {
190 header.push(name.to_string());
191 }
192 }
193 dedupe_header_names(header)
196}
197
198fn dedupe_header_names(names: Vec<String>) -> Vec<String> {
199 let mut base_counts: HashMap<String, usize> = HashMap::new();
200 let mut used: HashSet<String> = HashSet::new();
201 let mut deduped = Vec::with_capacity(names.len());
202
203 for name in names {
204 let count = base_counts.entry(name.clone()).or_insert(0);
205 *count += 1;
206 let mut candidate = if *count == 1 {
207 name.clone()
208 } else {
209 format!("{name}_dup_{}", *count)
210 };
211 while used.contains(&candidate) {
212 *count += 1;
213 candidate = format!("{name}_dup_{}", *count);
214 }
215 used.insert(candidate.clone());
216 deduped.push(candidate);
217 }
218
219 deduped
220}
221
222fn cell_to_string(cell: &Data) -> Option<String> {
223 match cell {
224 Data::Empty => None,
225 _ => Some(cell.to_string()),
226 }
227}
228
229impl InputAdapter for XlsxInputAdapter {
230 fn format(&self) -> &'static str {
231 "xlsx"
232 }
233
234 fn read_input_columns(
235 &self,
236 entity: &config::EntityConfig,
237 input_file: &InputFile,
238 _columns: &[config::ColumnConfig],
239 ) -> Result<Vec<String>, FileReadError> {
240 let options = XlsxOptions::from_source_options(entity.source.options.as_ref());
241 let header = read_xlsx_header(&input_file.source_local_path, &options).map_err(|err| {
242 FileReadError {
243 rule: err.rule,
244 message: err.message,
245 }
246 })?;
247 Ok(header)
248 }
249
250 fn read_inputs(
251 &self,
252 entity: &config::EntityConfig,
253 files: &[InputFile],
254 columns: &[config::ColumnConfig],
255 normalize_strategy: Option<&str>,
256 collect_raw: bool,
257 ) -> FloeResult<Vec<ReadInput>> {
258 let options = XlsxOptions::from_source_options(entity.source.options.as_ref());
259 let mut inputs = Vec::with_capacity(files.len());
260 for input_file in files {
261 let path = &input_file.source_local_path;
262 let df = read_xlsx_file(path, &options).map_err(|err| {
263 Box::new(IoError(err.to_string())) as Box<dyn std::error::Error + Send + Sync>
264 })?;
265 let input = format::read_input_from_df(
266 input_file,
267 &df,
268 columns,
269 normalize_strategy,
270 collect_raw,
271 )?;
272 inputs.push(input);
273 }
274 Ok(inputs)
275 }
276}