Skip to main content

sql_cli/data/
stream_loader.rs

1// Stream-based data loader that works with any Read source
2// This allows the same code to handle files, HTTP responses, or any other data stream
3
4use anyhow::{Context, Result};
5use csv::ReaderBuilder;
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet};
8use std::io::{BufRead, BufReader, Read};
9use tracing::{debug, info};
10
11use crate::data::advanced_csv_loader::StringInterner;
12use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
13
14/// Options controlling how a CSV stream is parsed.
15///
16/// Default is RFC-4180 style: comma delimiter, header row required. Surfaces
17/// (CLI flag, `READ_CSV(path, '|')`, WEB CTE `DELIMITER`) populate this struct
18/// at the edge; internal layers just pass it through.
19#[derive(Debug, Clone)]
20pub struct CsvReadOptions {
21    pub delimiter: u8,
22    pub has_headers: bool,
23}
24
25impl Default for CsvReadOptions {
26    fn default() -> Self {
27        Self {
28            delimiter: b',',
29            has_headers: true,
30        }
31    }
32}
33
34/// Pick a default delimiter from a path's extension.
35///
36/// `.tsv` → tab, `.psv` → pipe; everything else (including stdin `-`) → comma.
37/// Case-insensitive. This is only the *auto-detect* layer — explicit overrides
38/// from the CLI flag, `READ_CSV` 2nd arg, or WEB CTE `DELIMITER` win over this.
39pub fn detect_delimiter_from_path(path: &str) -> u8 {
40    let lower = path.to_ascii_lowercase();
41    if lower.ends_with(".tsv") {
42        b'\t'
43    } else if lower.ends_with(".psv") {
44        b'|'
45    } else {
46        b','
47    }
48}
49
50/// Parse a user-supplied delimiter string into a single byte.
51///
52/// Accepts:
53///   - a single ASCII character (e.g. `","`, `"|"`, `";"`)
54///   - the two-character escapes `"\t"`, `"\n"`, `"\r"` (typing literal tabs
55///     in SQL strings or shell args is awkward, so this is the canonical form)
56///
57/// Rejects multi-character strings, non-ASCII, and empty strings with a clear
58/// error. Caller is expected to wrap the error with context if needed.
59pub fn parse_delimiter_arg(s: &str) -> anyhow::Result<u8> {
60    match s {
61        "\\t" | "\t" => return Ok(b'\t'),
62        "\\n" => return Ok(b'\n'),
63        "\\r" => return Ok(b'\r'),
64        _ => {}
65    }
66    let bytes = s.as_bytes();
67    if bytes.len() == 1 && bytes[0].is_ascii() {
68        return Ok(bytes[0]);
69    }
70    Err(anyhow::anyhow!(
71        "delimiter must be a single ASCII character (or '\\t', '\\n', '\\r'); got {:?}",
72        s
73    ))
74}
75
76/// Resolve which delimiter to use for a given path.
77///
78/// Precedence (highest first):
79///   1. `explicit` override (typically from a CLI flag or 2nd arg)
80///   2. extension auto-detect (`.tsv` → tab, `.psv` → pipe)
81///   3. comma
82pub fn resolve_delimiter(path: &str, explicit: Option<u8>) -> u8 {
83    explicit.unwrap_or_else(|| detect_delimiter_from_path(path))
84}
85
86/// Human-readable form of a delimiter byte for diagnostic metadata.
87fn delimiter_label(d: u8) -> String {
88    match d {
89        b'\t' => "\\t".to_string(),
90        b'\n' => "\\n".to_string(),
91        b'\r' => "\\r".to_string(),
92        b => (b as char).to_string(),
93    }
94}
95
96/// Column analysis results for determining interning strategy
97#[derive(Debug)]
98struct ColumnAnalysis {
99    index: usize,
100    _name: String,
101    _cardinality: usize,
102    _sample_size: usize,
103    _unique_ratio: f64,
104    is_categorical: bool,
105    _avg_string_length: usize,
106}
107
108/// Advanced stream-based CSV loader with string interning
109pub struct StreamCsvLoader {
110    sample_size: usize,
111    cardinality_threshold: f64,
112    interners: HashMap<usize, StringInterner>,
113}
114
115impl StreamCsvLoader {
116    pub fn new() -> Self {
117        Self {
118            sample_size: 1000,
119            cardinality_threshold: 0.3,
120            interners: HashMap::new(),
121        }
122    }
123
124    /// Analyze columns to determine which should use string interning
125    fn analyze_columns(
126        &self,
127        rows: &[Vec<String>],
128        headers: &csv::StringRecord,
129    ) -> Vec<ColumnAnalysis> {
130        let mut analyses = Vec::new();
131
132        for (col_idx, header) in headers.iter().enumerate() {
133            let mut unique_values = HashSet::new();
134            let mut total_length = 0;
135            let mut non_empty_count = 0;
136
137            // Sample rows to analyze cardinality
138            for row in rows.iter().take(self.sample_size) {
139                if let Some(value) = row.get(col_idx) {
140                    if !value.is_empty() {
141                        unique_values.insert(value.clone());
142                        total_length += value.len();
143                        non_empty_count += 1;
144                    }
145                }
146            }
147
148            let cardinality = unique_values.len();
149            let sample_size = rows.len().min(self.sample_size);
150            let unique_ratio = if sample_size > 0 {
151                cardinality as f64 / sample_size as f64
152            } else {
153                1.0
154            };
155
156            let avg_string_length = if non_empty_count > 0 {
157                total_length / non_empty_count
158            } else {
159                0
160            };
161
162            // Consider categorical if low cardinality ratio or short strings with repetition
163            let is_categorical = unique_ratio < self.cardinality_threshold
164                || (avg_string_length < 20 && cardinality < sample_size / 2);
165
166            analyses.push(ColumnAnalysis {
167                index: col_idx,
168                _name: header.to_string(),
169                _cardinality: cardinality,
170                _sample_size: sample_size,
171                _unique_ratio: unique_ratio,
172                is_categorical,
173                _avg_string_length: avg_string_length,
174            });
175        }
176
177        analyses
178    }
179
180    /// Load CSV data with string interning from any Read source, using default
181    /// comma-delimited options. Thin wrapper over [`load_csv_from_reader_with_opts`]
182    /// kept so existing callers don't need to touch options.
183    pub fn load_csv_from_reader<R: Read>(
184        &mut self,
185        reader: R,
186        table_name: &str,
187        source_type: &str,
188        source_path: &str,
189    ) -> Result<DataTable> {
190        self.load_csv_from_reader_with_opts(
191            reader,
192            table_name,
193            source_type,
194            source_path,
195            &CsvReadOptions::default(),
196        )
197    }
198
199    /// Load CSV data with string interning, honouring caller-supplied options
200    /// (delimiter, headers).
201    pub fn load_csv_from_reader_with_opts<R: Read>(
202        &mut self,
203        mut reader: R,
204        table_name: &str,
205        source_type: &str,
206        source_path: &str,
207        opts: &CsvReadOptions,
208    ) -> Result<DataTable> {
209        info!(
210            "Stream CSV load: Loading {} with optimizations (delimiter={})",
211            source_path,
212            delimiter_label(opts.delimiter)
213        );
214
215        // Read all data into memory
216        let mut buffer = Vec::new();
217        reader.read_to_end(&mut buffer)?;
218
219        // First pass: Parse CSV with headers
220        let mut csv_reader = ReaderBuilder::new()
221            .has_headers(opts.has_headers)
222            .delimiter(opts.delimiter)
223            .from_reader(&buffer[..]);
224
225        let headers = csv_reader.headers()?.clone();
226        let mut table = DataTable::new(table_name);
227
228        // Add metadata about the source
229        table
230            .metadata
231            .insert("source_type".to_string(), source_type.to_string());
232        table
233            .metadata
234            .insert("source_path".to_string(), source_path.to_string());
235        table
236            .metadata
237            .insert("delimiter".to_string(), delimiter_label(opts.delimiter));
238
239        // Create columns from headers
240        for header in &headers {
241            table.add_column(DataColumn::new(header));
242        }
243
244        // Collect all rows as strings
245        let mut string_rows = Vec::new();
246        for result in csv_reader.records() {
247            let record = result?;
248            let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
249            string_rows.push(row);
250        }
251
252        // Analyze columns for string interning
253        let analyses = self.analyze_columns(&string_rows, &headers);
254        let categorical_columns: HashSet<usize> = analyses
255            .iter()
256            .filter(|a| a.is_categorical)
257            .map(|a| a.index)
258            .collect();
259
260        info!(
261            "Column analysis: {} of {} columns will use string interning",
262            categorical_columns.len(),
263            analyses.len()
264        );
265
266        // Initialize interners for categorical columns
267        for col_idx in &categorical_columns {
268            self.interners.insert(*col_idx, StringInterner::new());
269        }
270
271        // Second pass: Read raw lines for NULL detection
272        let mut line_reader = BufReader::new(&buffer[..]);
273        let mut raw_lines = Vec::new();
274        let mut raw_line = String::new();
275
276        // Skip header
277        line_reader.read_line(&mut raw_line)?;
278        raw_line.clear();
279
280        // Read all raw lines
281        for _ in 0..string_rows.len() {
282            line_reader.read_line(&mut raw_line)?;
283            raw_lines.push(raw_line.clone());
284            raw_line.clear();
285        }
286
287        // Infer column types by sampling
288        let mut column_types = vec![DataType::Null; headers.len()];
289        let sample_size = string_rows.len().min(100);
290
291        for row in string_rows.iter().take(sample_size) {
292            for (col_idx, value) in row.iter().enumerate() {
293                if !value.is_empty() {
294                    let inferred = DataType::infer_from_string(value);
295                    column_types[col_idx] = column_types[col_idx].merge(&inferred);
296                }
297            }
298        }
299
300        // Update column types
301        for (col_idx, column) in table.columns.iter_mut().enumerate() {
302            column.data_type = column_types[col_idx].clone();
303        }
304
305        // Convert strings to typed values and add rows
306        for (row_idx, string_row) in string_rows.iter().enumerate() {
307            let mut values = Vec::new();
308            let raw_line = &raw_lines[row_idx];
309
310            for (col_idx, value) in string_row.iter().enumerate() {
311                let data_value = if value.is_empty() {
312                    // Check if this is NULL (,,) vs empty string ("")
313                    if is_null_field(raw_line, col_idx, opts.delimiter as char) {
314                        DataValue::Null
315                    } else if categorical_columns.contains(&col_idx) {
316                        // Use interned string for empty categorical values
317                        if let Some(interner) = self.interners.get_mut(&col_idx) {
318                            DataValue::InternedString(interner.intern(""))
319                        } else {
320                            DataValue::String(String::new())
321                        }
322                    } else {
323                        DataValue::String(String::new())
324                    }
325                } else if categorical_columns.contains(&col_idx)
326                    && column_types[col_idx] == DataType::String
327                {
328                    // Use string interning for categorical columns
329                    if let Some(interner) = self.interners.get_mut(&col_idx) {
330                        DataValue::InternedString(interner.intern(value))
331                    } else {
332                        DataValue::from_string(value, &column_types[col_idx])
333                    }
334                } else {
335                    DataValue::from_string(value, &column_types[col_idx])
336                };
337                values.push(data_value);
338            }
339            table
340                .add_row(DataRow::new(values))
341                .map_err(|e| anyhow::anyhow!(e))?;
342        }
343
344        // Print interner statistics
345        for (col_idx, interner) in &self.interners {
346            let stats = interner.stats();
347            if stats.memory_saved_bytes > 0 {
348                debug!(
349                    "Column {} interning: {} unique strings, {} references, {} bytes saved",
350                    headers.get(*col_idx).unwrap_or(&String::new()),
351                    stats.unique_strings,
352                    stats.total_references,
353                    stats.memory_saved_bytes
354                );
355            }
356        }
357
358        // Update column statistics
359        table.infer_column_types();
360
361        Ok(table)
362    }
363}
364
365/// Simple wrapper for loading CSV without advanced features. Defaults to
366/// comma delimiter; for other delimiters use [`load_csv_from_reader_with_opts`].
367pub fn load_csv_from_reader<R: Read>(
368    reader: R,
369    table_name: &str,
370    source_type: &str,
371    source_path: &str,
372) -> Result<DataTable> {
373    let mut loader = StreamCsvLoader::new();
374    loader.load_csv_from_reader(reader, table_name, source_type, source_path)
375}
376
377/// As [`load_csv_from_reader`], but honouring caller-supplied [`CsvReadOptions`]
378/// (delimiter, headers).
379pub fn load_csv_from_reader_with_opts<R: Read>(
380    reader: R,
381    table_name: &str,
382    source_type: &str,
383    source_path: &str,
384    opts: &CsvReadOptions,
385) -> Result<DataTable> {
386    let mut loader = StreamCsvLoader::new();
387    loader.load_csv_from_reader_with_opts(reader, table_name, source_type, source_path, opts)
388}
389
390/// Parse JSON content as either a JSON array of objects or JSONL
391/// (newline-delimited JSON, one object per line). The format is detected by
392/// peeking at the first non-whitespace byte: `[` starts an array, anything
393/// else is parsed line-by-line.
394///
395/// Empty and whitespace-only lines are skipped in JSONL mode. Parse errors
396/// in JSONL mode include the source line number.
397pub fn parse_json_records(content: &str) -> Result<Vec<JsonValue>> {
398    let trimmed = content.trim_start();
399    if trimmed.starts_with('[') {
400        return serde_json::from_str(content).with_context(|| "Failed to parse JSON array");
401    }
402
403    let mut out = Vec::new();
404    for (idx, raw_line) in content.lines().enumerate() {
405        let line = raw_line.trim();
406        if line.is_empty() {
407            continue;
408        }
409        let value: JsonValue = serde_json::from_str(line)
410            .with_context(|| format!("Failed to parse JSONL at line {}", idx + 1))?;
411        out.push(value);
412    }
413    Ok(out)
414}
415
416/// Compute the ordered union of object keys across the first `sample_size`
417/// records. Order of first occurrence is preserved so the column layout is
418/// stable. Non-object records are skipped.
419pub fn collect_column_names(records: &[JsonValue], sample_size: usize) -> Vec<String> {
420    let mut seen: HashSet<String> = HashSet::new();
421    let mut names: Vec<String> = Vec::new();
422    for record in records.iter().take(sample_size) {
423        if let Some(obj) = record.as_object() {
424            for key in obj.keys() {
425                if seen.insert(key.clone()) {
426                    names.push(key.clone());
427                }
428            }
429        }
430    }
431    names
432}
433
434/// Load JSON data from any Read source into a DataTable.
435///
436/// Accepts either a JSON array of objects (`[{...}, {...}]`) or JSONL
437/// (one JSON object per line). Format is auto-detected.
438pub fn load_json_from_reader<R: Read>(
439    mut reader: R,
440    table_name: &str,
441    source_type: &str,
442    source_path: &str,
443) -> Result<DataTable> {
444    let mut json_str = String::new();
445    reader.read_to_string(&mut json_str)?;
446
447    let json_data: Vec<JsonValue> = parse_json_records(&json_str)?;
448
449    if json_data.is_empty() {
450        return Ok(DataTable::new(table_name));
451    }
452
453    // Schema is the union of keys across the first 100 records so heterogeneous
454    // JSONL streams (where later records may carry fields the first one did
455    // not) don't silently drop columns.
456    let column_names = collect_column_names(&json_data, 100);
457    if column_names.is_empty() {
458        return Err(anyhow::anyhow!(
459            "JSON data must contain objects (got non-object records)"
460        ));
461    }
462
463    let mut table = DataTable::new(table_name);
464
465    // Add metadata
466    table
467        .metadata
468        .insert("source_type".to_string(), source_type.to_string());
469    table
470        .metadata
471        .insert("source_path".to_string(), source_path.to_string());
472
473    for name in &column_names {
474        table.add_column(DataColumn::new(name));
475    }
476
477    // Collect values for type inference
478    let mut string_rows = Vec::new();
479    for json_obj in &json_data {
480        if let Some(obj) = json_obj.as_object() {
481            let mut row = Vec::new();
482            for col_name in &column_names {
483                let value = obj
484                    .get(col_name)
485                    .map(|v| json_value_to_string(v))
486                    .unwrap_or_default();
487                row.push(value);
488            }
489            string_rows.push(row);
490        }
491    }
492
493    // Infer column types
494    let mut column_types = vec![DataType::Null; column_names.len()];
495    let sample_size = string_rows.len().min(100);
496
497    for row in string_rows.iter().take(sample_size) {
498        for (col_idx, value) in row.iter().enumerate() {
499            if !value.is_empty() && value != "null" {
500                let inferred = DataType::infer_from_string(value);
501                column_types[col_idx] = column_types[col_idx].merge(&inferred);
502            }
503        }
504    }
505
506    // Update column types
507    for (col_idx, column) in table.columns.iter_mut().enumerate() {
508        column.data_type = column_types[col_idx].clone();
509    }
510
511    // Convert to typed values and add rows
512    for string_row in &string_rows {
513        let mut values = Vec::new();
514        for (col_idx, value) in string_row.iter().enumerate() {
515            let data_value = if value.is_empty() || value == "null" {
516                DataValue::Null
517            } else {
518                DataValue::from_string(value, &column_types[col_idx])
519            };
520            values.push(data_value);
521        }
522        table
523            .add_row(DataRow::new(values))
524            .map_err(|e| anyhow::anyhow!(e))?;
525    }
526
527    // Update statistics
528    table.infer_column_types();
529
530    Ok(table)
531}
532
533/// Helper to convert JSON value to string for type inference
534fn json_value_to_string(value: &JsonValue) -> String {
535    match value {
536        JsonValue::Null => String::new(),
537        JsonValue::Bool(b) => b.to_string(),
538        JsonValue::Number(n) => n.to_string(),
539        JsonValue::String(s) => s.clone(),
540        JsonValue::Array(arr) => format!("{:?}", arr),
541        JsonValue::Object(obj) => format!("{:?}", obj),
542    }
543}
544
545/// Helper to detect NULL fields in raw CSV lines. `delimiter` is the field
546/// separator character used in the source (`,` for plain CSV, `\t` for TSV, etc.).
547fn is_null_field(raw_line: &str, field_index: usize, delimiter: char) -> bool {
548    let mut delim_count = 0;
549    let mut in_quotes = false;
550    let mut field_start = 0;
551    let mut prev_char = ' ';
552
553    for (i, ch) in raw_line.char_indices() {
554        if ch == '"' && prev_char != '\\' {
555            in_quotes = !in_quotes;
556        } else if ch == delimiter && !in_quotes {
557            if delim_count == field_index {
558                // Found the field - check if it's empty
559                return i == field_start
560                    || (i == field_start + 1
561                        && raw_line.chars().nth(field_start) == Some(delimiter));
562            }
563            delim_count += 1;
564            field_start = i + 1;
565        }
566        prev_char = ch;
567    }
568
569    // Check last field
570    if delim_count == field_index {
571        let remaining = raw_line[field_start..].trim_end();
572        return remaining.is_empty() || remaining.chars().next() == Some(delimiter);
573    }
574
575    false
576}
577
578#[cfg(test)]
579mod tests {
580    use super::*;
581    use std::io::Cursor;
582
583    #[test]
584    fn test_csv_from_reader() {
585        let csv_data = "id,name,value\n1,Alice,100\n2,Bob,200\n3,,300";
586        let reader = Cursor::new(csv_data);
587
588        let table =
589            load_csv_from_reader(reader, "test", "stream", "memory").expect("Failed to load CSV");
590
591        assert_eq!(table.name, "test");
592        assert_eq!(table.column_count(), 3);
593        assert_eq!(table.row_count(), 3);
594
595        // Check that empty field is NULL
596        let value = table.get_value(2, 1).unwrap();
597        assert!(matches!(value, DataValue::Null));
598    }
599
600    #[test]
601    fn test_json_from_reader() {
602        let json_data = r#"[
603            {"id": 1, "name": "Alice", "value": 100},
604            {"id": 2, "name": "Bob", "value": 200},
605            {"id": 3, "name": null, "value": 300}
606        ]"#;
607        let reader = Cursor::new(json_data);
608
609        let table =
610            load_json_from_reader(reader, "test", "stream", "memory").expect("Failed to load JSON");
611
612        assert_eq!(table.name, "test");
613        assert_eq!(table.column_count(), 3);
614        assert_eq!(table.row_count(), 3);
615
616        // Check that null is handled
617        let value = table.get_value(2, 1).unwrap();
618        assert!(matches!(value, DataValue::Null));
619    }
620
621    #[test]
622    fn test_jsonl_from_reader() {
623        let jsonl_data = "{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n";
624        let reader = Cursor::new(jsonl_data);
625
626        let table = load_json_from_reader(reader, "test", "stream", "memory")
627            .expect("Failed to load JSONL");
628
629        assert_eq!(table.column_count(), 2);
630        assert_eq!(table.row_count(), 2);
631    }
632
633    #[test]
634    fn test_jsonl_heterogeneous_schema_unioned() {
635        // Second record adds an "extra" field; loader should pick it up via the
636        // union, and row 0 should have Null for it.
637        let jsonl_data = "{\"id\":1}\n{\"id\":2,\"extra\":\"hi\"}\n";
638        let reader = Cursor::new(jsonl_data);
639        let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
640        assert_eq!(table.column_count(), 2);
641        assert_eq!(table.row_count(), 2);
642    }
643
644    #[test]
645    fn test_jsonl_skips_blank_lines() {
646        let jsonl_data = "{\"id\":1}\n\n\n{\"id\":2}\n";
647        let reader = Cursor::new(jsonl_data);
648        let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
649        assert_eq!(table.row_count(), 2);
650    }
651
652    #[test]
653    fn test_parse_json_records_array_form() {
654        let recs = parse_json_records(r#"[{"a":1},{"a":2}]"#).unwrap();
655        assert_eq!(recs.len(), 2);
656    }
657
658    #[test]
659    fn test_parse_json_records_jsonl_form() {
660        let recs = parse_json_records("{\"a\":1}\n{\"a\":2}\n").unwrap();
661        assert_eq!(recs.len(), 2);
662    }
663
664    #[test]
665    fn test_parse_json_records_jsonl_error_cites_line() {
666        let err = parse_json_records("{\"a\":1}\nnot json\n").unwrap_err();
667        assert!(err.to_string().contains("line 2"));
668    }
669
670    // ---- CsvReadOptions / delimiter detection ----
671
672    #[test]
673    fn test_csv_options_default_is_comma() {
674        let opts = CsvReadOptions::default();
675        assert_eq!(opts.delimiter, b',');
676        assert!(opts.has_headers);
677    }
678
679    #[test]
680    fn test_detect_delimiter_from_path() {
681        assert_eq!(detect_delimiter_from_path("data.tsv"), b'\t');
682        assert_eq!(detect_delimiter_from_path("data.TSV"), b'\t');
683        assert_eq!(detect_delimiter_from_path("/tmp/foo.psv"), b'|');
684        assert_eq!(detect_delimiter_from_path("data.PSV"), b'|');
685        assert_eq!(detect_delimiter_from_path("data.csv"), b',');
686        assert_eq!(detect_delimiter_from_path("noext"), b',');
687        assert_eq!(detect_delimiter_from_path("-"), b',');
688    }
689
690    #[test]
691    fn test_load_csv_with_pipe_delimiter() {
692        let data = "id|name|score\n1|alice|10\n2|bob|20\n";
693        let reader = Cursor::new(data);
694        let opts = CsvReadOptions {
695            delimiter: b'|',
696            has_headers: true,
697        };
698        let table = load_csv_from_reader_with_opts(reader, "psv", "test", "memory", &opts)
699            .expect("load failed");
700        assert_eq!(table.column_count(), 3);
701        assert_eq!(table.row_count(), 2);
702        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
703        assert_eq!(
704            table.get_value(1, 1).unwrap(),
705            &DataValue::String("bob".to_string())
706        );
707    }
708
709    #[test]
710    fn test_load_csv_with_tab_delimiter() {
711        let data = "id\tname\tscore\n1\talice\t10\n2\tbob\t20\n";
712        let reader = Cursor::new(data);
713        let opts = CsvReadOptions {
714            delimiter: b'\t',
715            has_headers: true,
716        };
717        let table = load_csv_from_reader_with_opts(reader, "tsv", "test", "memory", &opts)
718            .expect("load failed");
719        assert_eq!(table.column_count(), 3);
720        assert_eq!(table.row_count(), 2);
721        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
722    }
723
724    #[test]
725    fn test_metadata_records_delimiter() {
726        // Comma -> stored as ","
727        let table = load_csv_from_reader(Cursor::new("a,b\n1,2\n"), "t", "test", "memory").unwrap();
728        assert_eq!(
729            table.metadata.get("delimiter").map(String::as_str),
730            Some(",")
731        );
732
733        // Tab -> stored as "\t"
734        let opts = CsvReadOptions {
735            delimiter: b'\t',
736            has_headers: true,
737        };
738        let table = load_csv_from_reader_with_opts(
739            Cursor::new("a\tb\n1\t2\n"),
740            "t",
741            "test",
742            "memory",
743            &opts,
744        )
745        .unwrap();
746        assert_eq!(
747            table.metadata.get("delimiter").map(String::as_str),
748            Some("\\t")
749        );
750    }
751
752    #[test]
753    fn test_parse_delimiter_arg_accepts_single_char() {
754        assert_eq!(parse_delimiter_arg(",").unwrap(), b',');
755        assert_eq!(parse_delimiter_arg("|").unwrap(), b'|');
756        assert_eq!(parse_delimiter_arg(";").unwrap(), b';');
757    }
758
759    #[test]
760    fn test_parse_delimiter_arg_accepts_backslash_escapes() {
761        assert_eq!(parse_delimiter_arg("\\t").unwrap(), b'\t');
762        assert_eq!(parse_delimiter_arg("\t").unwrap(), b'\t');
763        assert_eq!(parse_delimiter_arg("\\n").unwrap(), b'\n');
764        assert_eq!(parse_delimiter_arg("\\r").unwrap(), b'\r');
765    }
766
767    #[test]
768    fn test_parse_delimiter_arg_rejects_multi_char() {
769        let err = parse_delimiter_arg("||").unwrap_err();
770        assert!(err.to_string().contains("single ASCII character"));
771    }
772
773    #[test]
774    fn test_parse_delimiter_arg_rejects_non_ascii() {
775        let err = parse_delimiter_arg("ö").unwrap_err();
776        assert!(err.to_string().contains("single ASCII character"));
777    }
778
779    #[test]
780    fn test_resolve_delimiter_explicit_wins() {
781        assert_eq!(resolve_delimiter("data.psv", Some(b',')), b',');
782        assert_eq!(resolve_delimiter("data.tsv", Some(b';')), b';');
783        assert_eq!(resolve_delimiter("data.csv", Some(b'|')), b'|');
784    }
785
786    #[test]
787    fn test_resolve_delimiter_falls_back_to_extension() {
788        assert_eq!(resolve_delimiter("data.psv", None), b'|');
789        assert_eq!(resolve_delimiter("data.tsv", None), b'\t');
790        assert_eq!(resolve_delimiter("data.csv", None), b',');
791        assert_eq!(resolve_delimiter("data.dat", None), b',');
792    }
793
794    #[test]
795    fn test_null_detection_works_with_pipe_delimiter() {
796        // Middle column is unquoted-empty -> NULL, not empty string.
797        let data = "id|name|score\n1||10\n";
798        let opts = CsvReadOptions {
799            delimiter: b'|',
800            has_headers: true,
801        };
802        let table =
803            load_csv_from_reader_with_opts(Cursor::new(data), "psv", "test", "memory", &opts)
804                .expect("load failed");
805        assert!(matches!(table.get_value(0, 1).unwrap(), DataValue::Null));
806    }
807}