1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12#[derive(Debug, Clone)]
14pub struct StringInterner {
15 strings: HashMap<String, Arc<String>>,
16 usage_count: HashMap<Arc<String>, usize>,
17}
18
19impl Default for StringInterner {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl StringInterner {
26 #[must_use]
27 pub fn new() -> Self {
28 Self {
29 strings: HashMap::new(),
30 usage_count: HashMap::new(),
31 }
32 }
33
34 pub fn intern(&mut self, s: &str) -> Arc<String> {
36 if let Some(rc_str) = self.strings.get(s) {
37 let rc = rc_str.clone();
38 *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
39 rc
40 } else {
41 let rc_str = Arc::new(s.to_string());
42 self.strings.insert(s.to_string(), rc_str.clone());
43 self.usage_count.insert(rc_str.clone(), 1);
44 rc_str
45 }
46 }
47
48 #[must_use]
50 pub fn stats(&self) -> InternerStats {
51 let total_strings = self.strings.len();
52 let total_references: usize = self.usage_count.values().sum();
53 let memory_saved = self.calculate_memory_saved();
54
55 InternerStats {
56 unique_strings: total_strings,
57 total_references,
58 memory_saved_bytes: memory_saved,
59 }
60 }
61
62 fn calculate_memory_saved(&self) -> usize {
63 let mut saved = 0;
64 for (rc_str, count) in &self.usage_count {
65 if *count > 1 {
66 saved += rc_str.len() * (*count - 1);
68 }
69 }
70 saved
71 }
72}
73
74#[derive(Debug)]
75pub struct InternerStats {
76 pub unique_strings: usize,
77 pub total_references: usize,
78 pub memory_saved_bytes: usize,
79}
80
81#[derive(Debug)]
83struct ColumnAnalysis {
84 index: usize,
85 name: String,
86 cardinality: usize,
87 sample_size: usize,
88 unique_ratio: f64,
89 is_categorical: bool,
90 avg_string_length: usize,
91}
92
93pub struct AdvancedCsvLoader {
94 sample_size: usize,
95 cardinality_threshold: f64, interners: HashMap<usize, StringInterner>, }
98
99impl AdvancedCsvLoader {
100 #[must_use]
101 pub fn new() -> Self {
102 Self {
103 sample_size: 1000, cardinality_threshold: 0.5, interners: HashMap::new(),
106 }
107 }
108
109 fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
111 info!("Analyzing CSV columns for optimization strategies");
112
113 let file = File::open(path)?;
114 let mut reader = csv::Reader::from_reader(file);
115 let headers = reader.headers()?.clone();
116
117 let num_columns = headers.len();
119 let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
120 let mut total_lengths: Vec<usize> = vec![0; num_columns];
121 let mut string_counts: Vec<usize> = vec![0; num_columns];
122
123 let mut row_count = 0;
125 for result in reader.records() {
126 if row_count >= self.sample_size {
127 break;
128 }
129
130 let record = result?;
131 for (col_idx, field) in record.iter().enumerate() {
132 if col_idx < num_columns {
133 if !field.is_empty() && field.parse::<f64>().is_err() {
135 unique_values[col_idx].insert(field.to_string());
136 total_lengths[col_idx] += field.len();
137 string_counts[col_idx] += 1;
138 }
139 }
140 }
141 row_count += 1;
142 }
143
144 let mut analyses = Vec::new();
146 for (idx, header) in headers.iter().enumerate() {
147 let cardinality = unique_values[idx].len();
148 let unique_ratio = if row_count > 0 {
149 cardinality as f64 / row_count as f64
150 } else {
151 1.0
152 };
153
154 let avg_length = if string_counts[idx] > 0 {
155 total_lengths[idx] / string_counts[idx]
156 } else {
157 0
158 };
159
160 let is_datetime = Self::is_likely_datetime(&unique_values[idx]);
162
163 let is_categorical = !is_datetime
165 && (unique_ratio < self.cardinality_threshold
166 || Self::is_likely_categorical(header, cardinality, avg_length));
167
168 analyses.push(ColumnAnalysis {
169 index: idx,
170 name: header.to_string(),
171 cardinality,
172 sample_size: row_count,
173 unique_ratio,
174 is_categorical,
175 avg_string_length: avg_length,
176 });
177
178 if is_categorical {
179 debug!(
180 "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
181 header, cardinality, row_count, unique_ratio
182 );
183 self.interners.insert(idx, StringInterner::new());
184 }
185 }
186
187 Ok(analyses)
188 }
189
190 fn is_likely_datetime(unique_values: &HashSet<String>) -> bool {
192 if unique_values.is_empty() {
193 return false;
194 }
195
196 let sample_size = unique_values.len().min(10);
198 let mut datetime_count = 0;
199
200 for (i, value) in unique_values.iter().enumerate() {
201 if i >= sample_size {
202 break;
203 }
204
205 if (value.contains('-') || value.contains('/') || value.contains(':'))
207 && value.len() >= 8
208 && value.len() <= 30
209 {
210 datetime_count += 1;
211 }
212 }
213
214 datetime_count >= (sample_size * 7) / 10 }
217
218 fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
220 let name_lower = name.to_lowercase();
221
222 let categorical_patterns = [
224 "status",
225 "state",
226 "type",
227 "category",
228 "class",
229 "group",
230 "country",
231 "region",
232 "city",
233 "currency",
234 "side",
235 "book",
236 "desk",
237 "trader",
238 "portfolio",
239 "strategy",
240 "exchange",
241 "venue",
242 "counterparty",
243 "product",
244 "instrument",
245 ];
246
247 for pattern in &categorical_patterns {
248 if name_lower.contains(pattern) {
249 return true;
250 }
251 }
252
253 if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
255 return true;
256 }
257
258 cardinality < 100 && avg_length < 50
260 }
261
262 pub fn load_csv_optimized<P: AsRef<Path>>(
264 &mut self,
265 path: P,
266 table_name: &str,
267 ) -> Result<DataTable> {
268 let path = path.as_ref();
269 info!(
270 "Advanced CSV load: Loading {} with optimizations",
271 path.display()
272 );
273
274 crate::utils::memory_tracker::track_memory("advanced_csv_start");
276
277 let analyses = self.analyze_columns(path)?;
279 let categorical_columns: HashSet<usize> = analyses
280 .iter()
281 .filter(|a| a.is_categorical)
282 .map(|a| a.index)
283 .collect();
284
285 info!(
286 "Column analysis complete: {} of {} columns will use string interning",
287 categorical_columns.len(),
288 analyses.len()
289 );
290
291 let file = File::open(path)?;
293 let mut reader = csv::Reader::from_reader(file);
294 let headers = reader.headers()?.clone();
295
296 let mut table = DataTable::new(table_name);
297 for header in &headers {
298 table.add_column(DataColumn::new(header.to_string()));
299 }
300
301 crate::utils::memory_tracker::track_memory("advanced_csv_headers");
302
303 let file_size = std::fs::metadata(path)?.len();
305 let estimated_rows = (file_size / 100) as usize; table.reserve_rows(estimated_rows.min(1_000_000)); let file2 = File::open(path)?;
310 let mut line_reader = BufReader::new(file2);
311 let mut raw_line = String::new();
312 line_reader.read_line(&mut raw_line)?;
314
315 let mut row_count = 0;
317 for result in reader.records() {
318 let record = result?;
319
320 raw_line.clear();
322 line_reader.read_line(&mut raw_line)?;
323
324 let mut values = Vec::with_capacity(headers.len());
325
326 for (idx, field) in record.iter().enumerate() {
327 let value = if field.is_empty() {
328 if Self::is_null_field(&raw_line, idx) {
330 DataValue::Null
331 } else {
332 if categorical_columns.contains(&idx) {
334 if let Some(interner) = self.interners.get_mut(&idx) {
335 DataValue::InternedString(interner.intern(""))
336 } else {
337 DataValue::String(String::new())
338 }
339 } else {
340 DataValue::String(String::new())
341 }
342 }
343 } else if let Ok(b) = field.parse::<bool>() {
344 DataValue::Boolean(b)
345 } else if let Ok(i) = field.parse::<i64>() {
346 DataValue::Integer(i)
347 } else if let Ok(f) = field.parse::<f64>() {
348 DataValue::Float(f)
349 } else {
350 if (field.contains('-') || field.contains('/') || field.contains(':'))
353 && field.len() >= 8
354 && field.len() <= 30
355 {
356 DataValue::DateTime(field.to_string())
357 } else if categorical_columns.contains(&idx) {
358 if let Some(interner) = self.interners.get_mut(&idx) {
360 DataValue::InternedString(interner.intern(field))
362 } else {
363 DataValue::String(field.to_string())
364 }
365 } else {
366 DataValue::String(field.to_string())
367 }
368 };
369 values.push(value);
370 }
371
372 table
373 .add_row(DataRow::new(values))
374 .map_err(|e| anyhow::anyhow!(e))?;
375 row_count += 1;
376
377 if row_count % 10000 == 0 {
379 crate::utils::memory_tracker::track_memory(&format!(
380 "advanced_csv_{row_count}rows"
381 ));
382 debug!("Loaded {} rows...", row_count);
383 }
384 }
385
386 table.shrink_to_fit();
388
389 table.infer_column_types();
391
392 crate::utils::memory_tracker::track_memory("advanced_csv_complete");
393
394 let mut total_saved = 0;
396 for (col_idx, interner) in &self.interners {
397 let stats = interner.stats();
398 if stats.memory_saved_bytes > 0 {
399 debug!(
400 "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
401 col_idx,
402 headers.get(*col_idx).unwrap_or("?"),
403 stats.unique_strings,
404 stats.total_references,
405 stats.memory_saved_bytes / 1024
406 );
407 }
408 total_saved += stats.memory_saved_bytes;
409 }
410
411 info!(
412 "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
413 table.row_count(),
414 table.column_count(),
415 table.estimate_memory_size() / 1024 / 1024,
416 total_saved / 1024
417 );
418
419 Ok(table)
420 }
421
422 fn is_null_field(raw_line: &str, field_index: usize) -> bool {
424 let mut comma_count = 0;
425 let mut in_quotes = false;
426 let mut field_start = 0;
427 let mut prev_char = ' ';
428
429 for (i, ch) in raw_line.chars().enumerate() {
430 if ch == '"' && prev_char != '\\' {
431 in_quotes = !in_quotes;
432 }
433
434 if ch == ',' && !in_quotes {
435 if comma_count == field_index {
436 let field_end = i;
437 let field_content = &raw_line[field_start..field_end].trim();
438 if field_content.is_empty() {
440 return true; }
442 if field_content.starts_with('"')
444 && field_content.ends_with('"')
445 && field_content.len() == 2
446 {
447 return false; }
449 return false; }
451 comma_count += 1;
452 field_start = i + 1;
453 }
454 prev_char = ch;
455 }
456
457 if comma_count == field_index {
459 let field_content = raw_line[field_start..]
460 .trim()
461 .trim_end_matches('\n')
462 .trim_end_matches('\r');
463 if field_content.is_empty() {
465 return true; }
467 if field_content.starts_with('"')
469 && field_content.ends_with('"')
470 && field_content.len() == 2
471 {
472 return false; }
474 return false; }
476
477 false }
479
480 #[must_use]
482 pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
483 self.interners
484 .iter()
485 .map(|(idx, interner)| (*idx, interner.stats()))
486 .collect()
487 }
488}
489
490impl Default for AdvancedCsvLoader {
491 fn default() -> Self {
492 Self::new()
493 }
494}