Skip to main content

sql_cli/data/
stream_loader.rs

1// Stream-based data loader that works with any Read source
2// This allows the same code to handle files, HTTP responses, or any other data stream
3
4use anyhow::{Context, Result};
5use csv::ReaderBuilder;
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet};
8use std::io::{BufRead, BufReader, Read};
9use tracing::{debug, info};
10
11use crate::data::advanced_csv_loader::StringInterner;
12use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
13
14/// Options controlling how a CSV stream is parsed.
15///
16/// Default is RFC-4180 style: comma delimiter, header row required. Surfaces
17/// (CLI flag, `READ_CSV(path, '|')`, WEB CTE `DELIMITER`) populate this struct
18/// at the edge; internal layers just pass it through.
19#[derive(Debug, Clone)]
20pub struct CsvReadOptions {
21    pub delimiter: u8,
22    pub has_headers: bool,
23}
24
25impl Default for CsvReadOptions {
26    fn default() -> Self {
27        Self {
28            delimiter: b',',
29            has_headers: true,
30        }
31    }
32}
33
34/// Pick a default delimiter from a path's extension.
35///
36/// `.tsv` → tab, `.psv` → pipe; everything else (including stdin `-`) → comma.
37/// Case-insensitive. This is only the *auto-detect* layer — explicit overrides
38/// from the CLI flag, `READ_CSV` 2nd arg, or WEB CTE `DELIMITER` win over this.
39pub fn detect_delimiter_from_path(path: &str) -> u8 {
40    let lower = path.to_ascii_lowercase();
41    if lower.ends_with(".tsv") {
42        b'\t'
43    } else if lower.ends_with(".psv") {
44        b'|'
45    } else {
46        b','
47    }
48}
49
50/// Parse a user-supplied delimiter string into a single byte.
51///
52/// Accepts:
53///   - a single ASCII character (e.g. `","`, `"|"`, `";"`)
54///   - the two-character escapes `"\t"`, `"\n"`, `"\r"` (typing literal tabs
55///     in SQL strings or shell args is awkward, so this is the canonical form)
56///
57/// Rejects multi-character strings, non-ASCII, and empty strings with a clear
58/// error. Caller is expected to wrap the error with context if needed.
59pub fn parse_delimiter_arg(s: &str) -> anyhow::Result<u8> {
60    match s {
61        "\\t" | "\t" => return Ok(b'\t'),
62        "\\n" => return Ok(b'\n'),
63        "\\r" => return Ok(b'\r'),
64        _ => {}
65    }
66    let bytes = s.as_bytes();
67    if bytes.len() == 1 && bytes[0].is_ascii() {
68        return Ok(bytes[0]);
69    }
70    Err(anyhow::anyhow!(
71        "delimiter must be a single ASCII character (or '\\t', '\\n', '\\r'); got {:?}",
72        s
73    ))
74}
75
76/// Resolve which delimiter to use for a given path.
77///
78/// Precedence (highest first):
79///   1. `explicit` override (typically from a CLI flag or 2nd arg)
80///   2. extension auto-detect (`.tsv` → tab, `.psv` → pipe)
81///   3. comma
82pub fn resolve_delimiter(path: &str, explicit: Option<u8>) -> u8 {
83    explicit.unwrap_or_else(|| detect_delimiter_from_path(path))
84}
85
86/// Human-readable form of a delimiter byte for diagnostic metadata.
87fn delimiter_label(d: u8) -> String {
88    match d {
89        b'\t' => "\\t".to_string(),
90        b'\n' => "\\n".to_string(),
91        b'\r' => "\\r".to_string(),
92        b => (b as char).to_string(),
93    }
94}
95
96/// Column analysis results for determining interning strategy
97#[derive(Debug)]
98struct ColumnAnalysis {
99    index: usize,
100    _name: String,
101    _cardinality: usize,
102    _sample_size: usize,
103    _unique_ratio: f64,
104    is_categorical: bool,
105    _avg_string_length: usize,
106}
107
108/// Advanced stream-based CSV loader with string interning
109pub struct StreamCsvLoader {
110    sample_size: usize,
111    cardinality_threshold: f64,
112    interners: HashMap<usize, StringInterner>,
113}
114
115impl StreamCsvLoader {
116    pub fn new() -> Self {
117        Self {
118            sample_size: 1000,
119            cardinality_threshold: 0.3,
120            interners: HashMap::new(),
121        }
122    }
123
124    /// Analyze columns to determine which should use string interning
125    fn analyze_columns(
126        &self,
127        rows: &[Vec<String>],
128        headers: &csv::StringRecord,
129    ) -> Vec<ColumnAnalysis> {
130        let mut analyses = Vec::new();
131
132        for (col_idx, header) in headers.iter().enumerate() {
133            let mut unique_values = HashSet::new();
134            let mut total_length = 0;
135            let mut non_empty_count = 0;
136
137            // Sample rows to analyze cardinality
138            for row in rows.iter().take(self.sample_size) {
139                if let Some(value) = row.get(col_idx) {
140                    if !value.is_empty() {
141                        unique_values.insert(value.clone());
142                        total_length += value.len();
143                        non_empty_count += 1;
144                    }
145                }
146            }
147
148            let cardinality = unique_values.len();
149            let sample_size = rows.len().min(self.sample_size);
150            let unique_ratio = if sample_size > 0 {
151                cardinality as f64 / sample_size as f64
152            } else {
153                1.0
154            };
155
156            let avg_string_length = if non_empty_count > 0 {
157                total_length / non_empty_count
158            } else {
159                0
160            };
161
162            // Consider categorical if low cardinality ratio or short strings with repetition
163            let is_categorical = unique_ratio < self.cardinality_threshold
164                || (avg_string_length < 20 && cardinality < sample_size / 2);
165
166            analyses.push(ColumnAnalysis {
167                index: col_idx,
168                _name: header.to_string(),
169                _cardinality: cardinality,
170                _sample_size: sample_size,
171                _unique_ratio: unique_ratio,
172                is_categorical,
173                _avg_string_length: avg_string_length,
174            });
175        }
176
177        analyses
178    }
179
180    /// Load CSV data with string interning from any Read source, using default
181    /// comma-delimited options. Thin wrapper over [`load_csv_from_reader_with_opts`]
182    /// kept so existing callers don't need to touch options.
183    pub fn load_csv_from_reader<R: Read>(
184        &mut self,
185        reader: R,
186        table_name: &str,
187        source_type: &str,
188        source_path: &str,
189    ) -> Result<DataTable> {
190        self.load_csv_from_reader_with_opts(
191            reader,
192            table_name,
193            source_type,
194            source_path,
195            &CsvReadOptions::default(),
196        )
197    }
198
199    /// Load CSV data with string interning, honouring caller-supplied options
200    /// (delimiter, headers).
201    pub fn load_csv_from_reader_with_opts<R: Read>(
202        &mut self,
203        mut reader: R,
204        table_name: &str,
205        source_type: &str,
206        source_path: &str,
207        opts: &CsvReadOptions,
208    ) -> Result<DataTable> {
209        info!(
210            "Stream CSV load: Loading {} with optimizations (delimiter={})",
211            source_path,
212            delimiter_label(opts.delimiter)
213        );
214
215        // Read all data into memory
216        let mut buffer = Vec::new();
217        reader.read_to_end(&mut buffer)?;
218
219        // First pass: Parse CSV with headers
220        let mut csv_reader = ReaderBuilder::new()
221            .has_headers(opts.has_headers)
222            .delimiter(opts.delimiter)
223            .from_reader(&buffer[..]);
224
225        let headers = csv_reader.headers()?.clone();
226        let mut table = DataTable::new(table_name);
227
228        // Add metadata about the source
229        table
230            .metadata
231            .insert("source_type".to_string(), source_type.to_string());
232        table
233            .metadata
234            .insert("source_path".to_string(), source_path.to_string());
235        table
236            .metadata
237            .insert("delimiter".to_string(), delimiter_label(opts.delimiter));
238
239        // Create columns from headers
240        for header in &headers {
241            table.add_column(DataColumn::new(header));
242        }
243
244        // Collect all rows as strings
245        let mut string_rows = Vec::new();
246        for result in csv_reader.records() {
247            let record = result?;
248            let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
249            string_rows.push(row);
250        }
251
252        // Analyze columns for string interning
253        let analyses = self.analyze_columns(&string_rows, &headers);
254        let categorical_columns: HashSet<usize> = analyses
255            .iter()
256            .filter(|a| a.is_categorical)
257            .map(|a| a.index)
258            .collect();
259
260        info!(
261            "Column analysis: {} of {} columns will use string interning",
262            categorical_columns.len(),
263            analyses.len()
264        );
265
266        // Initialize interners for categorical columns
267        for col_idx in &categorical_columns {
268            self.interners.insert(*col_idx, StringInterner::new());
269        }
270
271        // Second pass: Read raw lines for NULL detection
272        let mut line_reader = BufReader::new(&buffer[..]);
273        let mut raw_lines = Vec::new();
274        let mut raw_line = String::new();
275
276        // Skip header
277        line_reader.read_line(&mut raw_line)?;
278        raw_line.clear();
279
280        // Read all raw lines
281        for _ in 0..string_rows.len() {
282            line_reader.read_line(&mut raw_line)?;
283            raw_lines.push(raw_line.clone());
284            raw_line.clear();
285        }
286
287        // Infer column types by sampling
288        let mut column_types = vec![DataType::Null; headers.len()];
289        let sample_size = string_rows.len().min(100);
290
291        for row in string_rows.iter().take(sample_size) {
292            for (col_idx, value) in row.iter().enumerate() {
293                if !value.is_empty() {
294                    let inferred = DataType::infer_from_string(value);
295                    column_types[col_idx] = column_types[col_idx].merge(&inferred);
296                }
297            }
298        }
299
300        // Update column types
301        for (col_idx, column) in table.columns.iter_mut().enumerate() {
302            column.data_type = column_types[col_idx].clone();
303        }
304
305        // Convert strings to typed values and add rows
306        for (row_idx, string_row) in string_rows.iter().enumerate() {
307            let mut values = Vec::new();
308            let raw_line = &raw_lines[row_idx];
309
310            for (col_idx, value) in string_row.iter().enumerate() {
311                let data_value = if value.is_empty() {
312                    // Check if this is NULL (,,) vs empty string ("")
313                    if is_null_field(raw_line, col_idx, opts.delimiter as char) {
314                        DataValue::Null
315                    } else if categorical_columns.contains(&col_idx) {
316                        // Use interned string for empty categorical values
317                        if let Some(interner) = self.interners.get_mut(&col_idx) {
318                            DataValue::InternedString(interner.intern(""))
319                        } else {
320                            DataValue::String(String::new())
321                        }
322                    } else {
323                        DataValue::String(String::new())
324                    }
325                } else if categorical_columns.contains(&col_idx)
326                    && column_types[col_idx] == DataType::String
327                {
328                    // Use string interning for categorical columns
329                    if let Some(interner) = self.interners.get_mut(&col_idx) {
330                        DataValue::InternedString(interner.intern(value))
331                    } else {
332                        DataValue::from_string(value, &column_types[col_idx])
333                    }
334                } else {
335                    DataValue::from_string(value, &column_types[col_idx])
336                };
337                values.push(data_value);
338            }
339            table
340                .add_row(DataRow::new(values))
341                .map_err(|e| anyhow::anyhow!(e))?;
342        }
343
344        // Print interner statistics
345        for (col_idx, interner) in &self.interners {
346            let stats = interner.stats();
347            if stats.memory_saved_bytes > 0 {
348                debug!(
349                    "Column {} interning: {} unique strings, {} references, {} bytes saved",
350                    headers.get(*col_idx).unwrap_or(&String::new()),
351                    stats.unique_strings,
352                    stats.total_references,
353                    stats.memory_saved_bytes
354                );
355            }
356        }
357
358        // Update column statistics
359        table.infer_column_types();
360
361        Ok(table)
362    }
363}
364
365/// Simple wrapper for loading CSV without advanced features. Defaults to
366/// comma delimiter; for other delimiters use [`load_csv_from_reader_with_opts`].
367pub fn load_csv_from_reader<R: Read>(
368    reader: R,
369    table_name: &str,
370    source_type: &str,
371    source_path: &str,
372) -> Result<DataTable> {
373    let mut loader = StreamCsvLoader::new();
374    loader.load_csv_from_reader(reader, table_name, source_type, source_path)
375}
376
377/// As [`load_csv_from_reader`], but honouring caller-supplied [`CsvReadOptions`]
378/// (delimiter, headers).
379pub fn load_csv_from_reader_with_opts<R: Read>(
380    reader: R,
381    table_name: &str,
382    source_type: &str,
383    source_path: &str,
384    opts: &CsvReadOptions,
385) -> Result<DataTable> {
386    let mut loader = StreamCsvLoader::new();
387    loader.load_csv_from_reader_with_opts(reader, table_name, source_type, source_path, opts)
388}
389
390/// Parse JSON content as either a JSON array of objects or JSONL
391/// (newline-delimited JSON, one object per line). The format is detected by
392/// peeking at the first non-whitespace byte: `[` starts an array, anything
393/// else is parsed line-by-line.
394///
395/// Empty and whitespace-only lines are skipped in JSONL mode. Parse errors
396/// in JSONL mode include the source line number.
397pub fn parse_json_records(content: &str) -> Result<Vec<JsonValue>> {
398    let trimmed = content.trim_start();
399    if trimmed.starts_with('[') {
400        return serde_json::from_str(content).with_context(|| "Failed to parse JSON array");
401    }
402
403    let mut out = Vec::new();
404    for (idx, raw_line) in content.lines().enumerate() {
405        let line = raw_line.trim();
406        if line.is_empty() {
407            continue;
408        }
409        let value: JsonValue = serde_json::from_str(line)
410            .with_context(|| format!("Failed to parse JSONL at line {}", idx + 1))?;
411        out.push(value);
412    }
413    Ok(out)
414}
415
416/// Navigate to a sub-value of a JSON document using a dotted path.
417///
418/// This is the shared "find the rows" step used by both the WEB CTE
419/// `JSON_PATH` clause and `READ_JSON(path, json_path)`. It only *locates* a
420/// value; it deliberately does not filter, transform, or pluck scalars — that
421/// is SQL's job (use a WHERE clause / column list once the rows are loaded).
422/// Anything beyond locating the row set is out of scope; pipe through `jq`.
423///
424/// Supports two forms per dotted segment:
425///   - `name`     — descend into an object key
426///   - `name[]`   — descend into `name` (must be an array), then map the
427///                  remainder of the path across every element. A bare `[]`
428///                  projects over the current value when it is already an array.
429///
430/// Example for an Elasticsearch response:
431///   `hits.hits[]._source`
432/// returns an array of `_source` objects, one per hit — i.e. the `_source`
433/// fields become the top-level row shape consumed by the loader.
434///
435/// An empty path (or one that is only dots) returns the value unchanged.
436pub fn navigate_json_path(value: &JsonValue, path: &str) -> Result<JsonValue> {
437    let parts: Vec<&str> = path.split('.').filter(|p| !p.is_empty()).collect();
438    walk_json_path(value, &parts)
439}
440
441fn walk_json_path(value: &JsonValue, parts: &[&str]) -> Result<JsonValue> {
442    let Some((head, tail)) = parts.split_first() else {
443        return Ok(value.clone());
444    };
445
446    // Array projection: `name[]` (or bare `[]`) maps the rest of the path
447    // across each element of an array.
448    if let Some(name) = head.strip_suffix("[]") {
449        let array_val = if name.is_empty() {
450            value
451        } else {
452            value
453                .get(name)
454                .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", name))?
455        };
456        let arr = array_val.as_array().ok_or_else(|| {
457            anyhow::anyhow!(
458                "Expected array at '{}' for [] projection, got {}",
459                if name.is_empty() { "<root>" } else { name },
460                json_kind(array_val)
461            )
462        })?;
463        let mut projected = Vec::with_capacity(arr.len());
464        for el in arr {
465            projected.push(walk_json_path(el, tail)?);
466        }
467        return Ok(JsonValue::Array(projected));
468    }
469
470    let next = value
471        .get(head)
472        .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", head))?;
473    walk_json_path(next, tail)
474}
475
476/// Human-readable JSON type name, for error messages.
477fn json_kind(value: &JsonValue) -> &'static str {
478    match value {
479        JsonValue::Null => "null",
480        JsonValue::Bool(_) => "bool",
481        JsonValue::Number(_) => "number",
482        JsonValue::String(_) => "string",
483        JsonValue::Array(_) => "array",
484        JsonValue::Object(_) => "object",
485    }
486}
487
488/// Compute the ordered union of object keys across the first `sample_size`
489/// records. Order of first occurrence is preserved so the column layout is
490/// stable. Non-object records are skipped.
491pub fn collect_column_names(records: &[JsonValue], sample_size: usize) -> Vec<String> {
492    let mut seen: HashSet<String> = HashSet::new();
493    let mut names: Vec<String> = Vec::new();
494    for record in records.iter().take(sample_size) {
495        if let Some(obj) = record.as_object() {
496            for key in obj.keys() {
497                if seen.insert(key.clone()) {
498                    names.push(key.clone());
499                }
500            }
501        }
502    }
503    names
504}
505
506/// Load JSON data from any Read source into a DataTable.
507///
508/// Accepts either a JSON array of objects (`[{...}, {...}]`) or JSONL
509/// (one JSON object per line). Format is auto-detected.
510pub fn load_json_from_reader<R: Read>(
511    mut reader: R,
512    table_name: &str,
513    source_type: &str,
514    source_path: &str,
515) -> Result<DataTable> {
516    let mut json_str = String::new();
517    reader.read_to_string(&mut json_str)?;
518
519    let json_data: Vec<JsonValue> = parse_json_records(&json_str)?;
520
521    if json_data.is_empty() {
522        return Ok(DataTable::new(table_name));
523    }
524
525    // Schema is the union of keys across the first 100 records so heterogeneous
526    // JSONL streams (where later records may carry fields the first one did
527    // not) don't silently drop columns.
528    let column_names = collect_column_names(&json_data, 100);
529    if column_names.is_empty() {
530        return Err(anyhow::anyhow!(
531            "JSON data must contain objects (got non-object records)"
532        ));
533    }
534
535    let mut table = DataTable::new(table_name);
536
537    // Add metadata
538    table
539        .metadata
540        .insert("source_type".to_string(), source_type.to_string());
541    table
542        .metadata
543        .insert("source_path".to_string(), source_path.to_string());
544
545    for name in &column_names {
546        table.add_column(DataColumn::new(name));
547    }
548
549    // Collect values for type inference
550    let mut string_rows = Vec::new();
551    for json_obj in &json_data {
552        if let Some(obj) = json_obj.as_object() {
553            let mut row = Vec::new();
554            for col_name in &column_names {
555                let value = obj
556                    .get(col_name)
557                    .map(|v| json_value_to_string(v))
558                    .unwrap_or_default();
559                row.push(value);
560            }
561            string_rows.push(row);
562        }
563    }
564
565    // Infer column types
566    let mut column_types = vec![DataType::Null; column_names.len()];
567    let sample_size = string_rows.len().min(100);
568
569    for row in string_rows.iter().take(sample_size) {
570        for (col_idx, value) in row.iter().enumerate() {
571            if !value.is_empty() && value != "null" {
572                let inferred = DataType::infer_from_string(value);
573                column_types[col_idx] = column_types[col_idx].merge(&inferred);
574            }
575        }
576    }
577
578    // Update column types
579    for (col_idx, column) in table.columns.iter_mut().enumerate() {
580        column.data_type = column_types[col_idx].clone();
581    }
582
583    // Convert to typed values and add rows
584    for string_row in &string_rows {
585        let mut values = Vec::new();
586        for (col_idx, value) in string_row.iter().enumerate() {
587            let data_value = if value.is_empty() || value == "null" {
588                DataValue::Null
589            } else {
590                DataValue::from_string(value, &column_types[col_idx])
591            };
592            values.push(data_value);
593        }
594        table
595            .add_row(DataRow::new(values))
596            .map_err(|e| anyhow::anyhow!(e))?;
597    }
598
599    // Update statistics
600    table.infer_column_types();
601
602    Ok(table)
603}
604
605/// Helper to convert JSON value to string for type inference
606fn json_value_to_string(value: &JsonValue) -> String {
607    match value {
608        JsonValue::Null => String::new(),
609        JsonValue::Bool(b) => b.to_string(),
610        JsonValue::Number(n) => n.to_string(),
611        JsonValue::String(s) => s.clone(),
612        JsonValue::Array(arr) => format!("{:?}", arr),
613        JsonValue::Object(obj) => format!("{:?}", obj),
614    }
615}
616
617/// Helper to detect NULL fields in raw CSV lines. `delimiter` is the field
618/// separator character used in the source (`,` for plain CSV, `\t` for TSV, etc.).
619fn is_null_field(raw_line: &str, field_index: usize, delimiter: char) -> bool {
620    let mut delim_count = 0;
621    let mut in_quotes = false;
622    let mut field_start = 0;
623    let mut prev_char = ' ';
624
625    for (i, ch) in raw_line.char_indices() {
626        if ch == '"' && prev_char != '\\' {
627            in_quotes = !in_quotes;
628        } else if ch == delimiter && !in_quotes {
629            if delim_count == field_index {
630                // Found the field - check if it's empty
631                return i == field_start
632                    || (i == field_start + 1
633                        && raw_line.chars().nth(field_start) == Some(delimiter));
634            }
635            delim_count += 1;
636            field_start = i + 1;
637        }
638        prev_char = ch;
639    }
640
641    // Check last field
642    if delim_count == field_index {
643        let remaining = raw_line[field_start..].trim_end();
644        return remaining.is_empty() || remaining.chars().next() == Some(delimiter);
645    }
646
647    false
648}
649
650#[cfg(test)]
651mod tests {
652    use super::*;
653    use std::io::Cursor;
654
655    // ---- navigate_json_path tests ----
656
657    #[test]
658    fn test_navigate_json_path_descends_object_key() {
659        // The TeamCity-style case: object with a nested array one level down.
660        let doc = serde_json::json!({
661            "count": 2,
662            "project": [{"id": "a"}, {"id": "b"}]
663        });
664        let extracted = navigate_json_path(&doc, "project").unwrap();
665        assert!(extracted.is_array());
666        assert_eq!(extracted.as_array().unwrap().len(), 2);
667    }
668
669    #[test]
670    fn test_navigate_json_path_nested_descent() {
671        // TeamCity actually wraps as { projects: { project: [...] } }.
672        let doc = serde_json::json!({
673            "projects": {"project": [{"id": "a"}, {"id": "b"}, {"id": "c"}]}
674        });
675        let extracted = navigate_json_path(&doc, "projects.project").unwrap();
676        assert_eq!(extracted.as_array().unwrap().len(), 3);
677    }
678
679    #[test]
680    fn test_navigate_json_path_array_projection() {
681        // Elasticsearch-style: project _source out of each hit.
682        let doc = serde_json::json!({
683            "hits": {"hits": [
684                {"_source": {"id": 1}},
685                {"_source": {"id": 2}}
686            ]}
687        });
688        let extracted = navigate_json_path(&doc, "hits.hits[]._source").unwrap();
689        let arr = extracted.as_array().unwrap();
690        assert_eq!(arr.len(), 2);
691        assert_eq!(arr[1]["id"], serde_json::json!(2));
692    }
693
694    #[test]
695    fn test_navigate_json_path_bare_projection_over_root_array() {
696        let doc = serde_json::json!([{"v": {"x": 1}}, {"v": {"x": 2}}]);
697        let extracted = navigate_json_path(&doc, "[].v").unwrap();
698        let arr = extracted.as_array().unwrap();
699        assert_eq!(arr[0]["x"], serde_json::json!(1));
700    }
701
702    #[test]
703    fn test_navigate_json_path_empty_path_is_identity() {
704        let doc = serde_json::json!({"a": 1});
705        let extracted = navigate_json_path(&doc, "").unwrap();
706        assert_eq!(extracted, doc);
707    }
708
709    #[test]
710    fn test_navigate_json_path_missing_key_errors() {
711        let doc = serde_json::json!({"a": 1});
712        let err = navigate_json_path(&doc, "b").unwrap_err();
713        assert!(err.to_string().contains("not found"), "{}", err);
714    }
715
716    #[test]
717    fn test_navigate_json_path_projection_on_non_array_errors() {
718        let doc = serde_json::json!({"a": {"not": "an array"}});
719        let err = navigate_json_path(&doc, "a[]").unwrap_err();
720        let msg = err.to_string();
721        assert!(msg.contains("Expected array"), "{}", msg);
722        assert!(msg.contains("object"), "{}", msg);
723    }
724
725    #[test]
726    fn test_csv_from_reader() {
727        let csv_data = "id,name,value\n1,Alice,100\n2,Bob,200\n3,,300";
728        let reader = Cursor::new(csv_data);
729
730        let table =
731            load_csv_from_reader(reader, "test", "stream", "memory").expect("Failed to load CSV");
732
733        assert_eq!(table.name, "test");
734        assert_eq!(table.column_count(), 3);
735        assert_eq!(table.row_count(), 3);
736
737        // Check that empty field is NULL
738        let value = table.get_value(2, 1).unwrap();
739        assert!(matches!(value, DataValue::Null));
740    }
741
742    #[test]
743    fn test_json_from_reader() {
744        let json_data = r#"[
745            {"id": 1, "name": "Alice", "value": 100},
746            {"id": 2, "name": "Bob", "value": 200},
747            {"id": 3, "name": null, "value": 300}
748        ]"#;
749        let reader = Cursor::new(json_data);
750
751        let table =
752            load_json_from_reader(reader, "test", "stream", "memory").expect("Failed to load JSON");
753
754        assert_eq!(table.name, "test");
755        assert_eq!(table.column_count(), 3);
756        assert_eq!(table.row_count(), 3);
757
758        // Check that null is handled
759        let value = table.get_value(2, 1).unwrap();
760        assert!(matches!(value, DataValue::Null));
761    }
762
763    #[test]
764    fn test_jsonl_from_reader() {
765        let jsonl_data = "{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n";
766        let reader = Cursor::new(jsonl_data);
767
768        let table = load_json_from_reader(reader, "test", "stream", "memory")
769            .expect("Failed to load JSONL");
770
771        assert_eq!(table.column_count(), 2);
772        assert_eq!(table.row_count(), 2);
773    }
774
775    #[test]
776    fn test_jsonl_heterogeneous_schema_unioned() {
777        // Second record adds an "extra" field; loader should pick it up via the
778        // union, and row 0 should have Null for it.
779        let jsonl_data = "{\"id\":1}\n{\"id\":2,\"extra\":\"hi\"}\n";
780        let reader = Cursor::new(jsonl_data);
781        let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
782        assert_eq!(table.column_count(), 2);
783        assert_eq!(table.row_count(), 2);
784    }
785
786    #[test]
787    fn test_jsonl_skips_blank_lines() {
788        let jsonl_data = "{\"id\":1}\n\n\n{\"id\":2}\n";
789        let reader = Cursor::new(jsonl_data);
790        let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
791        assert_eq!(table.row_count(), 2);
792    }
793
794    #[test]
795    fn test_parse_json_records_array_form() {
796        let recs = parse_json_records(r#"[{"a":1},{"a":2}]"#).unwrap();
797        assert_eq!(recs.len(), 2);
798    }
799
800    #[test]
801    fn test_parse_json_records_jsonl_form() {
802        let recs = parse_json_records("{\"a\":1}\n{\"a\":2}\n").unwrap();
803        assert_eq!(recs.len(), 2);
804    }
805
806    #[test]
807    fn test_parse_json_records_jsonl_error_cites_line() {
808        let err = parse_json_records("{\"a\":1}\nnot json\n").unwrap_err();
809        assert!(err.to_string().contains("line 2"));
810    }
811
812    // ---- CsvReadOptions / delimiter detection ----
813
814    #[test]
815    fn test_csv_options_default_is_comma() {
816        let opts = CsvReadOptions::default();
817        assert_eq!(opts.delimiter, b',');
818        assert!(opts.has_headers);
819    }
820
821    #[test]
822    fn test_detect_delimiter_from_path() {
823        assert_eq!(detect_delimiter_from_path("data.tsv"), b'\t');
824        assert_eq!(detect_delimiter_from_path("data.TSV"), b'\t');
825        assert_eq!(detect_delimiter_from_path("/tmp/foo.psv"), b'|');
826        assert_eq!(detect_delimiter_from_path("data.PSV"), b'|');
827        assert_eq!(detect_delimiter_from_path("data.csv"), b',');
828        assert_eq!(detect_delimiter_from_path("noext"), b',');
829        assert_eq!(detect_delimiter_from_path("-"), b',');
830    }
831
832    #[test]
833    fn test_load_csv_with_pipe_delimiter() {
834        let data = "id|name|score\n1|alice|10\n2|bob|20\n";
835        let reader = Cursor::new(data);
836        let opts = CsvReadOptions {
837            delimiter: b'|',
838            has_headers: true,
839        };
840        let table = load_csv_from_reader_with_opts(reader, "psv", "test", "memory", &opts)
841            .expect("load failed");
842        assert_eq!(table.column_count(), 3);
843        assert_eq!(table.row_count(), 2);
844        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
845        assert_eq!(
846            table.get_value(1, 1).unwrap(),
847            &DataValue::String("bob".to_string())
848        );
849    }
850
851    #[test]
852    fn test_load_csv_with_tab_delimiter() {
853        let data = "id\tname\tscore\n1\talice\t10\n2\tbob\t20\n";
854        let reader = Cursor::new(data);
855        let opts = CsvReadOptions {
856            delimiter: b'\t',
857            has_headers: true,
858        };
859        let table = load_csv_from_reader_with_opts(reader, "tsv", "test", "memory", &opts)
860            .expect("load failed");
861        assert_eq!(table.column_count(), 3);
862        assert_eq!(table.row_count(), 2);
863        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
864    }
865
866    #[test]
867    fn test_metadata_records_delimiter() {
868        // Comma -> stored as ","
869        let table = load_csv_from_reader(Cursor::new("a,b\n1,2\n"), "t", "test", "memory").unwrap();
870        assert_eq!(
871            table.metadata.get("delimiter").map(String::as_str),
872            Some(",")
873        );
874
875        // Tab -> stored as "\t"
876        let opts = CsvReadOptions {
877            delimiter: b'\t',
878            has_headers: true,
879        };
880        let table = load_csv_from_reader_with_opts(
881            Cursor::new("a\tb\n1\t2\n"),
882            "t",
883            "test",
884            "memory",
885            &opts,
886        )
887        .unwrap();
888        assert_eq!(
889            table.metadata.get("delimiter").map(String::as_str),
890            Some("\\t")
891        );
892    }
893
894    #[test]
895    fn test_parse_delimiter_arg_accepts_single_char() {
896        assert_eq!(parse_delimiter_arg(",").unwrap(), b',');
897        assert_eq!(parse_delimiter_arg("|").unwrap(), b'|');
898        assert_eq!(parse_delimiter_arg(";").unwrap(), b';');
899    }
900
901    #[test]
902    fn test_parse_delimiter_arg_accepts_backslash_escapes() {
903        assert_eq!(parse_delimiter_arg("\\t").unwrap(), b'\t');
904        assert_eq!(parse_delimiter_arg("\t").unwrap(), b'\t');
905        assert_eq!(parse_delimiter_arg("\\n").unwrap(), b'\n');
906        assert_eq!(parse_delimiter_arg("\\r").unwrap(), b'\r');
907    }
908
909    #[test]
910    fn test_parse_delimiter_arg_rejects_multi_char() {
911        let err = parse_delimiter_arg("||").unwrap_err();
912        assert!(err.to_string().contains("single ASCII character"));
913    }
914
915    #[test]
916    fn test_parse_delimiter_arg_rejects_non_ascii() {
917        let err = parse_delimiter_arg("ö").unwrap_err();
918        assert!(err.to_string().contains("single ASCII character"));
919    }
920
921    #[test]
922    fn test_resolve_delimiter_explicit_wins() {
923        assert_eq!(resolve_delimiter("data.psv", Some(b',')), b',');
924        assert_eq!(resolve_delimiter("data.tsv", Some(b';')), b';');
925        assert_eq!(resolve_delimiter("data.csv", Some(b'|')), b'|');
926    }
927
928    #[test]
929    fn test_resolve_delimiter_falls_back_to_extension() {
930        assert_eq!(resolve_delimiter("data.psv", None), b'|');
931        assert_eq!(resolve_delimiter("data.tsv", None), b'\t');
932        assert_eq!(resolve_delimiter("data.csv", None), b',');
933        assert_eq!(resolve_delimiter("data.dat", None), b',');
934    }
935
936    #[test]
937    fn test_null_detection_works_with_pipe_delimiter() {
938        // Middle column is unquoted-empty -> NULL, not empty string.
939        let data = "id|name|score\n1||10\n";
940        let opts = CsvReadOptions {
941            delimiter: b'|',
942            has_headers: true,
943        };
944        let table =
945            load_csv_from_reader_with_opts(Cursor::new(data), "psv", "test", "memory", &opts)
946                .expect("load failed");
947        assert!(matches!(table.get_value(0, 1).unwrap(), DataValue::Null));
948    }
949}