sql_cli/data/
advanced_csv_loader.rs

1/// Advanced CSV loader with string interning and memory optimization
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12/// String interner for efficient memory usage with repeated strings
13#[derive(Debug, Clone)]
14pub struct StringInterner {
15    strings: HashMap<String, Arc<String>>,
16    usage_count: HashMap<Arc<String>, usize>,
17}
18
19impl Default for StringInterner {
20    fn default() -> Self {
21        Self::new()
22    }
23}
24
25impl StringInterner {
26    #[must_use]
27    pub fn new() -> Self {
28        Self {
29            strings: HashMap::new(),
30            usage_count: HashMap::new(),
31        }
32    }
33
34    /// Intern a string and return a reference-counted pointer
35    pub fn intern(&mut self, s: &str) -> Arc<String> {
36        if let Some(rc_str) = self.strings.get(s) {
37            let rc = rc_str.clone();
38            *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
39            rc
40        } else {
41            let rc_str = Arc::new(s.to_string());
42            self.strings.insert(s.to_string(), rc_str.clone());
43            self.usage_count.insert(rc_str.clone(), 1);
44            rc_str
45        }
46    }
47
48    /// Get statistics about interned strings
49    #[must_use]
50    pub fn stats(&self) -> InternerStats {
51        let total_strings = self.strings.len();
52        let total_references: usize = self.usage_count.values().sum();
53        let memory_saved = self.calculate_memory_saved();
54
55        InternerStats {
56            unique_strings: total_strings,
57            total_references,
58            memory_saved_bytes: memory_saved,
59        }
60    }
61
62    fn calculate_memory_saved(&self) -> usize {
63        let mut saved = 0;
64        for (rc_str, count) in &self.usage_count {
65            if *count > 1 {
66                // Each additional reference saves the string size
67                saved += rc_str.len() * (*count - 1);
68            }
69        }
70        saved
71    }
72}
73
74#[derive(Debug)]
75pub struct InternerStats {
76    pub unique_strings: usize,
77    pub total_references: usize,
78    pub memory_saved_bytes: usize,
79}
80
81/// Column analysis results for determining interning strategy
82#[derive(Debug)]
83struct ColumnAnalysis {
84    index: usize,
85    name: String,
86    cardinality: usize,
87    sample_size: usize,
88    unique_ratio: f64,
89    is_categorical: bool,
90    avg_string_length: usize,
91}
92
93pub struct AdvancedCsvLoader {
94    sample_size: usize,
95    cardinality_threshold: f64, // Ratio threshold for considering a column categorical
96    interners: HashMap<usize, StringInterner>, // Column index -> interner
97}
98
99impl AdvancedCsvLoader {
100    #[must_use]
101    pub fn new() -> Self {
102        Self {
103            sample_size: 1000,          // Sample first 1000 rows for analysis
104            cardinality_threshold: 0.5, // If < 50% unique values, consider categorical
105            interners: HashMap::new(),
106        }
107    }
108
109    /// Analyze columns to determine which should use string interning
110    fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
111        info!("Analyzing CSV columns for optimization strategies");
112
113        let file = File::open(path)?;
114        let mut reader = csv::Reader::from_reader(file);
115        let headers = reader.headers()?.clone();
116
117        // Initialize tracking for each column
118        let num_columns = headers.len();
119        let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
120        let mut total_lengths: Vec<usize> = vec![0; num_columns];
121        let mut string_counts: Vec<usize> = vec![0; num_columns];
122
123        // Sample rows to analyze cardinality
124        let mut row_count = 0;
125        for result in reader.records() {
126            if row_count >= self.sample_size {
127                break;
128            }
129
130            let record = result?;
131            for (col_idx, field) in record.iter().enumerate() {
132                if col_idx < num_columns {
133                    // Only track non-numeric strings
134                    if !field.is_empty() && field.parse::<f64>().is_err() {
135                        unique_values[col_idx].insert(field.to_string());
136                        total_lengths[col_idx] += field.len();
137                        string_counts[col_idx] += 1;
138                    }
139                }
140            }
141            row_count += 1;
142        }
143
144        // Build analysis for each column
145        let mut analyses = Vec::new();
146        for (idx, header) in headers.iter().enumerate() {
147            let cardinality = unique_values[idx].len();
148            let unique_ratio = if row_count > 0 {
149                cardinality as f64 / row_count as f64
150            } else {
151                1.0
152            };
153
154            let avg_length = if string_counts[idx] > 0 {
155                total_lengths[idx] / string_counts[idx]
156            } else {
157                0
158            };
159
160            // Consider categorical if low cardinality or common patterns
161            let is_categorical = unique_ratio < self.cardinality_threshold
162                || Self::is_likely_categorical(header, cardinality, avg_length);
163
164            analyses.push(ColumnAnalysis {
165                index: idx,
166                name: header.to_string(),
167                cardinality,
168                sample_size: row_count,
169                unique_ratio,
170                is_categorical,
171                avg_string_length: avg_length,
172            });
173
174            if is_categorical {
175                debug!(
176                    "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
177                    header, cardinality, row_count, unique_ratio
178                );
179                self.interners.insert(idx, StringInterner::new());
180            }
181        }
182
183        Ok(analyses)
184    }
185
186    /// Heuristic to identify likely categorical columns by name and characteristics
187    fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
188        let name_lower = name.to_lowercase();
189
190        // Common categorical column patterns
191        let categorical_patterns = [
192            "status",
193            "state",
194            "type",
195            "category",
196            "class",
197            "group",
198            "country",
199            "region",
200            "city",
201            "currency",
202            "side",
203            "book",
204            "desk",
205            "trader",
206            "portfolio",
207            "strategy",
208            "exchange",
209            "venue",
210            "counterparty",
211            "product",
212            "instrument",
213        ];
214
215        for pattern in &categorical_patterns {
216            if name_lower.contains(pattern) {
217                return true;
218            }
219        }
220
221        // Boolean-like columns
222        if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
223            return true;
224        }
225
226        // Low cardinality with short strings often indicates categories
227        cardinality < 100 && avg_length < 50
228    }
229
230    /// Load CSV with advanced optimizations
231    pub fn load_csv_optimized<P: AsRef<Path>>(
232        &mut self,
233        path: P,
234        table_name: &str,
235    ) -> Result<DataTable> {
236        let path = path.as_ref();
237        info!(
238            "Advanced CSV load: Loading {} with optimizations",
239            path.display()
240        );
241
242        // Track memory before loading
243        crate::utils::memory_tracker::track_memory("advanced_csv_start");
244
245        // Analyze columns first
246        let analyses = self.analyze_columns(path)?;
247        let categorical_columns: HashSet<usize> = analyses
248            .iter()
249            .filter(|a| a.is_categorical)
250            .map(|a| a.index)
251            .collect();
252
253        info!(
254            "Column analysis complete: {} of {} columns will use string interning",
255            categorical_columns.len(),
256            analyses.len()
257        );
258
259        // Now load the actual data
260        let file = File::open(path)?;
261        let mut reader = csv::Reader::from_reader(file);
262        let headers = reader.headers()?.clone();
263
264        let mut table = DataTable::new(table_name);
265        for header in &headers {
266            table.add_column(DataColumn::new(header.to_string()));
267        }
268
269        crate::utils::memory_tracker::track_memory("advanced_csv_headers");
270
271        // Pre-allocate with estimated capacity if possible
272        let file_size = std::fs::metadata(path)?.len();
273        let estimated_rows = (file_size / 100) as usize; // Rough estimate
274        table.reserve_rows(estimated_rows.min(1_000_000)); // Cap at 1M for safety
275
276        // Re-open file to read raw lines for null detection
277        let file2 = File::open(path)?;
278        let mut line_reader = BufReader::new(file2);
279        let mut raw_line = String::new();
280        // Skip header line
281        line_reader.read_line(&mut raw_line)?;
282
283        // Read rows with optimizations
284        let mut row_count = 0;
285        for result in reader.records() {
286            let record = result?;
287
288            // Read the corresponding raw line
289            raw_line.clear();
290            line_reader.read_line(&mut raw_line)?;
291
292            let mut values = Vec::with_capacity(headers.len());
293
294            for (idx, field) in record.iter().enumerate() {
295                let value = if field.is_empty() {
296                    // Distinguish between null and empty string
297                    if Self::is_null_field(&raw_line, idx) {
298                        DataValue::Null
299                    } else {
300                        // Empty string - check if should be interned
301                        if categorical_columns.contains(&idx) {
302                            if let Some(interner) = self.interners.get_mut(&idx) {
303                                DataValue::InternedString(interner.intern(""))
304                            } else {
305                                DataValue::String(String::new())
306                            }
307                        } else {
308                            DataValue::String(String::new())
309                        }
310                    }
311                } else if let Ok(b) = field.parse::<bool>() {
312                    DataValue::Boolean(b)
313                } else if let Ok(i) = field.parse::<i64>() {
314                    DataValue::Integer(i)
315                } else if let Ok(f) = field.parse::<f64>() {
316                    DataValue::Float(f)
317                } else {
318                    // Check if this column should use interning
319                    if categorical_columns.contains(&idx) {
320                        if let Some(interner) = self.interners.get_mut(&idx) {
321                            // Use interned string
322                            DataValue::InternedString(interner.intern(field))
323                        } else {
324                            DataValue::String(field.to_string())
325                        }
326                    } else if field.contains('-') && field.len() >= 8 && field.len() <= 30 {
327                        DataValue::DateTime(field.to_string())
328                    } else {
329                        DataValue::String(field.to_string())
330                    }
331                };
332                values.push(value);
333            }
334
335            table
336                .add_row(DataRow::new(values))
337                .map_err(|e| anyhow::anyhow!(e))?;
338            row_count += 1;
339
340            // Track memory periodically
341            if row_count % 10000 == 0 {
342                crate::utils::memory_tracker::track_memory(&format!(
343                    "advanced_csv_{row_count}rows"
344                ));
345                debug!("Loaded {} rows...", row_count);
346            }
347        }
348
349        // Shrink vectors to fit actual data
350        table.shrink_to_fit();
351
352        // Infer column types
353        table.infer_column_types();
354
355        crate::utils::memory_tracker::track_memory("advanced_csv_complete");
356
357        // Report statistics
358        let mut total_saved = 0;
359        for (col_idx, interner) in &self.interners {
360            let stats = interner.stats();
361            if stats.memory_saved_bytes > 0 {
362                debug!(
363                    "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
364                    col_idx,
365                    headers.get(*col_idx).unwrap_or("?"),
366                    stats.unique_strings,
367                    stats.total_references,
368                    stats.memory_saved_bytes / 1024
369                );
370            }
371            total_saved += stats.memory_saved_bytes;
372        }
373
374        info!(
375            "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
376            table.row_count(),
377            table.column_count(),
378            table.estimate_memory_size() / 1024 / 1024,
379            total_saved / 1024
380        );
381
382        Ok(table)
383    }
384
385    /// Helper to detect if a field in the raw CSV line is a null (unquoted empty)
386    fn is_null_field(raw_line: &str, field_index: usize) -> bool {
387        let mut comma_count = 0;
388        let mut in_quotes = false;
389        let mut field_start = 0;
390        let mut prev_char = ' ';
391
392        for (i, ch) in raw_line.chars().enumerate() {
393            if ch == '"' && prev_char != '\\' {
394                in_quotes = !in_quotes;
395            }
396
397            if ch == ',' && !in_quotes {
398                if comma_count == field_index {
399                    let field_end = i;
400                    let field_content = &raw_line[field_start..field_end].trim();
401                    // If empty, check if it was quoted (quoted empty = empty string, unquoted empty = NULL)
402                    if field_content.is_empty() {
403                        return true; // Unquoted empty field -> NULL
404                    }
405                    // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
406                    if field_content.starts_with('"')
407                        && field_content.ends_with('"')
408                        && field_content.len() == 2
409                    {
410                        return false; // Quoted empty field -> empty string
411                    }
412                    return false; // Non-empty field -> not NULL
413                }
414                comma_count += 1;
415                field_start = i + 1;
416            }
417            prev_char = ch;
418        }
419
420        // Check last field
421        if comma_count == field_index {
422            let field_content = raw_line[field_start..]
423                .trim()
424                .trim_end_matches('\n')
425                .trim_end_matches('\r');
426            // If empty, check if it was quoted
427            if field_content.is_empty() {
428                return true; // Unquoted empty field -> NULL
429            }
430            // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
431            if field_content.starts_with('"')
432                && field_content.ends_with('"')
433                && field_content.len() == 2
434            {
435                return false; // Quoted empty field -> empty string
436            }
437            return false; // Non-empty field -> not NULL
438        }
439
440        false // Field not found -> not NULL (shouldn't happen)
441    }
442
443    /// Get interner statistics for debugging
444    #[must_use]
445    pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
446        self.interners
447            .iter()
448            .map(|(idx, interner)| (*idx, interner.stats()))
449            .collect()
450    }
451}
452
453impl Default for AdvancedCsvLoader {
454    fn default() -> Self {
455        Self::new()
456    }
457}