Skip to main content

sql_cli/sql/generators/
file_readers.rs

1use crate::data::advanced_csv_loader::AdvancedCsvLoader;
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
3use crate::data::stream_loader::collect_column_names;
4use crate::sql::generators::TableGenerator;
5use anyhow::{anyhow, Result};
6use regex::Regex;
7use serde_json::Value as JsonValue;
8use std::fs::File;
9use std::io::{BufRead, BufReader, Cursor, IsTerminal};
10use std::sync::{Arc, OnceLock};
11
12/// Hard cap on rows any file reader will return. Users who need more can raise
13/// it via a session setting (future work); for now this protects against
14/// accidentally pulling a multi-GB log into memory.
15const MAX_LINES_PER_FILE: usize = 1_000_000;
16
17/// Extract a string argument, erroring if the arg is missing/NULL/non-string.
18fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
19    match args.get(idx) {
20        Some(DataValue::String(s)) => Ok(s.clone()),
21        Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
22        Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
23        Some(v) => Err(anyhow!(
24            "{} argument {} must be a string, got {:?}",
25            name,
26            idx + 1,
27            v
28        )),
29    }
30}
31
32/// Extract an optional string argument. Returns None for missing or NULL.
33fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
34    match args.get(idx) {
35        Some(DataValue::String(s)) => Some(s.clone()),
36        Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
37        _ => None,
38    }
39}
40
41/// Open a file and stream its lines, applying an optional include-regex filter
42/// and the global truncation cap. Emits a stderr warning when truncation kicks in.
43///
44/// Returns (line_num, line) pairs where `line_num` is the original 1-based line
45/// number in the source file — so numbers are preserved through filtering.
46fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
47    let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
48    let reader = BufReader::new(file);
49
50    let mut out = Vec::new();
51    let mut truncated = false;
52
53    for (idx, line_result) in reader.lines().enumerate() {
54        let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
55        let line_num = (idx + 1) as i64;
56
57        if let Some(re) = match_regex {
58            if !re.is_match(&line) {
59                continue;
60            }
61        }
62
63        if out.len() >= MAX_LINES_PER_FILE {
64            truncated = true;
65            break;
66        }
67        out.push((line_num, line));
68    }
69
70    if truncated {
71        eprintln!(
72            "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
73            MAX_LINES_PER_FILE, path
74        );
75    }
76
77    Ok(out)
78}
79
80/// Read lines from a file path, or from stdin if `path == "-"` (Unix convention).
81///
82/// Stdin reads are cached for the process — same buffer is reused across multiple
83/// reader invocations in the same query. The optional regex filter is applied to
84/// both sources uniformly.
85fn read_lines_from_path_or_stdin(
86    path: &str,
87    match_regex: Option<&Regex>,
88) -> Result<Vec<(i64, String)>> {
89    if path == "-" {
90        let cached = cached_stdin_lines()?;
91        return Ok(cached
92            .iter()
93            .filter(|(_, line)| match_regex.map_or(true, |re| re.is_match(line)))
94            .cloned()
95            .collect());
96    }
97    read_filtered_lines(path, match_regex)
98}
99
100/// READ_TEXT(path [, match_regex]) - Read a text file line by line.
101///
102/// Emits `(line_num, line)` rows. Optional `match_regex` filters source lines
103/// *before* materializing them, which is the primary fast path for large logs.
104pub struct ReadText;
105
106impl TableGenerator for ReadText {
107    fn name(&self) -> &str {
108        "READ_TEXT"
109    }
110
111    fn columns(&self) -> Vec<DataColumn> {
112        vec![DataColumn::new("line_num"), DataColumn::new("line")]
113    }
114
115    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
116        if args.is_empty() || args.len() > 2 {
117            return Err(anyhow!(
118                "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
119            ));
120        }
121
122        let path = require_string(&args, 0, "READ_TEXT")?;
123        let match_regex = optional_string(&args, 1)
124            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
125            .transpose()?;
126
127        let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
128
129        let mut table = DataTable::new("read_text");
130        table.add_column(DataColumn::new("line_num"));
131        table.add_column(DataColumn::new("line"));
132
133        for (line_num, line) in lines {
134            table
135                .add_row(DataRow::new(vec![
136                    DataValue::Integer(line_num),
137                    DataValue::String(line),
138                ]))
139                .map_err(|e| anyhow!(e))?;
140        }
141
142        Ok(Arc::new(table))
143    }
144
145    fn description(&self) -> &str {
146        "Read a text file line-by-line. Pass '-' as path to read from stdin. Optional second arg is a regex that filters lines at read time."
147    }
148
149    fn arg_count(&self) -> usize {
150        2
151    }
152}
153
154/// GREP(path, pattern [, invert]) - Read only lines matching a regex.
155///
156/// Thin composable wrapper around READ_TEXT's filter path. Third argument
157/// (boolean or integer truthy value) inverts the match, matching `grep -v`.
158pub struct Grep;
159
160impl TableGenerator for Grep {
161    fn name(&self) -> &str {
162        "GREP"
163    }
164
165    fn columns(&self) -> Vec<DataColumn> {
166        vec![DataColumn::new("line_num"), DataColumn::new("line")]
167    }
168
169    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
170        if args.len() < 2 || args.len() > 3 {
171            return Err(anyhow!(
172                "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
173            ));
174        }
175
176        let path = require_string(&args, 0, "GREP")?;
177        let pattern_str = require_string(&args, 1, "GREP")?;
178        let pattern =
179            Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
180
181        let invert = match args.get(2) {
182            Some(DataValue::Boolean(b)) => *b,
183            Some(DataValue::Integer(n)) => *n != 0,
184            Some(DataValue::Null) | None => false,
185            Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
186        };
187
188        // When not inverted we can push the filter down into the line reader for
189        // the fast path. When inverted we still iterate every line.
190        let lines = if invert {
191            let all = read_lines_from_path_or_stdin(&path, None)?;
192            all.into_iter()
193                .filter(|(_, line)| !pattern.is_match(line))
194                .collect::<Vec<_>>()
195        } else {
196            read_lines_from_path_or_stdin(&path, Some(&pattern))?
197        };
198
199        let mut table = DataTable::new("grep");
200        table.add_column(DataColumn::new("line_num"));
201        table.add_column(DataColumn::new("line"));
202
203        for (line_num, line) in lines {
204            table
205                .add_row(DataRow::new(vec![
206                    DataValue::Integer(line_num),
207                    DataValue::String(line),
208                ]))
209                .map_err(|e| anyhow!(e))?;
210        }
211
212        Ok(Arc::new(table))
213    }
214
215    fn description(&self) -> &str {
216        "Read only lines matching a regex (third arg inverts the match, like grep -v). Pass '-' as path to read from stdin."
217    }
218
219    fn arg_count(&self) -> usize {
220        3
221    }
222}
223
224/// READ_WORDS(path [, min_length [, case]]) - Read a text file and emit one row per word.
225///
226/// Emits `(word_num, word, line_num, word_pos)` rows where:
227///   - `word_num` is a global 1-based word counter across the whole file
228///   - `word` is the extracted token (punctuation stripped)
229///   - `line_num` is the original 1-based line number the word came from
230///   - `word_pos` is the 1-based position of the word within its line
231///
232/// Optional `min_length` (default 1) filters out short words.
233/// Optional `case` ('lower' or 'upper') normalises word casing.
234pub struct ReadWords;
235
236impl TableGenerator for ReadWords {
237    fn name(&self) -> &str {
238        "READ_WORDS"
239    }
240
241    fn columns(&self) -> Vec<DataColumn> {
242        vec![
243            DataColumn::new("word_num"),
244            DataColumn::new("word"),
245            DataColumn::new("line_num"),
246            DataColumn::new("word_pos"),
247        ]
248    }
249
250    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
251        if args.is_empty() || args.len() > 3 {
252            return Err(anyhow!(
253                "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
254            ));
255        }
256
257        let path = require_string(&args, 0, "READ_WORDS")?;
258
259        let min_length: usize = match args.get(1) {
260            Some(DataValue::Integer(n)) => {
261                if *n < 1 {
262                    return Err(anyhow!("READ_WORDS min_length must be >= 1"));
263                }
264                *n as usize
265            }
266            Some(DataValue::Float(f)) => *f as usize,
267            Some(DataValue::Null) | None => 1,
268            Some(v) => {
269                return Err(anyhow!(
270                    "READ_WORDS min_length must be an integer, got {:?}",
271                    v
272                ))
273            }
274        };
275
276        let case_option = optional_string(&args, 2);
277
278        let lines = read_lines_from_path_or_stdin(&path, None)?;
279
280        let mut table = DataTable::new("read_words");
281        table.add_column(DataColumn::new("word_num"));
282        table.add_column(DataColumn::new("word"));
283        table.add_column(DataColumn::new("line_num"));
284        table.add_column(DataColumn::new("word_pos"));
285
286        let mut word_num: i64 = 0;
287
288        for (line_num, line) in &lines {
289            let mut word_pos: i64 = 0;
290
291            for token in line.split(|c: char| !c.is_alphanumeric()) {
292                if token.is_empty() || token.len() < min_length {
293                    continue;
294                }
295
296                word_pos += 1;
297                word_num += 1;
298
299                let word = match case_option.as_deref() {
300                    Some("lower") | Some("lowercase") => token.to_lowercase(),
301                    Some("upper") | Some("uppercase") => token.to_uppercase(),
302                    _ => token.to_string(),
303                };
304
305                table
306                    .add_row(DataRow::new(vec![
307                        DataValue::Integer(word_num),
308                        DataValue::String(word),
309                        DataValue::Integer(*line_num),
310                        DataValue::Integer(word_pos),
311                    ]))
312                    .map_err(|e| anyhow!(e))?;
313            }
314        }
315
316        Ok(Arc::new(table))
317    }
318
319    fn description(&self) -> &str {
320        "Read a text file and emit one row per word, with optional min length and case normalisation"
321    }
322
323    fn arg_count(&self) -> usize {
324        3
325    }
326}
327
328/// READ_JSONL(path [, match_regex]) - Read newline-delimited JSON.
329///
330/// Each non-blank line is parsed as a self-contained JSON object. Schema is
331/// the union of object keys across the first 100 records, so heterogeneous
332/// log streams (later events introducing new fields) don't drop columns.
333/// Optional `match_regex` filters source lines *before* JSON parsing — the
334/// fast path for grepping large log files.
335pub struct ReadJsonl;
336
337impl TableGenerator for ReadJsonl {
338    fn name(&self) -> &str {
339        "READ_JSONL"
340    }
341
342    fn columns(&self) -> Vec<DataColumn> {
343        // Schema is dynamic; the actual columns are inferred at generate() time
344        // from the JSON keys in the file.
345        vec![DataColumn::new("(inferred from JSON keys)")]
346    }
347
348    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
349        if args.is_empty() || args.len() > 2 {
350            return Err(anyhow!(
351                "READ_JSONL expects 1 or 2 arguments: (path [, match_regex])"
352            ));
353        }
354
355        let path = require_string(&args, 0, "READ_JSONL")?;
356        let match_regex = optional_string(&args, 1)
357            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
358            .transpose()?;
359
360        let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
361
362        let mut records: Vec<JsonValue> = Vec::with_capacity(lines.len());
363        for (line_num, line) in &lines {
364            let trimmed = line.trim();
365            if trimmed.is_empty() {
366                continue;
367            }
368            let value: JsonValue = serde_json::from_str(trimmed)
369                .map_err(|e| anyhow!("READ_JSONL parse error at line {}: {}", line_num, e))?;
370            records.push(value);
371        }
372
373        if records.is_empty() {
374            return Ok(Arc::new(DataTable::new("read_jsonl")));
375        }
376
377        let column_names = collect_column_names(&records, 100);
378        if column_names.is_empty() {
379            return Err(anyhow!(
380                "READ_JSONL: no JSON objects found (records must be objects, not arrays/scalars)"
381            ));
382        }
383
384        // Stringify, infer types from the first 100 rows, then convert to typed
385        // DataValues — same pipeline the file-level JSON loader uses.
386        let mut string_rows: Vec<Vec<String>> = Vec::with_capacity(records.len());
387        for record in &records {
388            let obj = match record.as_object() {
389                Some(o) => o,
390                None => continue,
391            };
392            let mut row = Vec::with_capacity(column_names.len());
393            for col_name in &column_names {
394                let value = obj
395                    .get(col_name)
396                    .map(json_value_to_string)
397                    .unwrap_or_default();
398                row.push(value);
399            }
400            string_rows.push(row);
401        }
402
403        let mut column_types: Vec<DataType> = vec![DataType::Null; column_names.len()];
404        let sample_size = string_rows.len().min(100);
405        for row in string_rows.iter().take(sample_size) {
406            for (col_idx, value) in row.iter().enumerate() {
407                if !value.is_empty() && value != "null" {
408                    let inferred = DataType::infer_from_string(value);
409                    column_types[col_idx] = column_types[col_idx].merge(&inferred);
410                }
411            }
412        }
413
414        let mut table = DataTable::new("read_jsonl");
415        for (name, dtype) in column_names.iter().zip(column_types.iter()) {
416            let mut col = DataColumn::new(name);
417            col.data_type = dtype.clone();
418            table.add_column(col);
419        }
420
421        for string_row in &string_rows {
422            let mut values = Vec::with_capacity(string_row.len());
423            for (col_idx, value) in string_row.iter().enumerate() {
424                let dv = if value.is_empty() || value == "null" {
425                    DataValue::Null
426                } else {
427                    DataValue::from_string(value, &column_types[col_idx])
428                };
429                values.push(dv);
430            }
431            table
432                .add_row(DataRow::new(values))
433                .map_err(|e| anyhow!(e))?;
434        }
435
436        Ok(Arc::new(table))
437    }
438
439    fn description(&self) -> &str {
440        "Read newline-delimited JSON (one object per line). Pass '-' as path to read JSONL from stdin. Optional second arg is a regex that filters lines at read time."
441    }
442
443    fn arg_count(&self) -> usize {
444        2
445    }
446}
447
448/// READ_CSV(path) - Read a CSV file and emit one row per record.
449///
450/// Columns are inferred from the header row. Pass `-` as the path to read CSV
451/// from stdin (shares the same cached-once buffer with READ_STDIN / READ_JSONL).
452/// Type inference, string interning, and other optimisations are inherited
453/// from the main CSV loader so behaviour matches `sql-cli file.csv -q ...`.
454pub struct ReadCsv;
455
456impl TableGenerator for ReadCsv {
457    fn name(&self) -> &str {
458        "READ_CSV"
459    }
460
461    fn columns(&self) -> Vec<DataColumn> {
462        // Schema is inferred from the CSV header at generate() time.
463        vec![DataColumn::new("(inferred from CSV header)")]
464    }
465
466    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
467        if args.len() != 1 {
468            return Err(anyhow!("READ_CSV expects 1 argument: (path)"));
469        }
470
471        let path = require_string(&args, 0, "READ_CSV")?;
472        let mut loader = AdvancedCsvLoader::new();
473
474        let table = if path == "-" {
475            // Reconstruct a CSV byte stream from the cached stdin lines so other
476            // stdin readers in the same query keep seeing the same buffer.
477            let lines = cached_stdin_lines()?;
478            let mut buffer = String::with_capacity(lines.iter().map(|(_, l)| l.len() + 1).sum());
479            for (i, (_, line)) in lines.iter().enumerate() {
480                if i > 0 {
481                    buffer.push('\n');
482                }
483                buffer.push_str(line);
484            }
485            let cursor = Cursor::new(buffer.into_bytes());
486            loader
487                .load_csv_from_reader(cursor, "read_csv", "<stdin>")
488                .map_err(|e| anyhow!("READ_CSV parse error reading stdin: {}", e))?
489        } else {
490            let file = File::open(&path)
491                .map_err(|e| anyhow!("READ_CSV failed to open '{}': {}", path, e))?;
492            loader
493                .load_csv_from_reader(file, "read_csv", &path)
494                .map_err(|e| anyhow!("READ_CSV parse error reading '{}': {}", path, e))?
495        };
496
497        Ok(Arc::new(table))
498    }
499
500    fn description(&self) -> &str {
501        "Read a CSV file (header row required). Pass '-' as path to read CSV from stdin."
502    }
503
504    fn arg_count(&self) -> usize {
505        1
506    }
507}
508
509fn json_value_to_string(value: &JsonValue) -> String {
510    match value {
511        JsonValue::Null => String::new(),
512        JsonValue::Bool(b) => b.to_string(),
513        JsonValue::Number(n) => n.to_string(),
514        JsonValue::String(s) => s.clone(),
515        JsonValue::Array(arr) => format!("{:?}", arr),
516        JsonValue::Object(obj) => format!("{:?}", obj),
517    }
518}
519
520/// READ_STDIN([match_regex]) - Read lines piped on standard input.
521///
522/// Emits `(line_num, line)` rows. Optional regex pre-filters lines before
523/// materialisation. Erroring on TTY input prevents the engine from blocking
524/// forever waiting on a keyboard when the user forgets the pipe.
525///
526/// Stdin is consumable, so it is read **once per process** and cached. This
527/// keeps the function composable: multiple `READ_STDIN()` references in the
528/// same query (CTEs, self-joins) see the same rows. The regex filter is
529/// applied per call against the cached buffer.
530pub struct ReadStdin;
531
532static STDIN_CACHE: OnceLock<Result<Vec<(i64, String)>, String>> = OnceLock::new();
533
534fn cached_stdin_lines() -> Result<&'static Vec<(i64, String)>> {
535    let cached = STDIN_CACHE.get_or_init(|| {
536        let stdin = std::io::stdin();
537        if stdin.is_terminal() {
538            return Err("READ_STDIN() requires data piped on stdin; got an interactive terminal. Try: cat file | sql-cli -q '...'".to_string());
539        }
540        let handle = stdin.lock();
541        let reader = BufReader::new(handle);
542        let mut out = Vec::new();
543        let mut truncated = false;
544        for (idx, line_result) in reader.lines().enumerate() {
545            let line = match line_result {
546                Ok(l) => l,
547                Err(e) => return Err(format!("Error reading stdin: {}", e)),
548            };
549            if out.len() >= MAX_LINES_PER_FILE {
550                truncated = true;
551                break;
552            }
553            out.push(((idx + 1) as i64, line));
554        }
555        if truncated {
556            eprintln!(
557                "WARNING: truncated to {} rows (max_lines_per_file cap) when reading stdin",
558                MAX_LINES_PER_FILE
559            );
560        }
561        Ok(out)
562    });
563    cached.as_ref().map_err(|e| anyhow!(e.clone()))
564}
565
566impl TableGenerator for ReadStdin {
567    fn name(&self) -> &str {
568        "READ_STDIN"
569    }
570
571    fn columns(&self) -> Vec<DataColumn> {
572        vec![DataColumn::new("line_num"), DataColumn::new("line")]
573    }
574
575    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
576        if args.len() > 1 {
577            return Err(anyhow!(
578                "READ_STDIN expects 0 or 1 arguments: ([match_regex])"
579            ));
580        }
581
582        let match_regex = optional_string(&args, 0)
583            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
584            .transpose()?;
585
586        let lines = cached_stdin_lines()?;
587
588        let mut table = DataTable::new("read_stdin");
589        table.add_column(DataColumn::new("line_num"));
590        table.add_column(DataColumn::new("line"));
591
592        for (line_num, line) in lines {
593            if let Some(ref re) = match_regex {
594                if !re.is_match(line) {
595                    continue;
596                }
597            }
598            table
599                .add_row(DataRow::new(vec![
600                    DataValue::Integer(*line_num),
601                    DataValue::String(line.clone()),
602                ]))
603                .map_err(|e| anyhow!(e))?;
604        }
605
606        Ok(Arc::new(table))
607    }
608
609    fn description(&self) -> &str {
610        "Read lines piped on stdin (cached once per process). Optional regex filters lines at read time. Yields (line_num, line)."
611    }
612
613    fn arg_count(&self) -> usize {
614        1
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621    use std::io::Write;
622    use tempfile::NamedTempFile;
623
624    fn write_tmp(contents: &str) -> NamedTempFile {
625        let mut f = NamedTempFile::new().unwrap();
626        f.write_all(contents.as_bytes()).unwrap();
627        f
628    }
629
630    #[test]
631    fn test_read_text_returns_all_lines() {
632        let f = write_tmp("one\ntwo\nthree\n");
633        let table = ReadText
634            .generate(vec![DataValue::String(
635                f.path().to_string_lossy().to_string(),
636            )])
637            .unwrap();
638        assert_eq!(table.row_count(), 3);
639        assert_eq!(
640            table.get_value(0, 1).unwrap(),
641            &DataValue::String("one".to_string())
642        );
643        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
644    }
645
646    #[test]
647    fn test_read_text_with_match_regex_filters_lines() {
648        let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
649        let table = ReadText
650            .generate(vec![
651                DataValue::String(f.path().to_string_lossy().to_string()),
652                DataValue::String("ERROR".to_string()),
653            ])
654            .unwrap();
655        assert_eq!(table.row_count(), 2);
656        // Line numbers preserve original file positions (2 and 4), not 1 and 2.
657        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
658        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
659    }
660
661    #[test]
662    fn test_read_text_requires_path() {
663        assert!(ReadText.generate(vec![]).is_err());
664    }
665
666    #[test]
667    fn test_read_text_invalid_regex_errors_early() {
668        let f = write_tmp("hello\n");
669        let err = ReadText
670            .generate(vec![
671                DataValue::String(f.path().to_string_lossy().to_string()),
672                DataValue::String("(unclosed".to_string()),
673            ])
674            .unwrap_err();
675        assert!(err.to_string().contains("match_regex"));
676    }
677
678    #[test]
679    fn test_grep_matches_like_grep() {
680        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
681        let table = Grep
682            .generate(vec![
683                DataValue::String(f.path().to_string_lossy().to_string()),
684                DataValue::String("^ap".to_string()),
685            ])
686            .unwrap();
687        assert_eq!(table.row_count(), 2);
688        assert_eq!(
689            table.get_value(0, 1).unwrap(),
690            &DataValue::String("apple".to_string())
691        );
692        assert_eq!(
693            table.get_value(1, 1).unwrap(),
694            &DataValue::String("apricot".to_string())
695        );
696    }
697
698    #[test]
699    fn test_grep_invert_like_grep_v() {
700        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
701        let table = Grep
702            .generate(vec![
703                DataValue::String(f.path().to_string_lossy().to_string()),
704                DataValue::String("^ap".to_string()),
705                DataValue::Boolean(true),
706            ])
707            .unwrap();
708        assert_eq!(table.row_count(), 2);
709        assert_eq!(
710            table.get_value(0, 1).unwrap(),
711            &DataValue::String("banana".to_string())
712        );
713    }
714
715    // ---- ReadWords tests ----
716
717    #[test]
718    fn test_read_words_basic() {
719        let f = write_tmp("hello world\ngoodbye moon\n");
720        let table = ReadWords
721            .generate(vec![DataValue::String(
722                f.path().to_string_lossy().to_string(),
723            )])
724            .unwrap();
725        // 4 words total
726        assert_eq!(table.row_count(), 4);
727        // Columns: word_num, word, line_num, word_pos
728        // First word
729        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); // word_num
730        assert_eq!(
731            table.get_value(0, 1).unwrap(),
732            &DataValue::String("hello".to_string())
733        );
734        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); // line_num
735        assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); // word_pos
736                                                                            // Third word (first on line 2)
737        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
738        assert_eq!(
739            table.get_value(2, 1).unwrap(),
740            &DataValue::String("goodbye".to_string())
741        );
742        assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
743        assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
744    }
745
746    #[test]
747    fn test_read_words_min_length() {
748        let f = write_tmp("I am a big dog\n");
749        let table = ReadWords
750            .generate(vec![
751                DataValue::String(f.path().to_string_lossy().to_string()),
752                DataValue::Integer(3),
753            ])
754            .unwrap();
755        // Only "big" and "dog" have length >= 3
756        assert_eq!(table.row_count(), 2);
757        assert_eq!(
758            table.get_value(0, 1).unwrap(),
759            &DataValue::String("big".to_string())
760        );
761        assert_eq!(
762            table.get_value(1, 1).unwrap(),
763            &DataValue::String("dog".to_string())
764        );
765    }
766
767    #[test]
768    fn test_read_words_case_lower() {
769        let f = write_tmp("Hello World\n");
770        let table = ReadWords
771            .generate(vec![
772                DataValue::String(f.path().to_string_lossy().to_string()),
773                DataValue::Integer(1),
774                DataValue::String("lower".to_string()),
775            ])
776            .unwrap();
777        assert_eq!(
778            table.get_value(0, 1).unwrap(),
779            &DataValue::String("hello".to_string())
780        );
781        assert_eq!(
782            table.get_value(1, 1).unwrap(),
783            &DataValue::String("world".to_string())
784        );
785    }
786
787    #[test]
788    fn test_read_words_strips_punctuation() {
789        let f = write_tmp("hello, world! foo-bar.\n");
790        let table = ReadWords
791            .generate(vec![DataValue::String(
792                f.path().to_string_lossy().to_string(),
793            )])
794            .unwrap();
795        let words: Vec<String> = (0..table.row_count())
796            .map(|i| match table.get_value(i, 1).unwrap() {
797                DataValue::String(s) => s.clone(),
798                _ => panic!("expected string"),
799            })
800            .collect();
801        assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
802    }
803
804    #[test]
805    fn test_read_words_requires_path() {
806        assert!(ReadWords.generate(vec![]).is_err());
807    }
808
809    #[test]
810    fn test_read_words_empty_lines_skipped() {
811        let f = write_tmp("hello\n\n\nworld\n");
812        let table = ReadWords
813            .generate(vec![DataValue::String(
814                f.path().to_string_lossy().to_string(),
815            )])
816            .unwrap();
817        assert_eq!(table.row_count(), 2);
818        // word_num is contiguous
819        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
820        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
821        // line_num preserves original positions
822        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
823        assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
824    }
825
826    // ---- ReadJsonl tests ----
827
828    fn col_index(table: &DataTable, name: &str) -> usize {
829        table
830            .columns
831            .iter()
832            .position(|c| c.name == name)
833            .unwrap_or_else(|| panic!("column '{}' not found", name))
834    }
835
836    #[test]
837    fn test_read_jsonl_basic() {
838        let f = write_tmp(
839            r#"{"id":1,"name":"alice","score":10}
840{"id":2,"name":"bob","score":20}
841{"id":3,"name":"carol","score":30}
842"#,
843        );
844        let table = ReadJsonl
845            .generate(vec![DataValue::String(
846                f.path().to_string_lossy().to_string(),
847            )])
848            .unwrap();
849        assert_eq!(table.row_count(), 3);
850        assert_eq!(table.column_count(), 3);
851
852        let id_col = col_index(&table, "id");
853        let name_col = col_index(&table, "name");
854        assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
855        assert_eq!(
856            table.get_value(2, name_col).unwrap(),
857            &DataValue::String("carol".to_string())
858        );
859    }
860
861    #[test]
862    fn test_read_jsonl_heterogeneous_schema_unioned() {
863        // Later records introduce fields the first one didn't have. Schema
864        // should union them; missing values become Null.
865        let f = write_tmp(
866            r#"{"id":1,"name":"alice"}
867{"id":2,"name":"bob","extra":"hello"}
868{"id":3,"name":"carol","other":42}
869"#,
870        );
871        let table = ReadJsonl
872            .generate(vec![DataValue::String(
873                f.path().to_string_lossy().to_string(),
874            )])
875            .unwrap();
876        assert_eq!(table.row_count(), 3);
877        assert_eq!(table.column_count(), 4);
878        let extra = col_index(&table, "extra");
879        let other = col_index(&table, "other");
880        // Row 0 has neither extra nor other -> Null
881        assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
882        assert_eq!(table.get_value(0, other).unwrap(), &DataValue::Null);
883        // Row 1 has extra="hello"
884        assert_eq!(
885            table.get_value(1, extra).unwrap(),
886            &DataValue::String("hello".to_string())
887        );
888        // Row 2 has other=42
889        assert_eq!(table.get_value(2, other).unwrap(), &DataValue::Integer(42));
890    }
891
892    #[test]
893    fn test_read_jsonl_blank_lines_skipped() {
894        let f = write_tmp(
895            r#"{"id":1}
896
897{"id":2}
898
899"#,
900        );
901        let table = ReadJsonl
902            .generate(vec![DataValue::String(
903                f.path().to_string_lossy().to_string(),
904            )])
905            .unwrap();
906        assert_eq!(table.row_count(), 2);
907    }
908
909    #[test]
910    fn test_read_jsonl_match_regex_pre_filters() {
911        let f = write_tmp(
912            r#"{"level":"INFO","msg":"boot"}
913{"level":"ERROR","msg":"disk"}
914{"level":"INFO","msg":"shutdown"}
915{"level":"ERROR","msg":"oom"}
916"#,
917        );
918        let table = ReadJsonl
919            .generate(vec![
920                DataValue::String(f.path().to_string_lossy().to_string()),
921                DataValue::String("ERROR".to_string()),
922            ])
923            .unwrap();
924        assert_eq!(table.row_count(), 2);
925        let msg = col_index(&table, "msg");
926        assert_eq!(
927            table.get_value(0, msg).unwrap(),
928            &DataValue::String("disk".to_string())
929        );
930    }
931
932    #[test]
933    fn test_read_jsonl_invalid_line_errors_with_line_number() {
934        let f = write_tmp(
935            r#"{"id":1}
936not json at all
937{"id":3}
938"#,
939        );
940        let err = ReadJsonl
941            .generate(vec![DataValue::String(
942                f.path().to_string_lossy().to_string(),
943            )])
944            .unwrap_err();
945        let msg = err.to_string();
946        assert!(
947            msg.contains("line 2"),
948            "error should cite line number: {}",
949            msg
950        );
951    }
952
953    #[test]
954    fn test_read_jsonl_requires_path() {
955        assert!(ReadJsonl.generate(vec![]).is_err());
956    }
957
958    #[test]
959    fn test_read_jsonl_empty_file_returns_empty_table() {
960        let f = write_tmp("");
961        let table = ReadJsonl
962            .generate(vec![DataValue::String(
963                f.path().to_string_lossy().to_string(),
964            )])
965            .unwrap();
966        assert_eq!(table.row_count(), 0);
967    }
968
969    // ReadStdin: argument-validation only (stdin is process-global and we cannot
970    // safely inject test data without refactoring to inject a Reader). The
971    // happy-path is covered by manual smoke tests in commit messages and the
972    // examples/ folder.
973
974    #[test]
975    fn test_read_stdin_rejects_too_many_args() {
976        let err = ReadStdin
977            .generate(vec![
978                DataValue::String("foo".to_string()),
979                DataValue::String("bar".to_string()),
980            ])
981            .unwrap_err();
982        assert!(
983            err.to_string().contains("0 or 1 arguments"),
984            "should mention arg count: {}",
985            err
986        );
987    }
988
989    #[test]
990    fn test_read_stdin_rejects_invalid_regex() {
991        let err = ReadStdin
992            .generate(vec![DataValue::String("[invalid(regex".to_string())])
993            .unwrap_err();
994        assert!(
995            err.to_string().contains("Invalid match_regex"),
996            "should mention regex: {}",
997            err
998        );
999    }
1000}