1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::path::Path;
8use std::sync::Arc;
9use tracing::{debug, info};
10
11#[derive(Debug, Clone)]
13pub struct StringInterner {
14 strings: HashMap<String, Arc<String>>,
15 usage_count: HashMap<Arc<String>, usize>,
16}
17
18impl Default for StringInterner {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl StringInterner {
25 #[must_use]
26 pub fn new() -> Self {
27 Self {
28 strings: HashMap::new(),
29 usage_count: HashMap::new(),
30 }
31 }
32
33 pub fn intern(&mut self, s: &str) -> Arc<String> {
35 if let Some(rc_str) = self.strings.get(s) {
36 let rc = rc_str.clone();
37 *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
38 rc
39 } else {
40 let rc_str = Arc::new(s.to_string());
41 self.strings.insert(s.to_string(), rc_str.clone());
42 self.usage_count.insert(rc_str.clone(), 1);
43 rc_str
44 }
45 }
46
47 #[must_use]
49 pub fn stats(&self) -> InternerStats {
50 let total_strings = self.strings.len();
51 let total_references: usize = self.usage_count.values().sum();
52 let memory_saved = self.calculate_memory_saved();
53
54 InternerStats {
55 unique_strings: total_strings,
56 total_references,
57 memory_saved_bytes: memory_saved,
58 }
59 }
60
61 fn calculate_memory_saved(&self) -> usize {
62 let mut saved = 0;
63 for (rc_str, count) in &self.usage_count {
64 if *count > 1 {
65 saved += rc_str.len() * (*count - 1);
67 }
68 }
69 saved
70 }
71}
72
73#[derive(Debug)]
74pub struct InternerStats {
75 pub unique_strings: usize,
76 pub total_references: usize,
77 pub memory_saved_bytes: usize,
78}
79
80#[derive(Debug)]
82struct ColumnAnalysis {
83 index: usize,
84 name: String,
85 cardinality: usize,
86 sample_size: usize,
87 unique_ratio: f64,
88 is_categorical: bool,
89 avg_string_length: usize,
90}
91
92pub struct AdvancedCsvLoader {
93 sample_size: usize,
94 cardinality_threshold: f64, interners: HashMap<usize, StringInterner>, }
97
98impl AdvancedCsvLoader {
99 #[must_use]
100 pub fn new() -> Self {
101 Self {
102 sample_size: 1000, cardinality_threshold: 0.5, interners: HashMap::new(),
105 }
106 }
107
108 fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
110 info!("Analyzing CSV columns for optimization strategies");
111
112 let file = File::open(path)?;
113 let mut reader = csv::Reader::from_reader(file);
114 let headers = reader.headers()?.clone();
115
116 let num_columns = headers.len();
118 let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
119 let mut total_lengths: Vec<usize> = vec![0; num_columns];
120 let mut string_counts: Vec<usize> = vec![0; num_columns];
121
122 let mut row_count = 0;
124 for result in reader.records() {
125 if row_count >= self.sample_size {
126 break;
127 }
128
129 let record = result?;
130 for (col_idx, field) in record.iter().enumerate() {
131 if col_idx < num_columns {
132 if !field.is_empty() && field.parse::<f64>().is_err() {
134 unique_values[col_idx].insert(field.to_string());
135 total_lengths[col_idx] += field.len();
136 string_counts[col_idx] += 1;
137 }
138 }
139 }
140 row_count += 1;
141 }
142
143 let mut analyses = Vec::new();
145 for (idx, header) in headers.iter().enumerate() {
146 let cardinality = unique_values[idx].len();
147 let unique_ratio = if row_count > 0 {
148 cardinality as f64 / row_count as f64
149 } else {
150 1.0
151 };
152
153 let avg_length = if string_counts[idx] > 0 {
154 total_lengths[idx] / string_counts[idx]
155 } else {
156 0
157 };
158
159 let is_categorical = unique_ratio < self.cardinality_threshold
161 || Self::is_likely_categorical(header, cardinality, avg_length);
162
163 analyses.push(ColumnAnalysis {
164 index: idx,
165 name: header.to_string(),
166 cardinality,
167 sample_size: row_count,
168 unique_ratio,
169 is_categorical,
170 avg_string_length: avg_length,
171 });
172
173 if is_categorical {
174 debug!(
175 "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
176 header, cardinality, row_count, unique_ratio
177 );
178 self.interners.insert(idx, StringInterner::new());
179 }
180 }
181
182 Ok(analyses)
183 }
184
185 fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
187 let name_lower = name.to_lowercase();
188
189 let categorical_patterns = [
191 "status",
192 "state",
193 "type",
194 "category",
195 "class",
196 "group",
197 "country",
198 "region",
199 "city",
200 "currency",
201 "side",
202 "book",
203 "desk",
204 "trader",
205 "portfolio",
206 "strategy",
207 "exchange",
208 "venue",
209 "counterparty",
210 "product",
211 "instrument",
212 ];
213
214 for pattern in &categorical_patterns {
215 if name_lower.contains(pattern) {
216 return true;
217 }
218 }
219
220 if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
222 return true;
223 }
224
225 cardinality < 100 && avg_length < 50
227 }
228
229 pub fn load_csv_optimized<P: AsRef<Path>>(
231 &mut self,
232 path: P,
233 table_name: &str,
234 ) -> Result<DataTable> {
235 let path = path.as_ref();
236 info!(
237 "Advanced CSV load: Loading {} with optimizations",
238 path.display()
239 );
240
241 crate::utils::memory_tracker::track_memory("advanced_csv_start");
243
244 let analyses = self.analyze_columns(path)?;
246 let categorical_columns: HashSet<usize> = analyses
247 .iter()
248 .filter(|a| a.is_categorical)
249 .map(|a| a.index)
250 .collect();
251
252 info!(
253 "Column analysis complete: {} of {} columns will use string interning",
254 categorical_columns.len(),
255 analyses.len()
256 );
257
258 let file = File::open(path)?;
260 let mut reader = csv::Reader::from_reader(file);
261 let headers = reader.headers()?.clone();
262
263 let mut table = DataTable::new(table_name);
264 for header in &headers {
265 table.add_column(DataColumn::new(header.to_string()));
266 }
267
268 crate::utils::memory_tracker::track_memory("advanced_csv_headers");
269
270 let file_size = std::fs::metadata(path)?.len();
272 let estimated_rows = (file_size / 100) as usize; table.reserve_rows(estimated_rows.min(1_000_000)); let mut row_count = 0;
277 for result in reader.records() {
278 let record = result?;
279 let mut values = Vec::with_capacity(headers.len());
280
281 for (idx, field) in record.iter().enumerate() {
282 let value = if field.is_empty() {
283 DataValue::Null
284 } else if let Ok(b) = field.parse::<bool>() {
285 DataValue::Boolean(b)
286 } else if let Ok(i) = field.parse::<i64>() {
287 DataValue::Integer(i)
288 } else if let Ok(f) = field.parse::<f64>() {
289 DataValue::Float(f)
290 } else {
291 if categorical_columns.contains(&idx) {
293 if let Some(interner) = self.interners.get_mut(&idx) {
294 DataValue::InternedString(interner.intern(field))
296 } else {
297 DataValue::String(field.to_string())
298 }
299 } else if field.contains('-') && field.len() >= 8 && field.len() <= 30 {
300 DataValue::DateTime(field.to_string())
301 } else {
302 DataValue::String(field.to_string())
303 }
304 };
305 values.push(value);
306 }
307
308 table
309 .add_row(DataRow::new(values))
310 .map_err(|e| anyhow::anyhow!(e))?;
311 row_count += 1;
312
313 if row_count % 10000 == 0 {
315 crate::utils::memory_tracker::track_memory(&format!(
316 "advanced_csv_{row_count}rows"
317 ));
318 debug!("Loaded {} rows...", row_count);
319 }
320 }
321
322 table.shrink_to_fit();
324
325 table.infer_column_types();
327
328 crate::utils::memory_tracker::track_memory("advanced_csv_complete");
329
330 let mut total_saved = 0;
332 for (col_idx, interner) in &self.interners {
333 let stats = interner.stats();
334 if stats.memory_saved_bytes > 0 {
335 debug!(
336 "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
337 col_idx,
338 headers.get(*col_idx).unwrap_or("?"),
339 stats.unique_strings,
340 stats.total_references,
341 stats.memory_saved_bytes / 1024
342 );
343 }
344 total_saved += stats.memory_saved_bytes;
345 }
346
347 info!(
348 "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
349 table.row_count(),
350 table.column_count(),
351 table.estimate_memory_size() / 1024 / 1024,
352 total_saved / 1024
353 );
354
355 Ok(table)
356 }
357
358 #[must_use]
360 pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
361 self.interners
362 .iter()
363 .map(|(idx, interner)| (*idx, interner.stats()))
364 .collect()
365 }
366}
367
368impl Default for AdvancedCsvLoader {
369 fn default() -> Self {
370 Self::new()
371 }
372}