Skip to main content

sql_cli/data/
stream_loader.rs

1// Stream-based data loader that works with any Read source
2// This allows the same code to handle files, HTTP responses, or any other data stream
3
4use anyhow::{Context, Result};
5use csv::ReaderBuilder;
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet};
8use std::io::{BufRead, BufReader, Read};
9use tracing::{debug, info};
10
11use crate::data::advanced_csv_loader::StringInterner;
12use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
13
14/// Column analysis results for determining interning strategy
15#[derive(Debug)]
16struct ColumnAnalysis {
17    index: usize,
18    _name: String,
19    _cardinality: usize,
20    _sample_size: usize,
21    _unique_ratio: f64,
22    is_categorical: bool,
23    _avg_string_length: usize,
24}
25
26/// Advanced stream-based CSV loader with string interning
27pub struct StreamCsvLoader {
28    sample_size: usize,
29    cardinality_threshold: f64,
30    interners: HashMap<usize, StringInterner>,
31}
32
33impl StreamCsvLoader {
34    pub fn new() -> Self {
35        Self {
36            sample_size: 1000,
37            cardinality_threshold: 0.3,
38            interners: HashMap::new(),
39        }
40    }
41
42    /// Analyze columns to determine which should use string interning
43    fn analyze_columns(
44        &self,
45        rows: &[Vec<String>],
46        headers: &csv::StringRecord,
47    ) -> Vec<ColumnAnalysis> {
48        let mut analyses = Vec::new();
49
50        for (col_idx, header) in headers.iter().enumerate() {
51            let mut unique_values = HashSet::new();
52            let mut total_length = 0;
53            let mut non_empty_count = 0;
54
55            // Sample rows to analyze cardinality
56            for row in rows.iter().take(self.sample_size) {
57                if let Some(value) = row.get(col_idx) {
58                    if !value.is_empty() {
59                        unique_values.insert(value.clone());
60                        total_length += value.len();
61                        non_empty_count += 1;
62                    }
63                }
64            }
65
66            let cardinality = unique_values.len();
67            let sample_size = rows.len().min(self.sample_size);
68            let unique_ratio = if sample_size > 0 {
69                cardinality as f64 / sample_size as f64
70            } else {
71                1.0
72            };
73
74            let avg_string_length = if non_empty_count > 0 {
75                total_length / non_empty_count
76            } else {
77                0
78            };
79
80            // Consider categorical if low cardinality ratio or short strings with repetition
81            let is_categorical = unique_ratio < self.cardinality_threshold
82                || (avg_string_length < 20 && cardinality < sample_size / 2);
83
84            analyses.push(ColumnAnalysis {
85                index: col_idx,
86                _name: header.to_string(),
87                _cardinality: cardinality,
88                _sample_size: sample_size,
89                _unique_ratio: unique_ratio,
90                is_categorical,
91                _avg_string_length: avg_string_length,
92            });
93        }
94
95        analyses
96    }
97
98    /// Load CSV data with string interning from any Read source
99    pub fn load_csv_from_reader<R: Read>(
100        &mut self,
101        mut reader: R,
102        table_name: &str,
103        source_type: &str,
104        source_path: &str,
105    ) -> Result<DataTable> {
106        info!(
107            "Stream CSV load: Loading {} with optimizations",
108            source_path
109        );
110
111        // Read all data into memory
112        let mut buffer = Vec::new();
113        reader.read_to_end(&mut buffer)?;
114
115        // First pass: Parse CSV with headers
116        let mut csv_reader = ReaderBuilder::new()
117            .has_headers(true)
118            .from_reader(&buffer[..]);
119
120        let headers = csv_reader.headers()?.clone();
121        let mut table = DataTable::new(table_name);
122
123        // Add metadata about the source
124        table
125            .metadata
126            .insert("source_type".to_string(), source_type.to_string());
127        table
128            .metadata
129            .insert("source_path".to_string(), source_path.to_string());
130
131        // Create columns from headers
132        for header in &headers {
133            table.add_column(DataColumn::new(header));
134        }
135
136        // Collect all rows as strings
137        let mut string_rows = Vec::new();
138        for result in csv_reader.records() {
139            let record = result?;
140            let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
141            string_rows.push(row);
142        }
143
144        // Analyze columns for string interning
145        let analyses = self.analyze_columns(&string_rows, &headers);
146        let categorical_columns: HashSet<usize> = analyses
147            .iter()
148            .filter(|a| a.is_categorical)
149            .map(|a| a.index)
150            .collect();
151
152        info!(
153            "Column analysis: {} of {} columns will use string interning",
154            categorical_columns.len(),
155            analyses.len()
156        );
157
158        // Initialize interners for categorical columns
159        for col_idx in &categorical_columns {
160            self.interners.insert(*col_idx, StringInterner::new());
161        }
162
163        // Second pass: Read raw lines for NULL detection
164        let mut line_reader = BufReader::new(&buffer[..]);
165        let mut raw_lines = Vec::new();
166        let mut raw_line = String::new();
167
168        // Skip header
169        line_reader.read_line(&mut raw_line)?;
170        raw_line.clear();
171
172        // Read all raw lines
173        for _ in 0..string_rows.len() {
174            line_reader.read_line(&mut raw_line)?;
175            raw_lines.push(raw_line.clone());
176            raw_line.clear();
177        }
178
179        // Infer column types by sampling
180        let mut column_types = vec![DataType::Null; headers.len()];
181        let sample_size = string_rows.len().min(100);
182
183        for row in string_rows.iter().take(sample_size) {
184            for (col_idx, value) in row.iter().enumerate() {
185                if !value.is_empty() {
186                    let inferred = DataType::infer_from_string(value);
187                    column_types[col_idx] = column_types[col_idx].merge(&inferred);
188                }
189            }
190        }
191
192        // Update column types
193        for (col_idx, column) in table.columns.iter_mut().enumerate() {
194            column.data_type = column_types[col_idx].clone();
195        }
196
197        // Convert strings to typed values and add rows
198        for (row_idx, string_row) in string_rows.iter().enumerate() {
199            let mut values = Vec::new();
200            let raw_line = &raw_lines[row_idx];
201
202            for (col_idx, value) in string_row.iter().enumerate() {
203                let data_value = if value.is_empty() {
204                    // Check if this is NULL (,,) vs empty string ("")
205                    if is_null_field(raw_line, col_idx) {
206                        DataValue::Null
207                    } else if categorical_columns.contains(&col_idx) {
208                        // Use interned string for empty categorical values
209                        if let Some(interner) = self.interners.get_mut(&col_idx) {
210                            DataValue::InternedString(interner.intern(""))
211                        } else {
212                            DataValue::String(String::new())
213                        }
214                    } else {
215                        DataValue::String(String::new())
216                    }
217                } else if categorical_columns.contains(&col_idx)
218                    && column_types[col_idx] == DataType::String
219                {
220                    // Use string interning for categorical columns
221                    if let Some(interner) = self.interners.get_mut(&col_idx) {
222                        DataValue::InternedString(interner.intern(value))
223                    } else {
224                        DataValue::from_string(value, &column_types[col_idx])
225                    }
226                } else {
227                    DataValue::from_string(value, &column_types[col_idx])
228                };
229                values.push(data_value);
230            }
231            table
232                .add_row(DataRow::new(values))
233                .map_err(|e| anyhow::anyhow!(e))?;
234        }
235
236        // Print interner statistics
237        for (col_idx, interner) in &self.interners {
238            let stats = interner.stats();
239            if stats.memory_saved_bytes > 0 {
240                debug!(
241                    "Column {} interning: {} unique strings, {} references, {} bytes saved",
242                    headers.get(*col_idx).unwrap_or(&String::new()),
243                    stats.unique_strings,
244                    stats.total_references,
245                    stats.memory_saved_bytes
246                );
247            }
248        }
249
250        // Update column statistics
251        table.infer_column_types();
252
253        Ok(table)
254    }
255}
256
257/// Simple wrapper for loading CSV without advanced features
258pub fn load_csv_from_reader<R: Read>(
259    reader: R,
260    table_name: &str,
261    source_type: &str,
262    source_path: &str,
263) -> Result<DataTable> {
264    let mut loader = StreamCsvLoader::new();
265    loader.load_csv_from_reader(reader, table_name, source_type, source_path)
266}
267
268/// Parse JSON content as either a JSON array of objects or JSONL
269/// (newline-delimited JSON, one object per line). The format is detected by
270/// peeking at the first non-whitespace byte: `[` starts an array, anything
271/// else is parsed line-by-line.
272///
273/// Empty and whitespace-only lines are skipped in JSONL mode. Parse errors
274/// in JSONL mode include the source line number.
275pub fn parse_json_records(content: &str) -> Result<Vec<JsonValue>> {
276    let trimmed = content.trim_start();
277    if trimmed.starts_with('[') {
278        return serde_json::from_str(content).with_context(|| "Failed to parse JSON array");
279    }
280
281    let mut out = Vec::new();
282    for (idx, raw_line) in content.lines().enumerate() {
283        let line = raw_line.trim();
284        if line.is_empty() {
285            continue;
286        }
287        let value: JsonValue = serde_json::from_str(line)
288            .with_context(|| format!("Failed to parse JSONL at line {}", idx + 1))?;
289        out.push(value);
290    }
291    Ok(out)
292}
293
294/// Compute the ordered union of object keys across the first `sample_size`
295/// records. Order of first occurrence is preserved so the column layout is
296/// stable. Non-object records are skipped.
297pub fn collect_column_names(records: &[JsonValue], sample_size: usize) -> Vec<String> {
298    let mut seen: HashSet<String> = HashSet::new();
299    let mut names: Vec<String> = Vec::new();
300    for record in records.iter().take(sample_size) {
301        if let Some(obj) = record.as_object() {
302            for key in obj.keys() {
303                if seen.insert(key.clone()) {
304                    names.push(key.clone());
305                }
306            }
307        }
308    }
309    names
310}
311
312/// Load JSON data from any Read source into a DataTable.
313///
314/// Accepts either a JSON array of objects (`[{...}, {...}]`) or JSONL
315/// (one JSON object per line). Format is auto-detected.
316pub fn load_json_from_reader<R: Read>(
317    mut reader: R,
318    table_name: &str,
319    source_type: &str,
320    source_path: &str,
321) -> Result<DataTable> {
322    let mut json_str = String::new();
323    reader.read_to_string(&mut json_str)?;
324
325    let json_data: Vec<JsonValue> = parse_json_records(&json_str)?;
326
327    if json_data.is_empty() {
328        return Ok(DataTable::new(table_name));
329    }
330
331    // Schema is the union of keys across the first 100 records so heterogeneous
332    // JSONL streams (where later records may carry fields the first one did
333    // not) don't silently drop columns.
334    let column_names = collect_column_names(&json_data, 100);
335    if column_names.is_empty() {
336        return Err(anyhow::anyhow!(
337            "JSON data must contain objects (got non-object records)"
338        ));
339    }
340
341    let mut table = DataTable::new(table_name);
342
343    // Add metadata
344    table
345        .metadata
346        .insert("source_type".to_string(), source_type.to_string());
347    table
348        .metadata
349        .insert("source_path".to_string(), source_path.to_string());
350
351    for name in &column_names {
352        table.add_column(DataColumn::new(name));
353    }
354
355    // Collect values for type inference
356    let mut string_rows = Vec::new();
357    for json_obj in &json_data {
358        if let Some(obj) = json_obj.as_object() {
359            let mut row = Vec::new();
360            for col_name in &column_names {
361                let value = obj
362                    .get(col_name)
363                    .map(|v| json_value_to_string(v))
364                    .unwrap_or_default();
365                row.push(value);
366            }
367            string_rows.push(row);
368        }
369    }
370
371    // Infer column types
372    let mut column_types = vec![DataType::Null; column_names.len()];
373    let sample_size = string_rows.len().min(100);
374
375    for row in string_rows.iter().take(sample_size) {
376        for (col_idx, value) in row.iter().enumerate() {
377            if !value.is_empty() && value != "null" {
378                let inferred = DataType::infer_from_string(value);
379                column_types[col_idx] = column_types[col_idx].merge(&inferred);
380            }
381        }
382    }
383
384    // Update column types
385    for (col_idx, column) in table.columns.iter_mut().enumerate() {
386        column.data_type = column_types[col_idx].clone();
387    }
388
389    // Convert to typed values and add rows
390    for string_row in &string_rows {
391        let mut values = Vec::new();
392        for (col_idx, value) in string_row.iter().enumerate() {
393            let data_value = if value.is_empty() || value == "null" {
394                DataValue::Null
395            } else {
396                DataValue::from_string(value, &column_types[col_idx])
397            };
398            values.push(data_value);
399        }
400        table
401            .add_row(DataRow::new(values))
402            .map_err(|e| anyhow::anyhow!(e))?;
403    }
404
405    // Update statistics
406    table.infer_column_types();
407
408    Ok(table)
409}
410
411/// Helper to convert JSON value to string for type inference
412fn json_value_to_string(value: &JsonValue) -> String {
413    match value {
414        JsonValue::Null => String::new(),
415        JsonValue::Bool(b) => b.to_string(),
416        JsonValue::Number(n) => n.to_string(),
417        JsonValue::String(s) => s.clone(),
418        JsonValue::Array(arr) => format!("{:?}", arr),
419        JsonValue::Object(obj) => format!("{:?}", obj),
420    }
421}
422
423/// Helper to detect NULL fields in raw CSV lines
424fn is_null_field(raw_line: &str, field_index: usize) -> bool {
425    let mut comma_count = 0;
426    let mut in_quotes = false;
427    let mut field_start = 0;
428    let mut prev_char = ' ';
429
430    for (i, ch) in raw_line.char_indices() {
431        if ch == '"' && prev_char != '\\' {
432            in_quotes = !in_quotes;
433        } else if ch == ',' && !in_quotes {
434            if comma_count == field_index {
435                // Found the field - check if it's empty
436                return i == field_start
437                    || (i == field_start + 1 && raw_line.chars().nth(field_start) == Some(','));
438            }
439            comma_count += 1;
440            field_start = i + 1;
441        }
442        prev_char = ch;
443    }
444
445    // Check last field
446    if comma_count == field_index {
447        let remaining = raw_line[field_start..].trim_end();
448        return remaining.is_empty() || remaining == ",";
449    }
450
451    false
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use std::io::Cursor;
458
459    #[test]
460    fn test_csv_from_reader() {
461        let csv_data = "id,name,value\n1,Alice,100\n2,Bob,200\n3,,300";
462        let reader = Cursor::new(csv_data);
463
464        let table =
465            load_csv_from_reader(reader, "test", "stream", "memory").expect("Failed to load CSV");
466
467        assert_eq!(table.name, "test");
468        assert_eq!(table.column_count(), 3);
469        assert_eq!(table.row_count(), 3);
470
471        // Check that empty field is NULL
472        let value = table.get_value(2, 1).unwrap();
473        assert!(matches!(value, DataValue::Null));
474    }
475
476    #[test]
477    fn test_json_from_reader() {
478        let json_data = r#"[
479            {"id": 1, "name": "Alice", "value": 100},
480            {"id": 2, "name": "Bob", "value": 200},
481            {"id": 3, "name": null, "value": 300}
482        ]"#;
483        let reader = Cursor::new(json_data);
484
485        let table =
486            load_json_from_reader(reader, "test", "stream", "memory").expect("Failed to load JSON");
487
488        assert_eq!(table.name, "test");
489        assert_eq!(table.column_count(), 3);
490        assert_eq!(table.row_count(), 3);
491
492        // Check that null is handled
493        let value = table.get_value(2, 1).unwrap();
494        assert!(matches!(value, DataValue::Null));
495    }
496
497    #[test]
498    fn test_jsonl_from_reader() {
499        let jsonl_data = "{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n";
500        let reader = Cursor::new(jsonl_data);
501
502        let table = load_json_from_reader(reader, "test", "stream", "memory")
503            .expect("Failed to load JSONL");
504
505        assert_eq!(table.column_count(), 2);
506        assert_eq!(table.row_count(), 2);
507    }
508
509    #[test]
510    fn test_jsonl_heterogeneous_schema_unioned() {
511        // Second record adds an "extra" field; loader should pick it up via the
512        // union, and row 0 should have Null for it.
513        let jsonl_data = "{\"id\":1}\n{\"id\":2,\"extra\":\"hi\"}\n";
514        let reader = Cursor::new(jsonl_data);
515        let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
516        assert_eq!(table.column_count(), 2);
517        assert_eq!(table.row_count(), 2);
518    }
519
520    #[test]
521    fn test_jsonl_skips_blank_lines() {
522        let jsonl_data = "{\"id\":1}\n\n\n{\"id\":2}\n";
523        let reader = Cursor::new(jsonl_data);
524        let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
525        assert_eq!(table.row_count(), 2);
526    }
527
528    #[test]
529    fn test_parse_json_records_array_form() {
530        let recs = parse_json_records(r#"[{"a":1},{"a":2}]"#).unwrap();
531        assert_eq!(recs.len(), 2);
532    }
533
534    #[test]
535    fn test_parse_json_records_jsonl_form() {
536        let recs = parse_json_records("{\"a\":1}\n{\"a\":2}\n").unwrap();
537        assert_eq!(recs.len(), 2);
538    }
539
540    #[test]
541    fn test_parse_json_records_jsonl_error_cites_line() {
542        let err = parse_json_records("{\"a\":1}\nnot json\n").unwrap_err();
543        assert!(err.to_string().contains("line 2"));
544    }
545}