use std::error::Error as StdError;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::error::{IngestionError, IngestionResult};
use crate::types::{DataSet, Schema, Value};
use super::observability::{
IngestionContext, IngestionObserver, IngestionSeverity, IngestionStats,
};
use super::polars_bridge::{infer_schema_from_dataframe_lossy, polars_error_to_ingestion};
use super::watermark::{
apply_watermark_after_ingest, max_value_in_column, validate_watermark_config,
};
use super::{csv, excel, json, parquet};
use polars::prelude::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IngestionFormat {
Csv,
Json,
Parquet,
Excel,
}
impl IngestionFormat {
pub fn from_extension(ext: &str) -> Option<Self> {
match ext.to_ascii_lowercase().as_str() {
"csv" => Some(Self::Csv),
"json" | "ndjson" => Some(Self::Json),
"parquet" | "pq" => Some(Self::Parquet),
"xlsx" | "xls" | "xlsm" | "xlsb" | "ods" => Some(Self::Excel),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum ExcelSheetSelection {
#[default]
First,
Sheet(String),
AllSheets,
Sheets(Vec<String>),
}
#[derive(Clone)]
pub struct IngestionOptions {
pub format: Option<IngestionFormat>,
pub excel_sheet_selection: ExcelSheetSelection,
pub observer: Option<Arc<dyn IngestionObserver>>,
pub alert_at_or_above: IngestionSeverity,
pub watermark_column: Option<String>,
pub watermark_exclusive_above: Option<Value>,
}
impl fmt::Debug for IngestionOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IngestionOptions")
.field("format", &self.format)
.field("excel_sheet_selection", &self.excel_sheet_selection)
.field("observer_set", &self.observer.is_some())
.field("alert_at_or_above", &self.alert_at_or_above)
.field("watermark_column", &self.watermark_column)
.field("watermark_exclusive_above", &self.watermark_exclusive_above)
.finish()
}
}
impl Default for IngestionOptions {
fn default() -> Self {
Self {
format: None,
excel_sheet_selection: ExcelSheetSelection::default(),
observer: None,
alert_at_or_above: IngestionSeverity::Critical,
watermark_column: None,
watermark_exclusive_above: None,
}
}
}
pub fn ingest_from_path(
path: impl AsRef<Path>,
schema: &Schema,
options: &IngestionOptions,
) -> IngestionResult<DataSet> {
validate_watermark_config(schema, options)?;
let path = path.as_ref();
let fmt = match options.format {
Some(f) => f,
None => infer_format_from_path(path)?,
};
let ctx = IngestionContext {
path: path.to_path_buf(),
format: fmt,
};
let result = match fmt {
IngestionFormat::Csv => csv::ingest_csv_from_path(path, schema),
IngestionFormat::Json => json::ingest_json_from_path(path, schema),
IngestionFormat::Parquet => parquet::ingest_parquet_from_path(path, schema),
IngestionFormat::Excel => {
ingest_excel_dispatch(path, schema, &options.excel_sheet_selection)
}
};
let result = result.and_then(|ds| apply_watermark_after_ingest(ds, schema, options));
if let Some(obs) = options.observer.as_ref() {
match &result {
Ok(ds) => obs.on_success(
&ctx,
IngestionStats {
rows: ds.row_count(),
},
),
Err(e) => {
let sev = severity_for_error(e);
obs.on_failure(&ctx, sev, e);
if sev >= options.alert_at_or_above {
obs.on_alert(&ctx, sev, e);
}
}
}
}
result
}
fn options_without_watermark(options: &IngestionOptions) -> IngestionOptions {
let mut o = options.clone();
o.watermark_column = None;
o.watermark_exclusive_above = None;
o
}
#[derive(Debug, Clone, PartialEq)]
pub struct OrderedBatchIngestMetadata {
pub paths: Vec<PathBuf>,
pub last_path: Option<PathBuf>,
pub max_watermark_value: Option<Value>,
}
pub fn ingest_from_ordered_paths<P: AsRef<Path>>(
paths: &[P],
schema: &Schema,
options: &IngestionOptions,
) -> IngestionResult<(DataSet, OrderedBatchIngestMetadata)> {
validate_watermark_config(schema, options)?;
if paths.is_empty() {
return Err(IngestionError::SchemaMismatch {
message: "ingest_from_ordered_paths: empty path list".to_string(),
});
}
let path_bufs: Vec<PathBuf> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
let per_file_opts = options_without_watermark(options);
let mut all_rows: Vec<Vec<Value>> = Vec::new();
for p in &path_bufs {
let ds = ingest_from_path(p, schema, &per_file_opts)?;
all_rows.extend(ds.rows);
}
let mut ds = DataSet::new(schema.clone(), all_rows);
ds = apply_watermark_after_ingest(ds, schema, options)?;
let max_watermark_value = match &options.watermark_column {
Some(col) => max_value_in_column(&ds, schema, col),
None => None,
};
let last_path = path_bufs.last().cloned();
let meta = OrderedBatchIngestMetadata {
paths: path_bufs,
last_path,
max_watermark_value,
};
Ok((ds, meta))
}
pub fn infer_schema_from_path(
path: impl AsRef<Path>,
options: &IngestionOptions,
) -> IngestionResult<Schema> {
let path = path.as_ref();
let fmt = match options.format {
Some(f) => f,
None => infer_format_from_path(path)?,
};
match fmt {
IngestionFormat::Csv => {
let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
.with_has_header(true)
.finish()
.map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
.collect()
.map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
infer_schema_from_dataframe_lossy(&df)
}
IngestionFormat::Json => {
use std::fs::File;
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
let json_format = if ext.eq_ignore_ascii_case("ndjson") {
JsonFormat::JsonLines
} else {
JsonFormat::Json
};
let file = File::open(path)?;
let df = JsonReader::new(file)
.with_json_format(json_format)
.finish()
.map_err(|e| polars_error_to_ingestion("failed to read json with polars", e))?;
infer_schema_from_dataframe_lossy(&df)
}
IngestionFormat::Parquet => {
let df = LazyFrame::scan_parquet(
path.to_string_lossy().as_ref().into(),
ScanArgsParquet::default(),
)
.map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
.collect()
.map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
infer_schema_from_dataframe_lossy(&df)
}
IngestionFormat::Excel => infer_excel_schema_dispatch(path, &options.excel_sheet_selection),
}
}
pub fn ingest_from_path_infer(
path: impl AsRef<Path>,
options: &IngestionOptions,
) -> IngestionResult<DataSet> {
let schema = infer_schema_from_path(path.as_ref(), options)?;
ingest_from_path(path, &schema, options)
}
fn severity_for_error(e: &IngestionError) -> IngestionSeverity {
match e {
IngestionError::Io(_) => IngestionSeverity::Critical,
IngestionError::Parquet(err) => {
if error_chain_contains_io(err) {
IngestionSeverity::Critical
} else {
IngestionSeverity::Error
}
}
IngestionError::Csv(err) => match err.kind() {
::csv::ErrorKind::Io(_) => IngestionSeverity::Critical,
_ => IngestionSeverity::Error,
},
#[cfg(feature = "excel")]
IngestionError::Excel(_) => IngestionSeverity::Error,
IngestionError::Engine { source, .. } => {
if error_chain_contains_io(source.as_ref()) {
IngestionSeverity::Critical
} else {
IngestionSeverity::Error
}
}
IngestionError::SchemaMismatch { .. } => IngestionSeverity::Error,
IngestionError::ParseError { .. } => IngestionSeverity::Error,
}
}
fn error_chain_contains_io(e: &(dyn StdError + 'static)) -> bool {
let mut cur: Option<&(dyn StdError + 'static)> = Some(e);
while let Some(err) = cur {
if err.is::<std::io::Error>() {
return true;
}
cur = err.source();
}
false
}
fn infer_format_from_path(path: &Path) -> IngestionResult<IngestionFormat> {
let ext = path.extension().and_then(|s| s.to_str()).ok_or_else(|| {
IngestionError::SchemaMismatch {
message: format!(
"cannot infer format: path has no extension ({})",
path.display()
),
}
})?;
IngestionFormat::from_extension(ext).ok_or_else(|| IngestionError::SchemaMismatch {
message: format!(
"cannot infer format from extension '{ext}' for path ({})",
path.display()
),
})
}
fn ingest_excel_dispatch(
path: &Path,
schema: &Schema,
sel: &ExcelSheetSelection,
) -> IngestionResult<DataSet> {
match sel {
ExcelSheetSelection::First => excel::ingest_excel_from_path(path, None, schema),
ExcelSheetSelection::Sheet(name) => {
excel::ingest_excel_from_path(path, Some(name.as_str()), schema)
}
ExcelSheetSelection::AllSheets => {
excel::ingest_excel_workbook_from_path(path, None, schema)
}
ExcelSheetSelection::Sheets(names) => {
let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
excel::ingest_excel_workbook_from_path(path, Some(refs.as_slice()), schema)
}
}
}
fn infer_excel_schema_dispatch(path: &Path, sel: &ExcelSheetSelection) -> IngestionResult<Schema> {
match sel {
ExcelSheetSelection::First => excel::infer_excel_schema_from_path(path, None),
ExcelSheetSelection::Sheet(name) => {
excel::infer_excel_schema_from_path(path, Some(name.as_str()))
}
ExcelSheetSelection::AllSheets => excel::infer_excel_schema_workbook_from_path(path, None),
ExcelSheetSelection::Sheets(names) => {
let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
excel::infer_excel_schema_workbook_from_path(path, Some(refs.as_slice()))
}
}
}
#[derive(Clone)]
pub struct IngestionRequest {
pub path: PathBuf,
pub schema: Schema,
pub options: IngestionOptions,
}
impl fmt::Debug for IngestionRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IngestionRequest")
.field("path", &self.path)
.field("schema_fields", &self.schema.fields.len())
.field("options", &self.options)
.finish()
}
}
impl IngestionRequest {
pub fn run(&self) -> IngestionResult<DataSet> {
ingest_from_path(&self.path, &self.schema, &self.options)
}
}