1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::path::Path;
8use std::sync::Arc;
9use tracing::{debug, info};
10
11#[derive(Debug, Clone)]
13pub struct StringInterner {
14 strings: HashMap<String, Arc<String>>,
15 usage_count: HashMap<Arc<String>, usize>,
16}
17
18impl StringInterner {
19 pub fn new() -> Self {
20 Self {
21 strings: HashMap::new(),
22 usage_count: HashMap::new(),
23 }
24 }
25
26 pub fn intern(&mut self, s: &str) -> Arc<String> {
28 if let Some(rc_str) = self.strings.get(s) {
29 let rc = rc_str.clone();
30 *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
31 rc
32 } else {
33 let rc_str = Arc::new(s.to_string());
34 self.strings.insert(s.to_string(), rc_str.clone());
35 self.usage_count.insert(rc_str.clone(), 1);
36 rc_str
37 }
38 }
39
40 pub fn stats(&self) -> InternerStats {
42 let total_strings = self.strings.len();
43 let total_references: usize = self.usage_count.values().sum();
44 let memory_saved = self.calculate_memory_saved();
45
46 InternerStats {
47 unique_strings: total_strings,
48 total_references,
49 memory_saved_bytes: memory_saved,
50 }
51 }
52
53 fn calculate_memory_saved(&self) -> usize {
54 let mut saved = 0;
55 for (rc_str, count) in &self.usage_count {
56 if *count > 1 {
57 saved += rc_str.len() * (*count - 1);
59 }
60 }
61 saved
62 }
63}
64
65#[derive(Debug)]
66pub struct InternerStats {
67 pub unique_strings: usize,
68 pub total_references: usize,
69 pub memory_saved_bytes: usize,
70}
71
72#[derive(Debug)]
74struct ColumnAnalysis {
75 index: usize,
76 name: String,
77 cardinality: usize,
78 sample_size: usize,
79 unique_ratio: f64,
80 is_categorical: bool,
81 avg_string_length: usize,
82}
83
84pub struct AdvancedCsvLoader {
85 sample_size: usize,
86 cardinality_threshold: f64, interners: HashMap<usize, StringInterner>, }
89
90impl AdvancedCsvLoader {
91 pub fn new() -> Self {
92 Self {
93 sample_size: 1000, cardinality_threshold: 0.5, interners: HashMap::new(),
96 }
97 }
98
99 fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
101 info!("Analyzing CSV columns for optimization strategies");
102
103 let file = File::open(path)?;
104 let mut reader = csv::Reader::from_reader(file);
105 let headers = reader.headers()?.clone();
106
107 let num_columns = headers.len();
109 let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
110 let mut total_lengths: Vec<usize> = vec![0; num_columns];
111 let mut string_counts: Vec<usize> = vec![0; num_columns];
112
113 let mut row_count = 0;
115 for result in reader.records() {
116 if row_count >= self.sample_size {
117 break;
118 }
119
120 let record = result?;
121 for (col_idx, field) in record.iter().enumerate() {
122 if col_idx < num_columns {
123 if !field.is_empty() && field.parse::<f64>().is_err() {
125 unique_values[col_idx].insert(field.to_string());
126 total_lengths[col_idx] += field.len();
127 string_counts[col_idx] += 1;
128 }
129 }
130 }
131 row_count += 1;
132 }
133
134 let mut analyses = Vec::new();
136 for (idx, header) in headers.iter().enumerate() {
137 let cardinality = unique_values[idx].len();
138 let unique_ratio = if row_count > 0 {
139 cardinality as f64 / row_count as f64
140 } else {
141 1.0
142 };
143
144 let avg_length = if string_counts[idx] > 0 {
145 total_lengths[idx] / string_counts[idx]
146 } else {
147 0
148 };
149
150 let is_categorical = unique_ratio < self.cardinality_threshold
152 || Self::is_likely_categorical(&header, cardinality, avg_length);
153
154 analyses.push(ColumnAnalysis {
155 index: idx,
156 name: header.to_string(),
157 cardinality,
158 sample_size: row_count,
159 unique_ratio,
160 is_categorical,
161 avg_string_length: avg_length,
162 });
163
164 if is_categorical {
165 debug!(
166 "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
167 header, cardinality, row_count, unique_ratio
168 );
169 self.interners.insert(idx, StringInterner::new());
170 }
171 }
172
173 Ok(analyses)
174 }
175
176 fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
178 let name_lower = name.to_lowercase();
179
180 let categorical_patterns = [
182 "status",
183 "state",
184 "type",
185 "category",
186 "class",
187 "group",
188 "country",
189 "region",
190 "city",
191 "currency",
192 "side",
193 "book",
194 "desk",
195 "trader",
196 "portfolio",
197 "strategy",
198 "exchange",
199 "venue",
200 "counterparty",
201 "product",
202 "instrument",
203 ];
204
205 for pattern in &categorical_patterns {
206 if name_lower.contains(pattern) {
207 return true;
208 }
209 }
210
211 if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
213 return true;
214 }
215
216 cardinality < 100 && avg_length < 50
218 }
219
220 pub fn load_csv_optimized<P: AsRef<Path>>(
222 &mut self,
223 path: P,
224 table_name: &str,
225 ) -> Result<DataTable> {
226 let path = path.as_ref();
227 info!(
228 "Advanced CSV load: Loading {} with optimizations",
229 path.display()
230 );
231
232 crate::utils::memory_tracker::track_memory("advanced_csv_start");
234
235 let analyses = self.analyze_columns(path)?;
237 let categorical_columns: HashSet<usize> = analyses
238 .iter()
239 .filter(|a| a.is_categorical)
240 .map(|a| a.index)
241 .collect();
242
243 info!(
244 "Column analysis complete: {} of {} columns will use string interning",
245 categorical_columns.len(),
246 analyses.len()
247 );
248
249 let file = File::open(path)?;
251 let mut reader = csv::Reader::from_reader(file);
252 let headers = reader.headers()?.clone();
253
254 let mut table = DataTable::new(table_name);
255 for header in headers.iter() {
256 table.add_column(DataColumn::new(header.to_string()));
257 }
258
259 crate::utils::memory_tracker::track_memory("advanced_csv_headers");
260
261 let file_size = std::fs::metadata(path)?.len();
263 let estimated_rows = (file_size / 100) as usize; table.reserve_rows(estimated_rows.min(1_000_000)); let mut row_count = 0;
268 for result in reader.records() {
269 let record = result?;
270 let mut values = Vec::with_capacity(headers.len());
271
272 for (idx, field) in record.iter().enumerate() {
273 let value = if field.is_empty() {
274 DataValue::Null
275 } else if let Ok(b) = field.parse::<bool>() {
276 DataValue::Boolean(b)
277 } else if let Ok(i) = field.parse::<i64>() {
278 DataValue::Integer(i)
279 } else if let Ok(f) = field.parse::<f64>() {
280 DataValue::Float(f)
281 } else {
282 if categorical_columns.contains(&idx) {
284 if let Some(interner) = self.interners.get_mut(&idx) {
285 DataValue::InternedString(interner.intern(field))
287 } else {
288 DataValue::String(field.to_string())
289 }
290 } else if field.contains('-') && field.len() >= 8 && field.len() <= 30 {
291 DataValue::DateTime(field.to_string())
292 } else {
293 DataValue::String(field.to_string())
294 }
295 };
296 values.push(value);
297 }
298
299 table
300 .add_row(DataRow::new(values))
301 .map_err(|e| anyhow::anyhow!(e))?;
302 row_count += 1;
303
304 if row_count % 10000 == 0 {
306 crate::utils::memory_tracker::track_memory(&format!(
307 "advanced_csv_{}rows",
308 row_count
309 ));
310 debug!("Loaded {} rows...", row_count);
311 }
312 }
313
314 table.shrink_to_fit();
316
317 table.infer_column_types();
319
320 crate::utils::memory_tracker::track_memory("advanced_csv_complete");
321
322 let mut total_saved = 0;
324 for (col_idx, interner) in &self.interners {
325 let stats = interner.stats();
326 if stats.memory_saved_bytes > 0 {
327 debug!(
328 "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
329 col_idx,
330 headers.get(*col_idx).unwrap_or(&"?"),
331 stats.unique_strings,
332 stats.total_references,
333 stats.memory_saved_bytes / 1024
334 );
335 }
336 total_saved += stats.memory_saved_bytes;
337 }
338
339 info!(
340 "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
341 table.row_count(),
342 table.column_count(),
343 table.estimate_memory_size() / 1024 / 1024,
344 total_saved / 1024
345 );
346
347 Ok(table)
348 }
349
350 pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
352 self.interners
353 .iter()
354 .map(|(idx, interner)| (*idx, interner.stats()))
355 .collect()
356 }
357}
358
359impl Default for AdvancedCsvLoader {
360 fn default() -> Self {
361 Self::new()
362 }
363}