sql_cli/data/
stream_loader.rs

1// Stream-based data loader that works with any Read source
2// This allows the same code to handle files, HTTP responses, or any other data stream
3
4use anyhow::{Context, Result};
5use csv::ReaderBuilder;
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet};
8use std::io::{BufRead, BufReader, Read};
9use tracing::{debug, info};
10
11use crate::data::advanced_csv_loader::StringInterner;
12use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
13
14/// Column analysis results for determining interning strategy
15#[derive(Debug)]
16struct ColumnAnalysis {
17    index: usize,
18    name: String,
19    cardinality: usize,
20    sample_size: usize,
21    unique_ratio: f64,
22    is_categorical: bool,
23    avg_string_length: usize,
24}
25
26/// Advanced stream-based CSV loader with string interning
27pub struct StreamCsvLoader {
28    sample_size: usize,
29    cardinality_threshold: f64,
30    interners: HashMap<usize, StringInterner>,
31}
32
33impl StreamCsvLoader {
34    pub fn new() -> Self {
35        Self {
36            sample_size: 1000,
37            cardinality_threshold: 0.3,
38            interners: HashMap::new(),
39        }
40    }
41
42    /// Analyze columns to determine which should use string interning
43    fn analyze_columns(
44        &self,
45        rows: &[Vec<String>],
46        headers: &csv::StringRecord,
47    ) -> Vec<ColumnAnalysis> {
48        let mut analyses = Vec::new();
49
50        for (col_idx, header) in headers.iter().enumerate() {
51            let mut unique_values = HashSet::new();
52            let mut total_length = 0;
53            let mut non_empty_count = 0;
54
55            // Sample rows to analyze cardinality
56            for row in rows.iter().take(self.sample_size) {
57                if let Some(value) = row.get(col_idx) {
58                    if !value.is_empty() {
59                        unique_values.insert(value.clone());
60                        total_length += value.len();
61                        non_empty_count += 1;
62                    }
63                }
64            }
65
66            let cardinality = unique_values.len();
67            let sample_size = rows.len().min(self.sample_size);
68            let unique_ratio = if sample_size > 0 {
69                cardinality as f64 / sample_size as f64
70            } else {
71                1.0
72            };
73
74            let avg_string_length = if non_empty_count > 0 {
75                total_length / non_empty_count
76            } else {
77                0
78            };
79
80            // Consider categorical if low cardinality ratio or short strings with repetition
81            let is_categorical = unique_ratio < self.cardinality_threshold
82                || (avg_string_length < 20 && cardinality < sample_size / 2);
83
84            analyses.push(ColumnAnalysis {
85                index: col_idx,
86                name: header.to_string(),
87                cardinality,
88                sample_size,
89                unique_ratio,
90                is_categorical,
91                avg_string_length,
92            });
93        }
94
95        analyses
96    }
97
98    /// Load CSV data with string interning from any Read source
99    pub fn load_csv_from_reader<R: Read>(
100        &mut self,
101        mut reader: R,
102        table_name: &str,
103        source_type: &str,
104        source_path: &str,
105    ) -> Result<DataTable> {
106        info!(
107            "Stream CSV load: Loading {} with optimizations",
108            source_path
109        );
110
111        // Read all data into memory
112        let mut buffer = Vec::new();
113        reader.read_to_end(&mut buffer)?;
114
115        // First pass: Parse CSV with headers
116        let mut csv_reader = ReaderBuilder::new()
117            .has_headers(true)
118            .from_reader(&buffer[..]);
119
120        let headers = csv_reader.headers()?.clone();
121        let mut table = DataTable::new(table_name);
122
123        // Add metadata about the source
124        table
125            .metadata
126            .insert("source_type".to_string(), source_type.to_string());
127        table
128            .metadata
129            .insert("source_path".to_string(), source_path.to_string());
130
131        // Create columns from headers
132        for header in &headers {
133            table.add_column(DataColumn::new(header));
134        }
135
136        // Collect all rows as strings
137        let mut string_rows = Vec::new();
138        for result in csv_reader.records() {
139            let record = result?;
140            let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
141            string_rows.push(row);
142        }
143
144        // Analyze columns for string interning
145        let analyses = self.analyze_columns(&string_rows, &headers);
146        let categorical_columns: HashSet<usize> = analyses
147            .iter()
148            .filter(|a| a.is_categorical)
149            .map(|a| a.index)
150            .collect();
151
152        info!(
153            "Column analysis: {} of {} columns will use string interning",
154            categorical_columns.len(),
155            analyses.len()
156        );
157
158        // Initialize interners for categorical columns
159        for col_idx in &categorical_columns {
160            self.interners.insert(*col_idx, StringInterner::new());
161        }
162
163        // Second pass: Read raw lines for NULL detection
164        let mut line_reader = BufReader::new(&buffer[..]);
165        let mut raw_lines = Vec::new();
166        let mut raw_line = String::new();
167
168        // Skip header
169        line_reader.read_line(&mut raw_line)?;
170        raw_line.clear();
171
172        // Read all raw lines
173        for _ in 0..string_rows.len() {
174            line_reader.read_line(&mut raw_line)?;
175            raw_lines.push(raw_line.clone());
176            raw_line.clear();
177        }
178
179        // Infer column types by sampling
180        let mut column_types = vec![DataType::Null; headers.len()];
181        let sample_size = string_rows.len().min(100);
182
183        for row in string_rows.iter().take(sample_size) {
184            for (col_idx, value) in row.iter().enumerate() {
185                if !value.is_empty() {
186                    let inferred = DataType::infer_from_string(value);
187                    column_types[col_idx] = column_types[col_idx].merge(&inferred);
188                }
189            }
190        }
191
192        // Update column types
193        for (col_idx, column) in table.columns.iter_mut().enumerate() {
194            column.data_type = column_types[col_idx].clone();
195        }
196
197        // Convert strings to typed values and add rows
198        for (row_idx, string_row) in string_rows.iter().enumerate() {
199            let mut values = Vec::new();
200            let raw_line = &raw_lines[row_idx];
201
202            for (col_idx, value) in string_row.iter().enumerate() {
203                let data_value = if value.is_empty() {
204                    // Check if this is NULL (,,) vs empty string ("")
205                    if is_null_field(raw_line, col_idx) {
206                        DataValue::Null
207                    } else if categorical_columns.contains(&col_idx) {
208                        // Use interned string for empty categorical values
209                        if let Some(interner) = self.interners.get_mut(&col_idx) {
210                            DataValue::InternedString(interner.intern(""))
211                        } else {
212                            DataValue::String(String::new())
213                        }
214                    } else {
215                        DataValue::String(String::new())
216                    }
217                } else if categorical_columns.contains(&col_idx)
218                    && column_types[col_idx] == DataType::String
219                {
220                    // Use string interning for categorical columns
221                    if let Some(interner) = self.interners.get_mut(&col_idx) {
222                        DataValue::InternedString(interner.intern(value))
223                    } else {
224                        DataValue::from_string(value, &column_types[col_idx])
225                    }
226                } else {
227                    DataValue::from_string(value, &column_types[col_idx])
228                };
229                values.push(data_value);
230            }
231            table
232                .add_row(DataRow::new(values))
233                .map_err(|e| anyhow::anyhow!(e))?;
234        }
235
236        // Print interner statistics
237        for (col_idx, interner) in &self.interners {
238            let stats = interner.stats();
239            if stats.memory_saved_bytes > 0 {
240                debug!(
241                    "Column {} interning: {} unique strings, {} references, {} bytes saved",
242                    headers.get(*col_idx).unwrap_or(&String::new()),
243                    stats.unique_strings,
244                    stats.total_references,
245                    stats.memory_saved_bytes
246                );
247            }
248        }
249
250        // Update column statistics
251        table.infer_column_types();
252
253        Ok(table)
254    }
255}
256
257/// Simple wrapper for loading CSV without advanced features
258pub fn load_csv_from_reader<R: Read>(
259    reader: R,
260    table_name: &str,
261    source_type: &str,
262    source_path: &str,
263) -> Result<DataTable> {
264    let mut loader = StreamCsvLoader::new();
265    loader.load_csv_from_reader(reader, table_name, source_type, source_path)
266}
267
268/// Load JSON data from any Read source into a DataTable
269pub fn load_json_from_reader<R: Read>(
270    mut reader: R,
271    table_name: &str,
272    source_type: &str,
273    source_path: &str,
274) -> Result<DataTable> {
275    let mut json_str = String::new();
276    reader.read_to_string(&mut json_str)?;
277
278    let json_data: Vec<JsonValue> =
279        serde_json::from_str(&json_str).with_context(|| "Failed to parse JSON data")?;
280
281    if json_data.is_empty() {
282        return Ok(DataTable::new(table_name));
283    }
284
285    // Extract column names from first object
286    let first_obj = json_data[0]
287        .as_object()
288        .context("JSON data must be an array of objects")?;
289
290    let mut table = DataTable::new(table_name);
291
292    // Add metadata
293    table
294        .metadata
295        .insert("source_type".to_string(), source_type.to_string());
296    table
297        .metadata
298        .insert("source_path".to_string(), source_path.to_string());
299
300    // Create columns
301    let column_names: Vec<String> = first_obj.keys().cloned().collect();
302    for name in &column_names {
303        table.add_column(DataColumn::new(name));
304    }
305
306    // Collect values for type inference
307    let mut string_rows = Vec::new();
308    for json_obj in &json_data {
309        if let Some(obj) = json_obj.as_object() {
310            let mut row = Vec::new();
311            for col_name in &column_names {
312                let value = obj
313                    .get(col_name)
314                    .map(|v| json_value_to_string(v))
315                    .unwrap_or_default();
316                row.push(value);
317            }
318            string_rows.push(row);
319        }
320    }
321
322    // Infer column types
323    let mut column_types = vec![DataType::Null; column_names.len()];
324    let sample_size = string_rows.len().min(100);
325
326    for row in string_rows.iter().take(sample_size) {
327        for (col_idx, value) in row.iter().enumerate() {
328            if !value.is_empty() && value != "null" {
329                let inferred = DataType::infer_from_string(value);
330                column_types[col_idx] = column_types[col_idx].merge(&inferred);
331            }
332        }
333    }
334
335    // Update column types
336    for (col_idx, column) in table.columns.iter_mut().enumerate() {
337        column.data_type = column_types[col_idx].clone();
338    }
339
340    // Convert to typed values and add rows
341    for string_row in &string_rows {
342        let mut values = Vec::new();
343        for (col_idx, value) in string_row.iter().enumerate() {
344            let data_value = if value.is_empty() || value == "null" {
345                DataValue::Null
346            } else {
347                DataValue::from_string(value, &column_types[col_idx])
348            };
349            values.push(data_value);
350        }
351        table
352            .add_row(DataRow::new(values))
353            .map_err(|e| anyhow::anyhow!(e))?;
354    }
355
356    // Update statistics
357    table.infer_column_types();
358
359    Ok(table)
360}
361
362/// Helper to convert JSON value to string for type inference
363fn json_value_to_string(value: &JsonValue) -> String {
364    match value {
365        JsonValue::Null => String::new(),
366        JsonValue::Bool(b) => b.to_string(),
367        JsonValue::Number(n) => n.to_string(),
368        JsonValue::String(s) => s.clone(),
369        JsonValue::Array(arr) => format!("{:?}", arr),
370        JsonValue::Object(obj) => format!("{:?}", obj),
371    }
372}
373
374/// Helper to detect NULL fields in raw CSV lines
375fn is_null_field(raw_line: &str, field_index: usize) -> bool {
376    let mut comma_count = 0;
377    let mut in_quotes = false;
378    let mut field_start = 0;
379    let mut prev_char = ' ';
380
381    for (i, ch) in raw_line.char_indices() {
382        if ch == '"' && prev_char != '\\' {
383            in_quotes = !in_quotes;
384        } else if ch == ',' && !in_quotes {
385            if comma_count == field_index {
386                // Found the field - check if it's empty
387                return i == field_start
388                    || (i == field_start + 1 && raw_line.chars().nth(field_start) == Some(','));
389            }
390            comma_count += 1;
391            field_start = i + 1;
392        }
393        prev_char = ch;
394    }
395
396    // Check last field
397    if comma_count == field_index {
398        let remaining = raw_line[field_start..].trim_end();
399        return remaining.is_empty() || remaining == ",";
400    }
401
402    false
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use std::io::Cursor;
409
410    #[test]
411    fn test_csv_from_reader() {
412        let csv_data = "id,name,value\n1,Alice,100\n2,Bob,200\n3,,300";
413        let reader = Cursor::new(csv_data);
414
415        let table =
416            load_csv_from_reader(reader, "test", "stream", "memory").expect("Failed to load CSV");
417
418        assert_eq!(table.name, "test");
419        assert_eq!(table.column_count(), 3);
420        assert_eq!(table.row_count(), 3);
421
422        // Check that empty field is NULL
423        let value = table.get_value(2, 1).unwrap();
424        assert!(matches!(value, DataValue::Null));
425    }
426
427    #[test]
428    fn test_json_from_reader() {
429        let json_data = r#"[
430            {"id": 1, "name": "Alice", "value": 100},
431            {"id": 2, "name": "Bob", "value": 200},
432            {"id": 3, "name": null, "value": 300}
433        ]"#;
434        let reader = Cursor::new(json_data);
435
436        let table =
437            load_json_from_reader(reader, "test", "stream", "memory").expect("Failed to load JSON");
438
439        assert_eq!(table.name, "test");
440        assert_eq!(table.column_count(), 3);
441        assert_eq!(table.row_count(), 3);
442
443        // Check that null is handled
444        let value = table.get_value(2, 1).unwrap();
445        assert!(matches!(value, DataValue::Null));
446    }
447}