1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::io::{BufRead, BufReader, Read};
8use std::path::Path;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12#[derive(Debug, Clone)]
14pub struct StringInterner {
15 strings: HashMap<String, Arc<String>>,
16 usage_count: HashMap<Arc<String>, usize>,
17}
18
19impl Default for StringInterner {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl StringInterner {
26 #[must_use]
27 pub fn new() -> Self {
28 Self {
29 strings: HashMap::new(),
30 usage_count: HashMap::new(),
31 }
32 }
33
34 pub fn intern(&mut self, s: &str) -> Arc<String> {
36 if let Some(rc_str) = self.strings.get(s) {
37 let rc = rc_str.clone();
38 *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
39 rc
40 } else {
41 let rc_str = Arc::new(s.to_string());
42 self.strings.insert(s.to_string(), rc_str.clone());
43 self.usage_count.insert(rc_str.clone(), 1);
44 rc_str
45 }
46 }
47
48 #[must_use]
50 pub fn stats(&self) -> InternerStats {
51 let total_strings = self.strings.len();
52 let total_references: usize = self.usage_count.values().sum();
53 let memory_saved = self.calculate_memory_saved();
54
55 InternerStats {
56 unique_strings: total_strings,
57 total_references,
58 memory_saved_bytes: memory_saved,
59 }
60 }
61
62 fn calculate_memory_saved(&self) -> usize {
63 let mut saved = 0;
64 for (rc_str, count) in &self.usage_count {
65 if *count > 1 {
66 saved += rc_str.len() * (*count - 1);
68 }
69 }
70 saved
71 }
72}
73
74#[derive(Debug)]
75pub struct InternerStats {
76 pub unique_strings: usize,
77 pub total_references: usize,
78 pub memory_saved_bytes: usize,
79}
80
81#[derive(Debug)]
83struct ColumnAnalysis {
84 index: usize,
85 _name: String,
86 _cardinality: usize,
87 _sample_size: usize,
88 _unique_ratio: f64,
89 is_categorical: bool,
90 _avg_string_length: usize,
91}
92
93pub struct AdvancedCsvLoader {
94 sample_size: usize,
95 cardinality_threshold: f64, interners: HashMap<usize, StringInterner>, }
98
99impl AdvancedCsvLoader {
100 #[must_use]
101 pub fn new() -> Self {
102 Self {
103 sample_size: 1000, cardinality_threshold: 0.5, interners: HashMap::new(),
106 }
107 }
108
109 fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
111 info!("Analyzing CSV columns for optimization strategies");
112
113 let file = File::open(path)?;
114 let mut reader = csv::Reader::from_reader(file);
115 let headers = reader.headers()?.clone();
116
117 let num_columns = headers.len();
119 let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
120 let mut total_lengths: Vec<usize> = vec![0; num_columns];
121 let mut string_counts: Vec<usize> = vec![0; num_columns];
122
123 let mut row_count = 0;
125 for result in reader.records() {
126 if row_count >= self.sample_size {
127 break;
128 }
129
130 let record = result?;
131 for (col_idx, field) in record.iter().enumerate() {
132 if col_idx < num_columns {
133 if !field.is_empty() && field.parse::<f64>().is_err() {
135 unique_values[col_idx].insert(field.to_string());
136 total_lengths[col_idx] += field.len();
137 string_counts[col_idx] += 1;
138 }
139 }
140 }
141 row_count += 1;
142 }
143
144 let mut analyses = Vec::new();
146 for (idx, header) in headers.iter().enumerate() {
147 let cardinality = unique_values[idx].len();
148 let unique_ratio = if row_count > 0 {
149 cardinality as f64 / row_count as f64
150 } else {
151 1.0
152 };
153
154 let avg_length = if string_counts[idx] > 0 {
155 total_lengths[idx] / string_counts[idx]
156 } else {
157 0
158 };
159
160 let is_datetime = Self::is_likely_datetime(&unique_values[idx]);
162
163 let is_categorical = !is_datetime
165 && (unique_ratio < self.cardinality_threshold
166 || Self::is_likely_categorical(header, cardinality, avg_length));
167
168 analyses.push(ColumnAnalysis {
169 index: idx,
170 _name: header.to_string(),
171 _cardinality: cardinality,
172 _sample_size: row_count,
173 _unique_ratio: unique_ratio,
174 is_categorical,
175 _avg_string_length: avg_length,
176 });
177
178 if is_categorical {
179 debug!(
180 "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
181 header, cardinality, row_count, unique_ratio
182 );
183 self.interners.insert(idx, StringInterner::new());
184 }
185 }
186
187 Ok(analyses)
188 }
189
190 fn is_likely_datetime(unique_values: &HashSet<String>) -> bool {
192 if unique_values.is_empty() {
193 return false;
194 }
195
196 let sample_size = unique_values.len().min(10);
198 let mut datetime_count = 0;
199
200 for (i, value) in unique_values.iter().enumerate() {
201 if i >= sample_size {
202 break;
203 }
204
205 if (value.contains('-') || value.contains('/') || value.contains(':'))
207 && value.len() >= 8
208 && value.len() <= 30
209 {
210 datetime_count += 1;
211 }
212 }
213
214 datetime_count >= (sample_size * 7) / 10 }
217
218 fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
220 let name_lower = name.to_lowercase();
221
222 let categorical_patterns = [
224 "status",
225 "state",
226 "type",
227 "category",
228 "class",
229 "group",
230 "country",
231 "region",
232 "city",
233 "currency",
234 "side",
235 "book",
236 "desk",
237 "trader",
238 "portfolio",
239 "strategy",
240 "exchange",
241 "venue",
242 "counterparty",
243 "product",
244 "instrument",
245 ];
246
247 for pattern in &categorical_patterns {
248 if name_lower.contains(pattern) {
249 return true;
250 }
251 }
252
253 if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
255 return true;
256 }
257
258 cardinality < 100 && avg_length < 50
260 }
261
262 pub fn load_csv_from_reader<R: Read>(
264 &mut self,
265 reader: R,
266 table_name: &str,
267 source_path: &str,
268 ) -> Result<DataTable> {
269 use crate::data::stream_loader::CsvReadOptions;
270 self.load_csv_from_reader_with_opts(
271 reader,
272 table_name,
273 source_path,
274 &CsvReadOptions::default(),
275 )
276 }
277
278 pub fn load_csv_from_reader_with_opts<R: Read>(
280 &mut self,
281 reader: R,
282 table_name: &str,
283 source_path: &str,
284 opts: &crate::data::stream_loader::CsvReadOptions,
285 ) -> Result<DataTable> {
286 use crate::data::stream_loader::StreamCsvLoader;
287
288 let mut stream_loader = StreamCsvLoader::new();
289 stream_loader.load_csv_from_reader_with_opts(reader, table_name, "file", source_path, opts)
290 }
291
292 pub fn load_csv_optimized<P: AsRef<Path>>(
296 &mut self,
297 path: P,
298 table_name: &str,
299 ) -> Result<DataTable> {
300 use crate::data::stream_loader::{detect_delimiter_from_path, CsvReadOptions};
301 let path_str = path.as_ref().display().to_string();
302 let opts = CsvReadOptions {
303 delimiter: detect_delimiter_from_path(&path_str),
304 has_headers: true,
305 };
306 self.load_csv_optimized_with_opts(path, table_name, &opts)
307 }
308
309 pub fn load_csv_optimized_with_opts<P: AsRef<Path>>(
311 &mut self,
312 path: P,
313 table_name: &str,
314 opts: &crate::data::stream_loader::CsvReadOptions,
315 ) -> Result<DataTable> {
316 let path = path.as_ref();
317 info!(
318 "Advanced CSV load: Loading {} with optimizations",
319 path.display()
320 );
321
322 let file = File::open(path)?;
323 self.load_csv_from_reader_with_opts(file, table_name, &path.display().to_string(), opts)
324 }
325
326 pub fn load_csv_optimized_legacy<P: AsRef<Path>>(
328 &mut self,
329 path: P,
330 table_name: &str,
331 ) -> Result<DataTable> {
332 let path = path.as_ref();
333 info!(
334 "Advanced CSV load: Loading {} with optimizations",
335 path.display()
336 );
337
338 crate::utils::memory_tracker::track_memory("advanced_csv_start");
340
341 let analyses = self.analyze_columns(path)?;
343 let categorical_columns: HashSet<usize> = analyses
344 .iter()
345 .filter(|a| a.is_categorical)
346 .map(|a| a.index)
347 .collect();
348
349 info!(
350 "Column analysis complete: {} of {} columns will use string interning",
351 categorical_columns.len(),
352 analyses.len()
353 );
354
355 let file = File::open(path)?;
357 let mut reader = csv::Reader::from_reader(file);
358 let headers = reader.headers()?.clone();
359
360 let mut table = DataTable::new(table_name);
361 for header in &headers {
362 table.add_column(DataColumn::new(header.to_string()));
363 }
364
365 crate::utils::memory_tracker::track_memory("advanced_csv_headers");
366
367 let file_size = std::fs::metadata(path)?.len();
369 let estimated_rows = (file_size / 100) as usize; table.reserve_rows(estimated_rows.min(1_000_000)); let file2 = File::open(path)?;
374 let mut line_reader = BufReader::new(file2);
375 let mut raw_line = String::new();
376 line_reader.read_line(&mut raw_line)?;
378
379 let mut row_count = 0;
381 for result in reader.records() {
382 let record = result?;
383
384 raw_line.clear();
386 line_reader.read_line(&mut raw_line)?;
387
388 let mut values = Vec::with_capacity(headers.len());
389
390 for (idx, field) in record.iter().enumerate() {
391 let value = if field.is_empty() {
392 if Self::is_null_field(&raw_line, idx) {
394 DataValue::Null
395 } else {
396 if categorical_columns.contains(&idx) {
398 if let Some(interner) = self.interners.get_mut(&idx) {
399 DataValue::InternedString(interner.intern(""))
400 } else {
401 DataValue::String(String::new())
402 }
403 } else {
404 DataValue::String(String::new())
405 }
406 }
407 } else if let Ok(b) = field.parse::<bool>() {
408 DataValue::Boolean(b)
409 } else if let Ok(i) = field.parse::<i64>() {
410 DataValue::Integer(i)
411 } else if let Ok(f) = field.parse::<f64>() {
412 DataValue::Float(f)
413 } else {
414 if (field.contains('-') || field.contains('/') || field.contains(':'))
417 && field.len() >= 8
418 && field.len() <= 30
419 {
420 DataValue::DateTime(field.to_string())
421 } else if categorical_columns.contains(&idx) {
422 if let Some(interner) = self.interners.get_mut(&idx) {
424 DataValue::InternedString(interner.intern(field))
426 } else {
427 DataValue::String(field.to_string())
428 }
429 } else {
430 DataValue::String(field.to_string())
431 }
432 };
433 values.push(value);
434 }
435
436 table
437 .add_row(DataRow::new(values))
438 .map_err(|e| anyhow::anyhow!(e))?;
439 row_count += 1;
440
441 if row_count % 10000 == 0 {
443 crate::utils::memory_tracker::track_memory(&format!(
444 "advanced_csv_{row_count}rows"
445 ));
446 debug!("Loaded {} rows...", row_count);
447 }
448 }
449
450 table.shrink_to_fit();
452
453 table.infer_column_types();
455
456 crate::utils::memory_tracker::track_memory("advanced_csv_complete");
457
458 let mut total_saved = 0;
460 for (col_idx, interner) in &self.interners {
461 let stats = interner.stats();
462 if stats.memory_saved_bytes > 0 {
463 debug!(
464 "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
465 col_idx,
466 headers.get(*col_idx).unwrap_or("?"),
467 stats.unique_strings,
468 stats.total_references,
469 stats.memory_saved_bytes / 1024
470 );
471 }
472 total_saved += stats.memory_saved_bytes;
473 }
474
475 info!(
476 "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
477 table.row_count(),
478 table.column_count(),
479 table.estimate_memory_size() / 1024 / 1024,
480 total_saved / 1024
481 );
482
483 Ok(table)
484 }
485
486 fn is_null_field(raw_line: &str, field_index: usize) -> bool {
488 let mut comma_count = 0;
489 let mut in_quotes = false;
490 let mut field_start = 0;
491 let mut prev_char = ' ';
492
493 for (i, ch) in raw_line.chars().enumerate() {
494 if ch == '"' && prev_char != '\\' {
495 in_quotes = !in_quotes;
496 }
497
498 if ch == ',' && !in_quotes {
499 if comma_count == field_index {
500 let field_end = i;
501 let field_content = &raw_line[field_start..field_end].trim();
502 if field_content.is_empty() {
504 return true; }
506 if field_content.starts_with('"')
508 && field_content.ends_with('"')
509 && field_content.len() == 2
510 {
511 return false; }
513 return false; }
515 comma_count += 1;
516 field_start = i + 1;
517 }
518 prev_char = ch;
519 }
520
521 if comma_count == field_index {
523 let field_content = raw_line[field_start..]
524 .trim()
525 .trim_end_matches('\n')
526 .trim_end_matches('\r');
527 if field_content.is_empty() {
529 return true; }
531 if field_content.starts_with('"')
533 && field_content.ends_with('"')
534 && field_content.len() == 2
535 {
536 return false; }
538 return false; }
540
541 false }
543
544 #[must_use]
546 pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
547 self.interners
548 .iter()
549 .map(|(idx, interner)| (*idx, interner.stats()))
550 .collect()
551 }
552}
553
554impl Default for AdvancedCsvLoader {
555 fn default() -> Self {
556 Self::new()
557 }
558}