Skip to main content

sql_cli/sql/generators/
file_readers.rs

1use crate::data::advanced_csv_loader::AdvancedCsvLoader;
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
3use crate::data::stream_loader::{
4    collect_column_names, detect_delimiter_from_path, navigate_json_path,
5    parse_delimiter_arg as parse_delim_byte, parse_json_records, CsvReadOptions,
6};
7use crate::sql::generators::TableGenerator;
8use anyhow::{anyhow, Result};
9use regex::Regex;
10use serde_json::Value as JsonValue;
11use std::fs::File;
12use std::io::{BufRead, BufReader, Cursor, IsTerminal};
13use std::sync::{Arc, OnceLock};
14
15/// Hard cap on rows any file reader will return. Users who need more can raise
16/// it via a session setting (future work); for now this protects against
17/// accidentally pulling a multi-GB log into memory.
18const MAX_LINES_PER_FILE: usize = 1_000_000;
19
20/// Extract a string argument, erroring if the arg is missing/NULL/non-string.
21fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
22    match args.get(idx) {
23        Some(DataValue::String(s)) => Ok(s.clone()),
24        Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
25        Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
26        Some(v) => Err(anyhow!(
27            "{} argument {} must be a string, got {:?}",
28            name,
29            idx + 1,
30            v
31        )),
32    }
33}
34
35/// Extract an optional string argument. Returns None for missing or NULL.
36fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
37    match args.get(idx) {
38        Some(DataValue::String(s)) => Some(s.clone()),
39        Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
40        _ => None,
41    }
42}
43
44/// Parse a delimiter arg for READ_CSV, wrapping the shared parser's error
45/// with the function name for clearer messages.
46fn parse_delimiter_arg(s: &str, fn_name: &str) -> Result<u8> {
47    parse_delim_byte(s).map_err(|e| anyhow!("{}: {}", fn_name, e))
48}
49
50/// Open a file and stream its lines, applying an optional include-regex filter
51/// and the global truncation cap. Emits a stderr warning when truncation kicks in.
52///
53/// Returns (line_num, line) pairs where `line_num` is the original 1-based line
54/// number in the source file — so numbers are preserved through filtering.
55fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
56    let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
57    let reader = BufReader::new(file);
58
59    let mut out = Vec::new();
60    let mut truncated = false;
61
62    for (idx, line_result) in reader.lines().enumerate() {
63        let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
64        let line_num = (idx + 1) as i64;
65
66        if let Some(re) = match_regex {
67            if !re.is_match(&line) {
68                continue;
69            }
70        }
71
72        if out.len() >= MAX_LINES_PER_FILE {
73            truncated = true;
74            break;
75        }
76        out.push((line_num, line));
77    }
78
79    if truncated {
80        eprintln!(
81            "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
82            MAX_LINES_PER_FILE, path
83        );
84    }
85
86    Ok(out)
87}
88
89/// Read lines from a file path, or from stdin if `path == "-"` (Unix convention).
90///
91/// Stdin reads are cached for the process — same buffer is reused across multiple
92/// reader invocations in the same query. The optional regex filter is applied to
93/// both sources uniformly.
94fn read_lines_from_path_or_stdin(
95    path: &str,
96    match_regex: Option<&Regex>,
97) -> Result<Vec<(i64, String)>> {
98    if path == "-" {
99        let cached = cached_stdin_lines()?;
100        return Ok(cached
101            .iter()
102            .filter(|(_, line)| match_regex.map_or(true, |re| re.is_match(line)))
103            .cloned()
104            .collect());
105    }
106    read_filtered_lines(path, match_regex)
107}
108
109/// READ_TEXT(path [, match_regex]) - Read a text file line by line.
110///
111/// Emits `(line_num, line)` rows. Optional `match_regex` filters source lines
112/// *before* materializing them, which is the primary fast path for large logs.
113pub struct ReadText;
114
115impl TableGenerator for ReadText {
116    fn name(&self) -> &str {
117        "READ_TEXT"
118    }
119
120    fn columns(&self) -> Vec<DataColumn> {
121        vec![DataColumn::new("line_num"), DataColumn::new("line")]
122    }
123
124    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
125        if args.is_empty() || args.len() > 2 {
126            return Err(anyhow!(
127                "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
128            ));
129        }
130
131        let path = require_string(&args, 0, "READ_TEXT")?;
132        let match_regex = optional_string(&args, 1)
133            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
134            .transpose()?;
135
136        let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
137
138        let mut table = DataTable::new("read_text");
139        table.add_column(DataColumn::new("line_num"));
140        table.add_column(DataColumn::new("line"));
141
142        for (line_num, line) in lines {
143            table
144                .add_row(DataRow::new(vec![
145                    DataValue::Integer(line_num),
146                    DataValue::String(line),
147                ]))
148                .map_err(|e| anyhow!(e))?;
149        }
150
151        Ok(Arc::new(table))
152    }
153
154    fn description(&self) -> &str {
155        "Read a text file line-by-line. Pass '-' as path to read from stdin. Optional second arg is a regex that filters lines at read time."
156    }
157
158    fn arg_count(&self) -> usize {
159        2
160    }
161}
162
163/// GREP(path, pattern [, invert]) - Read only lines matching a regex.
164///
165/// Thin composable wrapper around READ_TEXT's filter path. Third argument
166/// (boolean or integer truthy value) inverts the match, matching `grep -v`.
167pub struct Grep;
168
169impl TableGenerator for Grep {
170    fn name(&self) -> &str {
171        "GREP"
172    }
173
174    fn columns(&self) -> Vec<DataColumn> {
175        vec![DataColumn::new("line_num"), DataColumn::new("line")]
176    }
177
178    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
179        if args.len() < 2 || args.len() > 3 {
180            return Err(anyhow!(
181                "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
182            ));
183        }
184
185        let path = require_string(&args, 0, "GREP")?;
186        let pattern_str = require_string(&args, 1, "GREP")?;
187        let pattern =
188            Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
189
190        let invert = match args.get(2) {
191            Some(DataValue::Boolean(b)) => *b,
192            Some(DataValue::Integer(n)) => *n != 0,
193            Some(DataValue::Null) | None => false,
194            Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
195        };
196
197        // When not inverted we can push the filter down into the line reader for
198        // the fast path. When inverted we still iterate every line.
199        let lines = if invert {
200            let all = read_lines_from_path_or_stdin(&path, None)?;
201            all.into_iter()
202                .filter(|(_, line)| !pattern.is_match(line))
203                .collect::<Vec<_>>()
204        } else {
205            read_lines_from_path_or_stdin(&path, Some(&pattern))?
206        };
207
208        let mut table = DataTable::new("grep");
209        table.add_column(DataColumn::new("line_num"));
210        table.add_column(DataColumn::new("line"));
211
212        for (line_num, line) in lines {
213            table
214                .add_row(DataRow::new(vec![
215                    DataValue::Integer(line_num),
216                    DataValue::String(line),
217                ]))
218                .map_err(|e| anyhow!(e))?;
219        }
220
221        Ok(Arc::new(table))
222    }
223
224    fn description(&self) -> &str {
225        "Read only lines matching a regex (third arg inverts the match, like grep -v). Pass '-' as path to read from stdin."
226    }
227
228    fn arg_count(&self) -> usize {
229        3
230    }
231}
232
233/// READ_WORDS(path [, min_length [, case]]) - Read a text file and emit one row per word.
234///
235/// Emits `(word_num, word, line_num, word_pos)` rows where:
236///   - `word_num` is a global 1-based word counter across the whole file
237///   - `word` is the extracted token (punctuation stripped)
238///   - `line_num` is the original 1-based line number the word came from
239///   - `word_pos` is the 1-based position of the word within its line
240///
241/// Optional `min_length` (default 1) filters out short words.
242/// Optional `case` ('lower' or 'upper') normalises word casing.
243pub struct ReadWords;
244
245impl TableGenerator for ReadWords {
246    fn name(&self) -> &str {
247        "READ_WORDS"
248    }
249
250    fn columns(&self) -> Vec<DataColumn> {
251        vec![
252            DataColumn::new("word_num"),
253            DataColumn::new("word"),
254            DataColumn::new("line_num"),
255            DataColumn::new("word_pos"),
256        ]
257    }
258
259    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
260        if args.is_empty() || args.len() > 3 {
261            return Err(anyhow!(
262                "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
263            ));
264        }
265
266        let path = require_string(&args, 0, "READ_WORDS")?;
267
268        let min_length: usize = match args.get(1) {
269            Some(DataValue::Integer(n)) => {
270                if *n < 1 {
271                    return Err(anyhow!("READ_WORDS min_length must be >= 1"));
272                }
273                *n as usize
274            }
275            Some(DataValue::Float(f)) => *f as usize,
276            Some(DataValue::Null) | None => 1,
277            Some(v) => {
278                return Err(anyhow!(
279                    "READ_WORDS min_length must be an integer, got {:?}",
280                    v
281                ))
282            }
283        };
284
285        let case_option = optional_string(&args, 2);
286
287        let lines = read_lines_from_path_or_stdin(&path, None)?;
288
289        let mut table = DataTable::new("read_words");
290        table.add_column(DataColumn::new("word_num"));
291        table.add_column(DataColumn::new("word"));
292        table.add_column(DataColumn::new("line_num"));
293        table.add_column(DataColumn::new("word_pos"));
294
295        let mut word_num: i64 = 0;
296
297        for (line_num, line) in &lines {
298            let mut word_pos: i64 = 0;
299
300            for token in line.split(|c: char| !c.is_alphanumeric()) {
301                if token.is_empty() || token.len() < min_length {
302                    continue;
303                }
304
305                word_pos += 1;
306                word_num += 1;
307
308                let word = match case_option.as_deref() {
309                    Some("lower") | Some("lowercase") => token.to_lowercase(),
310                    Some("upper") | Some("uppercase") => token.to_uppercase(),
311                    _ => token.to_string(),
312                };
313
314                table
315                    .add_row(DataRow::new(vec![
316                        DataValue::Integer(word_num),
317                        DataValue::String(word),
318                        DataValue::Integer(*line_num),
319                        DataValue::Integer(word_pos),
320                    ]))
321                    .map_err(|e| anyhow!(e))?;
322            }
323        }
324
325        Ok(Arc::new(table))
326    }
327
328    fn description(&self) -> &str {
329        "Read a text file and emit one row per word, with optional min length and case normalisation"
330    }
331
332    fn arg_count(&self) -> usize {
333        3
334    }
335}
336
337/// READ_JSONL(path [, match_regex]) - Read newline-delimited JSON.
338///
339/// Each non-blank line is parsed as a self-contained JSON object. Schema is
340/// the union of object keys across the first 100 records, so heterogeneous
341/// log streams (later events introducing new fields) don't drop columns.
342/// Optional `match_regex` filters source lines *before* JSON parsing — the
343/// fast path for grepping large log files.
344pub struct ReadJsonl;
345
346impl TableGenerator for ReadJsonl {
347    fn name(&self) -> &str {
348        "READ_JSONL"
349    }
350
351    fn columns(&self) -> Vec<DataColumn> {
352        // Schema is dynamic; the actual columns are inferred at generate() time
353        // from the JSON keys in the file.
354        vec![DataColumn::new("(inferred from JSON keys)")]
355    }
356
357    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
358        if args.is_empty() || args.len() > 2 {
359            return Err(anyhow!(
360                "READ_JSONL expects 1 or 2 arguments: (path [, match_regex])"
361            ));
362        }
363
364        let path = require_string(&args, 0, "READ_JSONL")?;
365        let match_regex = optional_string(&args, 1)
366            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
367            .transpose()?;
368
369        let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
370
371        let mut records: Vec<JsonValue> = Vec::with_capacity(lines.len());
372        for (line_num, line) in &lines {
373            let trimmed = line.trim();
374            if trimmed.is_empty() {
375                continue;
376            }
377            let value: JsonValue = serde_json::from_str(trimmed)
378                .map_err(|e| anyhow!("READ_JSONL parse error at line {}: {}", line_num, e))?;
379            records.push(value);
380        }
381
382        let table = json_records_to_table(records, "read_jsonl", "READ_JSONL")?;
383        Ok(Arc::new(table))
384    }
385
386    fn description(&self) -> &str {
387        "Read newline-delimited JSON (one object per line). Pass '-' as path to read JSONL from stdin. Optional second arg is a regex that filters lines at read time."
388    }
389
390    fn arg_count(&self) -> usize {
391        2
392    }
393}
394
395/// READ_CSV(path [, delimiter]) - Read a delimited text file and emit one row per record.
396///
397/// Columns are inferred from the header row. Pass `-` as the path to read CSV
398/// from stdin (shares the same cached-once buffer with READ_STDIN / READ_JSONL).
399///
400/// Delimiter resolution:
401///   1. Explicit 2nd arg wins (single ASCII char or `\t` escape).
402///   2. Otherwise, `.tsv` → tab, `.psv` → pipe, everything else → comma.
403///   3. Stdin (`-`) with no explicit delimiter defaults to comma.
404///
405/// Type inference, string interning, and other optimisations are inherited
406/// from the main CSV loader so behaviour matches `sql-cli file.csv -q ...`.
407pub struct ReadCsv;
408
409impl TableGenerator for ReadCsv {
410    fn name(&self) -> &str {
411        "READ_CSV"
412    }
413
414    fn columns(&self) -> Vec<DataColumn> {
415        // Schema is inferred from the CSV header at generate() time.
416        vec![DataColumn::new("(inferred from CSV header)")]
417    }
418
419    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
420        if args.is_empty() || args.len() > 2 {
421            return Err(anyhow!(
422                "READ_CSV expects 1 or 2 arguments: (path [, delimiter])"
423            ));
424        }
425
426        let path = require_string(&args, 0, "READ_CSV")?;
427
428        // Resolve delimiter: explicit 2nd arg > extension auto-detect > comma.
429        // Stdin (`-`) skips the extension layer since there's no extension to read.
430        let delimiter = if let Some(s) = optional_string(&args, 1) {
431            parse_delimiter_arg(&s, "READ_CSV")?
432        } else if path == "-" {
433            b','
434        } else {
435            detect_delimiter_from_path(&path)
436        };
437
438        let opts = CsvReadOptions {
439            delimiter,
440            has_headers: true,
441        };
442
443        let mut loader = AdvancedCsvLoader::new();
444
445        let table = if path == "-" {
446            // Reconstruct a CSV byte stream from the cached stdin lines so other
447            // stdin readers in the same query keep seeing the same buffer.
448            let lines = cached_stdin_lines()?;
449            let mut buffer = String::with_capacity(lines.iter().map(|(_, l)| l.len() + 1).sum());
450            for (i, (_, line)) in lines.iter().enumerate() {
451                if i > 0 {
452                    buffer.push('\n');
453                }
454                buffer.push_str(line);
455            }
456            let cursor = Cursor::new(buffer.into_bytes());
457            loader
458                .load_csv_from_reader_with_opts(cursor, "read_csv", "<stdin>", &opts)
459                .map_err(|e| anyhow!("READ_CSV parse error reading stdin: {}", e))?
460        } else {
461            let file = File::open(&path)
462                .map_err(|e| anyhow!("READ_CSV failed to open '{}': {}", path, e))?;
463            loader
464                .load_csv_from_reader_with_opts(file, "read_csv", &path, &opts)
465                .map_err(|e| anyhow!("READ_CSV parse error reading '{}': {}", path, e))?
466        };
467
468        Ok(Arc::new(table))
469    }
470
471    fn description(&self) -> &str {
472        "Read a delimited text file (header row required). Pass '-' as path to read from stdin. \
473         Second arg overrides the delimiter; otherwise '.tsv' → tab, '.psv' → pipe, else comma."
474    }
475
476    fn arg_count(&self) -> usize {
477        2
478    }
479}
480
481fn json_value_to_string(value: &JsonValue) -> String {
482    match value {
483        JsonValue::Null => String::new(),
484        JsonValue::Bool(b) => b.to_string(),
485        JsonValue::Number(n) => n.to_string(),
486        JsonValue::String(s) => s.clone(),
487        JsonValue::Array(arr) => format!("{:?}", arr),
488        JsonValue::Object(obj) => format!("{:?}", obj),
489    }
490}
491
492/// Build a typed `DataTable` from a list of parsed JSON records (objects).
493///
494/// Columns are the ordered union of object keys across the first 100 records,
495/// so heterogeneous streams don't drop late-appearing fields. Types are
496/// inferred from the first 100 rows. Shared by READ_JSONL (one object per line)
497/// and READ_JSON (whole document / array). `func` names the calling function in
498/// error messages.
499fn json_records_to_table(
500    records: Vec<JsonValue>,
501    table_name: &str,
502    func: &str,
503) -> Result<DataTable> {
504    if records.is_empty() {
505        return Ok(DataTable::new(table_name));
506    }
507
508    let column_names = collect_column_names(&records, 100);
509    if column_names.is_empty() {
510        return Err(anyhow!(
511            "{}: no JSON objects found (records must be objects, not arrays/scalars)",
512            func
513        ));
514    }
515
516    // Stringify, infer types from the first 100 rows, then convert to typed
517    // DataValues — same pipeline the file-level JSON loader uses.
518    let mut string_rows: Vec<Vec<String>> = Vec::with_capacity(records.len());
519    for record in &records {
520        let obj = match record.as_object() {
521            Some(o) => o,
522            None => continue,
523        };
524        let mut row = Vec::with_capacity(column_names.len());
525        for col_name in &column_names {
526            let value = obj
527                .get(col_name)
528                .map(json_value_to_string)
529                .unwrap_or_default();
530            row.push(value);
531        }
532        string_rows.push(row);
533    }
534
535    let mut column_types: Vec<DataType> = vec![DataType::Null; column_names.len()];
536    let sample_size = string_rows.len().min(100);
537    for row in string_rows.iter().take(sample_size) {
538        for (col_idx, value) in row.iter().enumerate() {
539            if !value.is_empty() && value != "null" {
540                let inferred = DataType::infer_from_string(value);
541                column_types[col_idx] = column_types[col_idx].merge(&inferred);
542            }
543        }
544    }
545
546    let mut table = DataTable::new(table_name);
547    for (name, dtype) in column_names.iter().zip(column_types.iter()) {
548        let mut col = DataColumn::new(name);
549        col.data_type = dtype.clone();
550        table.add_column(col);
551    }
552
553    for string_row in &string_rows {
554        let mut values = Vec::with_capacity(string_row.len());
555        for (col_idx, value) in string_row.iter().enumerate() {
556            let dv = if value.is_empty() || value == "null" {
557                DataValue::Null
558            } else {
559                DataValue::from_string(value, &column_types[col_idx])
560            };
561            values.push(dv);
562        }
563        table
564            .add_row(DataRow::new(values))
565            .map_err(|e| anyhow!(e))?;
566    }
567
568    Ok(table)
569}
570
571/// READ_JSON(path [, json_path]) - Read a whole JSON document and emit one row per object.
572///
573/// Accepts either a JSON array of objects (`[{...}, {...}]`, possibly
574/// pretty-printed across many lines) or newline-delimited JSON (JSONL); the
575/// format is auto-detected. This is the multi-line counterpart to READ_JSONL,
576/// which requires exactly one object per line. Pass `-` as the path to read
577/// from stdin (shares the same cached-once buffer as the other stdin readers).
578///
579/// The optional `json_path` drills into a nested document to *locate the rows*
580/// before tabularizing — the same dotted/`[]` syntax as the WEB CTE `JSON_PATH`
581/// clause (see [`navigate_json_path`]). This handles the common API shape of an
582/// object wrapping the array you want, e.g. TeamCity's
583/// `{ "projects": { "project": [...] } }` via `READ_JSON('-', 'projects.project')`,
584/// or Elasticsearch's `READ_JSON('-', 'hits.hits[]._source')`. The path only
585/// finds the row set; use a normal SELECT/WHERE to pick columns and filter —
586/// anything more (predicates, scalar plucking) is a job for a `jq` pre-process.
587pub struct ReadJson;
588
589impl TableGenerator for ReadJson {
590    fn name(&self) -> &str {
591        "READ_JSON"
592    }
593
594    fn columns(&self) -> Vec<DataColumn> {
595        // Schema is dynamic; the actual columns are inferred at generate() time
596        // from the JSON keys in the document.
597        vec![DataColumn::new("(inferred from JSON keys)")]
598    }
599
600    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
601        if args.is_empty() || args.len() > 2 {
602            return Err(anyhow!(
603                "READ_JSON expects 1 or 2 arguments: (path [, json_path])"
604            ));
605        }
606
607        let path = require_string(&args, 0, "READ_JSON")?;
608        let json_path = optional_string(&args, 1);
609
610        let content = if path == "-" {
611            // Reconstruct the document from the cached stdin lines so other
612            // stdin readers in the same query keep seeing the same buffer.
613            let lines = cached_stdin_lines()?;
614            let mut buffer = String::with_capacity(lines.iter().map(|(_, l)| l.len() + 1).sum());
615            for (i, (_, line)) in lines.iter().enumerate() {
616                if i > 0 {
617                    buffer.push('\n');
618                }
619                buffer.push_str(line);
620            }
621            buffer
622        } else {
623            std::fs::read_to_string(&path)
624                .map_err(|e| anyhow!("READ_JSON failed to read '{}': {}", path, e))?
625        };
626
627        let records = match json_path {
628            // No path: keep the auto-detecting array/JSONL fast path.
629            None => {
630                parse_json_records(&content).map_err(|e| anyhow!("READ_JSON parse error: {}", e))?
631            }
632            // Path given: parse the whole document, drill to the row set, then
633            // normalise to a list of records (array -> rows, single object -> 1
634            // row). Non-object records are rejected later by the table builder.
635            Some(path) => {
636                let value: JsonValue = serde_json::from_str(&content)
637                    .map_err(|e| anyhow!("READ_JSON parse error: {}", e))?;
638                let extracted = navigate_json_path(&value, &path)
639                    .map_err(|e| anyhow!("READ_JSON json_path '{}': {}", path, e))?;
640                match extracted {
641                    JsonValue::Array(arr) => arr,
642                    other => vec![other],
643                }
644            }
645        };
646
647        let table = json_records_to_table(records, "read_json", "READ_JSON")?;
648        Ok(Arc::new(table))
649    }
650
651    fn description(&self) -> &str {
652        "Read a whole JSON document — a JSON array of objects (possibly pretty-printed) or newline-delimited JSON — and emit one row per object. Pass '-' as path to read from stdin. Optional second arg is a JSON path (dotted, with '[]' array projection — same as the WEB CTE JSON_PATH) that drills into a nested document to locate the rows, e.g. READ_JSON('-', 'projects.project'). Unlike READ_JSONL, the input may span multiple lines per record."
653    }
654
655    fn arg_count(&self) -> usize {
656        2
657    }
658}
659
660/// READ_STDIN([match_regex]) - Read lines piped on standard input.
661///
662/// Emits `(line_num, line)` rows. Optional regex pre-filters lines before
663/// materialisation. Erroring on TTY input prevents the engine from blocking
664/// forever waiting on a keyboard when the user forgets the pipe.
665///
666/// Stdin is consumable, so it is read **once per process** and cached. This
667/// keeps the function composable: multiple `READ_STDIN()` references in the
668/// same query (CTEs, self-joins) see the same rows. The regex filter is
669/// applied per call against the cached buffer.
670pub struct ReadStdin;
671
672static STDIN_CACHE: OnceLock<Result<Vec<(i64, String)>, String>> = OnceLock::new();
673
674fn cached_stdin_lines() -> Result<&'static Vec<(i64, String)>> {
675    let cached = STDIN_CACHE.get_or_init(|| {
676        let stdin = std::io::stdin();
677        if stdin.is_terminal() {
678            return Err("READ_STDIN() requires data piped on stdin; got an interactive terminal. Try: cat file | sql-cli -q '...'".to_string());
679        }
680        let handle = stdin.lock();
681        let reader = BufReader::new(handle);
682        let mut out = Vec::new();
683        let mut truncated = false;
684        for (idx, line_result) in reader.lines().enumerate() {
685            let line = match line_result {
686                Ok(l) => l,
687                Err(e) => return Err(format!("Error reading stdin: {}", e)),
688            };
689            if out.len() >= MAX_LINES_PER_FILE {
690                truncated = true;
691                break;
692            }
693            out.push(((idx + 1) as i64, line));
694        }
695        if truncated {
696            eprintln!(
697                "WARNING: truncated to {} rows (max_lines_per_file cap) when reading stdin",
698                MAX_LINES_PER_FILE
699            );
700        }
701        Ok(out)
702    });
703    cached.as_ref().map_err(|e| anyhow!(e.clone()))
704}
705
706impl TableGenerator for ReadStdin {
707    fn name(&self) -> &str {
708        "READ_STDIN"
709    }
710
711    fn columns(&self) -> Vec<DataColumn> {
712        vec![DataColumn::new("line_num"), DataColumn::new("line")]
713    }
714
715    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
716        if args.len() > 1 {
717            return Err(anyhow!(
718                "READ_STDIN expects 0 or 1 arguments: ([match_regex])"
719            ));
720        }
721
722        let match_regex = optional_string(&args, 0)
723            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
724            .transpose()?;
725
726        let lines = cached_stdin_lines()?;
727
728        let mut table = DataTable::new("read_stdin");
729        table.add_column(DataColumn::new("line_num"));
730        table.add_column(DataColumn::new("line"));
731
732        for (line_num, line) in lines {
733            if let Some(ref re) = match_regex {
734                if !re.is_match(line) {
735                    continue;
736                }
737            }
738            table
739                .add_row(DataRow::new(vec![
740                    DataValue::Integer(*line_num),
741                    DataValue::String(line.clone()),
742                ]))
743                .map_err(|e| anyhow!(e))?;
744        }
745
746        Ok(Arc::new(table))
747    }
748
749    fn description(&self) -> &str {
750        "Read lines piped on stdin (cached once per process). Optional regex filters lines at read time. Yields (line_num, line)."
751    }
752
753    fn arg_count(&self) -> usize {
754        1
755    }
756}
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761    use std::io::Write;
762    use tempfile::NamedTempFile;
763
764    fn write_tmp(contents: &str) -> NamedTempFile {
765        let mut f = NamedTempFile::new().unwrap();
766        f.write_all(contents.as_bytes()).unwrap();
767        f
768    }
769
770    #[test]
771    fn test_read_text_returns_all_lines() {
772        let f = write_tmp("one\ntwo\nthree\n");
773        let table = ReadText
774            .generate(vec![DataValue::String(
775                f.path().to_string_lossy().to_string(),
776            )])
777            .unwrap();
778        assert_eq!(table.row_count(), 3);
779        assert_eq!(
780            table.get_value(0, 1).unwrap(),
781            &DataValue::String("one".to_string())
782        );
783        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
784    }
785
786    #[test]
787    fn test_read_text_with_match_regex_filters_lines() {
788        let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
789        let table = ReadText
790            .generate(vec![
791                DataValue::String(f.path().to_string_lossy().to_string()),
792                DataValue::String("ERROR".to_string()),
793            ])
794            .unwrap();
795        assert_eq!(table.row_count(), 2);
796        // Line numbers preserve original file positions (2 and 4), not 1 and 2.
797        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
798        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
799    }
800
801    #[test]
802    fn test_read_text_requires_path() {
803        assert!(ReadText.generate(vec![]).is_err());
804    }
805
806    #[test]
807    fn test_read_text_invalid_regex_errors_early() {
808        let f = write_tmp("hello\n");
809        let err = ReadText
810            .generate(vec![
811                DataValue::String(f.path().to_string_lossy().to_string()),
812                DataValue::String("(unclosed".to_string()),
813            ])
814            .unwrap_err();
815        assert!(err.to_string().contains("match_regex"));
816    }
817
818    #[test]
819    fn test_grep_matches_like_grep() {
820        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
821        let table = Grep
822            .generate(vec![
823                DataValue::String(f.path().to_string_lossy().to_string()),
824                DataValue::String("^ap".to_string()),
825            ])
826            .unwrap();
827        assert_eq!(table.row_count(), 2);
828        assert_eq!(
829            table.get_value(0, 1).unwrap(),
830            &DataValue::String("apple".to_string())
831        );
832        assert_eq!(
833            table.get_value(1, 1).unwrap(),
834            &DataValue::String("apricot".to_string())
835        );
836    }
837
838    #[test]
839    fn test_grep_invert_like_grep_v() {
840        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
841        let table = Grep
842            .generate(vec![
843                DataValue::String(f.path().to_string_lossy().to_string()),
844                DataValue::String("^ap".to_string()),
845                DataValue::Boolean(true),
846            ])
847            .unwrap();
848        assert_eq!(table.row_count(), 2);
849        assert_eq!(
850            table.get_value(0, 1).unwrap(),
851            &DataValue::String("banana".to_string())
852        );
853    }
854
855    // ---- ReadWords tests ----
856
857    #[test]
858    fn test_read_words_basic() {
859        let f = write_tmp("hello world\ngoodbye moon\n");
860        let table = ReadWords
861            .generate(vec![DataValue::String(
862                f.path().to_string_lossy().to_string(),
863            )])
864            .unwrap();
865        // 4 words total
866        assert_eq!(table.row_count(), 4);
867        // Columns: word_num, word, line_num, word_pos
868        // First word
869        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); // word_num
870        assert_eq!(
871            table.get_value(0, 1).unwrap(),
872            &DataValue::String("hello".to_string())
873        );
874        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); // line_num
875        assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); // word_pos
876                                                                            // Third word (first on line 2)
877        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
878        assert_eq!(
879            table.get_value(2, 1).unwrap(),
880            &DataValue::String("goodbye".to_string())
881        );
882        assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
883        assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
884    }
885
886    #[test]
887    fn test_read_words_min_length() {
888        let f = write_tmp("I am a big dog\n");
889        let table = ReadWords
890            .generate(vec![
891                DataValue::String(f.path().to_string_lossy().to_string()),
892                DataValue::Integer(3),
893            ])
894            .unwrap();
895        // Only "big" and "dog" have length >= 3
896        assert_eq!(table.row_count(), 2);
897        assert_eq!(
898            table.get_value(0, 1).unwrap(),
899            &DataValue::String("big".to_string())
900        );
901        assert_eq!(
902            table.get_value(1, 1).unwrap(),
903            &DataValue::String("dog".to_string())
904        );
905    }
906
907    #[test]
908    fn test_read_words_case_lower() {
909        let f = write_tmp("Hello World\n");
910        let table = ReadWords
911            .generate(vec![
912                DataValue::String(f.path().to_string_lossy().to_string()),
913                DataValue::Integer(1),
914                DataValue::String("lower".to_string()),
915            ])
916            .unwrap();
917        assert_eq!(
918            table.get_value(0, 1).unwrap(),
919            &DataValue::String("hello".to_string())
920        );
921        assert_eq!(
922            table.get_value(1, 1).unwrap(),
923            &DataValue::String("world".to_string())
924        );
925    }
926
927    #[test]
928    fn test_read_words_strips_punctuation() {
929        let f = write_tmp("hello, world! foo-bar.\n");
930        let table = ReadWords
931            .generate(vec![DataValue::String(
932                f.path().to_string_lossy().to_string(),
933            )])
934            .unwrap();
935        let words: Vec<String> = (0..table.row_count())
936            .map(|i| match table.get_value(i, 1).unwrap() {
937                DataValue::String(s) => s.clone(),
938                _ => panic!("expected string"),
939            })
940            .collect();
941        assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
942    }
943
944    #[test]
945    fn test_read_words_requires_path() {
946        assert!(ReadWords.generate(vec![]).is_err());
947    }
948
949    #[test]
950    fn test_read_words_empty_lines_skipped() {
951        let f = write_tmp("hello\n\n\nworld\n");
952        let table = ReadWords
953            .generate(vec![DataValue::String(
954                f.path().to_string_lossy().to_string(),
955            )])
956            .unwrap();
957        assert_eq!(table.row_count(), 2);
958        // word_num is contiguous
959        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
960        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
961        // line_num preserves original positions
962        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
963        assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
964    }
965
966    // ---- ReadJsonl tests ----
967
968    fn col_index(table: &DataTable, name: &str) -> usize {
969        table
970            .columns
971            .iter()
972            .position(|c| c.name == name)
973            .unwrap_or_else(|| panic!("column '{}' not found", name))
974    }
975
976    #[test]
977    fn test_read_jsonl_basic() {
978        let f = write_tmp(
979            r#"{"id":1,"name":"alice","score":10}
980{"id":2,"name":"bob","score":20}
981{"id":3,"name":"carol","score":30}
982"#,
983        );
984        let table = ReadJsonl
985            .generate(vec![DataValue::String(
986                f.path().to_string_lossy().to_string(),
987            )])
988            .unwrap();
989        assert_eq!(table.row_count(), 3);
990        assert_eq!(table.column_count(), 3);
991
992        let id_col = col_index(&table, "id");
993        let name_col = col_index(&table, "name");
994        assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
995        assert_eq!(
996            table.get_value(2, name_col).unwrap(),
997            &DataValue::String("carol".to_string())
998        );
999    }
1000
1001    #[test]
1002    fn test_read_jsonl_heterogeneous_schema_unioned() {
1003        // Later records introduce fields the first one didn't have. Schema
1004        // should union them; missing values become Null.
1005        let f = write_tmp(
1006            r#"{"id":1,"name":"alice"}
1007{"id":2,"name":"bob","extra":"hello"}
1008{"id":3,"name":"carol","other":42}
1009"#,
1010        );
1011        let table = ReadJsonl
1012            .generate(vec![DataValue::String(
1013                f.path().to_string_lossy().to_string(),
1014            )])
1015            .unwrap();
1016        assert_eq!(table.row_count(), 3);
1017        assert_eq!(table.column_count(), 4);
1018        let extra = col_index(&table, "extra");
1019        let other = col_index(&table, "other");
1020        // Row 0 has neither extra nor other -> Null
1021        assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
1022        assert_eq!(table.get_value(0, other).unwrap(), &DataValue::Null);
1023        // Row 1 has extra="hello"
1024        assert_eq!(
1025            table.get_value(1, extra).unwrap(),
1026            &DataValue::String("hello".to_string())
1027        );
1028        // Row 2 has other=42
1029        assert_eq!(table.get_value(2, other).unwrap(), &DataValue::Integer(42));
1030    }
1031
1032    #[test]
1033    fn test_read_jsonl_blank_lines_skipped() {
1034        let f = write_tmp(
1035            r#"{"id":1}
1036
1037{"id":2}
1038
1039"#,
1040        );
1041        let table = ReadJsonl
1042            .generate(vec![DataValue::String(
1043                f.path().to_string_lossy().to_string(),
1044            )])
1045            .unwrap();
1046        assert_eq!(table.row_count(), 2);
1047    }
1048
1049    #[test]
1050    fn test_read_jsonl_match_regex_pre_filters() {
1051        let f = write_tmp(
1052            r#"{"level":"INFO","msg":"boot"}
1053{"level":"ERROR","msg":"disk"}
1054{"level":"INFO","msg":"shutdown"}
1055{"level":"ERROR","msg":"oom"}
1056"#,
1057        );
1058        let table = ReadJsonl
1059            .generate(vec![
1060                DataValue::String(f.path().to_string_lossy().to_string()),
1061                DataValue::String("ERROR".to_string()),
1062            ])
1063            .unwrap();
1064        assert_eq!(table.row_count(), 2);
1065        let msg = col_index(&table, "msg");
1066        assert_eq!(
1067            table.get_value(0, msg).unwrap(),
1068            &DataValue::String("disk".to_string())
1069        );
1070    }
1071
1072    #[test]
1073    fn test_read_jsonl_invalid_line_errors_with_line_number() {
1074        let f = write_tmp(
1075            r#"{"id":1}
1076not json at all
1077{"id":3}
1078"#,
1079        );
1080        let err = ReadJsonl
1081            .generate(vec![DataValue::String(
1082                f.path().to_string_lossy().to_string(),
1083            )])
1084            .unwrap_err();
1085        let msg = err.to_string();
1086        assert!(
1087            msg.contains("line 2"),
1088            "error should cite line number: {}",
1089            msg
1090        );
1091    }
1092
1093    #[test]
1094    fn test_read_jsonl_requires_path() {
1095        assert!(ReadJsonl.generate(vec![]).is_err());
1096    }
1097
1098    #[test]
1099    fn test_read_jsonl_empty_file_returns_empty_table() {
1100        let f = write_tmp("");
1101        let table = ReadJsonl
1102            .generate(vec![DataValue::String(
1103                f.path().to_string_lossy().to_string(),
1104            )])
1105            .unwrap();
1106        assert_eq!(table.row_count(), 0);
1107    }
1108
1109    // ReadStdin: argument-validation only (stdin is process-global and we cannot
1110    // safely inject test data without refactoring to inject a Reader). The
1111    // happy-path is covered by manual smoke tests in commit messages and the
1112    // examples/ folder.
1113
1114    #[test]
1115    fn test_read_stdin_rejects_too_many_args() {
1116        let err = ReadStdin
1117            .generate(vec![
1118                DataValue::String("foo".to_string()),
1119                DataValue::String("bar".to_string()),
1120            ])
1121            .unwrap_err();
1122        assert!(
1123            err.to_string().contains("0 or 1 arguments"),
1124            "should mention arg count: {}",
1125            err
1126        );
1127    }
1128
1129    #[test]
1130    fn test_read_stdin_rejects_invalid_regex() {
1131        let err = ReadStdin
1132            .generate(vec![DataValue::String("[invalid(regex".to_string())])
1133            .unwrap_err();
1134        assert!(
1135            err.to_string().contains("Invalid match_regex"),
1136            "should mention regex: {}",
1137            err
1138        );
1139    }
1140
1141    // ---- ReadCsv tests ----
1142
1143    /// Helper to write a temp CSV-like file with a given extension and content.
1144    fn write_tmp_with_ext(ext: &str, contents: &str) -> tempfile::NamedTempFile {
1145        let f = tempfile::Builder::new()
1146            .suffix(&format!(".{}", ext))
1147            .tempfile()
1148            .unwrap();
1149        std::fs::write(f.path(), contents).unwrap();
1150        f
1151    }
1152
1153    #[test]
1154    fn test_read_csv_default_comma() {
1155        let f = write_tmp_with_ext("csv", "id,name\n1,alice\n2,bob\n");
1156        let table = ReadCsv
1157            .generate(vec![DataValue::String(
1158                f.path().to_string_lossy().to_string(),
1159            )])
1160            .unwrap();
1161        assert_eq!(table.column_count(), 2);
1162        assert_eq!(table.row_count(), 2);
1163        assert_eq!(table.columns[0].name, "id");
1164        assert_eq!(table.columns[1].name, "name");
1165    }
1166
1167    #[test]
1168    fn test_read_csv_psv_extension_auto_detects_pipe() {
1169        let f = write_tmp_with_ext("psv", "id|name|score\n1|alice|10\n2|bob|20\n");
1170        let table = ReadCsv
1171            .generate(vec![DataValue::String(
1172                f.path().to_string_lossy().to_string(),
1173            )])
1174            .unwrap();
1175        assert_eq!(table.column_count(), 3);
1176        assert_eq!(table.row_count(), 2);
1177        assert_eq!(table.columns[0].name, "id");
1178        assert_eq!(table.columns[2].name, "score");
1179        assert_eq!(
1180            table.metadata.get("delimiter").map(String::as_str),
1181            Some("|")
1182        );
1183    }
1184
1185    #[test]
1186    fn test_read_csv_tsv_extension_auto_detects_tab() {
1187        let f = write_tmp_with_ext("tsv", "id\tname\n1\talice\n2\tbob\n");
1188        let table = ReadCsv
1189            .generate(vec![DataValue::String(
1190                f.path().to_string_lossy().to_string(),
1191            )])
1192            .unwrap();
1193        assert_eq!(table.column_count(), 2);
1194        assert_eq!(table.row_count(), 2);
1195        assert_eq!(
1196            table.metadata.get("delimiter").map(String::as_str),
1197            Some("\\t")
1198        );
1199    }
1200
1201    #[test]
1202    fn test_read_csv_explicit_delimiter_overrides_extension() {
1203        // .psv extension would auto-detect pipe, but explicit comma wins.
1204        // Content is comma-delimited, so if extension auto-detect ran, it
1205        // would parse as a single column.
1206        let f = write_tmp_with_ext("psv", "id,name\n1,alice\n");
1207        let table = ReadCsv
1208            .generate(vec![
1209                DataValue::String(f.path().to_string_lossy().to_string()),
1210                DataValue::String(",".to_string()),
1211            ])
1212            .unwrap();
1213        assert_eq!(table.column_count(), 2);
1214    }
1215
1216    #[test]
1217    fn test_read_csv_explicit_pipe_on_unrecognised_extension() {
1218        let f = write_tmp_with_ext("dat", "a|b\n1|2\n");
1219        let table = ReadCsv
1220            .generate(vec![
1221                DataValue::String(f.path().to_string_lossy().to_string()),
1222                DataValue::String("|".to_string()),
1223            ])
1224            .unwrap();
1225        assert_eq!(table.column_count(), 2);
1226    }
1227
1228    #[test]
1229    fn test_read_csv_backslash_t_parses_as_tab() {
1230        let f = write_tmp_with_ext("dat", "a\tb\n1\t2\n");
1231        let table = ReadCsv
1232            .generate(vec![
1233                DataValue::String(f.path().to_string_lossy().to_string()),
1234                DataValue::String("\\t".to_string()),
1235            ])
1236            .unwrap();
1237        assert_eq!(table.column_count(), 2);
1238        assert_eq!(
1239            table.metadata.get("delimiter").map(String::as_str),
1240            Some("\\t")
1241        );
1242    }
1243
1244    #[test]
1245    fn test_read_csv_rejects_multi_char_delimiter() {
1246        let f = write_tmp_with_ext("dat", "a,b\n1,2\n");
1247        let err = ReadCsv
1248            .generate(vec![
1249                DataValue::String(f.path().to_string_lossy().to_string()),
1250                DataValue::String("||".to_string()),
1251            ])
1252            .unwrap_err();
1253        let msg = err.to_string();
1254        assert!(
1255            msg.contains("single ASCII character"),
1256            "should reject multi-char delimiter: {}",
1257            msg
1258        );
1259    }
1260
1261    #[test]
1262    fn test_read_csv_rejects_too_many_args() {
1263        let err = ReadCsv
1264            .generate(vec![
1265                DataValue::String("a".to_string()),
1266                DataValue::String("b".to_string()),
1267                DataValue::String("c".to_string()),
1268            ])
1269            .unwrap_err();
1270        assert!(err.to_string().contains("1 or 2 arguments"));
1271    }
1272
1273    #[test]
1274    fn test_read_csv_requires_path() {
1275        assert!(ReadCsv.generate(vec![]).is_err());
1276    }
1277
1278    // ---- ReadJson tests ----
1279
1280    #[test]
1281    fn test_read_json_array_of_objects() {
1282        // The case READ_JSONL can't handle: a pretty-printed multi-line array.
1283        let f = write_tmp(
1284            r#"[
1285  {"id": 1, "name": "alice"},
1286  {"id": 2, "name": "bob"}
1287]
1288"#,
1289        );
1290        let table = ReadJson
1291            .generate(vec![DataValue::String(
1292                f.path().to_string_lossy().to_string(),
1293            )])
1294            .unwrap();
1295        assert_eq!(table.row_count(), 2);
1296        assert_eq!(table.column_count(), 2);
1297        let id_col = col_index(&table, "id");
1298        let name_col = col_index(&table, "name");
1299        assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
1300        assert_eq!(
1301            table.get_value(1, name_col).unwrap(),
1302            &DataValue::String("bob".to_string())
1303        );
1304    }
1305
1306    #[test]
1307    fn test_read_json_also_accepts_jsonl() {
1308        // Auto-detect: same loader handles one-object-per-line input too.
1309        let f = write_tmp("{\"id\":1}\n{\"id\":2}\n");
1310        let table = ReadJson
1311            .generate(vec![DataValue::String(
1312                f.path().to_string_lossy().to_string(),
1313            )])
1314            .unwrap();
1315        assert_eq!(table.row_count(), 2);
1316        let id_col = col_index(&table, "id");
1317        assert_eq!(table.get_value(1, id_col).unwrap(), &DataValue::Integer(2));
1318    }
1319
1320    #[test]
1321    fn test_read_json_heterogeneous_schema_unioned() {
1322        let f = write_tmp(
1323            r#"[
1324  {"id": 1, "name": "alice"},
1325  {"id": 2, "name": "bob", "extra": "hello"}
1326]"#,
1327        );
1328        let table = ReadJson
1329            .generate(vec![DataValue::String(
1330                f.path().to_string_lossy().to_string(),
1331            )])
1332            .unwrap();
1333        assert_eq!(table.column_count(), 3);
1334        let extra = col_index(&table, "extra");
1335        assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
1336        assert_eq!(
1337            table.get_value(1, extra).unwrap(),
1338            &DataValue::String("hello".to_string())
1339        );
1340    }
1341
1342    #[test]
1343    fn test_read_json_empty_array_returns_empty_table() {
1344        let f = write_tmp("[]");
1345        let table = ReadJson
1346            .generate(vec![DataValue::String(
1347                f.path().to_string_lossy().to_string(),
1348            )])
1349            .unwrap();
1350        assert_eq!(table.row_count(), 0);
1351    }
1352
1353    #[test]
1354    fn test_read_json_requires_path() {
1355        assert!(ReadJson.generate(vec![]).is_err());
1356    }
1357
1358    #[test]
1359    fn test_read_json_bad_path_errors() {
1360        let err = ReadJson
1361            .generate(vec![DataValue::String(
1362                "/no/such/file/here.json".to_string(),
1363            )])
1364            .unwrap_err();
1365        assert!(err.to_string().contains("READ_JSON failed to read"));
1366    }
1367
1368    #[test]
1369    fn test_read_json_json_path_drills_into_nested_array() {
1370        // TeamCity-style: object wrapping the array we actually want.
1371        let f = write_tmp(
1372            r#"{
1373  "count": 2,
1374  "project": [
1375    {"id": "a", "name": "Alpha"},
1376    {"id": "b", "name": "Beta"}
1377  ]
1378}"#,
1379        );
1380        let table = ReadJson
1381            .generate(vec![
1382                DataValue::String(f.path().to_string_lossy().to_string()),
1383                DataValue::String("project".to_string()),
1384            ])
1385            .unwrap();
1386        assert_eq!(table.row_count(), 2);
1387        assert_eq!(table.column_count(), 2);
1388        let id_col = col_index(&table, "id");
1389        assert_eq!(
1390            table.get_value(1, id_col).unwrap(),
1391            &DataValue::String("b".to_string())
1392        );
1393    }
1394
1395    #[test]
1396    fn test_read_json_json_path_with_array_projection() {
1397        // Elasticsearch-style hits.hits[]._source projection.
1398        let f = write_tmp(
1399            r#"{
1400  "hits": {
1401    "hits": [
1402      {"_source": {"id": 1, "user": "alice"}},
1403      {"_source": {"id": 2, "user": "bob"}}
1404    ]
1405  }
1406}"#,
1407        );
1408        let table = ReadJson
1409            .generate(vec![
1410                DataValue::String(f.path().to_string_lossy().to_string()),
1411                DataValue::String("hits.hits[]._source".to_string()),
1412            ])
1413            .unwrap();
1414        assert_eq!(table.row_count(), 2);
1415        let user_col = col_index(&table, "user");
1416        assert_eq!(
1417            table.get_value(0, user_col).unwrap(),
1418            &DataValue::String("alice".to_string())
1419        );
1420    }
1421
1422    #[test]
1423    fn test_read_json_json_path_single_object_becomes_one_row() {
1424        let f = write_tmp(r#"{"meta": {"id": 7, "name": "solo"}}"#);
1425        let table = ReadJson
1426            .generate(vec![
1427                DataValue::String(f.path().to_string_lossy().to_string()),
1428                DataValue::String("meta".to_string()),
1429            ])
1430            .unwrap();
1431        assert_eq!(table.row_count(), 1);
1432        let id_col = col_index(&table, "id");
1433        assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(7));
1434    }
1435
1436    #[test]
1437    fn test_read_json_json_path_missing_key_errors() {
1438        let f = write_tmp(r#"{"project": []}"#);
1439        let err = ReadJson
1440            .generate(vec![
1441                DataValue::String(f.path().to_string_lossy().to_string()),
1442                DataValue::String("nope".to_string()),
1443            ])
1444            .unwrap_err();
1445        let msg = err.to_string();
1446        assert!(msg.contains("json_path"), "{}", msg);
1447        assert!(msg.contains("not found"), "{}", msg);
1448    }
1449
1450    #[test]
1451    fn test_read_json_rejects_three_args() {
1452        let err = ReadJson
1453            .generate(vec![
1454                DataValue::String("a".to_string()),
1455                DataValue::String("b".to_string()),
1456                DataValue::String("c".to_string()),
1457            ])
1458            .unwrap_err();
1459        assert!(err.to_string().contains("1 or 2 arguments"));
1460    }
1461}