Skip to main content

converge_analytics/
ingest.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! File ingest: read CSV, Parquet, and Excel into Polars DataFrames.
5//!
6//! This is the entry point for structured data import. It detects the format
7//! from the file extension, reads the data, and returns a `DataFrame`.
8//!
9//! # Supported formats
10//!
11//! | Format  | Extension       | Feature gate |
12//! |---------|-----------------|-------------|
13//! | CSV     | `.csv`, `.tsv`  | always      |
14//! | Parquet | `.parquet`      | always      |
15//! | Excel   | `.xlsx`, `.xls` | `excel`     |
16
17use std::path::Path;
18
19use anyhow::{Result, anyhow};
20use polars::prelude::*;
21
22/// Detected file format for ingest.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum IngestFormat {
25    Csv,
26    Tsv,
27    Parquet,
28    Excel,
29}
30
31impl IngestFormat {
32    /// Detect format from file extension.
33    pub fn from_path(path: &Path) -> Result<Self> {
34        let ext = path
35            .extension()
36            .and_then(|e| e.to_str())
37            .map(|e| e.to_ascii_lowercase())
38            .unwrap_or_default();
39
40        match ext.as_str() {
41            "csv" => Ok(Self::Csv),
42            "tsv" => Ok(Self::Tsv),
43            "parquet" | "pq" => Ok(Self::Parquet),
44            "xlsx" | "xls" => Ok(Self::Excel),
45            _ => Err(anyhow!(
46                "unsupported file format: .{ext} (expected csv, tsv, parquet, xlsx, or xls)"
47            )),
48        }
49    }
50}
51
52/// Read a file into a Polars DataFrame. Format is detected from the extension.
53///
54/// For Excel files, reads the first sheet by default. Use [`read_excel_sheet`]
55/// to specify a sheet name.
56pub fn read_file(path: &Path) -> Result<DataFrame> {
57    let format = IngestFormat::from_path(path)?;
58
59    match format {
60        IngestFormat::Csv => read_csv(path),
61        IngestFormat::Tsv => read_tsv(path),
62        IngestFormat::Parquet => read_parquet(path),
63        IngestFormat::Excel => {
64            #[cfg(feature = "excel")]
65            {
66                read_excel(path, None)
67            }
68            #[cfg(not(feature = "excel"))]
69            {
70                Err(anyhow!(
71                    "Excel support requires the 'excel' feature. Rebuild with: cargo build --features excel"
72                ))
73            }
74        }
75    }
76}
77
78/// Read a CSV file into a DataFrame.
79pub fn read_csv(path: &Path) -> Result<DataFrame> {
80    let df = CsvReadOptions::default()
81        .with_has_header(true)
82        .with_infer_schema_length(Some(1000))
83        .try_into_reader_with_file_path(Some(path.into()))?
84        .finish()?;
85
86    tracing::info!(
87        path = %path.display(),
88        rows = df.height(),
89        cols = df.width(),
90        "ingested CSV"
91    );
92
93    Ok(df)
94}
95
96/// Read a TSV (tab-separated) file into a DataFrame.
97pub fn read_tsv(path: &Path) -> Result<DataFrame> {
98    let df = CsvReadOptions::default()
99        .with_has_header(true)
100        .with_parse_options(CsvParseOptions::default().with_separator(b'\t'))
101        .with_infer_schema_length(Some(1000))
102        .try_into_reader_with_file_path(Some(path.into()))?
103        .finish()?;
104
105    tracing::info!(
106        path = %path.display(),
107        rows = df.height(),
108        cols = df.width(),
109        "ingested TSV"
110    );
111
112    Ok(df)
113}
114
115/// Read a Parquet file into a DataFrame.
116pub fn read_parquet(path: &Path) -> Result<DataFrame> {
117    let path_str = path
118        .to_str()
119        .ok_or_else(|| anyhow!("path is not valid UTF-8: {}", path.display()))?;
120
121    let df = LazyFrame::scan_parquet(PlPath::from_str(path_str), Default::default())?.collect()?;
122
123    tracing::info!(
124        path = %path.display(),
125        rows = df.height(),
126        cols = df.width(),
127        "ingested Parquet"
128    );
129
130    Ok(df)
131}
132
133/// Read an Excel file into a DataFrame.
134///
135/// If `sheet` is None, reads the first sheet.
136#[cfg(feature = "excel")]
137pub fn read_excel(path: &Path, sheet: Option<&str>) -> Result<DataFrame> {
138    use calamine::{Reader, open_workbook_auto};
139
140    let mut workbook =
141        open_workbook_auto(path).map_err(|e| anyhow!("failed to open Excel file: {e}"))?;
142
143    let sheet_name = match sheet {
144        Some(name) => name.to_string(),
145        None => {
146            let names = workbook.sheet_names();
147            names
148                .first()
149                .ok_or_else(|| anyhow!("Excel file has no sheets"))?
150                .clone()
151        }
152    };
153
154    let range = workbook
155        .worksheet_range(&sheet_name)
156        .map_err(|e| anyhow!("failed to read sheet '{sheet_name}': {e}"))?;
157
158    let (rows, cols) = range.get_size();
159    if rows < 2 || cols == 0 {
160        return Err(anyhow!(
161            "sheet '{sheet_name}' has no data (rows={rows}, cols={cols})"
162        ));
163    }
164
165    // First row is headers
166    let headers: Vec<String> = range
167        .rows()
168        .next()
169        .map(|row| {
170            row.iter()
171                .enumerate()
172                .map(|(i, cell)| {
173                    let val = cell.to_string();
174                    if val.is_empty() {
175                        format!("column_{i}")
176                    } else {
177                        val
178                    }
179                })
180                .collect()
181        })
182        .unwrap_or_default();
183
184    // Build columns
185    let mut columns: Vec<Vec<String>> = vec![Vec::with_capacity(rows - 1); cols];
186    for row in range.rows().skip(1) {
187        for (col_idx, cell) in row.iter().enumerate() {
188            if col_idx < cols {
189                columns[col_idx].push(cell.to_string());
190            }
191        }
192    }
193
194    // Try to infer numeric columns, otherwise keep as string
195    let series: Vec<Column> = headers
196        .iter()
197        .zip(columns.iter())
198        .map(|(name, values)| {
199            // Try f64 first
200            let floats: Option<Vec<Option<f64>>> = values
201                .iter()
202                .map(|v| {
203                    if v.is_empty() {
204                        Some(None)
205                    } else {
206                        v.parse::<f64>().ok().map(Some)
207                    }
208                })
209                .collect();
210
211            if let Some(floats) = floats {
212                Column::new(
213                    name.as_str().into(),
214                    floats.into_iter().collect::<Float64Chunked>(),
215                )
216            } else {
217                Column::new(
218                    name.as_str().into(),
219                    values.iter().map(String::as_str).collect::<StringChunked>(),
220                )
221            }
222        })
223        .collect();
224
225    let df = DataFrame::new(series)?;
226
227    tracing::info!(
228        path = %path.display(),
229        sheet = sheet_name,
230        rows = df.height(),
231        cols = df.width(),
232        "ingested Excel"
233    );
234
235    Ok(df)
236}
237
238/// List sheet names in an Excel file.
239#[cfg(feature = "excel")]
240pub fn list_excel_sheets(path: &Path) -> Result<Vec<String>> {
241    use calamine::{Reader, open_workbook_auto};
242
243    let workbook =
244        open_workbook_auto(path).map_err(|e| anyhow!("failed to open Excel file: {e}"))?;
245
246    Ok(workbook.sheet_names().to_vec())
247}
248
249/// Summary of an ingested file.
250#[derive(Debug, Clone, serde::Serialize)]
251pub struct IngestSummary {
252    pub format: String,
253    pub rows: usize,
254    pub columns: Vec<ColumnSummary>,
255}
256
257/// Summary of a single column.
258#[derive(Debug, Clone, serde::Serialize)]
259pub struct ColumnSummary {
260    pub name: String,
261    pub dtype: String,
262    pub null_count: usize,
263    pub sample_values: Vec<String>,
264}
265
266/// Produce a summary of a DataFrame for inspection.
267pub fn summarize(df: &DataFrame) -> IngestSummary {
268    let columns = df
269        .get_columns()
270        .iter()
271        .map(|col| {
272            let sample_values: Vec<String> = (0..col.len().min(3))
273                .map(|i| format!("{}", col.get(i).unwrap_or(AnyValue::Null)))
274                .collect();
275
276            ColumnSummary {
277                name: col.name().to_string(),
278                dtype: format!("{}", col.dtype()),
279                null_count: col.null_count(),
280                sample_values,
281            }
282        })
283        .collect();
284
285    IngestSummary {
286        format: "dataframe".to_string(),
287        rows: df.height(),
288        columns,
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use std::io::Write;
296
297    #[test]
298    fn read_csv_basic() {
299        let dir = tempfile::tempdir().unwrap();
300        let path = dir.path().join("test.csv");
301        let mut f = std::fs::File::create(&path).unwrap();
302        writeln!(f, "name,arr,growth").unwrap();
303        writeln!(f, "Acme,5000000,0.25").unwrap();
304        writeln!(f, "Beta,12000000,0.15").unwrap();
305
306        let df = read_csv(&path).unwrap();
307        assert_eq!(df.height(), 2);
308        assert_eq!(df.width(), 3);
309    }
310
311    #[test]
312    fn read_tsv_basic() {
313        let dir = tempfile::tempdir().unwrap();
314        let path = dir.path().join("test.tsv");
315        let mut f = std::fs::File::create(&path).unwrap();
316        writeln!(f, "name\tarr\tgrowth").unwrap();
317        writeln!(f, "Acme\t5000000\t0.25").unwrap();
318
319        let df = read_tsv(&path).unwrap();
320        assert_eq!(df.height(), 1);
321        assert_eq!(df.width(), 3);
322    }
323
324    #[test]
325    fn read_parquet_roundtrip() {
326        let dir = tempfile::tempdir().unwrap();
327        let path = dir.path().join("test.parquet");
328
329        let mut df = df! {
330            "company" => &["Acme", "Beta"],
331            "arr" => &[5_000_000i64, 12_000_000],
332        }
333        .unwrap();
334
335        let file = std::fs::File::create(&path).unwrap();
336        ParquetWriter::new(file).finish(&mut df).unwrap();
337
338        let loaded = read_parquet(&path).unwrap();
339        assert_eq!(loaded.height(), 2);
340    }
341
342    #[test]
343    fn format_detection() {
344        assert_eq!(
345            IngestFormat::from_path(Path::new("data.csv")).unwrap(),
346            IngestFormat::Csv
347        );
348        assert_eq!(
349            IngestFormat::from_path(Path::new("data.XLSX")).unwrap(),
350            IngestFormat::Excel
351        );
352        assert_eq!(
353            IngestFormat::from_path(Path::new("data.parquet")).unwrap(),
354            IngestFormat::Parquet
355        );
356        assert!(IngestFormat::from_path(Path::new("data.txt")).is_err());
357    }
358
359    #[test]
360    fn summarize_df() {
361        let df = df! {
362            "name" => &["Acme", "Beta", "Gamma"],
363            "arr" => &[5_000_000i64, 12_000_000, 3_000_000],
364        }
365        .unwrap();
366
367        let summary = summarize(&df);
368        assert_eq!(summary.rows, 3);
369        assert_eq!(summary.columns.len(), 2);
370        assert_eq!(summary.columns[0].name, "name");
371    }
372}