sql_cli/data/
advanced_csv_loader.rs

1/// Advanced CSV loader with string interning and memory optimization
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12/// String interner for efficient memory usage with repeated strings
13#[derive(Debug, Clone)]
14pub struct StringInterner {
15    strings: HashMap<String, Arc<String>>,
16    usage_count: HashMap<Arc<String>, usize>,
17}
18
19impl Default for StringInterner {
20    fn default() -> Self {
21        Self::new()
22    }
23}
24
25impl StringInterner {
26    #[must_use]
27    pub fn new() -> Self {
28        Self {
29            strings: HashMap::new(),
30            usage_count: HashMap::new(),
31        }
32    }
33
34    /// Intern a string and return a reference-counted pointer
35    pub fn intern(&mut self, s: &str) -> Arc<String> {
36        if let Some(rc_str) = self.strings.get(s) {
37            let rc = rc_str.clone();
38            *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
39            rc
40        } else {
41            let rc_str = Arc::new(s.to_string());
42            self.strings.insert(s.to_string(), rc_str.clone());
43            self.usage_count.insert(rc_str.clone(), 1);
44            rc_str
45        }
46    }
47
48    /// Get statistics about interned strings
49    #[must_use]
50    pub fn stats(&self) -> InternerStats {
51        let total_strings = self.strings.len();
52        let total_references: usize = self.usage_count.values().sum();
53        let memory_saved = self.calculate_memory_saved();
54
55        InternerStats {
56            unique_strings: total_strings,
57            total_references,
58            memory_saved_bytes: memory_saved,
59        }
60    }
61
62    fn calculate_memory_saved(&self) -> usize {
63        let mut saved = 0;
64        for (rc_str, count) in &self.usage_count {
65            if *count > 1 {
66                // Each additional reference saves the string size
67                saved += rc_str.len() * (*count - 1);
68            }
69        }
70        saved
71    }
72}
73
74#[derive(Debug)]
75pub struct InternerStats {
76    pub unique_strings: usize,
77    pub total_references: usize,
78    pub memory_saved_bytes: usize,
79}
80
81/// Column analysis results for determining interning strategy
82#[derive(Debug)]
83struct ColumnAnalysis {
84    index: usize,
85    name: String,
86    cardinality: usize,
87    sample_size: usize,
88    unique_ratio: f64,
89    is_categorical: bool,
90    avg_string_length: usize,
91}
92
93pub struct AdvancedCsvLoader {
94    sample_size: usize,
95    cardinality_threshold: f64, // Ratio threshold for considering a column categorical
96    interners: HashMap<usize, StringInterner>, // Column index -> interner
97}
98
99impl AdvancedCsvLoader {
100    #[must_use]
101    pub fn new() -> Self {
102        Self {
103            sample_size: 1000,          // Sample first 1000 rows for analysis
104            cardinality_threshold: 0.5, // If < 50% unique values, consider categorical
105            interners: HashMap::new(),
106        }
107    }
108
109    /// Analyze columns to determine which should use string interning
110    fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
111        info!("Analyzing CSV columns for optimization strategies");
112
113        let file = File::open(path)?;
114        let mut reader = csv::Reader::from_reader(file);
115        let headers = reader.headers()?.clone();
116
117        // Initialize tracking for each column
118        let num_columns = headers.len();
119        let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
120        let mut total_lengths: Vec<usize> = vec![0; num_columns];
121        let mut string_counts: Vec<usize> = vec![0; num_columns];
122
123        // Sample rows to analyze cardinality
124        let mut row_count = 0;
125        for result in reader.records() {
126            if row_count >= self.sample_size {
127                break;
128            }
129
130            let record = result?;
131            for (col_idx, field) in record.iter().enumerate() {
132                if col_idx < num_columns {
133                    // Only track non-numeric strings
134                    if !field.is_empty() && field.parse::<f64>().is_err() {
135                        unique_values[col_idx].insert(field.to_string());
136                        total_lengths[col_idx] += field.len();
137                        string_counts[col_idx] += 1;
138                    }
139                }
140            }
141            row_count += 1;
142        }
143
144        // Build analysis for each column
145        let mut analyses = Vec::new();
146        for (idx, header) in headers.iter().enumerate() {
147            let cardinality = unique_values[idx].len();
148            let unique_ratio = if row_count > 0 {
149                cardinality as f64 / row_count as f64
150            } else {
151                1.0
152            };
153
154            let avg_length = if string_counts[idx] > 0 {
155                total_lengths[idx] / string_counts[idx]
156            } else {
157                0
158            };
159
160            // Check if this might be a datetime column based on sample values
161            let is_datetime = Self::is_likely_datetime(&unique_values[idx]);
162
163            // Consider categorical if low cardinality or common patterns, but NOT if it's a datetime
164            let is_categorical = !is_datetime
165                && (unique_ratio < self.cardinality_threshold
166                    || Self::is_likely_categorical(header, cardinality, avg_length));
167
168            analyses.push(ColumnAnalysis {
169                index: idx,
170                name: header.to_string(),
171                cardinality,
172                sample_size: row_count,
173                unique_ratio,
174                is_categorical,
175                avg_string_length: avg_length,
176            });
177
178            if is_categorical {
179                debug!(
180                    "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
181                    header, cardinality, row_count, unique_ratio
182                );
183                self.interners.insert(idx, StringInterner::new());
184            }
185        }
186
187        Ok(analyses)
188    }
189
190    /// Heuristic to identify likely datetime columns based on sample values
191    fn is_likely_datetime(unique_values: &HashSet<String>) -> bool {
192        if unique_values.is_empty() {
193            return false;
194        }
195
196        // Check a sample of values to see if they look like datetimes
197        let sample_size = unique_values.len().min(10);
198        let mut datetime_count = 0;
199
200        for (i, value) in unique_values.iter().enumerate() {
201            if i >= sample_size {
202                break;
203            }
204
205            // Simple heuristic: contains date/time separators and has right length
206            if (value.contains('-') || value.contains('/') || value.contains(':'))
207                && value.len() >= 8
208                && value.len() <= 30
209            {
210                datetime_count += 1;
211            }
212        }
213
214        // If most samples look like datetimes, consider it a datetime column
215        datetime_count >= (sample_size * 7) / 10 // 70% threshold
216    }
217
218    /// Heuristic to identify likely categorical columns by name and characteristics
219    fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
220        let name_lower = name.to_lowercase();
221
222        // Common categorical column patterns
223        let categorical_patterns = [
224            "status",
225            "state",
226            "type",
227            "category",
228            "class",
229            "group",
230            "country",
231            "region",
232            "city",
233            "currency",
234            "side",
235            "book",
236            "desk",
237            "trader",
238            "portfolio",
239            "strategy",
240            "exchange",
241            "venue",
242            "counterparty",
243            "product",
244            "instrument",
245        ];
246
247        for pattern in &categorical_patterns {
248            if name_lower.contains(pattern) {
249                return true;
250            }
251        }
252
253        // Boolean-like columns
254        if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
255            return true;
256        }
257
258        // Low cardinality with short strings often indicates categories
259        cardinality < 100 && avg_length < 50
260    }
261
262    /// Load CSV with advanced optimizations
263    pub fn load_csv_optimized<P: AsRef<Path>>(
264        &mut self,
265        path: P,
266        table_name: &str,
267    ) -> Result<DataTable> {
268        let path = path.as_ref();
269        info!(
270            "Advanced CSV load: Loading {} with optimizations",
271            path.display()
272        );
273
274        // Track memory before loading
275        crate::utils::memory_tracker::track_memory("advanced_csv_start");
276
277        // Analyze columns first
278        let analyses = self.analyze_columns(path)?;
279        let categorical_columns: HashSet<usize> = analyses
280            .iter()
281            .filter(|a| a.is_categorical)
282            .map(|a| a.index)
283            .collect();
284
285        info!(
286            "Column analysis complete: {} of {} columns will use string interning",
287            categorical_columns.len(),
288            analyses.len()
289        );
290
291        // Now load the actual data
292        let file = File::open(path)?;
293        let mut reader = csv::Reader::from_reader(file);
294        let headers = reader.headers()?.clone();
295
296        let mut table = DataTable::new(table_name);
297        for header in &headers {
298            table.add_column(DataColumn::new(header.to_string()));
299        }
300
301        crate::utils::memory_tracker::track_memory("advanced_csv_headers");
302
303        // Pre-allocate with estimated capacity if possible
304        let file_size = std::fs::metadata(path)?.len();
305        let estimated_rows = (file_size / 100) as usize; // Rough estimate
306        table.reserve_rows(estimated_rows.min(1_000_000)); // Cap at 1M for safety
307
308        // Re-open file to read raw lines for null detection
309        let file2 = File::open(path)?;
310        let mut line_reader = BufReader::new(file2);
311        let mut raw_line = String::new();
312        // Skip header line
313        line_reader.read_line(&mut raw_line)?;
314
315        // Read rows with optimizations
316        let mut row_count = 0;
317        for result in reader.records() {
318            let record = result?;
319
320            // Read the corresponding raw line
321            raw_line.clear();
322            line_reader.read_line(&mut raw_line)?;
323
324            let mut values = Vec::with_capacity(headers.len());
325
326            for (idx, field) in record.iter().enumerate() {
327                let value = if field.is_empty() {
328                    // Distinguish between null and empty string
329                    if Self::is_null_field(&raw_line, idx) {
330                        DataValue::Null
331                    } else {
332                        // Empty string - check if should be interned
333                        if categorical_columns.contains(&idx) {
334                            if let Some(interner) = self.interners.get_mut(&idx) {
335                                DataValue::InternedString(interner.intern(""))
336                            } else {
337                                DataValue::String(String::new())
338                            }
339                        } else {
340                            DataValue::String(String::new())
341                        }
342                    }
343                } else if let Ok(b) = field.parse::<bool>() {
344                    DataValue::Boolean(b)
345                } else if let Ok(i) = field.parse::<i64>() {
346                    DataValue::Integer(i)
347                } else if let Ok(f) = field.parse::<f64>() {
348                    DataValue::Float(f)
349                } else {
350                    // Check for DateTime first, before considering interning
351                    // Support both - and / as date separators, and : for time
352                    if (field.contains('-') || field.contains('/') || field.contains(':'))
353                        && field.len() >= 8
354                        && field.len() <= 30
355                    {
356                        DataValue::DateTime(field.to_string())
357                    } else if categorical_columns.contains(&idx) {
358                        // Only intern if it's not a datetime
359                        if let Some(interner) = self.interners.get_mut(&idx) {
360                            // Use interned string
361                            DataValue::InternedString(interner.intern(field))
362                        } else {
363                            DataValue::String(field.to_string())
364                        }
365                    } else {
366                        DataValue::String(field.to_string())
367                    }
368                };
369                values.push(value);
370            }
371
372            table
373                .add_row(DataRow::new(values))
374                .map_err(|e| anyhow::anyhow!(e))?;
375            row_count += 1;
376
377            // Track memory periodically
378            if row_count % 10000 == 0 {
379                crate::utils::memory_tracker::track_memory(&format!(
380                    "advanced_csv_{row_count}rows"
381                ));
382                debug!("Loaded {} rows...", row_count);
383            }
384        }
385
386        // Shrink vectors to fit actual data
387        table.shrink_to_fit();
388
389        // Infer column types
390        table.infer_column_types();
391
392        crate::utils::memory_tracker::track_memory("advanced_csv_complete");
393
394        // Report statistics
395        let mut total_saved = 0;
396        for (col_idx, interner) in &self.interners {
397            let stats = interner.stats();
398            if stats.memory_saved_bytes > 0 {
399                debug!(
400                    "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
401                    col_idx,
402                    headers.get(*col_idx).unwrap_or("?"),
403                    stats.unique_strings,
404                    stats.total_references,
405                    stats.memory_saved_bytes / 1024
406                );
407            }
408            total_saved += stats.memory_saved_bytes;
409        }
410
411        info!(
412            "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
413            table.row_count(),
414            table.column_count(),
415            table.estimate_memory_size() / 1024 / 1024,
416            total_saved / 1024
417        );
418
419        Ok(table)
420    }
421
422    /// Helper to detect if a field in the raw CSV line is a null (unquoted empty)
423    fn is_null_field(raw_line: &str, field_index: usize) -> bool {
424        let mut comma_count = 0;
425        let mut in_quotes = false;
426        let mut field_start = 0;
427        let mut prev_char = ' ';
428
429        for (i, ch) in raw_line.chars().enumerate() {
430            if ch == '"' && prev_char != '\\' {
431                in_quotes = !in_quotes;
432            }
433
434            if ch == ',' && !in_quotes {
435                if comma_count == field_index {
436                    let field_end = i;
437                    let field_content = &raw_line[field_start..field_end].trim();
438                    // If empty, check if it was quoted (quoted empty = empty string, unquoted empty = NULL)
439                    if field_content.is_empty() {
440                        return true; // Unquoted empty field -> NULL
441                    }
442                    // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
443                    if field_content.starts_with('"')
444                        && field_content.ends_with('"')
445                        && field_content.len() == 2
446                    {
447                        return false; // Quoted empty field -> empty string
448                    }
449                    return false; // Non-empty field -> not NULL
450                }
451                comma_count += 1;
452                field_start = i + 1;
453            }
454            prev_char = ch;
455        }
456
457        // Check last field
458        if comma_count == field_index {
459            let field_content = raw_line[field_start..]
460                .trim()
461                .trim_end_matches('\n')
462                .trim_end_matches('\r');
463            // If empty, check if it was quoted
464            if field_content.is_empty() {
465                return true; // Unquoted empty field -> NULL
466            }
467            // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
468            if field_content.starts_with('"')
469                && field_content.ends_with('"')
470                && field_content.len() == 2
471            {
472                return false; // Quoted empty field -> empty string
473            }
474            return false; // Non-empty field -> not NULL
475        }
476
477        false // Field not found -> not NULL (shouldn't happen)
478    }
479
480    /// Get interner statistics for debugging
481    #[must_use]
482    pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
483        self.interners
484            .iter()
485            .map(|(idx, interner)| (*idx, interner.stats()))
486            .collect()
487    }
488}
489
490impl Default for AdvancedCsvLoader {
491    fn default() -> Self {
492        Self::new()
493    }
494}