use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
use anyhow::Result;
use csv;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{BufRead, BufReader, Read};
use std::path::Path;
use std::sync::Arc;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct StringInterner {
strings: HashMap<String, Arc<String>>,
usage_count: HashMap<Arc<String>, usize>,
}
impl Default for StringInterner {
fn default() -> Self {
Self::new()
}
}
impl StringInterner {
#[must_use]
pub fn new() -> Self {
Self {
strings: HashMap::new(),
usage_count: HashMap::new(),
}
}
pub fn intern(&mut self, s: &str) -> Arc<String> {
if let Some(rc_str) = self.strings.get(s) {
let rc = rc_str.clone();
*self.usage_count.entry(rc.clone()).or_insert(0) += 1;
rc
} else {
let rc_str = Arc::new(s.to_string());
self.strings.insert(s.to_string(), rc_str.clone());
self.usage_count.insert(rc_str.clone(), 1);
rc_str
}
}
#[must_use]
pub fn stats(&self) -> InternerStats {
let total_strings = self.strings.len();
let total_references: usize = self.usage_count.values().sum();
let memory_saved = self.calculate_memory_saved();
InternerStats {
unique_strings: total_strings,
total_references,
memory_saved_bytes: memory_saved,
}
}
fn calculate_memory_saved(&self) -> usize {
let mut saved = 0;
for (rc_str, count) in &self.usage_count {
if *count > 1 {
saved += rc_str.len() * (*count - 1);
}
}
saved
}
}
#[derive(Debug)]
pub struct InternerStats {
pub unique_strings: usize,
pub total_references: usize,
pub memory_saved_bytes: usize,
}
#[derive(Debug)]
struct ColumnAnalysis {
index: usize,
_name: String,
_cardinality: usize,
_sample_size: usize,
_unique_ratio: f64,
is_categorical: bool,
_avg_string_length: usize,
}
pub struct AdvancedCsvLoader {
sample_size: usize,
cardinality_threshold: f64, interners: HashMap<usize, StringInterner>, }
impl AdvancedCsvLoader {
#[must_use]
pub fn new() -> Self {
Self {
sample_size: 1000, cardinality_threshold: 0.5, interners: HashMap::new(),
}
}
fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
info!("Analyzing CSV columns for optimization strategies");
let file = File::open(path)?;
let mut reader = csv::Reader::from_reader(file);
let headers = reader.headers()?.clone();
let num_columns = headers.len();
let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
let mut total_lengths: Vec<usize> = vec![0; num_columns];
let mut string_counts: Vec<usize> = vec![0; num_columns];
let mut row_count = 0;
for result in reader.records() {
if row_count >= self.sample_size {
break;
}
let record = result?;
for (col_idx, field) in record.iter().enumerate() {
if col_idx < num_columns {
if !field.is_empty() && field.parse::<f64>().is_err() {
unique_values[col_idx].insert(field.to_string());
total_lengths[col_idx] += field.len();
string_counts[col_idx] += 1;
}
}
}
row_count += 1;
}
let mut analyses = Vec::new();
for (idx, header) in headers.iter().enumerate() {
let cardinality = unique_values[idx].len();
let unique_ratio = if row_count > 0 {
cardinality as f64 / row_count as f64
} else {
1.0
};
let avg_length = if string_counts[idx] > 0 {
total_lengths[idx] / string_counts[idx]
} else {
0
};
let is_datetime = Self::is_likely_datetime(&unique_values[idx]);
let is_categorical = !is_datetime
&& (unique_ratio < self.cardinality_threshold
|| Self::is_likely_categorical(header, cardinality, avg_length));
analyses.push(ColumnAnalysis {
index: idx,
_name: header.to_string(),
_cardinality: cardinality,
_sample_size: row_count,
_unique_ratio: unique_ratio,
is_categorical,
_avg_string_length: avg_length,
});
if is_categorical {
debug!(
"Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
header, cardinality, row_count, unique_ratio
);
self.interners.insert(idx, StringInterner::new());
}
}
Ok(analyses)
}
fn is_likely_datetime(unique_values: &HashSet<String>) -> bool {
if unique_values.is_empty() {
return false;
}
let sample_size = unique_values.len().min(10);
let mut datetime_count = 0;
for (i, value) in unique_values.iter().enumerate() {
if i >= sample_size {
break;
}
if (value.contains('-') || value.contains('/') || value.contains(':'))
&& value.len() >= 8
&& value.len() <= 30
{
datetime_count += 1;
}
}
datetime_count >= (sample_size * 7) / 10 }
fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
let name_lower = name.to_lowercase();
let categorical_patterns = [
"status",
"state",
"type",
"category",
"class",
"group",
"country",
"region",
"city",
"currency",
"side",
"book",
"desk",
"trader",
"portfolio",
"strategy",
"exchange",
"venue",
"counterparty",
"product",
"instrument",
];
for pattern in &categorical_patterns {
if name_lower.contains(pattern) {
return true;
}
}
if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
return true;
}
cardinality < 100 && avg_length < 50
}
pub fn load_csv_from_reader<R: Read>(
&mut self,
reader: R,
table_name: &str,
source_path: &str,
) -> Result<DataTable> {
use crate::data::stream_loader::CsvReadOptions;
self.load_csv_from_reader_with_opts(
reader,
table_name,
source_path,
&CsvReadOptions::default(),
)
}
pub fn load_csv_from_reader_with_opts<R: Read>(
&mut self,
reader: R,
table_name: &str,
source_path: &str,
opts: &crate::data::stream_loader::CsvReadOptions,
) -> Result<DataTable> {
use crate::data::stream_loader::StreamCsvLoader;
let mut stream_loader = StreamCsvLoader::new();
stream_loader.load_csv_from_reader_with_opts(reader, table_name, "file", source_path, opts)
}
pub fn load_csv_optimized<P: AsRef<Path>>(
&mut self,
path: P,
table_name: &str,
) -> Result<DataTable> {
use crate::data::stream_loader::{detect_delimiter_from_path, CsvReadOptions};
let path_str = path.as_ref().display().to_string();
let opts = CsvReadOptions {
delimiter: detect_delimiter_from_path(&path_str),
has_headers: true,
};
self.load_csv_optimized_with_opts(path, table_name, &opts)
}
pub fn load_csv_optimized_with_opts<P: AsRef<Path>>(
&mut self,
path: P,
table_name: &str,
opts: &crate::data::stream_loader::CsvReadOptions,
) -> Result<DataTable> {
let path = path.as_ref();
info!(
"Advanced CSV load: Loading {} with optimizations",
path.display()
);
let file = File::open(path)?;
self.load_csv_from_reader_with_opts(file, table_name, &path.display().to_string(), opts)
}
pub fn load_csv_optimized_legacy<P: AsRef<Path>>(
&mut self,
path: P,
table_name: &str,
) -> Result<DataTable> {
let path = path.as_ref();
info!(
"Advanced CSV load: Loading {} with optimizations",
path.display()
);
crate::utils::memory_tracker::track_memory("advanced_csv_start");
let analyses = self.analyze_columns(path)?;
let categorical_columns: HashSet<usize> = analyses
.iter()
.filter(|a| a.is_categorical)
.map(|a| a.index)
.collect();
info!(
"Column analysis complete: {} of {} columns will use string interning",
categorical_columns.len(),
analyses.len()
);
let file = File::open(path)?;
let mut reader = csv::Reader::from_reader(file);
let headers = reader.headers()?.clone();
let mut table = DataTable::new(table_name);
for header in &headers {
table.add_column(DataColumn::new(header.to_string()));
}
crate::utils::memory_tracker::track_memory("advanced_csv_headers");
let file_size = std::fs::metadata(path)?.len();
let estimated_rows = (file_size / 100) as usize; table.reserve_rows(estimated_rows.min(1_000_000));
let file2 = File::open(path)?;
let mut line_reader = BufReader::new(file2);
let mut raw_line = String::new();
line_reader.read_line(&mut raw_line)?;
let mut row_count = 0;
for result in reader.records() {
let record = result?;
raw_line.clear();
line_reader.read_line(&mut raw_line)?;
let mut values = Vec::with_capacity(headers.len());
for (idx, field) in record.iter().enumerate() {
let value = if field.is_empty() {
if Self::is_null_field(&raw_line, idx) {
DataValue::Null
} else {
if categorical_columns.contains(&idx) {
if let Some(interner) = self.interners.get_mut(&idx) {
DataValue::InternedString(interner.intern(""))
} else {
DataValue::String(String::new())
}
} else {
DataValue::String(String::new())
}
}
} else if let Ok(b) = field.parse::<bool>() {
DataValue::Boolean(b)
} else if let Ok(i) = field.parse::<i64>() {
DataValue::Integer(i)
} else if let Ok(f) = field.parse::<f64>() {
DataValue::Float(f)
} else {
if (field.contains('-') || field.contains('/') || field.contains(':'))
&& field.len() >= 8
&& field.len() <= 30
{
DataValue::DateTime(field.to_string())
} else if categorical_columns.contains(&idx) {
if let Some(interner) = self.interners.get_mut(&idx) {
DataValue::InternedString(interner.intern(field))
} else {
DataValue::String(field.to_string())
}
} else {
DataValue::String(field.to_string())
}
};
values.push(value);
}
table
.add_row(DataRow::new(values))
.map_err(|e| anyhow::anyhow!(e))?;
row_count += 1;
if row_count % 10000 == 0 {
crate::utils::memory_tracker::track_memory(&format!(
"advanced_csv_{row_count}rows"
));
debug!("Loaded {} rows...", row_count);
}
}
table.shrink_to_fit();
table.infer_column_types();
crate::utils::memory_tracker::track_memory("advanced_csv_complete");
let mut total_saved = 0;
for (col_idx, interner) in &self.interners {
let stats = interner.stats();
if stats.memory_saved_bytes > 0 {
debug!(
"Column {} ('{}'): {} unique strings, {} references, saved {} KB",
col_idx,
headers.get(*col_idx).unwrap_or("?"),
stats.unique_strings,
stats.total_references,
stats.memory_saved_bytes / 1024
);
}
total_saved += stats.memory_saved_bytes;
}
info!(
"Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
table.row_count(),
table.column_count(),
table.estimate_memory_size() / 1024 / 1024,
total_saved / 1024
);
Ok(table)
}
fn is_null_field(raw_line: &str, field_index: usize) -> bool {
let mut comma_count = 0;
let mut in_quotes = false;
let mut field_start = 0;
let mut prev_char = ' ';
for (i, ch) in raw_line.chars().enumerate() {
if ch == '"' && prev_char != '\\' {
in_quotes = !in_quotes;
}
if ch == ',' && !in_quotes {
if comma_count == field_index {
let field_end = i;
let field_content = &raw_line[field_start..field_end].trim();
if field_content.is_empty() {
return true; }
if field_content.starts_with('"')
&& field_content.ends_with('"')
&& field_content.len() == 2
{
return false; }
return false; }
comma_count += 1;
field_start = i + 1;
}
prev_char = ch;
}
if comma_count == field_index {
let field_content = raw_line[field_start..]
.trim()
.trim_end_matches('\n')
.trim_end_matches('\r');
if field_content.is_empty() {
return true; }
if field_content.starts_with('"')
&& field_content.ends_with('"')
&& field_content.len() == 2
{
return false; }
return false; }
false }
#[must_use]
pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
self.interners
.iter()
.map(|(idx, interner)| (*idx, interner.stats()))
.collect()
}
}
impl Default for AdvancedCsvLoader {
fn default() -> Self {
Self::new()
}
}