sql_cli/data/
advanced_csv_loader.rs

1/// Advanced CSV loader with string interning and memory optimization
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::io::{BufRead, BufReader, Read};
8use std::path::Path;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12/// String interner for efficient memory usage with repeated strings
13#[derive(Debug, Clone)]
14pub struct StringInterner {
15    strings: HashMap<String, Arc<String>>,
16    usage_count: HashMap<Arc<String>, usize>,
17}
18
19impl Default for StringInterner {
20    fn default() -> Self {
21        Self::new()
22    }
23}
24
25impl StringInterner {
26    #[must_use]
27    pub fn new() -> Self {
28        Self {
29            strings: HashMap::new(),
30            usage_count: HashMap::new(),
31        }
32    }
33
34    /// Intern a string and return a reference-counted pointer
35    pub fn intern(&mut self, s: &str) -> Arc<String> {
36        if let Some(rc_str) = self.strings.get(s) {
37            let rc = rc_str.clone();
38            *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
39            rc
40        } else {
41            let rc_str = Arc::new(s.to_string());
42            self.strings.insert(s.to_string(), rc_str.clone());
43            self.usage_count.insert(rc_str.clone(), 1);
44            rc_str
45        }
46    }
47
48    /// Get statistics about interned strings
49    #[must_use]
50    pub fn stats(&self) -> InternerStats {
51        let total_strings = self.strings.len();
52        let total_references: usize = self.usage_count.values().sum();
53        let memory_saved = self.calculate_memory_saved();
54
55        InternerStats {
56            unique_strings: total_strings,
57            total_references,
58            memory_saved_bytes: memory_saved,
59        }
60    }
61
62    fn calculate_memory_saved(&self) -> usize {
63        let mut saved = 0;
64        for (rc_str, count) in &self.usage_count {
65            if *count > 1 {
66                // Each additional reference saves the string size
67                saved += rc_str.len() * (*count - 1);
68            }
69        }
70        saved
71    }
72}
73
74#[derive(Debug)]
75pub struct InternerStats {
76    pub unique_strings: usize,
77    pub total_references: usize,
78    pub memory_saved_bytes: usize,
79}
80
81/// Column analysis results for determining interning strategy
82#[derive(Debug)]
83struct ColumnAnalysis {
84    index: usize,
85    name: String,
86    cardinality: usize,
87    sample_size: usize,
88    unique_ratio: f64,
89    is_categorical: bool,
90    avg_string_length: usize,
91}
92
93pub struct AdvancedCsvLoader {
94    sample_size: usize,
95    cardinality_threshold: f64, // Ratio threshold for considering a column categorical
96    interners: HashMap<usize, StringInterner>, // Column index -> interner
97}
98
99impl AdvancedCsvLoader {
100    #[must_use]
101    pub fn new() -> Self {
102        Self {
103            sample_size: 1000,          // Sample first 1000 rows for analysis
104            cardinality_threshold: 0.5, // If < 50% unique values, consider categorical
105            interners: HashMap::new(),
106        }
107    }
108
109    /// Analyze columns to determine which should use string interning
110    fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
111        info!("Analyzing CSV columns for optimization strategies");
112
113        let file = File::open(path)?;
114        let mut reader = csv::Reader::from_reader(file);
115        let headers = reader.headers()?.clone();
116
117        // Initialize tracking for each column
118        let num_columns = headers.len();
119        let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
120        let mut total_lengths: Vec<usize> = vec![0; num_columns];
121        let mut string_counts: Vec<usize> = vec![0; num_columns];
122
123        // Sample rows to analyze cardinality
124        let mut row_count = 0;
125        for result in reader.records() {
126            if row_count >= self.sample_size {
127                break;
128            }
129
130            let record = result?;
131            for (col_idx, field) in record.iter().enumerate() {
132                if col_idx < num_columns {
133                    // Only track non-numeric strings
134                    if !field.is_empty() && field.parse::<f64>().is_err() {
135                        unique_values[col_idx].insert(field.to_string());
136                        total_lengths[col_idx] += field.len();
137                        string_counts[col_idx] += 1;
138                    }
139                }
140            }
141            row_count += 1;
142        }
143
144        // Build analysis for each column
145        let mut analyses = Vec::new();
146        for (idx, header) in headers.iter().enumerate() {
147            let cardinality = unique_values[idx].len();
148            let unique_ratio = if row_count > 0 {
149                cardinality as f64 / row_count as f64
150            } else {
151                1.0
152            };
153
154            let avg_length = if string_counts[idx] > 0 {
155                total_lengths[idx] / string_counts[idx]
156            } else {
157                0
158            };
159
160            // Check if this might be a datetime column based on sample values
161            let is_datetime = Self::is_likely_datetime(&unique_values[idx]);
162
163            // Consider categorical if low cardinality or common patterns, but NOT if it's a datetime
164            let is_categorical = !is_datetime
165                && (unique_ratio < self.cardinality_threshold
166                    || Self::is_likely_categorical(header, cardinality, avg_length));
167
168            analyses.push(ColumnAnalysis {
169                index: idx,
170                name: header.to_string(),
171                cardinality,
172                sample_size: row_count,
173                unique_ratio,
174                is_categorical,
175                avg_string_length: avg_length,
176            });
177
178            if is_categorical {
179                debug!(
180                    "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
181                    header, cardinality, row_count, unique_ratio
182                );
183                self.interners.insert(idx, StringInterner::new());
184            }
185        }
186
187        Ok(analyses)
188    }
189
190    /// Heuristic to identify likely datetime columns based on sample values
191    fn is_likely_datetime(unique_values: &HashSet<String>) -> bool {
192        if unique_values.is_empty() {
193            return false;
194        }
195
196        // Check a sample of values to see if they look like datetimes
197        let sample_size = unique_values.len().min(10);
198        let mut datetime_count = 0;
199
200        for (i, value) in unique_values.iter().enumerate() {
201            if i >= sample_size {
202                break;
203            }
204
205            // Simple heuristic: contains date/time separators and has right length
206            if (value.contains('-') || value.contains('/') || value.contains(':'))
207                && value.len() >= 8
208                && value.len() <= 30
209            {
210                datetime_count += 1;
211            }
212        }
213
214        // If most samples look like datetimes, consider it a datetime column
215        datetime_count >= (sample_size * 7) / 10 // 70% threshold
216    }
217
218    /// Heuristic to identify likely categorical columns by name and characteristics
219    fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
220        let name_lower = name.to_lowercase();
221
222        // Common categorical column patterns
223        let categorical_patterns = [
224            "status",
225            "state",
226            "type",
227            "category",
228            "class",
229            "group",
230            "country",
231            "region",
232            "city",
233            "currency",
234            "side",
235            "book",
236            "desk",
237            "trader",
238            "portfolio",
239            "strategy",
240            "exchange",
241            "venue",
242            "counterparty",
243            "product",
244            "instrument",
245        ];
246
247        for pattern in &categorical_patterns {
248            if name_lower.contains(pattern) {
249                return true;
250            }
251        }
252
253        // Boolean-like columns
254        if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
255            return true;
256        }
257
258        // Low cardinality with short strings often indicates categories
259        cardinality < 100 && avg_length < 50
260    }
261
262    /// Load CSV from a reader with advanced optimizations
263    pub fn load_csv_from_reader<R: Read>(
264        &mut self,
265        reader: R,
266        table_name: &str,
267        source_path: &str,
268    ) -> Result<DataTable> {
269        use crate::data::stream_loader::StreamCsvLoader;
270
271        // Use the stream-based loader with string interning
272        let mut stream_loader = StreamCsvLoader::new();
273        stream_loader.load_csv_from_reader(reader, table_name, "file", source_path)
274    }
275
276    /// Load CSV with advanced optimizations
277    pub fn load_csv_optimized<P: AsRef<Path>>(
278        &mut self,
279        path: P,
280        table_name: &str,
281    ) -> Result<DataTable> {
282        let path = path.as_ref();
283        info!(
284            "Advanced CSV load: Loading {} with optimizations",
285            path.display()
286        );
287
288        // Use stream-based approach for consistency
289        let file = File::open(path)?;
290        self.load_csv_from_reader(file, table_name, &path.display().to_string())
291    }
292
293    /// Original file-based implementation (kept for backward compatibility)
294    pub fn load_csv_optimized_legacy<P: AsRef<Path>>(
295        &mut self,
296        path: P,
297        table_name: &str,
298    ) -> Result<DataTable> {
299        let path = path.as_ref();
300        info!(
301            "Advanced CSV load: Loading {} with optimizations",
302            path.display()
303        );
304
305        // Track memory before loading
306        crate::utils::memory_tracker::track_memory("advanced_csv_start");
307
308        // Analyze columns first
309        let analyses = self.analyze_columns(path)?;
310        let categorical_columns: HashSet<usize> = analyses
311            .iter()
312            .filter(|a| a.is_categorical)
313            .map(|a| a.index)
314            .collect();
315
316        info!(
317            "Column analysis complete: {} of {} columns will use string interning",
318            categorical_columns.len(),
319            analyses.len()
320        );
321
322        // Now load the actual data
323        let file = File::open(path)?;
324        let mut reader = csv::Reader::from_reader(file);
325        let headers = reader.headers()?.clone();
326
327        let mut table = DataTable::new(table_name);
328        for header in &headers {
329            table.add_column(DataColumn::new(header.to_string()));
330        }
331
332        crate::utils::memory_tracker::track_memory("advanced_csv_headers");
333
334        // Pre-allocate with estimated capacity if possible
335        let file_size = std::fs::metadata(path)?.len();
336        let estimated_rows = (file_size / 100) as usize; // Rough estimate
337        table.reserve_rows(estimated_rows.min(1_000_000)); // Cap at 1M for safety
338
339        // Re-open file to read raw lines for null detection
340        let file2 = File::open(path)?;
341        let mut line_reader = BufReader::new(file2);
342        let mut raw_line = String::new();
343        // Skip header line
344        line_reader.read_line(&mut raw_line)?;
345
346        // Read rows with optimizations
347        let mut row_count = 0;
348        for result in reader.records() {
349            let record = result?;
350
351            // Read the corresponding raw line
352            raw_line.clear();
353            line_reader.read_line(&mut raw_line)?;
354
355            let mut values = Vec::with_capacity(headers.len());
356
357            for (idx, field) in record.iter().enumerate() {
358                let value = if field.is_empty() {
359                    // Distinguish between null and empty string
360                    if Self::is_null_field(&raw_line, idx) {
361                        DataValue::Null
362                    } else {
363                        // Empty string - check if should be interned
364                        if categorical_columns.contains(&idx) {
365                            if let Some(interner) = self.interners.get_mut(&idx) {
366                                DataValue::InternedString(interner.intern(""))
367                            } else {
368                                DataValue::String(String::new())
369                            }
370                        } else {
371                            DataValue::String(String::new())
372                        }
373                    }
374                } else if let Ok(b) = field.parse::<bool>() {
375                    DataValue::Boolean(b)
376                } else if let Ok(i) = field.parse::<i64>() {
377                    DataValue::Integer(i)
378                } else if let Ok(f) = field.parse::<f64>() {
379                    DataValue::Float(f)
380                } else {
381                    // Check for DateTime first, before considering interning
382                    // Support both - and / as date separators, and : for time
383                    if (field.contains('-') || field.contains('/') || field.contains(':'))
384                        && field.len() >= 8
385                        && field.len() <= 30
386                    {
387                        DataValue::DateTime(field.to_string())
388                    } else if categorical_columns.contains(&idx) {
389                        // Only intern if it's not a datetime
390                        if let Some(interner) = self.interners.get_mut(&idx) {
391                            // Use interned string
392                            DataValue::InternedString(interner.intern(field))
393                        } else {
394                            DataValue::String(field.to_string())
395                        }
396                    } else {
397                        DataValue::String(field.to_string())
398                    }
399                };
400                values.push(value);
401            }
402
403            table
404                .add_row(DataRow::new(values))
405                .map_err(|e| anyhow::anyhow!(e))?;
406            row_count += 1;
407
408            // Track memory periodically
409            if row_count % 10000 == 0 {
410                crate::utils::memory_tracker::track_memory(&format!(
411                    "advanced_csv_{row_count}rows"
412                ));
413                debug!("Loaded {} rows...", row_count);
414            }
415        }
416
417        // Shrink vectors to fit actual data
418        table.shrink_to_fit();
419
420        // Infer column types
421        table.infer_column_types();
422
423        crate::utils::memory_tracker::track_memory("advanced_csv_complete");
424
425        // Report statistics
426        let mut total_saved = 0;
427        for (col_idx, interner) in &self.interners {
428            let stats = interner.stats();
429            if stats.memory_saved_bytes > 0 {
430                debug!(
431                    "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
432                    col_idx,
433                    headers.get(*col_idx).unwrap_or("?"),
434                    stats.unique_strings,
435                    stats.total_references,
436                    stats.memory_saved_bytes / 1024
437                );
438            }
439            total_saved += stats.memory_saved_bytes;
440        }
441
442        info!(
443            "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
444            table.row_count(),
445            table.column_count(),
446            table.estimate_memory_size() / 1024 / 1024,
447            total_saved / 1024
448        );
449
450        Ok(table)
451    }
452
453    /// Helper to detect if a field in the raw CSV line is a null (unquoted empty)
454    fn is_null_field(raw_line: &str, field_index: usize) -> bool {
455        let mut comma_count = 0;
456        let mut in_quotes = false;
457        let mut field_start = 0;
458        let mut prev_char = ' ';
459
460        for (i, ch) in raw_line.chars().enumerate() {
461            if ch == '"' && prev_char != '\\' {
462                in_quotes = !in_quotes;
463            }
464
465            if ch == ',' && !in_quotes {
466                if comma_count == field_index {
467                    let field_end = i;
468                    let field_content = &raw_line[field_start..field_end].trim();
469                    // If empty, check if it was quoted (quoted empty = empty string, unquoted empty = NULL)
470                    if field_content.is_empty() {
471                        return true; // Unquoted empty field -> NULL
472                    }
473                    // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
474                    if field_content.starts_with('"')
475                        && field_content.ends_with('"')
476                        && field_content.len() == 2
477                    {
478                        return false; // Quoted empty field -> empty string
479                    }
480                    return false; // Non-empty field -> not NULL
481                }
482                comma_count += 1;
483                field_start = i + 1;
484            }
485            prev_char = ch;
486        }
487
488        // Check last field
489        if comma_count == field_index {
490            let field_content = raw_line[field_start..]
491                .trim()
492                .trim_end_matches('\n')
493                .trim_end_matches('\r');
494            // If empty, check if it was quoted
495            if field_content.is_empty() {
496                return true; // Unquoted empty field -> NULL
497            }
498            // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
499            if field_content.starts_with('"')
500                && field_content.ends_with('"')
501                && field_content.len() == 2
502            {
503                return false; // Quoted empty field -> empty string
504            }
505            return false; // Non-empty field -> not NULL
506        }
507
508        false // Field not found -> not NULL (shouldn't happen)
509    }
510
511    /// Get interner statistics for debugging
512    #[must_use]
513    pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
514        self.interners
515            .iter()
516            .map(|(idx, interner)| (*idx, interner.stats()))
517            .collect()
518    }
519}
520
521impl Default for AdvancedCsvLoader {
522    fn default() -> Self {
523        Self::new()
524    }
525}