sql_cli/data/
advanced_csv_loader.rs

1/// Advanced CSV loader with string interning and memory optimization
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::path::Path;
8use std::sync::Arc;
9use tracing::{debug, info};
10
11/// String interner for efficient memory usage with repeated strings
12#[derive(Debug, Clone)]
13pub struct StringInterner {
14    strings: HashMap<String, Arc<String>>,
15    usage_count: HashMap<Arc<String>, usize>,
16}
17
18impl Default for StringInterner {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl StringInterner {
25    #[must_use]
26    pub fn new() -> Self {
27        Self {
28            strings: HashMap::new(),
29            usage_count: HashMap::new(),
30        }
31    }
32
33    /// Intern a string and return a reference-counted pointer
34    pub fn intern(&mut self, s: &str) -> Arc<String> {
35        if let Some(rc_str) = self.strings.get(s) {
36            let rc = rc_str.clone();
37            *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
38            rc
39        } else {
40            let rc_str = Arc::new(s.to_string());
41            self.strings.insert(s.to_string(), rc_str.clone());
42            self.usage_count.insert(rc_str.clone(), 1);
43            rc_str
44        }
45    }
46
47    /// Get statistics about interned strings
48    #[must_use]
49    pub fn stats(&self) -> InternerStats {
50        let total_strings = self.strings.len();
51        let total_references: usize = self.usage_count.values().sum();
52        let memory_saved = self.calculate_memory_saved();
53
54        InternerStats {
55            unique_strings: total_strings,
56            total_references,
57            memory_saved_bytes: memory_saved,
58        }
59    }
60
61    fn calculate_memory_saved(&self) -> usize {
62        let mut saved = 0;
63        for (rc_str, count) in &self.usage_count {
64            if *count > 1 {
65                // Each additional reference saves the string size
66                saved += rc_str.len() * (*count - 1);
67            }
68        }
69        saved
70    }
71}
72
73#[derive(Debug)]
74pub struct InternerStats {
75    pub unique_strings: usize,
76    pub total_references: usize,
77    pub memory_saved_bytes: usize,
78}
79
80/// Column analysis results for determining interning strategy
81#[derive(Debug)]
82struct ColumnAnalysis {
83    index: usize,
84    name: String,
85    cardinality: usize,
86    sample_size: usize,
87    unique_ratio: f64,
88    is_categorical: bool,
89    avg_string_length: usize,
90}
91
92pub struct AdvancedCsvLoader {
93    sample_size: usize,
94    cardinality_threshold: f64, // Ratio threshold for considering a column categorical
95    interners: HashMap<usize, StringInterner>, // Column index -> interner
96}
97
98impl AdvancedCsvLoader {
99    #[must_use]
100    pub fn new() -> Self {
101        Self {
102            sample_size: 1000,          // Sample first 1000 rows for analysis
103            cardinality_threshold: 0.5, // If < 50% unique values, consider categorical
104            interners: HashMap::new(),
105        }
106    }
107
108    /// Analyze columns to determine which should use string interning
109    fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
110        info!("Analyzing CSV columns for optimization strategies");
111
112        let file = File::open(path)?;
113        let mut reader = csv::Reader::from_reader(file);
114        let headers = reader.headers()?.clone();
115
116        // Initialize tracking for each column
117        let num_columns = headers.len();
118        let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
119        let mut total_lengths: Vec<usize> = vec![0; num_columns];
120        let mut string_counts: Vec<usize> = vec![0; num_columns];
121
122        // Sample rows to analyze cardinality
123        let mut row_count = 0;
124        for result in reader.records() {
125            if row_count >= self.sample_size {
126                break;
127            }
128
129            let record = result?;
130            for (col_idx, field) in record.iter().enumerate() {
131                if col_idx < num_columns {
132                    // Only track non-numeric strings
133                    if !field.is_empty() && field.parse::<f64>().is_err() {
134                        unique_values[col_idx].insert(field.to_string());
135                        total_lengths[col_idx] += field.len();
136                        string_counts[col_idx] += 1;
137                    }
138                }
139            }
140            row_count += 1;
141        }
142
143        // Build analysis for each column
144        let mut analyses = Vec::new();
145        for (idx, header) in headers.iter().enumerate() {
146            let cardinality = unique_values[idx].len();
147            let unique_ratio = if row_count > 0 {
148                cardinality as f64 / row_count as f64
149            } else {
150                1.0
151            };
152
153            let avg_length = if string_counts[idx] > 0 {
154                total_lengths[idx] / string_counts[idx]
155            } else {
156                0
157            };
158
159            // Consider categorical if low cardinality or common patterns
160            let is_categorical = unique_ratio < self.cardinality_threshold
161                || Self::is_likely_categorical(header, cardinality, avg_length);
162
163            analyses.push(ColumnAnalysis {
164                index: idx,
165                name: header.to_string(),
166                cardinality,
167                sample_size: row_count,
168                unique_ratio,
169                is_categorical,
170                avg_string_length: avg_length,
171            });
172
173            if is_categorical {
174                debug!(
175                    "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
176                    header, cardinality, row_count, unique_ratio
177                );
178                self.interners.insert(idx, StringInterner::new());
179            }
180        }
181
182        Ok(analyses)
183    }
184
185    /// Heuristic to identify likely categorical columns by name and characteristics
186    fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
187        let name_lower = name.to_lowercase();
188
189        // Common categorical column patterns
190        let categorical_patterns = [
191            "status",
192            "state",
193            "type",
194            "category",
195            "class",
196            "group",
197            "country",
198            "region",
199            "city",
200            "currency",
201            "side",
202            "book",
203            "desk",
204            "trader",
205            "portfolio",
206            "strategy",
207            "exchange",
208            "venue",
209            "counterparty",
210            "product",
211            "instrument",
212        ];
213
214        for pattern in &categorical_patterns {
215            if name_lower.contains(pattern) {
216                return true;
217            }
218        }
219
220        // Boolean-like columns
221        if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
222            return true;
223        }
224
225        // Low cardinality with short strings often indicates categories
226        cardinality < 100 && avg_length < 50
227    }
228
229    /// Load CSV with advanced optimizations
230    pub fn load_csv_optimized<P: AsRef<Path>>(
231        &mut self,
232        path: P,
233        table_name: &str,
234    ) -> Result<DataTable> {
235        let path = path.as_ref();
236        info!(
237            "Advanced CSV load: Loading {} with optimizations",
238            path.display()
239        );
240
241        // Track memory before loading
242        crate::utils::memory_tracker::track_memory("advanced_csv_start");
243
244        // Analyze columns first
245        let analyses = self.analyze_columns(path)?;
246        let categorical_columns: HashSet<usize> = analyses
247            .iter()
248            .filter(|a| a.is_categorical)
249            .map(|a| a.index)
250            .collect();
251
252        info!(
253            "Column analysis complete: {} of {} columns will use string interning",
254            categorical_columns.len(),
255            analyses.len()
256        );
257
258        // Now load the actual data
259        let file = File::open(path)?;
260        let mut reader = csv::Reader::from_reader(file);
261        let headers = reader.headers()?.clone();
262
263        let mut table = DataTable::new(table_name);
264        for header in &headers {
265            table.add_column(DataColumn::new(header.to_string()));
266        }
267
268        crate::utils::memory_tracker::track_memory("advanced_csv_headers");
269
270        // Pre-allocate with estimated capacity if possible
271        let file_size = std::fs::metadata(path)?.len();
272        let estimated_rows = (file_size / 100) as usize; // Rough estimate
273        table.reserve_rows(estimated_rows.min(1_000_000)); // Cap at 1M for safety
274
275        // Read rows with optimizations
276        let mut row_count = 0;
277        for result in reader.records() {
278            let record = result?;
279            let mut values = Vec::with_capacity(headers.len());
280
281            for (idx, field) in record.iter().enumerate() {
282                let value = if field.is_empty() {
283                    DataValue::Null
284                } else if let Ok(b) = field.parse::<bool>() {
285                    DataValue::Boolean(b)
286                } else if let Ok(i) = field.parse::<i64>() {
287                    DataValue::Integer(i)
288                } else if let Ok(f) = field.parse::<f64>() {
289                    DataValue::Float(f)
290                } else {
291                    // Check if this column should use interning
292                    if categorical_columns.contains(&idx) {
293                        if let Some(interner) = self.interners.get_mut(&idx) {
294                            // Use interned string
295                            DataValue::InternedString(interner.intern(field))
296                        } else {
297                            DataValue::String(field.to_string())
298                        }
299                    } else if field.contains('-') && field.len() >= 8 && field.len() <= 30 {
300                        DataValue::DateTime(field.to_string())
301                    } else {
302                        DataValue::String(field.to_string())
303                    }
304                };
305                values.push(value);
306            }
307
308            table
309                .add_row(DataRow::new(values))
310                .map_err(|e| anyhow::anyhow!(e))?;
311            row_count += 1;
312
313            // Track memory periodically
314            if row_count % 10000 == 0 {
315                crate::utils::memory_tracker::track_memory(&format!(
316                    "advanced_csv_{row_count}rows"
317                ));
318                debug!("Loaded {} rows...", row_count);
319            }
320        }
321
322        // Shrink vectors to fit actual data
323        table.shrink_to_fit();
324
325        // Infer column types
326        table.infer_column_types();
327
328        crate::utils::memory_tracker::track_memory("advanced_csv_complete");
329
330        // Report statistics
331        let mut total_saved = 0;
332        for (col_idx, interner) in &self.interners {
333            let stats = interner.stats();
334            if stats.memory_saved_bytes > 0 {
335                debug!(
336                    "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
337                    col_idx,
338                    headers.get(*col_idx).unwrap_or("?"),
339                    stats.unique_strings,
340                    stats.total_references,
341                    stats.memory_saved_bytes / 1024
342                );
343            }
344            total_saved += stats.memory_saved_bytes;
345        }
346
347        info!(
348            "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
349            table.row_count(),
350            table.column_count(),
351            table.estimate_memory_size() / 1024 / 1024,
352            total_saved / 1024
353        );
354
355        Ok(table)
356    }
357
358    /// Get interner statistics for debugging
359    #[must_use]
360    pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
361        self.interners
362            .iter()
363            .map(|(idx, interner)| (*idx, interner.stats()))
364            .collect()
365    }
366}
367
368impl Default for AdvancedCsvLoader {
369    fn default() -> Self {
370        Self::new()
371    }
372}