Skip to main content

sql_cli/data/
advanced_csv_loader.rs

1/// Advanced CSV loader with string interning and memory optimization
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::io::{BufRead, BufReader, Read};
8use std::path::Path;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12/// String interner for efficient memory usage with repeated strings
13#[derive(Debug, Clone)]
14pub struct StringInterner {
15    strings: HashMap<String, Arc<String>>,
16    usage_count: HashMap<Arc<String>, usize>,
17}
18
19impl Default for StringInterner {
20    fn default() -> Self {
21        Self::new()
22    }
23}
24
25impl StringInterner {
26    #[must_use]
27    pub fn new() -> Self {
28        Self {
29            strings: HashMap::new(),
30            usage_count: HashMap::new(),
31        }
32    }
33
34    /// Intern a string and return a reference-counted pointer
35    pub fn intern(&mut self, s: &str) -> Arc<String> {
36        if let Some(rc_str) = self.strings.get(s) {
37            let rc = rc_str.clone();
38            *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
39            rc
40        } else {
41            let rc_str = Arc::new(s.to_string());
42            self.strings.insert(s.to_string(), rc_str.clone());
43            self.usage_count.insert(rc_str.clone(), 1);
44            rc_str
45        }
46    }
47
48    /// Get statistics about interned strings
49    #[must_use]
50    pub fn stats(&self) -> InternerStats {
51        let total_strings = self.strings.len();
52        let total_references: usize = self.usage_count.values().sum();
53        let memory_saved = self.calculate_memory_saved();
54
55        InternerStats {
56            unique_strings: total_strings,
57            total_references,
58            memory_saved_bytes: memory_saved,
59        }
60    }
61
62    fn calculate_memory_saved(&self) -> usize {
63        let mut saved = 0;
64        for (rc_str, count) in &self.usage_count {
65            if *count > 1 {
66                // Each additional reference saves the string size
67                saved += rc_str.len() * (*count - 1);
68            }
69        }
70        saved
71    }
72}
73
74#[derive(Debug)]
75pub struct InternerStats {
76    pub unique_strings: usize,
77    pub total_references: usize,
78    pub memory_saved_bytes: usize,
79}
80
81/// Column analysis results for determining interning strategy
82#[derive(Debug)]
83struct ColumnAnalysis {
84    index: usize,
85    _name: String,
86    _cardinality: usize,
87    _sample_size: usize,
88    _unique_ratio: f64,
89    is_categorical: bool,
90    _avg_string_length: usize,
91}
92
93pub struct AdvancedCsvLoader {
94    sample_size: usize,
95    cardinality_threshold: f64, // Ratio threshold for considering a column categorical
96    interners: HashMap<usize, StringInterner>, // Column index -> interner
97}
98
99impl AdvancedCsvLoader {
100    #[must_use]
101    pub fn new() -> Self {
102        Self {
103            sample_size: 1000,          // Sample first 1000 rows for analysis
104            cardinality_threshold: 0.5, // If < 50% unique values, consider categorical
105            interners: HashMap::new(),
106        }
107    }
108
109    /// Analyze columns to determine which should use string interning
110    fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
111        info!("Analyzing CSV columns for optimization strategies");
112
113        let file = File::open(path)?;
114        let mut reader = csv::Reader::from_reader(file);
115        let headers = reader.headers()?.clone();
116
117        // Initialize tracking for each column
118        let num_columns = headers.len();
119        let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
120        let mut total_lengths: Vec<usize> = vec![0; num_columns];
121        let mut string_counts: Vec<usize> = vec![0; num_columns];
122
123        // Sample rows to analyze cardinality
124        let mut row_count = 0;
125        for result in reader.records() {
126            if row_count >= self.sample_size {
127                break;
128            }
129
130            let record = result?;
131            for (col_idx, field) in record.iter().enumerate() {
132                if col_idx < num_columns {
133                    // Only track non-numeric strings
134                    if !field.is_empty() && field.parse::<f64>().is_err() {
135                        unique_values[col_idx].insert(field.to_string());
136                        total_lengths[col_idx] += field.len();
137                        string_counts[col_idx] += 1;
138                    }
139                }
140            }
141            row_count += 1;
142        }
143
144        // Build analysis for each column
145        let mut analyses = Vec::new();
146        for (idx, header) in headers.iter().enumerate() {
147            let cardinality = unique_values[idx].len();
148            let unique_ratio = if row_count > 0 {
149                cardinality as f64 / row_count as f64
150            } else {
151                1.0
152            };
153
154            let avg_length = if string_counts[idx] > 0 {
155                total_lengths[idx] / string_counts[idx]
156            } else {
157                0
158            };
159
160            // Check if this might be a datetime column based on sample values
161            let is_datetime = Self::is_likely_datetime(&unique_values[idx]);
162
163            // Consider categorical if low cardinality or common patterns, but NOT if it's a datetime
164            let is_categorical = !is_datetime
165                && (unique_ratio < self.cardinality_threshold
166                    || Self::is_likely_categorical(header, cardinality, avg_length));
167
168            analyses.push(ColumnAnalysis {
169                index: idx,
170                _name: header.to_string(),
171                _cardinality: cardinality,
172                _sample_size: row_count,
173                _unique_ratio: unique_ratio,
174                is_categorical,
175                _avg_string_length: avg_length,
176            });
177
178            if is_categorical {
179                debug!(
180                    "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
181                    header, cardinality, row_count, unique_ratio
182                );
183                self.interners.insert(idx, StringInterner::new());
184            }
185        }
186
187        Ok(analyses)
188    }
189
190    /// Heuristic to identify likely datetime columns based on sample values
191    fn is_likely_datetime(unique_values: &HashSet<String>) -> bool {
192        if unique_values.is_empty() {
193            return false;
194        }
195
196        // Check a sample of values to see if they look like datetimes
197        let sample_size = unique_values.len().min(10);
198        let mut datetime_count = 0;
199
200        for (i, value) in unique_values.iter().enumerate() {
201            if i >= sample_size {
202                break;
203            }
204
205            // Simple heuristic: contains date/time separators and has right length
206            if (value.contains('-') || value.contains('/') || value.contains(':'))
207                && value.len() >= 8
208                && value.len() <= 30
209            {
210                datetime_count += 1;
211            }
212        }
213
214        // If most samples look like datetimes, consider it a datetime column
215        datetime_count >= (sample_size * 7) / 10 // 70% threshold
216    }
217
218    /// Heuristic to identify likely categorical columns by name and characteristics
219    fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
220        let name_lower = name.to_lowercase();
221
222        // Common categorical column patterns
223        let categorical_patterns = [
224            "status",
225            "state",
226            "type",
227            "category",
228            "class",
229            "group",
230            "country",
231            "region",
232            "city",
233            "currency",
234            "side",
235            "book",
236            "desk",
237            "trader",
238            "portfolio",
239            "strategy",
240            "exchange",
241            "venue",
242            "counterparty",
243            "product",
244            "instrument",
245        ];
246
247        for pattern in &categorical_patterns {
248            if name_lower.contains(pattern) {
249                return true;
250            }
251        }
252
253        // Boolean-like columns
254        if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
255            return true;
256        }
257
258        // Low cardinality with short strings often indicates categories
259        cardinality < 100 && avg_length < 50
260    }
261
262    /// Load CSV from a reader with advanced optimizations (default comma delimiter).
263    pub fn load_csv_from_reader<R: Read>(
264        &mut self,
265        reader: R,
266        table_name: &str,
267        source_path: &str,
268    ) -> Result<DataTable> {
269        use crate::data::stream_loader::CsvReadOptions;
270        self.load_csv_from_reader_with_opts(
271            reader,
272            table_name,
273            source_path,
274            &CsvReadOptions::default(),
275        )
276    }
277
278    /// Load CSV from a reader honouring caller-supplied options (delimiter, headers).
279    pub fn load_csv_from_reader_with_opts<R: Read>(
280        &mut self,
281        reader: R,
282        table_name: &str,
283        source_path: &str,
284        opts: &crate::data::stream_loader::CsvReadOptions,
285    ) -> Result<DataTable> {
286        use crate::data::stream_loader::StreamCsvLoader;
287
288        let mut stream_loader = StreamCsvLoader::new();
289        stream_loader.load_csv_from_reader_with_opts(reader, table_name, "file", source_path, opts)
290    }
291
292    /// Load CSV with advanced optimizations. Delimiter is auto-detected from
293    /// the file extension (`.tsv` → tab, `.psv` → pipe, else comma). To override,
294    /// use [`load_csv_optimized_with_opts`].
295    pub fn load_csv_optimized<P: AsRef<Path>>(
296        &mut self,
297        path: P,
298        table_name: &str,
299    ) -> Result<DataTable> {
300        use crate::data::stream_loader::{detect_delimiter_from_path, CsvReadOptions};
301        let path_str = path.as_ref().display().to_string();
302        let opts = CsvReadOptions {
303            delimiter: detect_delimiter_from_path(&path_str),
304            has_headers: true,
305        };
306        self.load_csv_optimized_with_opts(path, table_name, &opts)
307    }
308
309    /// Load CSV honouring caller-supplied options (delimiter, headers).
310    pub fn load_csv_optimized_with_opts<P: AsRef<Path>>(
311        &mut self,
312        path: P,
313        table_name: &str,
314        opts: &crate::data::stream_loader::CsvReadOptions,
315    ) -> Result<DataTable> {
316        let path = path.as_ref();
317        info!(
318            "Advanced CSV load: Loading {} with optimizations",
319            path.display()
320        );
321
322        let file = File::open(path)?;
323        self.load_csv_from_reader_with_opts(file, table_name, &path.display().to_string(), opts)
324    }
325
326    /// Original file-based implementation (kept for backward compatibility)
327    pub fn load_csv_optimized_legacy<P: AsRef<Path>>(
328        &mut self,
329        path: P,
330        table_name: &str,
331    ) -> Result<DataTable> {
332        let path = path.as_ref();
333        info!(
334            "Advanced CSV load: Loading {} with optimizations",
335            path.display()
336        );
337
338        // Track memory before loading
339        crate::utils::memory_tracker::track_memory("advanced_csv_start");
340
341        // Analyze columns first
342        let analyses = self.analyze_columns(path)?;
343        let categorical_columns: HashSet<usize> = analyses
344            .iter()
345            .filter(|a| a.is_categorical)
346            .map(|a| a.index)
347            .collect();
348
349        info!(
350            "Column analysis complete: {} of {} columns will use string interning",
351            categorical_columns.len(),
352            analyses.len()
353        );
354
355        // Now load the actual data
356        let file = File::open(path)?;
357        let mut reader = csv::Reader::from_reader(file);
358        let headers = reader.headers()?.clone();
359
360        let mut table = DataTable::new(table_name);
361        for header in &headers {
362            table.add_column(DataColumn::new(header.to_string()));
363        }
364
365        crate::utils::memory_tracker::track_memory("advanced_csv_headers");
366
367        // Pre-allocate with estimated capacity if possible
368        let file_size = std::fs::metadata(path)?.len();
369        let estimated_rows = (file_size / 100) as usize; // Rough estimate
370        table.reserve_rows(estimated_rows.min(1_000_000)); // Cap at 1M for safety
371
372        // Re-open file to read raw lines for null detection
373        let file2 = File::open(path)?;
374        let mut line_reader = BufReader::new(file2);
375        let mut raw_line = String::new();
376        // Skip header line
377        line_reader.read_line(&mut raw_line)?;
378
379        // Read rows with optimizations
380        let mut row_count = 0;
381        for result in reader.records() {
382            let record = result?;
383
384            // Read the corresponding raw line
385            raw_line.clear();
386            line_reader.read_line(&mut raw_line)?;
387
388            let mut values = Vec::with_capacity(headers.len());
389
390            for (idx, field) in record.iter().enumerate() {
391                let value = if field.is_empty() {
392                    // Distinguish between null and empty string
393                    if Self::is_null_field(&raw_line, idx) {
394                        DataValue::Null
395                    } else {
396                        // Empty string - check if should be interned
397                        if categorical_columns.contains(&idx) {
398                            if let Some(interner) = self.interners.get_mut(&idx) {
399                                DataValue::InternedString(interner.intern(""))
400                            } else {
401                                DataValue::String(String::new())
402                            }
403                        } else {
404                            DataValue::String(String::new())
405                        }
406                    }
407                } else if let Ok(b) = field.parse::<bool>() {
408                    DataValue::Boolean(b)
409                } else if let Ok(i) = field.parse::<i64>() {
410                    DataValue::Integer(i)
411                } else if let Ok(f) = field.parse::<f64>() {
412                    DataValue::Float(f)
413                } else {
414                    // Check for DateTime first, before considering interning
415                    // Support both - and / as date separators, and : for time
416                    if (field.contains('-') || field.contains('/') || field.contains(':'))
417                        && field.len() >= 8
418                        && field.len() <= 30
419                    {
420                        DataValue::DateTime(field.to_string())
421                    } else if categorical_columns.contains(&idx) {
422                        // Only intern if it's not a datetime
423                        if let Some(interner) = self.interners.get_mut(&idx) {
424                            // Use interned string
425                            DataValue::InternedString(interner.intern(field))
426                        } else {
427                            DataValue::String(field.to_string())
428                        }
429                    } else {
430                        DataValue::String(field.to_string())
431                    }
432                };
433                values.push(value);
434            }
435
436            table
437                .add_row(DataRow::new(values))
438                .map_err(|e| anyhow::anyhow!(e))?;
439            row_count += 1;
440
441            // Track memory periodically
442            if row_count % 10000 == 0 {
443                crate::utils::memory_tracker::track_memory(&format!(
444                    "advanced_csv_{row_count}rows"
445                ));
446                debug!("Loaded {} rows...", row_count);
447            }
448        }
449
450        // Shrink vectors to fit actual data
451        table.shrink_to_fit();
452
453        // Infer column types
454        table.infer_column_types();
455
456        crate::utils::memory_tracker::track_memory("advanced_csv_complete");
457
458        // Report statistics
459        let mut total_saved = 0;
460        for (col_idx, interner) in &self.interners {
461            let stats = interner.stats();
462            if stats.memory_saved_bytes > 0 {
463                debug!(
464                    "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
465                    col_idx,
466                    headers.get(*col_idx).unwrap_or("?"),
467                    stats.unique_strings,
468                    stats.total_references,
469                    stats.memory_saved_bytes / 1024
470                );
471            }
472            total_saved += stats.memory_saved_bytes;
473        }
474
475        info!(
476            "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
477            table.row_count(),
478            table.column_count(),
479            table.estimate_memory_size() / 1024 / 1024,
480            total_saved / 1024
481        );
482
483        Ok(table)
484    }
485
486    /// Helper to detect if a field in the raw CSV line is a null (unquoted empty)
487    fn is_null_field(raw_line: &str, field_index: usize) -> bool {
488        let mut comma_count = 0;
489        let mut in_quotes = false;
490        let mut field_start = 0;
491        let mut prev_char = ' ';
492
493        for (i, ch) in raw_line.chars().enumerate() {
494            if ch == '"' && prev_char != '\\' {
495                in_quotes = !in_quotes;
496            }
497
498            if ch == ',' && !in_quotes {
499                if comma_count == field_index {
500                    let field_end = i;
501                    let field_content = &raw_line[field_start..field_end].trim();
502                    // If empty, check if it was quoted (quoted empty = empty string, unquoted empty = NULL)
503                    if field_content.is_empty() {
504                        return true; // Unquoted empty field -> NULL
505                    }
506                    // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
507                    if field_content.starts_with('"')
508                        && field_content.ends_with('"')
509                        && field_content.len() == 2
510                    {
511                        return false; // Quoted empty field -> empty string
512                    }
513                    return false; // Non-empty field -> not NULL
514                }
515                comma_count += 1;
516                field_start = i + 1;
517            }
518            prev_char = ch;
519        }
520
521        // Check last field
522        if comma_count == field_index {
523            let field_content = raw_line[field_start..]
524                .trim()
525                .trim_end_matches('\n')
526                .trim_end_matches('\r');
527            // If empty, check if it was quoted
528            if field_content.is_empty() {
529                return true; // Unquoted empty field -> NULL
530            }
531            // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
532            if field_content.starts_with('"')
533                && field_content.ends_with('"')
534                && field_content.len() == 2
535            {
536                return false; // Quoted empty field -> empty string
537            }
538            return false; // Non-empty field -> not NULL
539        }
540
541        false // Field not found -> not NULL (shouldn't happen)
542    }
543
544    /// Get interner statistics for debugging
545    #[must_use]
546    pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
547        self.interners
548            .iter()
549            .map(|(idx, interner)| (*idx, interner.stats()))
550            .collect()
551    }
552}
553
554impl Default for AdvancedCsvLoader {
555    fn default() -> Self {
556        Self::new()
557    }
558}