use std::path::Path;
use std::sync::Arc;
use polars::prelude::{
col, DataFrame, DataType, LazyCsvReader, LazyFileListReader, PlPath, Schema, SerReader,
};
use crate::errors::{IoError, RunError};
use crate::io::format::{self, FileReadError, InputAdapter, LocalInputFile, ReadInput};
use crate::{config, FloeResult};
struct DelimitedInputAdapter {
format: &'static str,
}
static CSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "csv" };
static TSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "tsv" };
pub(crate) fn csv_input_adapter() -> &'static dyn InputAdapter {
&CSV_INPUT_ADAPTER
}
pub(crate) fn tsv_input_adapter() -> &'static dyn InputAdapter {
&TSV_INPUT_ADAPTER
}
#[derive(Debug, Clone)]
pub struct CsvReadPlan {
pub schema: Schema,
pub ignore_errors: bool,
}
impl CsvReadPlan {
pub fn strict(schema: Schema) -> Self {
Self {
schema,
ignore_errors: false,
}
}
}
pub fn read_csv_file(
input_path: &Path,
source_options: &config::SourceOptions,
plan: &CsvReadPlan,
) -> FloeResult<DataFrame> {
read_csv_lazy(
input_path,
source_options,
&plan.schema,
plan.ignore_errors,
None,
)
}
pub fn read_csv_header(
input_path: &Path,
source_options: &config::SourceOptions,
n_rows: Option<usize>,
) -> FloeResult<Vec<String>> {
let read_options = source_options
.to_csv_read_options(input_path)?
.with_n_rows(n_rows);
let reader = read_options
.try_into_reader_with_file_path(None)
.map_err(|err| {
Box::new(IoError(format!(
"failed to open csv at {}: {err}",
input_path.display()
))) as Box<dyn std::error::Error + Send + Sync>
})?;
let df = reader.finish().map_err(|err| {
Box::new(IoError(format!("csv header read failed: {err}")))
as Box<dyn std::error::Error + Send + Sync>
})?;
Ok(df
.get_column_names()
.iter()
.map(|name| name.to_string())
.collect())
}
impl InputAdapter for DelimitedInputAdapter {
fn format(&self) -> &'static str {
self.format
}
fn read_input_columns(
&self,
entity: &config::EntityConfig,
input_file: &LocalInputFile,
columns: &[config::ColumnConfig],
) -> Result<Vec<String>, FileReadError> {
let default_options = config::SourceOptions::defaults_for_format(self.format);
let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
resolve_input_columns(&input_file.local_path, source_options, columns).map_err(|err| {
FileReadError {
rule: format!("{}_read_error", self.format),
message: err.to_string(),
}
})
}
fn read_inputs(
&self,
entity: &config::EntityConfig,
files: &[LocalInputFile],
columns: &[config::ColumnConfig],
normalize_strategy: Option<&str>,
collect_raw: bool,
) -> FloeResult<Vec<ReadInput>> {
let default_options = config::SourceOptions::defaults_for_format(self.format);
let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
let mut inputs = Vec::with_capacity(files.len());
for input_file in files {
let input_columns =
resolve_input_columns(&input_file.local_path, source_options, columns)?;
inputs.push(read_csv_input_with_columns(
input_file,
source_options,
columns,
normalize_strategy,
collect_raw,
input_columns,
)?);
}
Ok(inputs)
}
fn read_inputs_with_prechecked_columns(
&self,
entity: &config::EntityConfig,
files: &[LocalInputFile],
columns: &[config::ColumnConfig],
normalize_strategy: Option<&str>,
collect_raw: bool,
prechecked_input_columns: Option<&[String]>,
) -> FloeResult<Vec<ReadInput>> {
if let (Some(prechecked), [input_file]) = (prechecked_input_columns, files) {
let default_options = config::SourceOptions::defaults_for_format(self.format);
let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
return Ok(vec![read_csv_input_with_columns(
input_file,
source_options,
columns,
normalize_strategy,
collect_raw,
prechecked.to_vec(),
)?]);
}
self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
}
}
fn read_csv_input_with_columns(
input_file: &LocalInputFile,
source_options: &config::SourceOptions,
columns: &[config::ColumnConfig],
normalize_strategy: Option<&str>,
collect_raw: bool,
input_columns: Vec<String>,
) -> FloeResult<ReadInput> {
let path = &input_file.local_path;
let typed_projection = if collect_raw {
projected_columns(&input_columns, columns)
} else {
None
};
let typed_schema =
format::build_typed_schema(input_columns.as_slice(), columns, normalize_strategy)?;
if collect_raw {
let raw_schema = build_raw_schema(&input_columns);
let raw_plan = CsvReadPlan::strict(raw_schema);
let raw_df = read_csv_file(path, source_options, &raw_plan)?;
let mut typed_df = format::cast_df_to_schema(&raw_df, &typed_schema)?;
if let Some(projection) = typed_projection.as_ref() {
typed_df = typed_df.select(projection).map_err(|err| {
Box::new(RunError(format!("failed to project typed columns: {err}")))
})?;
}
return format::finalize_read_input(input_file, Some(raw_df), typed_df, normalize_strategy);
}
let typed_plan = CsvReadPlan {
schema: typed_schema,
ignore_errors: true,
};
let typed_df = read_csv_lazy(path, source_options, &typed_plan.schema, true, None)?;
format::finalize_read_input(input_file, None, typed_df, normalize_strategy)
}
fn resolve_input_columns(
path: &Path,
source_options: &config::SourceOptions,
declared_columns: &[config::ColumnConfig],
) -> FloeResult<Vec<String>> {
let header = source_options.header.unwrap_or(true);
if header {
return read_csv_header(path, source_options, Some(0));
}
let input_columns = read_csv_header(path, source_options, Some(1))?;
let declared_names = declared_columns
.iter()
.map(|column| column.name.clone())
.collect::<Vec<_>>();
Ok(headless_columns(&declared_names, input_columns.len()))
}
fn headless_columns(declared_names: &[String], input_count: usize) -> Vec<String> {
let mut names = declared_names
.iter()
.take(input_count)
.cloned()
.collect::<Vec<_>>();
if input_count > declared_names.len() {
for index in declared_names.len()..input_count {
names.push(format!("extra_column_{}", index + 1));
}
}
names
}
fn build_raw_schema(columns: &[String]) -> Schema {
let mut schema = Schema::with_capacity(columns.len());
for name in columns {
schema.insert(name.as_str().into(), DataType::String);
}
schema
}
fn read_csv_lazy(
input_path: &Path,
source_options: &config::SourceOptions,
schema: &Schema,
ignore_errors: bool,
projection: Option<&[String]>,
) -> FloeResult<DataFrame> {
let header = source_options.header.unwrap_or(true);
let parse_options = source_options.to_csv_parse_options()?;
let path_str = input_path.to_string_lossy();
let mut reader = LazyCsvReader::new(PlPath::new(path_str.as_ref()))
.with_has_header(header)
.with_schema(Some(Arc::new(schema.clone())))
.with_ignore_errors(ignore_errors)
.map_parse_options(|_| parse_options.clone());
if let Some(chunk_size) = csv_chunk_size_for_path(input_path) {
reader = reader.with_chunk_size(chunk_size);
}
let mut lf = reader.finish().map_err(|err| {
Box::new(IoError(format!(
"failed to scan csv at {}: {err}",
input_path.display()
))) as Box<dyn std::error::Error + Send + Sync>
})?;
if let Some(columns) = projection {
let exprs = columns.iter().map(col).collect::<Vec<_>>();
lf = lf.select(exprs);
}
lf.collect().map_err(|err| {
Box::new(IoError(format!("csv read failed: {err}")))
as Box<dyn std::error::Error + Send + Sync>
})
}
fn projected_columns(
input_columns: &[String],
declared_columns: &[config::ColumnConfig],
) -> Option<Vec<String>> {
let declared = declared_columns
.iter()
.map(|column| column.name.as_str())
.collect::<std::collections::HashSet<_>>();
let projected = input_columns
.iter()
.filter(|name| declared.contains(name.as_str()))
.cloned()
.collect::<Vec<_>>();
if projected.is_empty() || projected.len() == input_columns.len() {
None
} else {
Some(projected)
}
}
fn csv_chunk_size_for_path(path: &Path) -> Option<usize> {
let metadata = std::fs::metadata(path).ok()?;
let size = metadata.len();
if size >= 64 * 1024 * 1024 {
Some(50_000)
} else {
None
}
}