1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12#[derive(Debug, Clone)]
14pub struct StringInterner {
15 strings: HashMap<String, Arc<String>>,
16 usage_count: HashMap<Arc<String>, usize>,
17}
18
19impl Default for StringInterner {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl StringInterner {
26 #[must_use]
27 pub fn new() -> Self {
28 Self {
29 strings: HashMap::new(),
30 usage_count: HashMap::new(),
31 }
32 }
33
34 pub fn intern(&mut self, s: &str) -> Arc<String> {
36 if let Some(rc_str) = self.strings.get(s) {
37 let rc = rc_str.clone();
38 *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
39 rc
40 } else {
41 let rc_str = Arc::new(s.to_string());
42 self.strings.insert(s.to_string(), rc_str.clone());
43 self.usage_count.insert(rc_str.clone(), 1);
44 rc_str
45 }
46 }
47
48 #[must_use]
50 pub fn stats(&self) -> InternerStats {
51 let total_strings = self.strings.len();
52 let total_references: usize = self.usage_count.values().sum();
53 let memory_saved = self.calculate_memory_saved();
54
55 InternerStats {
56 unique_strings: total_strings,
57 total_references,
58 memory_saved_bytes: memory_saved,
59 }
60 }
61
62 fn calculate_memory_saved(&self) -> usize {
63 let mut saved = 0;
64 for (rc_str, count) in &self.usage_count {
65 if *count > 1 {
66 saved += rc_str.len() * (*count - 1);
68 }
69 }
70 saved
71 }
72}
73
74#[derive(Debug)]
75pub struct InternerStats {
76 pub unique_strings: usize,
77 pub total_references: usize,
78 pub memory_saved_bytes: usize,
79}
80
81#[derive(Debug)]
83struct ColumnAnalysis {
84 index: usize,
85 name: String,
86 cardinality: usize,
87 sample_size: usize,
88 unique_ratio: f64,
89 is_categorical: bool,
90 avg_string_length: usize,
91}
92
93pub struct AdvancedCsvLoader {
94 sample_size: usize,
95 cardinality_threshold: f64, interners: HashMap<usize, StringInterner>, }
98
99impl AdvancedCsvLoader {
100 #[must_use]
101 pub fn new() -> Self {
102 Self {
103 sample_size: 1000, cardinality_threshold: 0.5, interners: HashMap::new(),
106 }
107 }
108
109 fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
111 info!("Analyzing CSV columns for optimization strategies");
112
113 let file = File::open(path)?;
114 let mut reader = csv::Reader::from_reader(file);
115 let headers = reader.headers()?.clone();
116
117 let num_columns = headers.len();
119 let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
120 let mut total_lengths: Vec<usize> = vec![0; num_columns];
121 let mut string_counts: Vec<usize> = vec![0; num_columns];
122
123 let mut row_count = 0;
125 for result in reader.records() {
126 if row_count >= self.sample_size {
127 break;
128 }
129
130 let record = result?;
131 for (col_idx, field) in record.iter().enumerate() {
132 if col_idx < num_columns {
133 if !field.is_empty() && field.parse::<f64>().is_err() {
135 unique_values[col_idx].insert(field.to_string());
136 total_lengths[col_idx] += field.len();
137 string_counts[col_idx] += 1;
138 }
139 }
140 }
141 row_count += 1;
142 }
143
144 let mut analyses = Vec::new();
146 for (idx, header) in headers.iter().enumerate() {
147 let cardinality = unique_values[idx].len();
148 let unique_ratio = if row_count > 0 {
149 cardinality as f64 / row_count as f64
150 } else {
151 1.0
152 };
153
154 let avg_length = if string_counts[idx] > 0 {
155 total_lengths[idx] / string_counts[idx]
156 } else {
157 0
158 };
159
160 let is_categorical = unique_ratio < self.cardinality_threshold
162 || Self::is_likely_categorical(header, cardinality, avg_length);
163
164 analyses.push(ColumnAnalysis {
165 index: idx,
166 name: header.to_string(),
167 cardinality,
168 sample_size: row_count,
169 unique_ratio,
170 is_categorical,
171 avg_string_length: avg_length,
172 });
173
174 if is_categorical {
175 debug!(
176 "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
177 header, cardinality, row_count, unique_ratio
178 );
179 self.interners.insert(idx, StringInterner::new());
180 }
181 }
182
183 Ok(analyses)
184 }
185
186 fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
188 let name_lower = name.to_lowercase();
189
190 let categorical_patterns = [
192 "status",
193 "state",
194 "type",
195 "category",
196 "class",
197 "group",
198 "country",
199 "region",
200 "city",
201 "currency",
202 "side",
203 "book",
204 "desk",
205 "trader",
206 "portfolio",
207 "strategy",
208 "exchange",
209 "venue",
210 "counterparty",
211 "product",
212 "instrument",
213 ];
214
215 for pattern in &categorical_patterns {
216 if name_lower.contains(pattern) {
217 return true;
218 }
219 }
220
221 if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
223 return true;
224 }
225
226 cardinality < 100 && avg_length < 50
228 }
229
230 pub fn load_csv_optimized<P: AsRef<Path>>(
232 &mut self,
233 path: P,
234 table_name: &str,
235 ) -> Result<DataTable> {
236 let path = path.as_ref();
237 info!(
238 "Advanced CSV load: Loading {} with optimizations",
239 path.display()
240 );
241
242 crate::utils::memory_tracker::track_memory("advanced_csv_start");
244
245 let analyses = self.analyze_columns(path)?;
247 let categorical_columns: HashSet<usize> = analyses
248 .iter()
249 .filter(|a| a.is_categorical)
250 .map(|a| a.index)
251 .collect();
252
253 info!(
254 "Column analysis complete: {} of {} columns will use string interning",
255 categorical_columns.len(),
256 analyses.len()
257 );
258
259 let file = File::open(path)?;
261 let mut reader = csv::Reader::from_reader(file);
262 let headers = reader.headers()?.clone();
263
264 let mut table = DataTable::new(table_name);
265 for header in &headers {
266 table.add_column(DataColumn::new(header.to_string()));
267 }
268
269 crate::utils::memory_tracker::track_memory("advanced_csv_headers");
270
271 let file_size = std::fs::metadata(path)?.len();
273 let estimated_rows = (file_size / 100) as usize; table.reserve_rows(estimated_rows.min(1_000_000)); let file2 = File::open(path)?;
278 let mut line_reader = BufReader::new(file2);
279 let mut raw_line = String::new();
280 line_reader.read_line(&mut raw_line)?;
282
283 let mut row_count = 0;
285 for result in reader.records() {
286 let record = result?;
287
288 raw_line.clear();
290 line_reader.read_line(&mut raw_line)?;
291
292 let mut values = Vec::with_capacity(headers.len());
293
294 for (idx, field) in record.iter().enumerate() {
295 let value = if field.is_empty() {
296 if Self::is_null_field(&raw_line, idx) {
298 DataValue::Null
299 } else {
300 if categorical_columns.contains(&idx) {
302 if let Some(interner) = self.interners.get_mut(&idx) {
303 DataValue::InternedString(interner.intern(""))
304 } else {
305 DataValue::String(String::new())
306 }
307 } else {
308 DataValue::String(String::new())
309 }
310 }
311 } else if let Ok(b) = field.parse::<bool>() {
312 DataValue::Boolean(b)
313 } else if let Ok(i) = field.parse::<i64>() {
314 DataValue::Integer(i)
315 } else if let Ok(f) = field.parse::<f64>() {
316 DataValue::Float(f)
317 } else {
318 if categorical_columns.contains(&idx) {
320 if let Some(interner) = self.interners.get_mut(&idx) {
321 DataValue::InternedString(interner.intern(field))
323 } else {
324 DataValue::String(field.to_string())
325 }
326 } else if field.contains('-') && field.len() >= 8 && field.len() <= 30 {
327 DataValue::DateTime(field.to_string())
328 } else {
329 DataValue::String(field.to_string())
330 }
331 };
332 values.push(value);
333 }
334
335 table
336 .add_row(DataRow::new(values))
337 .map_err(|e| anyhow::anyhow!(e))?;
338 row_count += 1;
339
340 if row_count % 10000 == 0 {
342 crate::utils::memory_tracker::track_memory(&format!(
343 "advanced_csv_{row_count}rows"
344 ));
345 debug!("Loaded {} rows...", row_count);
346 }
347 }
348
349 table.shrink_to_fit();
351
352 table.infer_column_types();
354
355 crate::utils::memory_tracker::track_memory("advanced_csv_complete");
356
357 let mut total_saved = 0;
359 for (col_idx, interner) in &self.interners {
360 let stats = interner.stats();
361 if stats.memory_saved_bytes > 0 {
362 debug!(
363 "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
364 col_idx,
365 headers.get(*col_idx).unwrap_or("?"),
366 stats.unique_strings,
367 stats.total_references,
368 stats.memory_saved_bytes / 1024
369 );
370 }
371 total_saved += stats.memory_saved_bytes;
372 }
373
374 info!(
375 "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
376 table.row_count(),
377 table.column_count(),
378 table.estimate_memory_size() / 1024 / 1024,
379 total_saved / 1024
380 );
381
382 Ok(table)
383 }
384
385 fn is_null_field(raw_line: &str, field_index: usize) -> bool {
387 let mut comma_count = 0;
388 let mut in_quotes = false;
389 let mut field_start = 0;
390 let mut prev_char = ' ';
391
392 for (i, ch) in raw_line.chars().enumerate() {
393 if ch == '"' && prev_char != '\\' {
394 in_quotes = !in_quotes;
395 }
396
397 if ch == ',' && !in_quotes {
398 if comma_count == field_index {
399 let field_end = i;
400 let field_content = &raw_line[field_start..field_end].trim();
401 if field_content.is_empty() {
403 return true; }
405 if field_content.starts_with('"')
407 && field_content.ends_with('"')
408 && field_content.len() == 2
409 {
410 return false; }
412 return false; }
414 comma_count += 1;
415 field_start = i + 1;
416 }
417 prev_char = ch;
418 }
419
420 if comma_count == field_index {
422 let field_content = raw_line[field_start..]
423 .trim()
424 .trim_end_matches('\n')
425 .trim_end_matches('\r');
426 if field_content.is_empty() {
428 return true; }
430 if field_content.starts_with('"')
432 && field_content.ends_with('"')
433 && field_content.len() == 2
434 {
435 return false; }
437 return false; }
439
440 false }
442
443 #[must_use]
445 pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
446 self.interners
447 .iter()
448 .map(|(idx, interner)| (*idx, interner.stats()))
449 .collect()
450 }
451}
452
453impl Default for AdvancedCsvLoader {
454 fn default() -> Self {
455 Self::new()
456 }
457}