use polars::prelude::*;
use std::fs::File;
use std::io::BufRead;
use std::path::Path;
#[derive(Debug, Clone)]
pub struct FinancialColumns {
pub date: Option<String>,
pub open: Option<String>,
pub high: Option<String>,
pub low: Option<String>,
pub close: Option<String>,
pub volume: Option<String>,
}
pub fn read_csv<P: AsRef<Path>>(
file_path: P,
has_header: bool,
delimiter: char,
) -> PolarsResult<DataFrame> {
let file = File::open(file_path)?;
let csv_options = CsvReadOptions::default()
.with_has_header(has_header)
.map_parse_options(|opts| opts.with_separator(delimiter as u8));
CsvReader::new(file).with_options(csv_options).finish()
}
pub fn read_csv_default<P: AsRef<Path>>(file_path: P) -> PolarsResult<DataFrame> {
read_csv(file_path, true, ',')
}
pub fn read_parquet<P: AsRef<Path>>(file_path: P) -> PolarsResult<DataFrame> {
let file = File::open(file_path)?;
ParquetReader::new(file).finish()
}
pub fn read_financial_data<P: AsRef<Path>>(
file_path: P,
) -> PolarsResult<(DataFrame, FinancialColumns)> {
let path = file_path.as_ref();
let file_type = path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.ok_or_else(|| {
PolarsError::ComputeError("Could not determine file type from extension".into())
})?;
let df = match file_type.as_str() {
"csv" => {
let file = File::open(path)?;
let mut reader = std::io::BufReader::new(file);
let mut first_line = String::new();
reader.read_line(&mut first_line)?;
let has_header = ["date", "time", "open", "high", "low", "close", "volume"]
.iter()
.any(|&name| first_line.to_lowercase().contains(name));
let delimiters = [',', ';', '\t', '|'];
let mut last_error = None;
for &delimiter in &delimiters {
match read_csv(path, has_header, delimiter) {
Ok(df) => return process_dataframe(df, has_header),
Err(e) => last_error = Some(e),
}
}
Err(last_error.unwrap_or_else(|| {
PolarsError::ComputeError("Failed to read CSV with any common delimiter".into())
}))?
}
"parquet" => read_parquet(path)?,
_ => {
return Err(PolarsError::ComputeError(
format!("Unsupported file type: {}", file_type).into(),
))
}
};
let columns = map_columns_with_headers(&df)?;
Ok((df, columns))
}
fn process_dataframe(
mut df: DataFrame,
has_header: bool,
) -> PolarsResult<(DataFrame, FinancialColumns)> {
let columns = if has_header {
map_columns_with_headers(&df)?
} else {
rename_columns_without_headers(&mut df)?
};
Ok((df, columns))
}
fn map_columns_with_headers(df: &DataFrame) -> PolarsResult<FinancialColumns> {
let column_names: Vec<String> = df
.get_column_names()
.into_iter()
.map(|s| s.to_string())
.collect();
let mut financial_columns = FinancialColumns {
date: None,
open: None,
high: None,
low: None,
close: None,
volume: None,
};
let date_variations = ["date", "time", "datetime", "timestamp", "dt"];
let open_variations = ["open", "o", "opening"];
let high_variations = ["high", "h", "highest"];
let low_variations = ["low", "l", "lowest"];
let close_variations = ["close", "c", "closing"];
let volume_variations = ["volume", "vol", "v"];
for col in column_names {
let lower_col = col.to_lowercase();
if financial_columns.date.is_none()
&& date_variations.iter().any(|&v| lower_col.contains(v))
{
financial_columns.date = Some(col.clone());
} else if financial_columns.open.is_none()
&& open_variations.iter().any(|&v| lower_col.contains(v))
{
financial_columns.open = Some(col.clone());
} else if financial_columns.high.is_none()
&& high_variations.iter().any(|&v| lower_col.contains(v))
{
financial_columns.high = Some(col.clone());
} else if financial_columns.low.is_none()
&& low_variations.iter().any(|&v| lower_col.contains(v))
{
financial_columns.low = Some(col.clone());
} else if financial_columns.close.is_none()
&& close_variations.iter().any(|&v| lower_col.contains(v))
{
financial_columns.close = Some(col.clone());
} else if financial_columns.volume.is_none()
&& volume_variations.iter().any(|&v| lower_col.contains(v))
{
financial_columns.volume = Some(col.clone());
}
}
Ok(financial_columns)
}
fn rename_columns_without_headers(df: &mut DataFrame) -> PolarsResult<FinancialColumns> {
let n_cols = df.width();
let mut col_names = vec![String::new(); n_cols];
let mut identified_cols = vec![false; n_cols];
let mut financial_columns = FinancialColumns {
date: None,
open: None,
high: None,
low: None,
close: None,
volume: None,
};
for i in 0..n_cols {
if let Some(series) = df.select_at_idx(i) {
if !identified_cols[i]
&& (series.dtype() == &DataType::String
|| series.dtype() == &DataType::Date
|| matches!(series.dtype(), DataType::Datetime(_, _)))
{
col_names[i] = "date".to_string();
financial_columns.date = Some("date".to_string());
identified_cols[i] = true;
break; }
}
}
for (i, &identified) in identified_cols.iter().enumerate().take(n_cols) {
if identified {
continue;
}
if let Some(series) = df.select_at_idx(i) {
if series.dtype().is_primitive_numeric() {
if let Ok(f64_series) = series.cast(&DataType::Float64) {
if let Ok(nums) = f64_series.f64() {
let is_volume =
if let (Some(mean), Some(std_dev)) = (nums.mean(), nums.std(0)) {
let other_cols_mean = get_numeric_columns_mean(df, i)?;
mean > other_cols_mean * 100.0 && std_dev > mean * 0.1
} else {
false
};
if is_volume {
col_names[i] = "volume".to_string();
financial_columns.volume = Some("volume".to_string());
identified_cols[i] = true;
break; }
}
}
}
}
}
let mut price_stats: Vec<(usize, f64, f64, f64)> = Vec::new();
for (i, &identified) in identified_cols.iter().enumerate().take(n_cols) {
if identified {
continue;
}
if let Some(series) = df.select_at_idx(i) {
if series.dtype().is_primitive_numeric() {
if let Ok(f64_series) = series.cast(&DataType::Float64) {
if let Ok(nums) = f64_series.f64() {
if let (Some(min), Some(max), Some(std)) =
(nums.min(), nums.max(), nums.std(0))
{
price_stats.push((i, min, max, std));
}
}
}
}
}
}
price_stats.sort_by(|a, b| {
let a_range = a.2 - a.1;
let b_range = b.2 - b.1;
b_range
.partial_cmp(&a_range)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| b.3.partial_cmp(&a.3).unwrap_or(std::cmp::Ordering::Equal))
});
for (idx, stat) in price_stats.iter().enumerate() {
let i = stat.0;
if !identified_cols[i] {
let col_name = match idx {
0 => {
financial_columns.high = Some("high".to_string());
"high"
}
1 => {
financial_columns.low = Some("low".to_string());
"low"
}
2 => {
financial_columns.close = Some("close".to_string());
"close"
}
3 => {
financial_columns.open = Some("open".to_string());
"open"
}
_ => "unknown",
};
col_names[i] = col_name.to_string();
identified_cols[i] = true;
}
}
for (i, name) in col_names.iter_mut().enumerate().take(n_cols) {
if name.is_empty() {
*name = format!("unknown_{}", i);
}
}
df.set_column_names(&col_names)?;
Ok(financial_columns)
}
fn get_numeric_columns_mean(df: &DataFrame, exclude_idx: usize) -> PolarsResult<f64> {
let mut sum = 0.0;
let mut count = 0;
for i in 0..df.width() {
if i == exclude_idx {
continue;
}
if let Some(series) = df.select_at_idx(i) {
if series.dtype().is_primitive_numeric() {
if let Ok(f64_series) = series.cast(&DataType::Float64) {
if let Ok(nums) = f64_series.f64() {
if let Some(mean) = nums.mean() {
sum += mean;
count += 1;
}
}
}
}
}
}
if count > 0 {
Ok(sum / count as f64)
} else {
Ok(0.0)
}
}