1use std::path::Path;
18
19use anyhow::{Result, anyhow};
20use polars::prelude::*;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum IngestFormat {
25 Csv,
26 Tsv,
27 Parquet,
28 Excel,
29}
30
31impl IngestFormat {
32 pub fn from_path(path: &Path) -> Result<Self> {
34 let ext = path
35 .extension()
36 .and_then(|e| e.to_str())
37 .map(|e| e.to_ascii_lowercase())
38 .unwrap_or_default();
39
40 match ext.as_str() {
41 "csv" => Ok(Self::Csv),
42 "tsv" => Ok(Self::Tsv),
43 "parquet" | "pq" => Ok(Self::Parquet),
44 "xlsx" | "xls" => Ok(Self::Excel),
45 _ => Err(anyhow!(
46 "unsupported file format: .{ext} (expected csv, tsv, parquet, xlsx, or xls)"
47 )),
48 }
49 }
50}
51
52pub fn read_file(path: &Path) -> Result<DataFrame> {
57 let format = IngestFormat::from_path(path)?;
58
59 match format {
60 IngestFormat::Csv => read_csv(path),
61 IngestFormat::Tsv => read_tsv(path),
62 IngestFormat::Parquet => read_parquet(path),
63 IngestFormat::Excel => {
64 #[cfg(feature = "excel")]
65 {
66 read_excel(path, None)
67 }
68 #[cfg(not(feature = "excel"))]
69 {
70 Err(anyhow!(
71 "Excel support requires the 'excel' feature. Rebuild with: cargo build --features excel"
72 ))
73 }
74 }
75 }
76}
77
78pub fn read_csv(path: &Path) -> Result<DataFrame> {
80 let df = CsvReadOptions::default()
81 .with_has_header(true)
82 .with_infer_schema_length(Some(1000))
83 .try_into_reader_with_file_path(Some(path.into()))?
84 .finish()?;
85
86 tracing::info!(
87 path = %path.display(),
88 rows = df.height(),
89 cols = df.width(),
90 "ingested CSV"
91 );
92
93 Ok(df)
94}
95
96pub fn read_tsv(path: &Path) -> Result<DataFrame> {
98 let df = CsvReadOptions::default()
99 .with_has_header(true)
100 .with_parse_options(CsvParseOptions::default().with_separator(b'\t'))
101 .with_infer_schema_length(Some(1000))
102 .try_into_reader_with_file_path(Some(path.into()))?
103 .finish()?;
104
105 tracing::info!(
106 path = %path.display(),
107 rows = df.height(),
108 cols = df.width(),
109 "ingested TSV"
110 );
111
112 Ok(df)
113}
114
115pub fn read_parquet(path: &Path) -> Result<DataFrame> {
117 let path_str = path
118 .to_str()
119 .ok_or_else(|| anyhow!("path is not valid UTF-8: {}", path.display()))?;
120
121 let df = LazyFrame::scan_parquet(PlPath::from_str(path_str), Default::default())?.collect()?;
122
123 tracing::info!(
124 path = %path.display(),
125 rows = df.height(),
126 cols = df.width(),
127 "ingested Parquet"
128 );
129
130 Ok(df)
131}
132
133#[cfg(feature = "excel")]
137pub fn read_excel(path: &Path, sheet: Option<&str>) -> Result<DataFrame> {
138 use calamine::{Reader, open_workbook_auto};
139
140 let mut workbook =
141 open_workbook_auto(path).map_err(|e| anyhow!("failed to open Excel file: {e}"))?;
142
143 let sheet_name = match sheet {
144 Some(name) => name.to_string(),
145 None => {
146 let names = workbook.sheet_names();
147 names
148 .first()
149 .ok_or_else(|| anyhow!("Excel file has no sheets"))?
150 .clone()
151 }
152 };
153
154 let range = workbook
155 .worksheet_range(&sheet_name)
156 .map_err(|e| anyhow!("failed to read sheet '{sheet_name}': {e}"))?;
157
158 let (rows, cols) = range.get_size();
159 if rows < 2 || cols == 0 {
160 return Err(anyhow!(
161 "sheet '{sheet_name}' has no data (rows={rows}, cols={cols})"
162 ));
163 }
164
165 let headers: Vec<String> = range
167 .rows()
168 .next()
169 .map(|row| {
170 row.iter()
171 .enumerate()
172 .map(|(i, cell)| {
173 let val = cell.to_string();
174 if val.is_empty() {
175 format!("column_{i}")
176 } else {
177 val
178 }
179 })
180 .collect()
181 })
182 .unwrap_or_default();
183
184 let mut columns: Vec<Vec<String>> = vec![Vec::with_capacity(rows - 1); cols];
186 for row in range.rows().skip(1) {
187 for (col_idx, cell) in row.iter().enumerate() {
188 if col_idx < cols {
189 columns[col_idx].push(cell.to_string());
190 }
191 }
192 }
193
194 let series: Vec<Column> = headers
196 .iter()
197 .zip(columns.iter())
198 .map(|(name, values)| {
199 let floats: Option<Vec<Option<f64>>> = values
201 .iter()
202 .map(|v| {
203 if v.is_empty() {
204 Some(None)
205 } else {
206 v.parse::<f64>().ok().map(Some)
207 }
208 })
209 .collect();
210
211 if let Some(floats) = floats {
212 Column::new(
213 name.as_str().into(),
214 floats.into_iter().collect::<Float64Chunked>(),
215 )
216 } else {
217 Column::new(
218 name.as_str().into(),
219 values.iter().map(String::as_str).collect::<StringChunked>(),
220 )
221 }
222 })
223 .collect();
224
225 let df = DataFrame::new(series)?;
226
227 tracing::info!(
228 path = %path.display(),
229 sheet = sheet_name,
230 rows = df.height(),
231 cols = df.width(),
232 "ingested Excel"
233 );
234
235 Ok(df)
236}
237
238#[cfg(feature = "excel")]
240pub fn list_excel_sheets(path: &Path) -> Result<Vec<String>> {
241 use calamine::{Reader, open_workbook_auto};
242
243 let workbook =
244 open_workbook_auto(path).map_err(|e| anyhow!("failed to open Excel file: {e}"))?;
245
246 Ok(workbook.sheet_names().to_vec())
247}
248
249#[derive(Debug, Clone, serde::Serialize)]
251pub struct IngestSummary {
252 pub format: String,
253 pub rows: usize,
254 pub columns: Vec<ColumnSummary>,
255}
256
257#[derive(Debug, Clone, serde::Serialize)]
259pub struct ColumnSummary {
260 pub name: String,
261 pub dtype: String,
262 pub null_count: usize,
263 pub sample_values: Vec<String>,
264}
265
266pub fn summarize(df: &DataFrame) -> IngestSummary {
268 let columns = df
269 .get_columns()
270 .iter()
271 .map(|col| {
272 let sample_values: Vec<String> = (0..col.len().min(3))
273 .map(|i| format!("{}", col.get(i).unwrap_or(AnyValue::Null)))
274 .collect();
275
276 ColumnSummary {
277 name: col.name().to_string(),
278 dtype: format!("{}", col.dtype()),
279 null_count: col.null_count(),
280 sample_values,
281 }
282 })
283 .collect();
284
285 IngestSummary {
286 format: "dataframe".to_string(),
287 rows: df.height(),
288 columns,
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use std::io::Write;
296
297 #[test]
298 fn read_csv_basic() {
299 let dir = tempfile::tempdir().unwrap();
300 let path = dir.path().join("test.csv");
301 let mut f = std::fs::File::create(&path).unwrap();
302 writeln!(f, "name,arr,growth").unwrap();
303 writeln!(f, "Acme,5000000,0.25").unwrap();
304 writeln!(f, "Beta,12000000,0.15").unwrap();
305
306 let df = read_csv(&path).unwrap();
307 assert_eq!(df.height(), 2);
308 assert_eq!(df.width(), 3);
309 }
310
311 #[test]
312 fn read_tsv_basic() {
313 let dir = tempfile::tempdir().unwrap();
314 let path = dir.path().join("test.tsv");
315 let mut f = std::fs::File::create(&path).unwrap();
316 writeln!(f, "name\tarr\tgrowth").unwrap();
317 writeln!(f, "Acme\t5000000\t0.25").unwrap();
318
319 let df = read_tsv(&path).unwrap();
320 assert_eq!(df.height(), 1);
321 assert_eq!(df.width(), 3);
322 }
323
324 #[test]
325 fn read_parquet_roundtrip() {
326 let dir = tempfile::tempdir().unwrap();
327 let path = dir.path().join("test.parquet");
328
329 let mut df = df! {
330 "company" => &["Acme", "Beta"],
331 "arr" => &[5_000_000i64, 12_000_000],
332 }
333 .unwrap();
334
335 let file = std::fs::File::create(&path).unwrap();
336 ParquetWriter::new(file).finish(&mut df).unwrap();
337
338 let loaded = read_parquet(&path).unwrap();
339 assert_eq!(loaded.height(), 2);
340 }
341
342 #[test]
343 fn format_detection() {
344 assert_eq!(
345 IngestFormat::from_path(Path::new("data.csv")).unwrap(),
346 IngestFormat::Csv
347 );
348 assert_eq!(
349 IngestFormat::from_path(Path::new("data.XLSX")).unwrap(),
350 IngestFormat::Excel
351 );
352 assert_eq!(
353 IngestFormat::from_path(Path::new("data.parquet")).unwrap(),
354 IngestFormat::Parquet
355 );
356 assert!(IngestFormat::from_path(Path::new("data.txt")).is_err());
357 }
358
359 #[test]
360 fn summarize_df() {
361 let df = df! {
362 "name" => &["Acme", "Beta", "Gamma"],
363 "arr" => &[5_000_000i64, 12_000_000, 3_000_000],
364 }
365 .unwrap();
366
367 let summary = summarize(&df);
368 assert_eq!(summary.rows, 3);
369 assert_eq!(summary.columns.len(), 2);
370 assert_eq!(summary.columns[0].name, "name");
371 }
372}