use std::error::Error as StdError;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::error::{IngestionError, IngestionResult};
use crate::types::{DataSet, Schema};
use super::observability::{
IngestionContext, IngestionObserver, IngestionSeverity, IngestionStats,
};
use super::polars_bridge::{infer_schema_from_dataframe_lossy, polars_error_to_ingestion};
use super::{csv, excel, json, parquet};
use polars::prelude::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IngestionFormat {
Csv,
Json,
Parquet,
Excel,
}
impl IngestionFormat {
pub fn from_extension(ext: &str) -> Option<Self> {
match ext.to_ascii_lowercase().as_str() {
"csv" => Some(Self::Csv),
"json" | "ndjson" => Some(Self::Json),
"parquet" | "pq" => Some(Self::Parquet),
"xlsx" | "xls" | "xlsm" | "xlsb" | "ods" => Some(Self::Excel),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExcelSheetSelection {
First,
Sheet(String),
AllSheets,
Sheets(Vec<String>),
}
impl Default for ExcelSheetSelection {
fn default() -> Self {
Self::First
}
}
#[derive(Clone)]
pub struct IngestionOptions {
pub format: Option<IngestionFormat>,
pub excel_sheet_selection: ExcelSheetSelection,
pub observer: Option<Arc<dyn IngestionObserver>>,
pub alert_at_or_above: IngestionSeverity,
}
impl fmt::Debug for IngestionOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IngestionOptions")
.field("format", &self.format)
.field("excel_sheet_selection", &self.excel_sheet_selection)
.field("observer_set", &self.observer.is_some())
.field("alert_at_or_above", &self.alert_at_or_above)
.finish()
}
}
impl Default for IngestionOptions {
fn default() -> Self {
Self {
format: None,
excel_sheet_selection: ExcelSheetSelection::default(),
observer: None,
alert_at_or_above: IngestionSeverity::Critical,
}
}
}
pub fn ingest_from_path(
path: impl AsRef<Path>,
schema: &Schema,
options: &IngestionOptions,
) -> IngestionResult<DataSet> {
let path = path.as_ref();
let fmt = match options.format {
Some(f) => f,
None => infer_format_from_path(path)?,
};
let ctx = IngestionContext {
path: path.to_path_buf(),
format: fmt,
};
let result = match fmt {
IngestionFormat::Csv => csv::ingest_csv_from_path(path, schema),
IngestionFormat::Json => json::ingest_json_from_path(path, schema),
IngestionFormat::Parquet => parquet::ingest_parquet_from_path(path, schema),
IngestionFormat::Excel => {
ingest_excel_dispatch(path, schema, &options.excel_sheet_selection)
}
};
if let Some(obs) = options.observer.as_ref() {
match &result {
Ok(ds) => obs.on_success(
&ctx,
IngestionStats {
rows: ds.row_count(),
},
),
Err(e) => {
let sev = severity_for_error(e);
obs.on_failure(&ctx, sev, e);
if sev >= options.alert_at_or_above {
obs.on_alert(&ctx, sev, e);
}
}
}
}
result
}
pub fn infer_schema_from_path(
path: impl AsRef<Path>,
options: &IngestionOptions,
) -> IngestionResult<Schema> {
let path = path.as_ref();
let fmt = match options.format {
Some(f) => f,
None => infer_format_from_path(path)?,
};
match fmt {
IngestionFormat::Csv => {
let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
.with_has_header(true)
.finish()
.map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
.collect()
.map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
infer_schema_from_dataframe_lossy(&df)
}
IngestionFormat::Json => {
use std::fs::File;
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
let json_format = if ext.eq_ignore_ascii_case("ndjson") {
JsonFormat::JsonLines
} else {
JsonFormat::Json
};
let file = File::open(path)?;
let df = JsonReader::new(file)
.with_json_format(json_format)
.finish()
.map_err(|e| polars_error_to_ingestion("failed to read json with polars", e))?;
infer_schema_from_dataframe_lossy(&df)
}
IngestionFormat::Parquet => {
let df = LazyFrame::scan_parquet(
path.to_string_lossy().as_ref().into(),
ScanArgsParquet::default(),
)
.map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
.collect()
.map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
infer_schema_from_dataframe_lossy(&df)
}
IngestionFormat::Excel => infer_excel_schema_dispatch(path, &options.excel_sheet_selection),
}
}
pub fn ingest_from_path_infer(
path: impl AsRef<Path>,
options: &IngestionOptions,
) -> IngestionResult<DataSet> {
let schema = infer_schema_from_path(path.as_ref(), options)?;
ingest_from_path(path, &schema, options)
}
fn severity_for_error(e: &IngestionError) -> IngestionSeverity {
match e {
IngestionError::Io(_) => IngestionSeverity::Critical,
IngestionError::Parquet(err) => {
if error_chain_contains_io(err) {
IngestionSeverity::Critical
} else {
IngestionSeverity::Error
}
}
IngestionError::Csv(err) => match err.kind() {
::csv::ErrorKind::Io(_) => IngestionSeverity::Critical,
_ => IngestionSeverity::Error,
},
#[cfg(feature = "excel")]
IngestionError::Excel(_) => IngestionSeverity::Error,
IngestionError::Engine { source, .. } => {
if error_chain_contains_io(source.as_ref()) {
IngestionSeverity::Critical
} else {
IngestionSeverity::Error
}
}
IngestionError::SchemaMismatch { .. } => IngestionSeverity::Error,
IngestionError::ParseError { .. } => IngestionSeverity::Error,
}
}
fn error_chain_contains_io(e: &(dyn StdError + 'static)) -> bool {
let mut cur: Option<&(dyn StdError + 'static)> = Some(e);
while let Some(err) = cur {
if err.is::<std::io::Error>() {
return true;
}
cur = err.source();
}
false
}
fn infer_format_from_path(path: &Path) -> IngestionResult<IngestionFormat> {
let ext = path.extension().and_then(|s| s.to_str()).ok_or_else(|| {
IngestionError::SchemaMismatch {
message: format!(
"cannot infer format: path has no extension ({})",
path.display()
),
}
})?;
IngestionFormat::from_extension(ext).ok_or_else(|| IngestionError::SchemaMismatch {
message: format!(
"cannot infer format from extension '{ext}' for path ({})",
path.display()
),
})
}
fn ingest_excel_dispatch(
path: &Path,
schema: &Schema,
sel: &ExcelSheetSelection,
) -> IngestionResult<DataSet> {
match sel {
ExcelSheetSelection::First => excel::ingest_excel_from_path(path, None, schema),
ExcelSheetSelection::Sheet(name) => {
excel::ingest_excel_from_path(path, Some(name.as_str()), schema)
}
ExcelSheetSelection::AllSheets => {
excel::ingest_excel_workbook_from_path(path, None, schema)
}
ExcelSheetSelection::Sheets(names) => {
let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
excel::ingest_excel_workbook_from_path(path, Some(refs.as_slice()), schema)
}
}
}
fn infer_excel_schema_dispatch(path: &Path, sel: &ExcelSheetSelection) -> IngestionResult<Schema> {
match sel {
ExcelSheetSelection::First => excel::infer_excel_schema_from_path(path, None),
ExcelSheetSelection::Sheet(name) => {
excel::infer_excel_schema_from_path(path, Some(name.as_str()))
}
ExcelSheetSelection::AllSheets => excel::infer_excel_schema_workbook_from_path(path, None),
ExcelSheetSelection::Sheets(names) => {
let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
excel::infer_excel_schema_workbook_from_path(path, Some(refs.as_slice()))
}
}
}
#[derive(Clone)]
pub struct IngestionRequest {
pub path: PathBuf,
pub schema: Schema,
pub options: IngestionOptions,
}
impl fmt::Debug for IngestionRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IngestionRequest")
.field("path", &self.path)
.field("schema_fields", &self.schema.fields.len())
.field("options", &self.options)
.finish()
}
}
impl IngestionRequest {
pub fn run(&self) -> IngestionResult<DataSet> {
ingest_from_path(&self.path, &self.schema, &self.options)
}
}