1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::io::{BufRead, BufReader, Read};
8use std::path::Path;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12#[derive(Debug, Clone)]
14pub struct StringInterner {
15 strings: HashMap<String, Arc<String>>,
16 usage_count: HashMap<Arc<String>, usize>,
17}
18
19impl Default for StringInterner {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl StringInterner {
26 #[must_use]
27 pub fn new() -> Self {
28 Self {
29 strings: HashMap::new(),
30 usage_count: HashMap::new(),
31 }
32 }
33
34 pub fn intern(&mut self, s: &str) -> Arc<String> {
36 if let Some(rc_str) = self.strings.get(s) {
37 let rc = rc_str.clone();
38 *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
39 rc
40 } else {
41 let rc_str = Arc::new(s.to_string());
42 self.strings.insert(s.to_string(), rc_str.clone());
43 self.usage_count.insert(rc_str.clone(), 1);
44 rc_str
45 }
46 }
47
48 #[must_use]
50 pub fn stats(&self) -> InternerStats {
51 let total_strings = self.strings.len();
52 let total_references: usize = self.usage_count.values().sum();
53 let memory_saved = self.calculate_memory_saved();
54
55 InternerStats {
56 unique_strings: total_strings,
57 total_references,
58 memory_saved_bytes: memory_saved,
59 }
60 }
61
62 fn calculate_memory_saved(&self) -> usize {
63 let mut saved = 0;
64 for (rc_str, count) in &self.usage_count {
65 if *count > 1 {
66 saved += rc_str.len() * (*count - 1);
68 }
69 }
70 saved
71 }
72}
73
74#[derive(Debug)]
75pub struct InternerStats {
76 pub unique_strings: usize,
77 pub total_references: usize,
78 pub memory_saved_bytes: usize,
79}
80
81#[derive(Debug)]
83struct ColumnAnalysis {
84 index: usize,
85 name: String,
86 cardinality: usize,
87 sample_size: usize,
88 unique_ratio: f64,
89 is_categorical: bool,
90 avg_string_length: usize,
91}
92
93pub struct AdvancedCsvLoader {
94 sample_size: usize,
95 cardinality_threshold: f64, interners: HashMap<usize, StringInterner>, }
98
99impl AdvancedCsvLoader {
100 #[must_use]
101 pub fn new() -> Self {
102 Self {
103 sample_size: 1000, cardinality_threshold: 0.5, interners: HashMap::new(),
106 }
107 }
108
109 fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
111 info!("Analyzing CSV columns for optimization strategies");
112
113 let file = File::open(path)?;
114 let mut reader = csv::Reader::from_reader(file);
115 let headers = reader.headers()?.clone();
116
117 let num_columns = headers.len();
119 let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
120 let mut total_lengths: Vec<usize> = vec![0; num_columns];
121 let mut string_counts: Vec<usize> = vec![0; num_columns];
122
123 let mut row_count = 0;
125 for result in reader.records() {
126 if row_count >= self.sample_size {
127 break;
128 }
129
130 let record = result?;
131 for (col_idx, field) in record.iter().enumerate() {
132 if col_idx < num_columns {
133 if !field.is_empty() && field.parse::<f64>().is_err() {
135 unique_values[col_idx].insert(field.to_string());
136 total_lengths[col_idx] += field.len();
137 string_counts[col_idx] += 1;
138 }
139 }
140 }
141 row_count += 1;
142 }
143
144 let mut analyses = Vec::new();
146 for (idx, header) in headers.iter().enumerate() {
147 let cardinality = unique_values[idx].len();
148 let unique_ratio = if row_count > 0 {
149 cardinality as f64 / row_count as f64
150 } else {
151 1.0
152 };
153
154 let avg_length = if string_counts[idx] > 0 {
155 total_lengths[idx] / string_counts[idx]
156 } else {
157 0
158 };
159
160 let is_datetime = Self::is_likely_datetime(&unique_values[idx]);
162
163 let is_categorical = !is_datetime
165 && (unique_ratio < self.cardinality_threshold
166 || Self::is_likely_categorical(header, cardinality, avg_length));
167
168 analyses.push(ColumnAnalysis {
169 index: idx,
170 name: header.to_string(),
171 cardinality,
172 sample_size: row_count,
173 unique_ratio,
174 is_categorical,
175 avg_string_length: avg_length,
176 });
177
178 if is_categorical {
179 debug!(
180 "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
181 header, cardinality, row_count, unique_ratio
182 );
183 self.interners.insert(idx, StringInterner::new());
184 }
185 }
186
187 Ok(analyses)
188 }
189
190 fn is_likely_datetime(unique_values: &HashSet<String>) -> bool {
192 if unique_values.is_empty() {
193 return false;
194 }
195
196 let sample_size = unique_values.len().min(10);
198 let mut datetime_count = 0;
199
200 for (i, value) in unique_values.iter().enumerate() {
201 if i >= sample_size {
202 break;
203 }
204
205 if (value.contains('-') || value.contains('/') || value.contains(':'))
207 && value.len() >= 8
208 && value.len() <= 30
209 {
210 datetime_count += 1;
211 }
212 }
213
214 datetime_count >= (sample_size * 7) / 10 }
217
218 fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
220 let name_lower = name.to_lowercase();
221
222 let categorical_patterns = [
224 "status",
225 "state",
226 "type",
227 "category",
228 "class",
229 "group",
230 "country",
231 "region",
232 "city",
233 "currency",
234 "side",
235 "book",
236 "desk",
237 "trader",
238 "portfolio",
239 "strategy",
240 "exchange",
241 "venue",
242 "counterparty",
243 "product",
244 "instrument",
245 ];
246
247 for pattern in &categorical_patterns {
248 if name_lower.contains(pattern) {
249 return true;
250 }
251 }
252
253 if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
255 return true;
256 }
257
258 cardinality < 100 && avg_length < 50
260 }
261
262 pub fn load_csv_from_reader<R: Read>(
264 &mut self,
265 reader: R,
266 table_name: &str,
267 source_path: &str,
268 ) -> Result<DataTable> {
269 use crate::data::stream_loader::StreamCsvLoader;
270
271 let mut stream_loader = StreamCsvLoader::new();
273 stream_loader.load_csv_from_reader(reader, table_name, "file", source_path)
274 }
275
276 pub fn load_csv_optimized<P: AsRef<Path>>(
278 &mut self,
279 path: P,
280 table_name: &str,
281 ) -> Result<DataTable> {
282 let path = path.as_ref();
283 info!(
284 "Advanced CSV load: Loading {} with optimizations",
285 path.display()
286 );
287
288 let file = File::open(path)?;
290 self.load_csv_from_reader(file, table_name, &path.display().to_string())
291 }
292
293 pub fn load_csv_optimized_legacy<P: AsRef<Path>>(
295 &mut self,
296 path: P,
297 table_name: &str,
298 ) -> Result<DataTable> {
299 let path = path.as_ref();
300 info!(
301 "Advanced CSV load: Loading {} with optimizations",
302 path.display()
303 );
304
305 crate::utils::memory_tracker::track_memory("advanced_csv_start");
307
308 let analyses = self.analyze_columns(path)?;
310 let categorical_columns: HashSet<usize> = analyses
311 .iter()
312 .filter(|a| a.is_categorical)
313 .map(|a| a.index)
314 .collect();
315
316 info!(
317 "Column analysis complete: {} of {} columns will use string interning",
318 categorical_columns.len(),
319 analyses.len()
320 );
321
322 let file = File::open(path)?;
324 let mut reader = csv::Reader::from_reader(file);
325 let headers = reader.headers()?.clone();
326
327 let mut table = DataTable::new(table_name);
328 for header in &headers {
329 table.add_column(DataColumn::new(header.to_string()));
330 }
331
332 crate::utils::memory_tracker::track_memory("advanced_csv_headers");
333
334 let file_size = std::fs::metadata(path)?.len();
336 let estimated_rows = (file_size / 100) as usize; table.reserve_rows(estimated_rows.min(1_000_000)); let file2 = File::open(path)?;
341 let mut line_reader = BufReader::new(file2);
342 let mut raw_line = String::new();
343 line_reader.read_line(&mut raw_line)?;
345
346 let mut row_count = 0;
348 for result in reader.records() {
349 let record = result?;
350
351 raw_line.clear();
353 line_reader.read_line(&mut raw_line)?;
354
355 let mut values = Vec::with_capacity(headers.len());
356
357 for (idx, field) in record.iter().enumerate() {
358 let value = if field.is_empty() {
359 if Self::is_null_field(&raw_line, idx) {
361 DataValue::Null
362 } else {
363 if categorical_columns.contains(&idx) {
365 if let Some(interner) = self.interners.get_mut(&idx) {
366 DataValue::InternedString(interner.intern(""))
367 } else {
368 DataValue::String(String::new())
369 }
370 } else {
371 DataValue::String(String::new())
372 }
373 }
374 } else if let Ok(b) = field.parse::<bool>() {
375 DataValue::Boolean(b)
376 } else if let Ok(i) = field.parse::<i64>() {
377 DataValue::Integer(i)
378 } else if let Ok(f) = field.parse::<f64>() {
379 DataValue::Float(f)
380 } else {
381 if (field.contains('-') || field.contains('/') || field.contains(':'))
384 && field.len() >= 8
385 && field.len() <= 30
386 {
387 DataValue::DateTime(field.to_string())
388 } else if categorical_columns.contains(&idx) {
389 if let Some(interner) = self.interners.get_mut(&idx) {
391 DataValue::InternedString(interner.intern(field))
393 } else {
394 DataValue::String(field.to_string())
395 }
396 } else {
397 DataValue::String(field.to_string())
398 }
399 };
400 values.push(value);
401 }
402
403 table
404 .add_row(DataRow::new(values))
405 .map_err(|e| anyhow::anyhow!(e))?;
406 row_count += 1;
407
408 if row_count % 10000 == 0 {
410 crate::utils::memory_tracker::track_memory(&format!(
411 "advanced_csv_{row_count}rows"
412 ));
413 debug!("Loaded {} rows...", row_count);
414 }
415 }
416
417 table.shrink_to_fit();
419
420 table.infer_column_types();
422
423 crate::utils::memory_tracker::track_memory("advanced_csv_complete");
424
425 let mut total_saved = 0;
427 for (col_idx, interner) in &self.interners {
428 let stats = interner.stats();
429 if stats.memory_saved_bytes > 0 {
430 debug!(
431 "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
432 col_idx,
433 headers.get(*col_idx).unwrap_or("?"),
434 stats.unique_strings,
435 stats.total_references,
436 stats.memory_saved_bytes / 1024
437 );
438 }
439 total_saved += stats.memory_saved_bytes;
440 }
441
442 info!(
443 "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
444 table.row_count(),
445 table.column_count(),
446 table.estimate_memory_size() / 1024 / 1024,
447 total_saved / 1024
448 );
449
450 Ok(table)
451 }
452
453 fn is_null_field(raw_line: &str, field_index: usize) -> bool {
455 let mut comma_count = 0;
456 let mut in_quotes = false;
457 let mut field_start = 0;
458 let mut prev_char = ' ';
459
460 for (i, ch) in raw_line.chars().enumerate() {
461 if ch == '"' && prev_char != '\\' {
462 in_quotes = !in_quotes;
463 }
464
465 if ch == ',' && !in_quotes {
466 if comma_count == field_index {
467 let field_end = i;
468 let field_content = &raw_line[field_start..field_end].trim();
469 if field_content.is_empty() {
471 return true; }
473 if field_content.starts_with('"')
475 && field_content.ends_with('"')
476 && field_content.len() == 2
477 {
478 return false; }
480 return false; }
482 comma_count += 1;
483 field_start = i + 1;
484 }
485 prev_char = ch;
486 }
487
488 if comma_count == field_index {
490 let field_content = raw_line[field_start..]
491 .trim()
492 .trim_end_matches('\n')
493 .trim_end_matches('\r');
494 if field_content.is_empty() {
496 return true; }
498 if field_content.starts_with('"')
500 && field_content.ends_with('"')
501 && field_content.len() == 2
502 {
503 return false; }
505 return false; }
507
508 false }
510
511 #[must_use]
513 pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
514 self.interners
515 .iter()
516 .map(|(idx, interner)| (*idx, interner.stats()))
517 .collect()
518 }
519}
520
521impl Default for AdvancedCsvLoader {
522 fn default() -> Self {
523 Self::new()
524 }
525}