use crate::{
Arguments, DEFAULT_OVERRIDE_REGEX, DEFAULT_QUERY, FileExtension, PathExtension,
PolarsViewError, PolarsViewResult, UniqueElements, sql_commands,
};
use egui::{
Align, CollapsingHeader, Color32, DragValue, Frame, Grid, Layout, Stroke, TextEdit, Ui, Vec2,
};
use polars::{io::RowIndex, prelude::*};
use regex::Regex;
use tokio::task::spawn_blocking;
use std::{
fmt::Debug,
fs::File,
num::NonZero,
path::{Path, PathBuf},
sync::Arc,
};
pub static NULL_VALUES: &str = r#""", <N/D>"#;
pub static DEFAULT_CSV_DELIMITER: &str = ";";
pub const DEFAULT_INDEX_COLUMN_NAME: &str = "Row Number";
const DEFAULT_NORM_REGEX: &str = "^Val.*$";
const DEFAULT_DROP_REGEX: &str = "^Temp.*$";
const DEFAULT_INDEX_COLUMN_OFFSET: u32 = 1;
const DEFAULT_INFER_SCHEMA_ROWS: usize = 200;
pub const MAX_ATTEMPTS: u32 = 1000;
#[derive(Debug, Clone, PartialEq)] pub struct DataFilter {
pub absolute_path: PathBuf,
pub table_name: String,
pub csv_delimiter: String,
pub read_data_from_file: bool,
pub schema: Arc<Schema>,
pub infer_schema_rows: usize,
pub exclude_null_cols: bool,
pub null_values: String,
pub force_string_patterns: Option<String>,
pub apply_sql: bool,
pub query: String,
pub add_row_index: bool,
pub index_column_name: String,
pub index_column_offset: u32,
pub normalize: bool,
pub normalize_regex: String,
pub drop: bool,
pub drop_regex: String,
}
impl Default for DataFilter {
fn default() -> Self {
DataFilter {
absolute_path: PathBuf::new(),
table_name: "AllData".to_string(),
csv_delimiter: DEFAULT_CSV_DELIMITER.to_string(),
read_data_from_file: true,
schema: Schema::default().into(),
infer_schema_rows: DEFAULT_INFER_SCHEMA_ROWS,
exclude_null_cols: false,
null_values: NULL_VALUES.to_string(),
force_string_patterns: DEFAULT_OVERRIDE_REGEX.map(ToString::to_string),
apply_sql: false,
query: DEFAULT_QUERY.to_string(),
add_row_index: false, index_column_name: DEFAULT_INDEX_COLUMN_NAME.to_string(),
index_column_offset: DEFAULT_INDEX_COLUMN_OFFSET,
normalize: false,
normalize_regex: DEFAULT_NORM_REGEX.to_string(),
drop: false,
drop_regex: DEFAULT_DROP_REGEX.to_string(),
}
}
}
impl DataFilter {
pub fn new(args: &Arguments) -> PolarsViewResult<Self> {
let absolute_path = match &args.path {
Some(p) => p.canonicalize()?, None => PathBuf::new(), };
let apply_sql = args.query.is_some();
let query = args
.query
.clone()
.unwrap_or_else(|| DEFAULT_QUERY.to_string());
let normalize = args.regex.is_some();
let normalize_regex = args
.regex
.clone()
.unwrap_or_else(|| DEFAULT_NORM_REGEX.to_string());
let force_string_patterns = args
.force_string_patterns .clone() .or(DEFAULT_OVERRIDE_REGEX.map(ToString::to_string));
Ok(DataFilter {
absolute_path,
table_name: args.table_name.clone(),
csv_delimiter: args.delimiter.clone(),
apply_sql, query,
exclude_null_cols: args.exclude_null_cols,
null_values: args.null_values.clone(),
force_string_patterns,
normalize, normalize_regex, ..Default::default() })
}
pub fn set_path(&mut self, path: &Path) -> PolarsViewResult<()> {
self.absolute_path = path.canonicalize()?;
tracing::debug!("absolute_path set to: {:#?}", self.absolute_path);
Ok(())
}
pub fn get_extension(&self) -> Option<String> {
self.absolute_path.extension_as_lowercase()
}
pub fn get_row_index(&self, schema: &Schema) -> PolarsResult<Option<RowIndex>> {
if !self.add_row_index {
tracing::trace!("Row index addition disabled in filter.");
return Ok(None); }
let unique_name = resolve_unique_column_name(
&self.index_column_name, schema, )?;
let index_offset = self.index_column_offset;
Ok(Some(RowIndex {
name: unique_name,
offset: index_offset,
}))
}
pub async fn get_df_and_extension(&mut self) -> PolarsViewResult<(DataFrame, FileExtension)> {
let extension = FileExtension::from_path(&self.absolute_path);
let (df, detected_delimiter) = match &extension {
FileExtension::Csv => self.read_csv_data().await?,
FileExtension::Json => self.read_json_data().await?,
FileExtension::NDJson => self.read_ndjson_data().await?,
FileExtension::Parquet => self.read_parquet_data().await?,
FileExtension::Unknown(ext) => {
return Err(PolarsViewError::FileType(format!(
"Unsupported extension: `{}` for file: `{}`",
ext,
self.absolute_path.display()
)));
}
FileExtension::Missing => {
return Err(PolarsViewError::FileType(format!(
"Missing extension for file: `{}`",
self.absolute_path.display()
)));
}
};
if let Some(byte) = detected_delimiter {
self.csv_delimiter = (byte as char).to_string();
}
tracing::debug!(
"fn get_df_and_extension(): Successfully loaded DataFrame with extension: {:?}",
extension
);
Ok((df, extension)) }
async fn read_json_data(&self) -> PolarsViewResult<(DataFrame, Option<u8>)> {
tracing::debug!("Reading JSON data from: {}", self.absolute_path.display());
let file = File::open(&self.absolute_path)?;
let infer_schema_rows_for_task = self.infer_schema_rows;
let df = execute_polars_blocking(move || {
JsonReader::new(file)
.infer_schema_len(NonZero::new(infer_schema_rows_for_task))
.finish()
})
.await?;
tracing::debug!("JSON read complete. Shape: {:?}", df.shape());
Ok((df, None))
}
async fn read_ndjson_data(&self) -> PolarsViewResult<(DataFrame, Option<u8>)> {
tracing::debug!("Reading NDJSON data from: {}", self.absolute_path.display());
let pl_ref_path = PlRefPath::try_from_path(&self.absolute_path)?;
let infer_schema_rows_for_task = self.infer_schema_rows;
let df = execute_polars_blocking(move || {
let lazyframe = LazyJsonLineReader::new(pl_ref_path) .low_memory(false) .with_infer_schema_length(NonZero::new(infer_schema_rows_for_task))
.with_ignore_errors(true)
.finish()?;
lazyframe.with_new_streaming(true).collect() })
.await?;
tracing::debug!("NDJSON read complete. Shape: {:?}", df.shape());
Ok((df, None))
}
async fn read_parquet_data(&self) -> PolarsViewResult<(DataFrame, Option<u8>)> {
tracing::debug!(
"Reading Parquet data from: {}",
self.absolute_path.display()
);
let pl_ref_path = PlRefPath::try_from_path(&self.absolute_path)?;
let args = ScanArgsParquet {
low_memory: false, ..Default::default()
};
let df = execute_polars_blocking(move || {
let lazyframe = LazyFrame::scan_parquet(pl_ref_path, args)?;
lazyframe.with_new_streaming(true).collect() })
.await?;
tracing::debug!("Parquet read complete. Shape: {:?}", df.shape());
Ok((df, None))
}
async fn read_csv_data(&self) -> PolarsViewResult<(DataFrame, Option<u8>)> {
let initial_separator = self.get_csv_separator()?;
let mut delimiters_to_try = vec![initial_separator, b',', b';', b'|', b'\t', b':'];
delimiters_to_try.unique();
tracing::debug!(
"Attempting CSV read. Delimiters to try: {:?}",
delimiters_to_try
.iter()
.map(|&b| b as char)
.collect::<Vec<_>>()
);
let mut iterator = delimiters_to_try.iter().peekable();
while let Some(&delimiter) = iterator.next() {
let is_last_element = iterator.peek().is_none();
if let Ok(schema) = self
.attempt_csv_parse_structure(delimiter, is_last_element)
.await
{
tracing::debug!(
"Trying to read full CSV file with delimiter: '{}'",
delimiter as char
);
match self.attempt_read_csv(delimiter, &schema).await {
Ok(lazyframe) => {
tracing::info!(
"Successfully read CSV with delimiter: '{}'",
delimiter as char
);
let df = execute_polars_blocking(move || {
lazyframe.with_new_streaming(true).collect()
})
.await?;
tracing::debug!("Data collection complete. Shape: {:?}", df.shape());
return Ok((df, Some(delimiter)));
}
Err(e) => {
tracing::warn!(
"Full CSV read failed with delimiter '{}' after quick check passed: {}",
delimiter as char,
e
);
continue; }
}
}
}
let msg = format!(
"Failed to read CSV '{}' with common delimiters. Check format or specify delimiter.",
self.absolute_path.display()
);
let error = PolarsViewError::CsvParsing(msg);
tracing::error!("{}", error);
Err(error)
}
fn get_csv_separator(&self) -> PolarsViewResult<u8> {
self.csv_delimiter
.as_bytes() .first() .copied() .ok_or_else(|| PolarsViewError::InvalidDelimiter(self.csv_delimiter.clone()))
}
async fn attempt_csv_parse_structure(
&self,
delimiter: u8,
is_last_element: bool,
) -> PolarsViewResult<Arc<Schema>> {
const ROW_LIMIT: usize = 100;
tracing::debug!(
"Trying to parse CSV with delimiter: '{}'",
delimiter as char,
);
let file_path = &self.absolute_path;
let data_frame = read_csv_partial_from_path(delimiter, ROW_LIMIT, file_path).await?;
let min_expected_cols_on_success = if self.add_row_index { 2 } else { 1 };
if data_frame.width() <= min_expected_cols_on_success && !is_last_element {
tracing::warn!(
"CSV read with delimiter '{}' resulted in {} columns (expected > {}). Assuming incorrect delimiter.",
delimiter as char,
data_frame.width(),
min_expected_cols_on_success
);
return Err(PolarsViewError::CsvParsing(format!(
"Delimiter '{}' likely incorrect (resulted in {} columns)",
delimiter as char,
data_frame.width()
)));
}
tracing::debug!(
"CSV read successful with delimiter '{}'. Final shape (rows, columns): {:?}",
delimiter as char,
data_frame.shape()
);
Ok(data_frame.schema().clone())
}
async fn attempt_read_csv(
&self,
delimiter: u8,
previous_scheme: &Arc<Schema>,
) -> PolarsViewResult<LazyFrame> {
tracing::debug!(
"Attempting CSV read with delimiter: '{}'",
delimiter as char,
);
let mut dtypes_opt: Option<Arc<Schema>> = None;
if let Some(force_string_patterns) = &self.force_string_patterns {
let override_schema = build_dtype_override_schema(
previous_scheme,
force_string_patterns, )?;
if !override_schema.is_empty() {
dtypes_opt = Some(Arc::new(override_schema));
};
}
let pl_ref_path = PlRefPath::try_from_path(&self.absolute_path)?;
let lazyframe = LazyCsvReader::new(pl_ref_path)
.with_low_memory(false) .with_encoding(CsvEncoding::LossyUtf8) .with_has_header(true) .with_try_parse_dates(true) .with_separator(delimiter) .with_infer_schema_length(Some(self.infer_schema_rows)) .with_dtype_overwrite(dtypes_opt)
.with_ignore_errors(true) .with_missing_is_null(true) .with_null_values(None) .with_n_rows(None) .with_decimal_comma(false) .with_row_index(None) .with_rechunk(true) .finish()?;
Ok(lazyframe)
}
pub fn parse_null_values(&self) -> Vec<&str> {
self.null_values
.split(',') .map(|s| {
let trimmed = s.trim();
if trimmed.len() >= 2 && trimmed.starts_with('"') && trimmed.ends_with('"') {
trimmed[1..trimmed.len() - 1].trim()
} else {
trimmed
}
})
.collect() }
pub fn render_query(&mut self, ui: &mut Ui) -> Option<DataFilter> {
let filters_before_render = self.clone();
let mut result = None;
let width_min = 450.0;
let grid = Grid::new("data_query_grid")
.num_columns(2)
.spacing([10.0, 20.0]) .striped(true);
ui.allocate_ui_with_layout(
Vec2::new(ui.available_width(), ui.available_height()), Layout::top_down(Align::LEFT),
|ui| {
grid.show(ui, |ui| {
ui.set_min_width(width_min);
self.render_add_row_number(ui);
self.render_exclude_null_cols(ui);
self.render_exclude_columns(ui);
self.render_normalize_numbers(ui);
self.render_null_values(ui);
if matches!(
self.get_extension().as_deref(), Some("csv" | "json" | "ndjson") ) {
self.render_schema_length_input(ui);
}
if self.get_extension().as_deref() == Some("csv") {
self.render_csv_delimiter(ui);
}
self.render_table_name_input(ui);
self.render_sql_query_input(ui);
if *self != filters_before_render {
self.apply_sql = true;
tracing::debug!("Change detected in DataFilter UI.");
}
if (self.csv_delimiter != filters_before_render.csv_delimiter)
|| (self.infer_schema_rows != filters_before_render.infer_schema_rows)
{
self.read_data_from_file = true;
}
ui.label(""); ui.with_layout(Layout::top_down(Align::Center), |ui| {
if ui.button("Apply SQL commands").clicked() {
if self.apply_sql {
result = Some(self.clone());
}
tracing::debug!("Apply SQL commands: {}", self.apply_sql);
}
});
ui.end_row();
}); }, );
self.render_sql_examples(ui);
result }
fn render_add_row_number(&mut self, ui: &mut Ui) {
ui.label("Add Row Number:");
ui.checkbox(&mut self.add_row_index, "")
.on_hover_text("Add a new column that counts the rows (first column).");
ui.end_row();
if self.add_row_index {
ui.label("\tName:");
let name_edit =
TextEdit::singleline(&mut self.index_column_name).desired_width(f32::INFINITY); ui.add(name_edit)
.on_hover_text("Name for the new index column (uniqueness checked later).");
ui.end_row();
ui.label("\tOffset:");
let offset_drag = DragValue::new(&mut self.index_column_offset)
.speed(1) .range(0..=u32::MAX); ui.add(offset_drag)
.on_hover_text("Starting value for the index (e.g., 0 or 1).");
ui.end_row();
}
}
fn render_exclude_null_cols(&mut self, ui: &mut Ui) {
ui.label("Exclude Null Cols:");
ui.checkbox(&mut self.exclude_null_cols, "")
.on_hover_text("Remove columns containing only null values.");
ui.end_row();
}
fn render_exclude_columns(&mut self, ui: &mut Ui) {
ui.label("Remove Columns:");
ui.checkbox(&mut self.drop, "")
.on_hover_text("Remove columns whose names match the specified regex pattern.");
ui.end_row();
if self.drop {
ui.label("\tRegex:");
let name_edit = TextEdit::singleline(&mut self.drop_regex).desired_width(f32::INFINITY); ui.add(name_edit).on_hover_text(
"Enter the regex pattern to identify columns to drop by name.\n\n\
Format Requirements:\n\
- Use `*` to drop ALL columns.\n\
- Use `^YourPattern$` to match the entire column name.\n \
(Must start with `^` and end with `$`).\n\n\
Regex Examples:\n\
- `^Temp.*$` (Matches columns starting with 'Temp')\n\
- `^Value B$` (Matches the exact column named 'Value B')\n\
- `^(ID|Key|Index)$` (Matches 'ID', 'Key', or 'Index' exactly)\n\
- `^.*_OLD$` (Matches columns ending with '_OLD')\n\n\
(Invalid regex syntax or format will cause errors.)",
);
ui.end_row();
}
}
fn render_normalize_numbers(&mut self, ui: &mut Ui) {
ui.label("Normalize Columns:");
ui.checkbox(&mut self.normalize, "").on_hover_text(
"Normalize Euro-style number strings in selected column names (via regex) to Float64.\n\
Example: '1.234,56' (String) to '1234.56' (Float64).",
);
ui.end_row();
if self.normalize {
ui.label("\tRegex:");
let name_edit =
TextEdit::singleline(&mut self.normalize_regex).desired_width(f32::INFINITY); ui.add(name_edit).on_hover_text(
r#"
Enter a regex pattern to select String columns by name.
Rules:
- Use '*' for ALL String columns (caution!).
- Use '^PATTERN$' for specific names (matches entire name).
Example Columns:
Row Number, Value1, Value2, ValueA, Valor, Total, SubTotal, Last Info
Example Patterns:
1. To select 'Value1', 'Value2':
^Value\d$
2. To select 'Value1', 'Value2', 'ValueA':
^Value.*$
3. To select 'Value1', 'Value2', 'ValueA', 'Valor':
^Val.*$
4. To select 'Value1', 'Value2', 'ValueA', 'Valor', 'Total', 'SubTotal':
^(Val|.*Total).*$
5. To select only 'Last Info' (note the space):
^Last Info$
(Applies only to columns that Polars identifies as String type.)"#,
);
ui.end_row();
}
}
fn render_null_values(&mut self, ui: &mut Ui) {
ui.label("Null Values:");
let null_values_edit =
TextEdit::singleline(&mut self.null_values).desired_width(f32::INFINITY);
ui.add(null_values_edit).on_hover_text(
"Comma-separated values to interpret as null during loading.\n\
Leading/trailing whitespace for each value is automatically trimmed.",
);
ui.end_row();
}
fn render_schema_length_input(&mut self, ui: &mut Ui) {
ui.label("Infer Rows:");
ui.add(
DragValue::new(&mut self.infer_schema_rows)
.speed(1) .range(0..=usize::MAX), )
.on_hover_text(
"Number of rows to scan for inferring data types (CSV/JSON)\n0: No inference",
);
ui.end_row();
}
fn render_csv_delimiter(&mut self, ui: &mut Ui) {
ui.label("CSV Delimiter:");
let csv_delimiter_edit = TextEdit::singleline(&mut self.csv_delimiter)
.char_limit(1) .desired_width(f32::INFINITY);
ui.add(csv_delimiter_edit)
.on_hover_text("Enter the single character CSV delimiter");
ui.end_row();
}
fn render_table_name_input(&mut self, ui: &mut Ui) {
ui.label("SQL Table Name:");
let table_name_edit =
TextEdit::singleline(&mut self.table_name).desired_width(f32::INFINITY);
ui.add(table_name_edit)
.on_hover_text("Name of the table to use in SQL queries (e.g., FROM TableName)");
ui.end_row();
}
fn render_sql_query_input(&mut self, ui: &mut Ui) {
ui.label("SQL Query:"); ui.vertical(|ui| {
ui.set_min_width(300.0);
let examples = sql_commands(&self.schema);
if examples.is_empty() {
ui.add(
TextEdit::multiline(&mut self.query)
.desired_width(f32::INFINITY)
.desired_rows(8) .font(egui::TextStyle::Monospace),
);
return; }
let tab_id = ui.id().with("sql_query_tab_index");
let mut selected_tab_index =
ui.memory_mut(|mem| *mem.data.get_persisted_mut_or_default::<usize>(tab_id));
selected_tab_index = selected_tab_index.min(examples.len().saturating_sub(1));
ui.separator();
ui.label("Examples:");
ui.horizontal_wrapped(|ui| {
for i in 0..examples.len() {
let is_selected = selected_tab_index == i;
let tab_name = format!("{}", i + 1);
let resp = ui
.selectable_label(is_selected, tab_name)
.on_hover_text(
examples
.get(i) .and_then(|s| s.lines().next()) .unwrap_or(""), );
if resp.clicked() && !is_selected {
selected_tab_index = i;
if let Some(example_query) = examples.get(i) {
self.query = example_query.clone(); tracing::debug!(
"Switched SQL Query tab to Example {}, query text updated.",
i + 1
);
}
ui.memory_mut(|mem| mem.data.insert_persisted(tab_id, selected_tab_index));
}
}
});
ui.separator();
ui.add(
TextEdit::multiline(&mut self.query)
.desired_width(f32::INFINITY) .desired_rows(6) .font(egui::TextStyle::Monospace), )
.on_hover_text(
"Enter SQL query (Polars SQL).\n\
Click Example tabs above.\n\
Changes trigger reload on Apply/focus change.",
);
}); ui.end_row(); }
fn render_sql_examples(&self, ui: &mut Ui) {
CollapsingHeader::new("SQL Command Examples")
.default_open(false)
.show(ui, |ui| {
let quoting_tip = "Tip: Use double quotes (\") or backticks (`) around column names with spaces or special characters (e.g., \"Column Name\" or `Column Name`).";
ui.label(quoting_tip);
Frame::default()
.stroke(Stroke::new(1.0, Color32::GRAY))
.outer_margin(2.0)
.inner_margin(10.0)
.show(ui, |ui| {
ui.vertical_centered(|ui| {
let polars_sql_url = "https://docs.pola.rs/api/python/stable/reference/sql/index.html";
ui.hyperlink_to("Polars SQL Reference", polars_sql_url).on_hover_text(polars_sql_url);
});
ui.separator();
let examples = sql_commands(&self.schema);
let mut ex_num = Vec::new();
for (index, example) in examples.iter().enumerate() {
ex_num.push(format!("Example {count}:\n{example}", count = index + 1));
}
ui.add(egui::Label::new(ex_num.join("\n\n")).selectable(true));
});
});
}
}
pub async fn read_csv_partial_from_path(
delimiter: u8,
n_rows: usize,
path: &Path,
) -> PolarsViewResult<DataFrame> {
tracing::debug!("Read a CSV file using Polars limited to {} rows.", n_rows,);
let csv_parse_options = CsvParseOptions::default()
.with_encoding(CsvEncoding::LossyUtf8) .with_missing_is_null(true) .with_separator(delimiter);
let csv_read_options = CsvReadOptions::default()
.with_parse_options(csv_parse_options) .with_has_header(true) .with_infer_schema_length(Some(0)) .with_ignore_errors(true) .with_n_rows(Some(n_rows)) .try_into_reader_with_file_path(Some(path.to_path_buf()))?;
let df = execute_polars_blocking(move || csv_read_options.finish()).await?;
tracing::debug!("Partial CSV read complete. Shape: {:?}", df.shape());
Ok(df)
}
fn build_dtype_override_schema(
input_schema: &Arc<Schema>,
regex_pattern: &str,
) -> PolarsViewResult<Schema> {
let mut overrides_schema = Schema::default();
if regex_pattern.trim() == "*" {
tracing::debug!(
"Wildcard pattern '{regex_pattern}' provided. Overriding all columns to String."
);
return Ok(input_schema.as_ref().clone()); }
if !(regex_pattern.starts_with('^') && regex_pattern.ends_with('$')) {
return Err(PolarsViewError::InvalidRegexPattern(
regex_pattern.to_string(),
));
}
let compiled_regex = match Regex::new(regex_pattern) {
Ok(re) => re,
Err(e) => {
return Err(PolarsViewError::InvalidRegexSyntax {
pattern: regex_pattern.to_string(),
error: e.to_string(),
});
}
};
for col_name in input_schema.iter_names() {
if compiled_regex.is_match(col_name) {
overrides_schema.insert(col_name.clone(), DataType::String);
}
}
if !overrides_schema.is_empty() {
tracing::debug!(
override_cols = ?overrides_schema.iter_names().collect::<Vec<_>>(),
"Pattern '{}' matched {} columns: ",
regex_pattern,
overrides_schema.len()
);
} else {
tracing::debug!("Provided regex patterns did not match any header columns.");
}
Ok(overrides_schema) }
fn resolve_unique_column_name(base_name: &str, schema: &Schema) -> PolarsResult<PlSmallStr> {
if schema.get(base_name).is_none() {
tracing::debug!("Base name '{}' is available.", base_name);
return Ok(base_name.into());
}
tracing::debug!(
"Base name '{}' conflicts. Searching unique name.",
base_name
);
let mut suffix_counter = 1u32;
loop {
let candidate_name = format!("{base_name}_{suffix_counter}");
if schema.get(&candidate_name).is_none() {
tracing::debug!("Found unique name: '{}'.", candidate_name);
return Ok(candidate_name.into()); }
suffix_counter = suffix_counter.checked_add(1).unwrap_or(MAX_ATTEMPTS);
if suffix_counter >= MAX_ATTEMPTS {
let msg = format!(
"Failed to find a unique column name starting with '{base_name}' after {MAX_ATTEMPTS} attempts."
);
tracing::error!("{}", msg);
return Err(PolarsError::ComputeError(msg.into()));
}
}
}
async fn execute_polars_blocking<T, F>(op: F) -> PolarsViewResult<T>
where
F: FnOnce() -> Result<T, PolarsError> + Send + 'static, T: Debug + Send + 'static, {
let result_from_task = spawn_blocking(op).await;
let polars_result = result_from_task.map_err(PolarsViewError::from)?;
let final_result = polars_result.map_err(PolarsViewError::from)?;
Ok(final_result) }
#[cfg(test)]
mod tests_override_columns {
use super::*;
use std::{fs::File, io::Write};
use tempfile::NamedTempFile;
fn setup_test_csv(
content: &str, delimiter: char,
force_string_patterns: Option<String>, ) -> PolarsViewResult<(NamedTempFile, DataFilter)> {
let temp_file = NamedTempFile::new()?;
let file_path = temp_file.path().to_path_buf();
let mut file = File::create(&file_path)?;
file.write_all(content.as_bytes())?;
file.flush()?;
let filter = DataFilter {
absolute_path: file_path, force_string_patterns, csv_delimiter: delimiter.to_string(), ..Default::default() };
Ok((temp_file, filter))
}
#[tokio::test] async fn test_csv_read_with_override_success() -> PolarsViewResult<()> {
println!("\n--- Test: Override Applied Successfully ---");
let csv_content = "\
long_id;value;text
12345678901234567890123456789012345678901234;10.5;abc
98765432109876543210987654321098765432109876;20.0;def
12345;30.7;ghi";
println!("Input CSV Content:\n{csv_content}\n");
let df_expected = df!(
"long_id" => &[
"12345678901234567890123456789012345678901234",
"98765432109876543210987654321098765432109876",
"12345"
],
"value" => &[10.5, 20.0, 30.7],
"text" => &["abc", "def", "ghi"]
)
.expect("Failed to create expected DataFrame");
println!("Expected DF (After Read):\n{df_expected}");
let delimiter = ';';
let col_regex = "^long_id$".to_string();
let (_temp_file, filter) = setup_test_csv(csv_content, delimiter, Some(col_regex))?;
let schema = filter
.attempt_csv_parse_structure(delimiter as u8, false)
.await?;
println!("schema: {schema:#?}");
let lazyframe = filter.attempt_read_csv(delimiter as u8, &schema).await?;
println!("get lazyframe");
let df_output =
execute_polars_blocking(move || lazyframe.with_new_streaming(true).collect()).await?;
println!("Output DF (Actual Read):\n{df_output}");
assert_eq!(
df_output.schema().get("long_id"),
Some(&DataType::String),
"Schema Check Failed: 'long_id' should be DataType::String"
);
assert_eq!(
df_output.schema().get("value"),
Some(&DataType::Float64),
"Schema Check Failed: 'value' should be DataType::Float64"
);
assert_eq!(
df_output.schema().get("text"),
Some(&DataType::String),
"Schema Check Failed: 'text' should be DataType::String"
);
assert_eq!(
df_output, df_expected,
"Content Check Failed: Output DF does not match expected DF"
);
Ok(())
}
#[tokio::test] async fn test_csv_read_without_override_yields_nulls() -> PolarsViewResult<()> {
println!("\n--- Test: No Override Applied (Expect Nulls) ---");
let csv_content = "\
long_id;value;text
12345678901234567890123456789012345678901234;10.5;abc
98765432109876543210987654321098765432109876;20.0;def";
println!("Input CSV Content:\n{csv_content}\n");
let df_expected_pattern = df!(
"long_id" => Series::new_null("long_id".into(), 2).cast(&DataType::Int64)?, "value" => &[10.5, 20.0],
"text" => &["abc", "def"]
)
.expect("Failed to create expected pattern DataFrame");
println!("Expected DF Pattern (After Read, note long_id nulls):\n{df_expected_pattern}");
let delimiter = ';';
let col_regex = "^Col Name$".to_string();
let (_temp_file, filter) = setup_test_csv(csv_content, delimiter, Some(col_regex))?;
let schema = filter
.attempt_csv_parse_structure(delimiter as u8, false)
.await?;
println!("schema: {schema:#?}");
let lazyframe = filter.attempt_read_csv(delimiter as u8, &schema).await?;
println!("get lazyframe");
let df_output = spawn_blocking(move || lazyframe.with_new_streaming(true).collect())
.await
.map_err(PolarsViewError::from)? .map_err(PolarsViewError::from)?;
println!("Output DF (Actual Read):\n{df_output}");
let long_id_col = df_output.column("long_id")?;
assert!(
long_id_col.is_null().all(), "Content Check Failed: 'long_id' column should be all nulls without override. Type: {:?}, Null count: {}",
long_id_col.dtype(),
long_id_col.null_count()
);
assert_eq!(
df_output.column("value")?,
df_expected_pattern.column("value")?
);
assert_eq!(
df_output.column("text")?,
df_expected_pattern.column("text")?
);
Ok(())
}
}