sql_cli/data/
advanced_csv_loader.rs

1/// Advanced CSV loader with string interning and memory optimization
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::collections::{HashMap, HashSet};
6use std::fs::File;
7use std::path::Path;
8use std::sync::Arc;
9use tracing::{debug, info};
10
11/// String interner for efficient memory usage with repeated strings
12#[derive(Debug, Clone)]
13pub struct StringInterner {
14    strings: HashMap<String, Arc<String>>,
15    usage_count: HashMap<Arc<String>, usize>,
16}
17
18impl StringInterner {
19    pub fn new() -> Self {
20        Self {
21            strings: HashMap::new(),
22            usage_count: HashMap::new(),
23        }
24    }
25
26    /// Intern a string and return a reference-counted pointer
27    pub fn intern(&mut self, s: &str) -> Arc<String> {
28        if let Some(rc_str) = self.strings.get(s) {
29            let rc = rc_str.clone();
30            *self.usage_count.entry(rc.clone()).or_insert(0) += 1;
31            rc
32        } else {
33            let rc_str = Arc::new(s.to_string());
34            self.strings.insert(s.to_string(), rc_str.clone());
35            self.usage_count.insert(rc_str.clone(), 1);
36            rc_str
37        }
38    }
39
40    /// Get statistics about interned strings
41    pub fn stats(&self) -> InternerStats {
42        let total_strings = self.strings.len();
43        let total_references: usize = self.usage_count.values().sum();
44        let memory_saved = self.calculate_memory_saved();
45
46        InternerStats {
47            unique_strings: total_strings,
48            total_references,
49            memory_saved_bytes: memory_saved,
50        }
51    }
52
53    fn calculate_memory_saved(&self) -> usize {
54        let mut saved = 0;
55        for (rc_str, count) in &self.usage_count {
56            if *count > 1 {
57                // Each additional reference saves the string size
58                saved += rc_str.len() * (*count - 1);
59            }
60        }
61        saved
62    }
63}
64
65#[derive(Debug)]
66pub struct InternerStats {
67    pub unique_strings: usize,
68    pub total_references: usize,
69    pub memory_saved_bytes: usize,
70}
71
72/// Column analysis results for determining interning strategy
73#[derive(Debug)]
74struct ColumnAnalysis {
75    index: usize,
76    name: String,
77    cardinality: usize,
78    sample_size: usize,
79    unique_ratio: f64,
80    is_categorical: bool,
81    avg_string_length: usize,
82}
83
84pub struct AdvancedCsvLoader {
85    sample_size: usize,
86    cardinality_threshold: f64, // Ratio threshold for considering a column categorical
87    interners: HashMap<usize, StringInterner>, // Column index -> interner
88}
89
90impl AdvancedCsvLoader {
91    pub fn new() -> Self {
92        Self {
93            sample_size: 1000,          // Sample first 1000 rows for analysis
94            cardinality_threshold: 0.5, // If < 50% unique values, consider categorical
95            interners: HashMap::new(),
96        }
97    }
98
99    /// Analyze columns to determine which should use string interning
100    fn analyze_columns(&mut self, path: &Path) -> Result<Vec<ColumnAnalysis>> {
101        info!("Analyzing CSV columns for optimization strategies");
102
103        let file = File::open(path)?;
104        let mut reader = csv::Reader::from_reader(file);
105        let headers = reader.headers()?.clone();
106
107        // Initialize tracking for each column
108        let num_columns = headers.len();
109        let mut unique_values: Vec<HashSet<String>> = vec![HashSet::new(); num_columns];
110        let mut total_lengths: Vec<usize> = vec![0; num_columns];
111        let mut string_counts: Vec<usize> = vec![0; num_columns];
112
113        // Sample rows to analyze cardinality
114        let mut row_count = 0;
115        for result in reader.records() {
116            if row_count >= self.sample_size {
117                break;
118            }
119
120            let record = result?;
121            for (col_idx, field) in record.iter().enumerate() {
122                if col_idx < num_columns {
123                    // Only track non-numeric strings
124                    if !field.is_empty() && field.parse::<f64>().is_err() {
125                        unique_values[col_idx].insert(field.to_string());
126                        total_lengths[col_idx] += field.len();
127                        string_counts[col_idx] += 1;
128                    }
129                }
130            }
131            row_count += 1;
132        }
133
134        // Build analysis for each column
135        let mut analyses = Vec::new();
136        for (idx, header) in headers.iter().enumerate() {
137            let cardinality = unique_values[idx].len();
138            let unique_ratio = if row_count > 0 {
139                cardinality as f64 / row_count as f64
140            } else {
141                1.0
142            };
143
144            let avg_length = if string_counts[idx] > 0 {
145                total_lengths[idx] / string_counts[idx]
146            } else {
147                0
148            };
149
150            // Consider categorical if low cardinality or common patterns
151            let is_categorical = unique_ratio < self.cardinality_threshold
152                || Self::is_likely_categorical(&header, cardinality, avg_length);
153
154            analyses.push(ColumnAnalysis {
155                index: idx,
156                name: header.to_string(),
157                cardinality,
158                sample_size: row_count,
159                unique_ratio,
160                is_categorical,
161                avg_string_length: avg_length,
162            });
163
164            if is_categorical {
165                debug!(
166                    "Column '{}' marked for interning: {} unique values in {} samples (ratio: {:.2})",
167                    header, cardinality, row_count, unique_ratio
168                );
169                self.interners.insert(idx, StringInterner::new());
170            }
171        }
172
173        Ok(analyses)
174    }
175
176    /// Heuristic to identify likely categorical columns by name and characteristics
177    fn is_likely_categorical(name: &str, cardinality: usize, avg_length: usize) -> bool {
178        let name_lower = name.to_lowercase();
179
180        // Common categorical column patterns
181        let categorical_patterns = [
182            "status",
183            "state",
184            "type",
185            "category",
186            "class",
187            "group",
188            "country",
189            "region",
190            "city",
191            "currency",
192            "side",
193            "book",
194            "desk",
195            "trader",
196            "portfolio",
197            "strategy",
198            "exchange",
199            "venue",
200            "counterparty",
201            "product",
202            "instrument",
203        ];
204
205        for pattern in &categorical_patterns {
206            if name_lower.contains(pattern) {
207                return true;
208            }
209        }
210
211        // Boolean-like columns
212        if name_lower.starts_with("is_") || name_lower.starts_with("has_") {
213            return true;
214        }
215
216        // Low cardinality with short strings often indicates categories
217        cardinality < 100 && avg_length < 50
218    }
219
220    /// Load CSV with advanced optimizations
221    pub fn load_csv_optimized<P: AsRef<Path>>(
222        &mut self,
223        path: P,
224        table_name: &str,
225    ) -> Result<DataTable> {
226        let path = path.as_ref();
227        info!(
228            "Advanced CSV load: Loading {} with optimizations",
229            path.display()
230        );
231
232        // Track memory before loading
233        crate::utils::memory_tracker::track_memory("advanced_csv_start");
234
235        // Analyze columns first
236        let analyses = self.analyze_columns(path)?;
237        let categorical_columns: HashSet<usize> = analyses
238            .iter()
239            .filter(|a| a.is_categorical)
240            .map(|a| a.index)
241            .collect();
242
243        info!(
244            "Column analysis complete: {} of {} columns will use string interning",
245            categorical_columns.len(),
246            analyses.len()
247        );
248
249        // Now load the actual data
250        let file = File::open(path)?;
251        let mut reader = csv::Reader::from_reader(file);
252        let headers = reader.headers()?.clone();
253
254        let mut table = DataTable::new(table_name);
255        for header in headers.iter() {
256            table.add_column(DataColumn::new(header.to_string()));
257        }
258
259        crate::utils::memory_tracker::track_memory("advanced_csv_headers");
260
261        // Pre-allocate with estimated capacity if possible
262        let file_size = std::fs::metadata(path)?.len();
263        let estimated_rows = (file_size / 100) as usize; // Rough estimate
264        table.reserve_rows(estimated_rows.min(1_000_000)); // Cap at 1M for safety
265
266        // Read rows with optimizations
267        let mut row_count = 0;
268        for result in reader.records() {
269            let record = result?;
270            let mut values = Vec::with_capacity(headers.len());
271
272            for (idx, field) in record.iter().enumerate() {
273                let value = if field.is_empty() {
274                    DataValue::Null
275                } else if let Ok(b) = field.parse::<bool>() {
276                    DataValue::Boolean(b)
277                } else if let Ok(i) = field.parse::<i64>() {
278                    DataValue::Integer(i)
279                } else if let Ok(f) = field.parse::<f64>() {
280                    DataValue::Float(f)
281                } else {
282                    // Check if this column should use interning
283                    if categorical_columns.contains(&idx) {
284                        if let Some(interner) = self.interners.get_mut(&idx) {
285                            // Use interned string
286                            DataValue::InternedString(interner.intern(field))
287                        } else {
288                            DataValue::String(field.to_string())
289                        }
290                    } else if field.contains('-') && field.len() >= 8 && field.len() <= 30 {
291                        DataValue::DateTime(field.to_string())
292                    } else {
293                        DataValue::String(field.to_string())
294                    }
295                };
296                values.push(value);
297            }
298
299            table
300                .add_row(DataRow::new(values))
301                .map_err(|e| anyhow::anyhow!(e))?;
302            row_count += 1;
303
304            // Track memory periodically
305            if row_count % 10000 == 0 {
306                crate::utils::memory_tracker::track_memory(&format!(
307                    "advanced_csv_{}rows",
308                    row_count
309                ));
310                debug!("Loaded {} rows...", row_count);
311            }
312        }
313
314        // Shrink vectors to fit actual data
315        table.shrink_to_fit();
316
317        // Infer column types
318        table.infer_column_types();
319
320        crate::utils::memory_tracker::track_memory("advanced_csv_complete");
321
322        // Report statistics
323        let mut total_saved = 0;
324        for (col_idx, interner) in &self.interners {
325            let stats = interner.stats();
326            if stats.memory_saved_bytes > 0 {
327                debug!(
328                    "Column {} ('{}'): {} unique strings, {} references, saved {} KB",
329                    col_idx,
330                    headers.get(*col_idx).unwrap_or(&"?"),
331                    stats.unique_strings,
332                    stats.total_references,
333                    stats.memory_saved_bytes / 1024
334                );
335            }
336            total_saved += stats.memory_saved_bytes;
337        }
338
339        info!(
340            "Advanced CSV load complete: {} rows, {} columns, ~{} MB (saved {} KB via interning)",
341            table.row_count(),
342            table.column_count(),
343            table.estimate_memory_size() / 1024 / 1024,
344            total_saved / 1024
345        );
346
347        Ok(table)
348    }
349
350    /// Get interner statistics for debugging
351    pub fn get_interner_stats(&self) -> HashMap<usize, InternerStats> {
352        self.interners
353            .iter()
354            .map(|(idx, interner)| (*idx, interner.stats()))
355            .collect()
356    }
357}
358
359impl Default for AdvancedCsvLoader {
360    fn default() -> Self {
361        Self::new()
362    }
363}