Skip to main content

sql_cli/sql/generators/
file_readers.rs

1use crate::data::advanced_csv_loader::AdvancedCsvLoader;
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
3use crate::data::stream_loader::{
4    collect_column_names, detect_delimiter_from_path, parse_delimiter_arg as parse_delim_byte,
5    CsvReadOptions,
6};
7use crate::sql::generators::TableGenerator;
8use anyhow::{anyhow, Result};
9use regex::Regex;
10use serde_json::Value as JsonValue;
11use std::fs::File;
12use std::io::{BufRead, BufReader, Cursor, IsTerminal};
13use std::sync::{Arc, OnceLock};
14
15/// Hard cap on rows any file reader will return. Users who need more can raise
16/// it via a session setting (future work); for now this protects against
17/// accidentally pulling a multi-GB log into memory.
18const MAX_LINES_PER_FILE: usize = 1_000_000;
19
20/// Extract a string argument, erroring if the arg is missing/NULL/non-string.
21fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
22    match args.get(idx) {
23        Some(DataValue::String(s)) => Ok(s.clone()),
24        Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
25        Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
26        Some(v) => Err(anyhow!(
27            "{} argument {} must be a string, got {:?}",
28            name,
29            idx + 1,
30            v
31        )),
32    }
33}
34
35/// Extract an optional string argument. Returns None for missing or NULL.
36fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
37    match args.get(idx) {
38        Some(DataValue::String(s)) => Some(s.clone()),
39        Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
40        _ => None,
41    }
42}
43
44/// Parse a delimiter arg for READ_CSV, wrapping the shared parser's error
45/// with the function name for clearer messages.
46fn parse_delimiter_arg(s: &str, fn_name: &str) -> Result<u8> {
47    parse_delim_byte(s).map_err(|e| anyhow!("{}: {}", fn_name, e))
48}
49
50/// Open a file and stream its lines, applying an optional include-regex filter
51/// and the global truncation cap. Emits a stderr warning when truncation kicks in.
52///
53/// Returns (line_num, line) pairs where `line_num` is the original 1-based line
54/// number in the source file — so numbers are preserved through filtering.
55fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
56    let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
57    let reader = BufReader::new(file);
58
59    let mut out = Vec::new();
60    let mut truncated = false;
61
62    for (idx, line_result) in reader.lines().enumerate() {
63        let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
64        let line_num = (idx + 1) as i64;
65
66        if let Some(re) = match_regex {
67            if !re.is_match(&line) {
68                continue;
69            }
70        }
71
72        if out.len() >= MAX_LINES_PER_FILE {
73            truncated = true;
74            break;
75        }
76        out.push((line_num, line));
77    }
78
79    if truncated {
80        eprintln!(
81            "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
82            MAX_LINES_PER_FILE, path
83        );
84    }
85
86    Ok(out)
87}
88
89/// Read lines from a file path, or from stdin if `path == "-"` (Unix convention).
90///
91/// Stdin reads are cached for the process — same buffer is reused across multiple
92/// reader invocations in the same query. The optional regex filter is applied to
93/// both sources uniformly.
94fn read_lines_from_path_or_stdin(
95    path: &str,
96    match_regex: Option<&Regex>,
97) -> Result<Vec<(i64, String)>> {
98    if path == "-" {
99        let cached = cached_stdin_lines()?;
100        return Ok(cached
101            .iter()
102            .filter(|(_, line)| match_regex.map_or(true, |re| re.is_match(line)))
103            .cloned()
104            .collect());
105    }
106    read_filtered_lines(path, match_regex)
107}
108
109/// READ_TEXT(path [, match_regex]) - Read a text file line by line.
110///
111/// Emits `(line_num, line)` rows. Optional `match_regex` filters source lines
112/// *before* materializing them, which is the primary fast path for large logs.
113pub struct ReadText;
114
115impl TableGenerator for ReadText {
116    fn name(&self) -> &str {
117        "READ_TEXT"
118    }
119
120    fn columns(&self) -> Vec<DataColumn> {
121        vec![DataColumn::new("line_num"), DataColumn::new("line")]
122    }
123
124    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
125        if args.is_empty() || args.len() > 2 {
126            return Err(anyhow!(
127                "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
128            ));
129        }
130
131        let path = require_string(&args, 0, "READ_TEXT")?;
132        let match_regex = optional_string(&args, 1)
133            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
134            .transpose()?;
135
136        let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
137
138        let mut table = DataTable::new("read_text");
139        table.add_column(DataColumn::new("line_num"));
140        table.add_column(DataColumn::new("line"));
141
142        for (line_num, line) in lines {
143            table
144                .add_row(DataRow::new(vec![
145                    DataValue::Integer(line_num),
146                    DataValue::String(line),
147                ]))
148                .map_err(|e| anyhow!(e))?;
149        }
150
151        Ok(Arc::new(table))
152    }
153
154    fn description(&self) -> &str {
155        "Read a text file line-by-line. Pass '-' as path to read from stdin. Optional second arg is a regex that filters lines at read time."
156    }
157
158    fn arg_count(&self) -> usize {
159        2
160    }
161}
162
163/// GREP(path, pattern [, invert]) - Read only lines matching a regex.
164///
165/// Thin composable wrapper around READ_TEXT's filter path. Third argument
166/// (boolean or integer truthy value) inverts the match, matching `grep -v`.
167pub struct Grep;
168
169impl TableGenerator for Grep {
170    fn name(&self) -> &str {
171        "GREP"
172    }
173
174    fn columns(&self) -> Vec<DataColumn> {
175        vec![DataColumn::new("line_num"), DataColumn::new("line")]
176    }
177
178    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
179        if args.len() < 2 || args.len() > 3 {
180            return Err(anyhow!(
181                "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
182            ));
183        }
184
185        let path = require_string(&args, 0, "GREP")?;
186        let pattern_str = require_string(&args, 1, "GREP")?;
187        let pattern =
188            Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
189
190        let invert = match args.get(2) {
191            Some(DataValue::Boolean(b)) => *b,
192            Some(DataValue::Integer(n)) => *n != 0,
193            Some(DataValue::Null) | None => false,
194            Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
195        };
196
197        // When not inverted we can push the filter down into the line reader for
198        // the fast path. When inverted we still iterate every line.
199        let lines = if invert {
200            let all = read_lines_from_path_or_stdin(&path, None)?;
201            all.into_iter()
202                .filter(|(_, line)| !pattern.is_match(line))
203                .collect::<Vec<_>>()
204        } else {
205            read_lines_from_path_or_stdin(&path, Some(&pattern))?
206        };
207
208        let mut table = DataTable::new("grep");
209        table.add_column(DataColumn::new("line_num"));
210        table.add_column(DataColumn::new("line"));
211
212        for (line_num, line) in lines {
213            table
214                .add_row(DataRow::new(vec![
215                    DataValue::Integer(line_num),
216                    DataValue::String(line),
217                ]))
218                .map_err(|e| anyhow!(e))?;
219        }
220
221        Ok(Arc::new(table))
222    }
223
224    fn description(&self) -> &str {
225        "Read only lines matching a regex (third arg inverts the match, like grep -v). Pass '-' as path to read from stdin."
226    }
227
228    fn arg_count(&self) -> usize {
229        3
230    }
231}
232
233/// READ_WORDS(path [, min_length [, case]]) - Read a text file and emit one row per word.
234///
235/// Emits `(word_num, word, line_num, word_pos)` rows where:
236///   - `word_num` is a global 1-based word counter across the whole file
237///   - `word` is the extracted token (punctuation stripped)
238///   - `line_num` is the original 1-based line number the word came from
239///   - `word_pos` is the 1-based position of the word within its line
240///
241/// Optional `min_length` (default 1) filters out short words.
242/// Optional `case` ('lower' or 'upper') normalises word casing.
243pub struct ReadWords;
244
245impl TableGenerator for ReadWords {
246    fn name(&self) -> &str {
247        "READ_WORDS"
248    }
249
250    fn columns(&self) -> Vec<DataColumn> {
251        vec![
252            DataColumn::new("word_num"),
253            DataColumn::new("word"),
254            DataColumn::new("line_num"),
255            DataColumn::new("word_pos"),
256        ]
257    }
258
259    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
260        if args.is_empty() || args.len() > 3 {
261            return Err(anyhow!(
262                "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
263            ));
264        }
265
266        let path = require_string(&args, 0, "READ_WORDS")?;
267
268        let min_length: usize = match args.get(1) {
269            Some(DataValue::Integer(n)) => {
270                if *n < 1 {
271                    return Err(anyhow!("READ_WORDS min_length must be >= 1"));
272                }
273                *n as usize
274            }
275            Some(DataValue::Float(f)) => *f as usize,
276            Some(DataValue::Null) | None => 1,
277            Some(v) => {
278                return Err(anyhow!(
279                    "READ_WORDS min_length must be an integer, got {:?}",
280                    v
281                ))
282            }
283        };
284
285        let case_option = optional_string(&args, 2);
286
287        let lines = read_lines_from_path_or_stdin(&path, None)?;
288
289        let mut table = DataTable::new("read_words");
290        table.add_column(DataColumn::new("word_num"));
291        table.add_column(DataColumn::new("word"));
292        table.add_column(DataColumn::new("line_num"));
293        table.add_column(DataColumn::new("word_pos"));
294
295        let mut word_num: i64 = 0;
296
297        for (line_num, line) in &lines {
298            let mut word_pos: i64 = 0;
299
300            for token in line.split(|c: char| !c.is_alphanumeric()) {
301                if token.is_empty() || token.len() < min_length {
302                    continue;
303                }
304
305                word_pos += 1;
306                word_num += 1;
307
308                let word = match case_option.as_deref() {
309                    Some("lower") | Some("lowercase") => token.to_lowercase(),
310                    Some("upper") | Some("uppercase") => token.to_uppercase(),
311                    _ => token.to_string(),
312                };
313
314                table
315                    .add_row(DataRow::new(vec![
316                        DataValue::Integer(word_num),
317                        DataValue::String(word),
318                        DataValue::Integer(*line_num),
319                        DataValue::Integer(word_pos),
320                    ]))
321                    .map_err(|e| anyhow!(e))?;
322            }
323        }
324
325        Ok(Arc::new(table))
326    }
327
328    fn description(&self) -> &str {
329        "Read a text file and emit one row per word, with optional min length and case normalisation"
330    }
331
332    fn arg_count(&self) -> usize {
333        3
334    }
335}
336
337/// READ_JSONL(path [, match_regex]) - Read newline-delimited JSON.
338///
339/// Each non-blank line is parsed as a self-contained JSON object. Schema is
340/// the union of object keys across the first 100 records, so heterogeneous
341/// log streams (later events introducing new fields) don't drop columns.
342/// Optional `match_regex` filters source lines *before* JSON parsing — the
343/// fast path for grepping large log files.
344pub struct ReadJsonl;
345
346impl TableGenerator for ReadJsonl {
347    fn name(&self) -> &str {
348        "READ_JSONL"
349    }
350
351    fn columns(&self) -> Vec<DataColumn> {
352        // Schema is dynamic; the actual columns are inferred at generate() time
353        // from the JSON keys in the file.
354        vec![DataColumn::new("(inferred from JSON keys)")]
355    }
356
357    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
358        if args.is_empty() || args.len() > 2 {
359            return Err(anyhow!(
360                "READ_JSONL expects 1 or 2 arguments: (path [, match_regex])"
361            ));
362        }
363
364        let path = require_string(&args, 0, "READ_JSONL")?;
365        let match_regex = optional_string(&args, 1)
366            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
367            .transpose()?;
368
369        let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
370
371        let mut records: Vec<JsonValue> = Vec::with_capacity(lines.len());
372        for (line_num, line) in &lines {
373            let trimmed = line.trim();
374            if trimmed.is_empty() {
375                continue;
376            }
377            let value: JsonValue = serde_json::from_str(trimmed)
378                .map_err(|e| anyhow!("READ_JSONL parse error at line {}: {}", line_num, e))?;
379            records.push(value);
380        }
381
382        if records.is_empty() {
383            return Ok(Arc::new(DataTable::new("read_jsonl")));
384        }
385
386        let column_names = collect_column_names(&records, 100);
387        if column_names.is_empty() {
388            return Err(anyhow!(
389                "READ_JSONL: no JSON objects found (records must be objects, not arrays/scalars)"
390            ));
391        }
392
393        // Stringify, infer types from the first 100 rows, then convert to typed
394        // DataValues — same pipeline the file-level JSON loader uses.
395        let mut string_rows: Vec<Vec<String>> = Vec::with_capacity(records.len());
396        for record in &records {
397            let obj = match record.as_object() {
398                Some(o) => o,
399                None => continue,
400            };
401            let mut row = Vec::with_capacity(column_names.len());
402            for col_name in &column_names {
403                let value = obj
404                    .get(col_name)
405                    .map(json_value_to_string)
406                    .unwrap_or_default();
407                row.push(value);
408            }
409            string_rows.push(row);
410        }
411
412        let mut column_types: Vec<DataType> = vec![DataType::Null; column_names.len()];
413        let sample_size = string_rows.len().min(100);
414        for row in string_rows.iter().take(sample_size) {
415            for (col_idx, value) in row.iter().enumerate() {
416                if !value.is_empty() && value != "null" {
417                    let inferred = DataType::infer_from_string(value);
418                    column_types[col_idx] = column_types[col_idx].merge(&inferred);
419                }
420            }
421        }
422
423        let mut table = DataTable::new("read_jsonl");
424        for (name, dtype) in column_names.iter().zip(column_types.iter()) {
425            let mut col = DataColumn::new(name);
426            col.data_type = dtype.clone();
427            table.add_column(col);
428        }
429
430        for string_row in &string_rows {
431            let mut values = Vec::with_capacity(string_row.len());
432            for (col_idx, value) in string_row.iter().enumerate() {
433                let dv = if value.is_empty() || value == "null" {
434                    DataValue::Null
435                } else {
436                    DataValue::from_string(value, &column_types[col_idx])
437                };
438                values.push(dv);
439            }
440            table
441                .add_row(DataRow::new(values))
442                .map_err(|e| anyhow!(e))?;
443        }
444
445        Ok(Arc::new(table))
446    }
447
448    fn description(&self) -> &str {
449        "Read newline-delimited JSON (one object per line). Pass '-' as path to read JSONL from stdin. Optional second arg is a regex that filters lines at read time."
450    }
451
452    fn arg_count(&self) -> usize {
453        2
454    }
455}
456
457/// READ_CSV(path [, delimiter]) - Read a delimited text file and emit one row per record.
458///
459/// Columns are inferred from the header row. Pass `-` as the path to read CSV
460/// from stdin (shares the same cached-once buffer with READ_STDIN / READ_JSONL).
461///
462/// Delimiter resolution:
463///   1. Explicit 2nd arg wins (single ASCII char or `\t` escape).
464///   2. Otherwise, `.tsv` → tab, `.psv` → pipe, everything else → comma.
465///   3. Stdin (`-`) with no explicit delimiter defaults to comma.
466///
467/// Type inference, string interning, and other optimisations are inherited
468/// from the main CSV loader so behaviour matches `sql-cli file.csv -q ...`.
469pub struct ReadCsv;
470
471impl TableGenerator for ReadCsv {
472    fn name(&self) -> &str {
473        "READ_CSV"
474    }
475
476    fn columns(&self) -> Vec<DataColumn> {
477        // Schema is inferred from the CSV header at generate() time.
478        vec![DataColumn::new("(inferred from CSV header)")]
479    }
480
481    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
482        if args.is_empty() || args.len() > 2 {
483            return Err(anyhow!(
484                "READ_CSV expects 1 or 2 arguments: (path [, delimiter])"
485            ));
486        }
487
488        let path = require_string(&args, 0, "READ_CSV")?;
489
490        // Resolve delimiter: explicit 2nd arg > extension auto-detect > comma.
491        // Stdin (`-`) skips the extension layer since there's no extension to read.
492        let delimiter = if let Some(s) = optional_string(&args, 1) {
493            parse_delimiter_arg(&s, "READ_CSV")?
494        } else if path == "-" {
495            b','
496        } else {
497            detect_delimiter_from_path(&path)
498        };
499
500        let opts = CsvReadOptions {
501            delimiter,
502            has_headers: true,
503        };
504
505        let mut loader = AdvancedCsvLoader::new();
506
507        let table = if path == "-" {
508            // Reconstruct a CSV byte stream from the cached stdin lines so other
509            // stdin readers in the same query keep seeing the same buffer.
510            let lines = cached_stdin_lines()?;
511            let mut buffer = String::with_capacity(lines.iter().map(|(_, l)| l.len() + 1).sum());
512            for (i, (_, line)) in lines.iter().enumerate() {
513                if i > 0 {
514                    buffer.push('\n');
515                }
516                buffer.push_str(line);
517            }
518            let cursor = Cursor::new(buffer.into_bytes());
519            loader
520                .load_csv_from_reader_with_opts(cursor, "read_csv", "<stdin>", &opts)
521                .map_err(|e| anyhow!("READ_CSV parse error reading stdin: {}", e))?
522        } else {
523            let file = File::open(&path)
524                .map_err(|e| anyhow!("READ_CSV failed to open '{}': {}", path, e))?;
525            loader
526                .load_csv_from_reader_with_opts(file, "read_csv", &path, &opts)
527                .map_err(|e| anyhow!("READ_CSV parse error reading '{}': {}", path, e))?
528        };
529
530        Ok(Arc::new(table))
531    }
532
533    fn description(&self) -> &str {
534        "Read a delimited text file (header row required). Pass '-' as path to read from stdin. \
535         Second arg overrides the delimiter; otherwise '.tsv' → tab, '.psv' → pipe, else comma."
536    }
537
538    fn arg_count(&self) -> usize {
539        2
540    }
541}
542
543fn json_value_to_string(value: &JsonValue) -> String {
544    match value {
545        JsonValue::Null => String::new(),
546        JsonValue::Bool(b) => b.to_string(),
547        JsonValue::Number(n) => n.to_string(),
548        JsonValue::String(s) => s.clone(),
549        JsonValue::Array(arr) => format!("{:?}", arr),
550        JsonValue::Object(obj) => format!("{:?}", obj),
551    }
552}
553
554/// READ_STDIN([match_regex]) - Read lines piped on standard input.
555///
556/// Emits `(line_num, line)` rows. Optional regex pre-filters lines before
557/// materialisation. Erroring on TTY input prevents the engine from blocking
558/// forever waiting on a keyboard when the user forgets the pipe.
559///
560/// Stdin is consumable, so it is read **once per process** and cached. This
561/// keeps the function composable: multiple `READ_STDIN()` references in the
562/// same query (CTEs, self-joins) see the same rows. The regex filter is
563/// applied per call against the cached buffer.
564pub struct ReadStdin;
565
566static STDIN_CACHE: OnceLock<Result<Vec<(i64, String)>, String>> = OnceLock::new();
567
568fn cached_stdin_lines() -> Result<&'static Vec<(i64, String)>> {
569    let cached = STDIN_CACHE.get_or_init(|| {
570        let stdin = std::io::stdin();
571        if stdin.is_terminal() {
572            return Err("READ_STDIN() requires data piped on stdin; got an interactive terminal. Try: cat file | sql-cli -q '...'".to_string());
573        }
574        let handle = stdin.lock();
575        let reader = BufReader::new(handle);
576        let mut out = Vec::new();
577        let mut truncated = false;
578        for (idx, line_result) in reader.lines().enumerate() {
579            let line = match line_result {
580                Ok(l) => l,
581                Err(e) => return Err(format!("Error reading stdin: {}", e)),
582            };
583            if out.len() >= MAX_LINES_PER_FILE {
584                truncated = true;
585                break;
586            }
587            out.push(((idx + 1) as i64, line));
588        }
589        if truncated {
590            eprintln!(
591                "WARNING: truncated to {} rows (max_lines_per_file cap) when reading stdin",
592                MAX_LINES_PER_FILE
593            );
594        }
595        Ok(out)
596    });
597    cached.as_ref().map_err(|e| anyhow!(e.clone()))
598}
599
600impl TableGenerator for ReadStdin {
601    fn name(&self) -> &str {
602        "READ_STDIN"
603    }
604
605    fn columns(&self) -> Vec<DataColumn> {
606        vec![DataColumn::new("line_num"), DataColumn::new("line")]
607    }
608
609    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
610        if args.len() > 1 {
611            return Err(anyhow!(
612                "READ_STDIN expects 0 or 1 arguments: ([match_regex])"
613            ));
614        }
615
616        let match_regex = optional_string(&args, 0)
617            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
618            .transpose()?;
619
620        let lines = cached_stdin_lines()?;
621
622        let mut table = DataTable::new("read_stdin");
623        table.add_column(DataColumn::new("line_num"));
624        table.add_column(DataColumn::new("line"));
625
626        for (line_num, line) in lines {
627            if let Some(ref re) = match_regex {
628                if !re.is_match(line) {
629                    continue;
630                }
631            }
632            table
633                .add_row(DataRow::new(vec![
634                    DataValue::Integer(*line_num),
635                    DataValue::String(line.clone()),
636                ]))
637                .map_err(|e| anyhow!(e))?;
638        }
639
640        Ok(Arc::new(table))
641    }
642
643    fn description(&self) -> &str {
644        "Read lines piped on stdin (cached once per process). Optional regex filters lines at read time. Yields (line_num, line)."
645    }
646
647    fn arg_count(&self) -> usize {
648        1
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655    use std::io::Write;
656    use tempfile::NamedTempFile;
657
658    fn write_tmp(contents: &str) -> NamedTempFile {
659        let mut f = NamedTempFile::new().unwrap();
660        f.write_all(contents.as_bytes()).unwrap();
661        f
662    }
663
664    #[test]
665    fn test_read_text_returns_all_lines() {
666        let f = write_tmp("one\ntwo\nthree\n");
667        let table = ReadText
668            .generate(vec![DataValue::String(
669                f.path().to_string_lossy().to_string(),
670            )])
671            .unwrap();
672        assert_eq!(table.row_count(), 3);
673        assert_eq!(
674            table.get_value(0, 1).unwrap(),
675            &DataValue::String("one".to_string())
676        );
677        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
678    }
679
680    #[test]
681    fn test_read_text_with_match_regex_filters_lines() {
682        let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
683        let table = ReadText
684            .generate(vec![
685                DataValue::String(f.path().to_string_lossy().to_string()),
686                DataValue::String("ERROR".to_string()),
687            ])
688            .unwrap();
689        assert_eq!(table.row_count(), 2);
690        // Line numbers preserve original file positions (2 and 4), not 1 and 2.
691        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
692        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
693    }
694
695    #[test]
696    fn test_read_text_requires_path() {
697        assert!(ReadText.generate(vec![]).is_err());
698    }
699
700    #[test]
701    fn test_read_text_invalid_regex_errors_early() {
702        let f = write_tmp("hello\n");
703        let err = ReadText
704            .generate(vec![
705                DataValue::String(f.path().to_string_lossy().to_string()),
706                DataValue::String("(unclosed".to_string()),
707            ])
708            .unwrap_err();
709        assert!(err.to_string().contains("match_regex"));
710    }
711
712    #[test]
713    fn test_grep_matches_like_grep() {
714        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
715        let table = Grep
716            .generate(vec![
717                DataValue::String(f.path().to_string_lossy().to_string()),
718                DataValue::String("^ap".to_string()),
719            ])
720            .unwrap();
721        assert_eq!(table.row_count(), 2);
722        assert_eq!(
723            table.get_value(0, 1).unwrap(),
724            &DataValue::String("apple".to_string())
725        );
726        assert_eq!(
727            table.get_value(1, 1).unwrap(),
728            &DataValue::String("apricot".to_string())
729        );
730    }
731
732    #[test]
733    fn test_grep_invert_like_grep_v() {
734        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
735        let table = Grep
736            .generate(vec![
737                DataValue::String(f.path().to_string_lossy().to_string()),
738                DataValue::String("^ap".to_string()),
739                DataValue::Boolean(true),
740            ])
741            .unwrap();
742        assert_eq!(table.row_count(), 2);
743        assert_eq!(
744            table.get_value(0, 1).unwrap(),
745            &DataValue::String("banana".to_string())
746        );
747    }
748
749    // ---- ReadWords tests ----
750
751    #[test]
752    fn test_read_words_basic() {
753        let f = write_tmp("hello world\ngoodbye moon\n");
754        let table = ReadWords
755            .generate(vec![DataValue::String(
756                f.path().to_string_lossy().to_string(),
757            )])
758            .unwrap();
759        // 4 words total
760        assert_eq!(table.row_count(), 4);
761        // Columns: word_num, word, line_num, word_pos
762        // First word
763        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); // word_num
764        assert_eq!(
765            table.get_value(0, 1).unwrap(),
766            &DataValue::String("hello".to_string())
767        );
768        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); // line_num
769        assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); // word_pos
770                                                                            // Third word (first on line 2)
771        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
772        assert_eq!(
773            table.get_value(2, 1).unwrap(),
774            &DataValue::String("goodbye".to_string())
775        );
776        assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
777        assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
778    }
779
780    #[test]
781    fn test_read_words_min_length() {
782        let f = write_tmp("I am a big dog\n");
783        let table = ReadWords
784            .generate(vec![
785                DataValue::String(f.path().to_string_lossy().to_string()),
786                DataValue::Integer(3),
787            ])
788            .unwrap();
789        // Only "big" and "dog" have length >= 3
790        assert_eq!(table.row_count(), 2);
791        assert_eq!(
792            table.get_value(0, 1).unwrap(),
793            &DataValue::String("big".to_string())
794        );
795        assert_eq!(
796            table.get_value(1, 1).unwrap(),
797            &DataValue::String("dog".to_string())
798        );
799    }
800
801    #[test]
802    fn test_read_words_case_lower() {
803        let f = write_tmp("Hello World\n");
804        let table = ReadWords
805            .generate(vec![
806                DataValue::String(f.path().to_string_lossy().to_string()),
807                DataValue::Integer(1),
808                DataValue::String("lower".to_string()),
809            ])
810            .unwrap();
811        assert_eq!(
812            table.get_value(0, 1).unwrap(),
813            &DataValue::String("hello".to_string())
814        );
815        assert_eq!(
816            table.get_value(1, 1).unwrap(),
817            &DataValue::String("world".to_string())
818        );
819    }
820
821    #[test]
822    fn test_read_words_strips_punctuation() {
823        let f = write_tmp("hello, world! foo-bar.\n");
824        let table = ReadWords
825            .generate(vec![DataValue::String(
826                f.path().to_string_lossy().to_string(),
827            )])
828            .unwrap();
829        let words: Vec<String> = (0..table.row_count())
830            .map(|i| match table.get_value(i, 1).unwrap() {
831                DataValue::String(s) => s.clone(),
832                _ => panic!("expected string"),
833            })
834            .collect();
835        assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
836    }
837
838    #[test]
839    fn test_read_words_requires_path() {
840        assert!(ReadWords.generate(vec![]).is_err());
841    }
842
843    #[test]
844    fn test_read_words_empty_lines_skipped() {
845        let f = write_tmp("hello\n\n\nworld\n");
846        let table = ReadWords
847            .generate(vec![DataValue::String(
848                f.path().to_string_lossy().to_string(),
849            )])
850            .unwrap();
851        assert_eq!(table.row_count(), 2);
852        // word_num is contiguous
853        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
854        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
855        // line_num preserves original positions
856        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
857        assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
858    }
859
860    // ---- ReadJsonl tests ----
861
862    fn col_index(table: &DataTable, name: &str) -> usize {
863        table
864            .columns
865            .iter()
866            .position(|c| c.name == name)
867            .unwrap_or_else(|| panic!("column '{}' not found", name))
868    }
869
870    #[test]
871    fn test_read_jsonl_basic() {
872        let f = write_tmp(
873            r#"{"id":1,"name":"alice","score":10}
874{"id":2,"name":"bob","score":20}
875{"id":3,"name":"carol","score":30}
876"#,
877        );
878        let table = ReadJsonl
879            .generate(vec![DataValue::String(
880                f.path().to_string_lossy().to_string(),
881            )])
882            .unwrap();
883        assert_eq!(table.row_count(), 3);
884        assert_eq!(table.column_count(), 3);
885
886        let id_col = col_index(&table, "id");
887        let name_col = col_index(&table, "name");
888        assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
889        assert_eq!(
890            table.get_value(2, name_col).unwrap(),
891            &DataValue::String("carol".to_string())
892        );
893    }
894
895    #[test]
896    fn test_read_jsonl_heterogeneous_schema_unioned() {
897        // Later records introduce fields the first one didn't have. Schema
898        // should union them; missing values become Null.
899        let f = write_tmp(
900            r#"{"id":1,"name":"alice"}
901{"id":2,"name":"bob","extra":"hello"}
902{"id":3,"name":"carol","other":42}
903"#,
904        );
905        let table = ReadJsonl
906            .generate(vec![DataValue::String(
907                f.path().to_string_lossy().to_string(),
908            )])
909            .unwrap();
910        assert_eq!(table.row_count(), 3);
911        assert_eq!(table.column_count(), 4);
912        let extra = col_index(&table, "extra");
913        let other = col_index(&table, "other");
914        // Row 0 has neither extra nor other -> Null
915        assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
916        assert_eq!(table.get_value(0, other).unwrap(), &DataValue::Null);
917        // Row 1 has extra="hello"
918        assert_eq!(
919            table.get_value(1, extra).unwrap(),
920            &DataValue::String("hello".to_string())
921        );
922        // Row 2 has other=42
923        assert_eq!(table.get_value(2, other).unwrap(), &DataValue::Integer(42));
924    }
925
926    #[test]
927    fn test_read_jsonl_blank_lines_skipped() {
928        let f = write_tmp(
929            r#"{"id":1}
930
931{"id":2}
932
933"#,
934        );
935        let table = ReadJsonl
936            .generate(vec![DataValue::String(
937                f.path().to_string_lossy().to_string(),
938            )])
939            .unwrap();
940        assert_eq!(table.row_count(), 2);
941    }
942
943    #[test]
944    fn test_read_jsonl_match_regex_pre_filters() {
945        let f = write_tmp(
946            r#"{"level":"INFO","msg":"boot"}
947{"level":"ERROR","msg":"disk"}
948{"level":"INFO","msg":"shutdown"}
949{"level":"ERROR","msg":"oom"}
950"#,
951        );
952        let table = ReadJsonl
953            .generate(vec![
954                DataValue::String(f.path().to_string_lossy().to_string()),
955                DataValue::String("ERROR".to_string()),
956            ])
957            .unwrap();
958        assert_eq!(table.row_count(), 2);
959        let msg = col_index(&table, "msg");
960        assert_eq!(
961            table.get_value(0, msg).unwrap(),
962            &DataValue::String("disk".to_string())
963        );
964    }
965
966    #[test]
967    fn test_read_jsonl_invalid_line_errors_with_line_number() {
968        let f = write_tmp(
969            r#"{"id":1}
970not json at all
971{"id":3}
972"#,
973        );
974        let err = ReadJsonl
975            .generate(vec![DataValue::String(
976                f.path().to_string_lossy().to_string(),
977            )])
978            .unwrap_err();
979        let msg = err.to_string();
980        assert!(
981            msg.contains("line 2"),
982            "error should cite line number: {}",
983            msg
984        );
985    }
986
987    #[test]
988    fn test_read_jsonl_requires_path() {
989        assert!(ReadJsonl.generate(vec![]).is_err());
990    }
991
992    #[test]
993    fn test_read_jsonl_empty_file_returns_empty_table() {
994        let f = write_tmp("");
995        let table = ReadJsonl
996            .generate(vec![DataValue::String(
997                f.path().to_string_lossy().to_string(),
998            )])
999            .unwrap();
1000        assert_eq!(table.row_count(), 0);
1001    }
1002
1003    // ReadStdin: argument-validation only (stdin is process-global and we cannot
1004    // safely inject test data without refactoring to inject a Reader). The
1005    // happy-path is covered by manual smoke tests in commit messages and the
1006    // examples/ folder.
1007
1008    #[test]
1009    fn test_read_stdin_rejects_too_many_args() {
1010        let err = ReadStdin
1011            .generate(vec![
1012                DataValue::String("foo".to_string()),
1013                DataValue::String("bar".to_string()),
1014            ])
1015            .unwrap_err();
1016        assert!(
1017            err.to_string().contains("0 or 1 arguments"),
1018            "should mention arg count: {}",
1019            err
1020        );
1021    }
1022
1023    #[test]
1024    fn test_read_stdin_rejects_invalid_regex() {
1025        let err = ReadStdin
1026            .generate(vec![DataValue::String("[invalid(regex".to_string())])
1027            .unwrap_err();
1028        assert!(
1029            err.to_string().contains("Invalid match_regex"),
1030            "should mention regex: {}",
1031            err
1032        );
1033    }
1034
1035    // ---- ReadCsv tests ----
1036
1037    /// Helper to write a temp CSV-like file with a given extension and content.
1038    fn write_tmp_with_ext(ext: &str, contents: &str) -> tempfile::NamedTempFile {
1039        let f = tempfile::Builder::new()
1040            .suffix(&format!(".{}", ext))
1041            .tempfile()
1042            .unwrap();
1043        std::fs::write(f.path(), contents).unwrap();
1044        f
1045    }
1046
1047    #[test]
1048    fn test_read_csv_default_comma() {
1049        let f = write_tmp_with_ext("csv", "id,name\n1,alice\n2,bob\n");
1050        let table = ReadCsv
1051            .generate(vec![DataValue::String(
1052                f.path().to_string_lossy().to_string(),
1053            )])
1054            .unwrap();
1055        assert_eq!(table.column_count(), 2);
1056        assert_eq!(table.row_count(), 2);
1057        assert_eq!(table.columns[0].name, "id");
1058        assert_eq!(table.columns[1].name, "name");
1059    }
1060
1061    #[test]
1062    fn test_read_csv_psv_extension_auto_detects_pipe() {
1063        let f = write_tmp_with_ext("psv", "id|name|score\n1|alice|10\n2|bob|20\n");
1064        let table = ReadCsv
1065            .generate(vec![DataValue::String(
1066                f.path().to_string_lossy().to_string(),
1067            )])
1068            .unwrap();
1069        assert_eq!(table.column_count(), 3);
1070        assert_eq!(table.row_count(), 2);
1071        assert_eq!(table.columns[0].name, "id");
1072        assert_eq!(table.columns[2].name, "score");
1073        assert_eq!(
1074            table.metadata.get("delimiter").map(String::as_str),
1075            Some("|")
1076        );
1077    }
1078
1079    #[test]
1080    fn test_read_csv_tsv_extension_auto_detects_tab() {
1081        let f = write_tmp_with_ext("tsv", "id\tname\n1\talice\n2\tbob\n");
1082        let table = ReadCsv
1083            .generate(vec![DataValue::String(
1084                f.path().to_string_lossy().to_string(),
1085            )])
1086            .unwrap();
1087        assert_eq!(table.column_count(), 2);
1088        assert_eq!(table.row_count(), 2);
1089        assert_eq!(
1090            table.metadata.get("delimiter").map(String::as_str),
1091            Some("\\t")
1092        );
1093    }
1094
1095    #[test]
1096    fn test_read_csv_explicit_delimiter_overrides_extension() {
1097        // .psv extension would auto-detect pipe, but explicit comma wins.
1098        // Content is comma-delimited, so if extension auto-detect ran, it
1099        // would parse as a single column.
1100        let f = write_tmp_with_ext("psv", "id,name\n1,alice\n");
1101        let table = ReadCsv
1102            .generate(vec![
1103                DataValue::String(f.path().to_string_lossy().to_string()),
1104                DataValue::String(",".to_string()),
1105            ])
1106            .unwrap();
1107        assert_eq!(table.column_count(), 2);
1108    }
1109
1110    #[test]
1111    fn test_read_csv_explicit_pipe_on_unrecognised_extension() {
1112        let f = write_tmp_with_ext("dat", "a|b\n1|2\n");
1113        let table = ReadCsv
1114            .generate(vec![
1115                DataValue::String(f.path().to_string_lossy().to_string()),
1116                DataValue::String("|".to_string()),
1117            ])
1118            .unwrap();
1119        assert_eq!(table.column_count(), 2);
1120    }
1121
1122    #[test]
1123    fn test_read_csv_backslash_t_parses_as_tab() {
1124        let f = write_tmp_with_ext("dat", "a\tb\n1\t2\n");
1125        let table = ReadCsv
1126            .generate(vec![
1127                DataValue::String(f.path().to_string_lossy().to_string()),
1128                DataValue::String("\\t".to_string()),
1129            ])
1130            .unwrap();
1131        assert_eq!(table.column_count(), 2);
1132        assert_eq!(
1133            table.metadata.get("delimiter").map(String::as_str),
1134            Some("\\t")
1135        );
1136    }
1137
1138    #[test]
1139    fn test_read_csv_rejects_multi_char_delimiter() {
1140        let f = write_tmp_with_ext("dat", "a,b\n1,2\n");
1141        let err = ReadCsv
1142            .generate(vec![
1143                DataValue::String(f.path().to_string_lossy().to_string()),
1144                DataValue::String("||".to_string()),
1145            ])
1146            .unwrap_err();
1147        let msg = err.to_string();
1148        assert!(
1149            msg.contains("single ASCII character"),
1150            "should reject multi-char delimiter: {}",
1151            msg
1152        );
1153    }
1154
1155    #[test]
1156    fn test_read_csv_rejects_too_many_args() {
1157        let err = ReadCsv
1158            .generate(vec![
1159                DataValue::String("a".to_string()),
1160                DataValue::String("b".to_string()),
1161                DataValue::String("c".to_string()),
1162            ])
1163            .unwrap_err();
1164        assert!(err.to_string().contains("1 or 2 arguments"));
1165    }
1166
1167    #[test]
1168    fn test_read_csv_requires_path() {
1169        assert!(ReadCsv.generate(vec![]).is_err());
1170    }
1171}