Skip to main content

sql_cli/sql/generators/
file_readers.rs

1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
2use crate::data::stream_loader::collect_column_names;
3use crate::sql::generators::TableGenerator;
4use anyhow::{anyhow, Result};
5use regex::Regex;
6use serde_json::Value as JsonValue;
7use std::fs::File;
8use std::io::{BufRead, BufReader};
9use std::sync::Arc;
10
11/// Hard cap on rows any file reader will return. Users who need more can raise
12/// it via a session setting (future work); for now this protects against
13/// accidentally pulling a multi-GB log into memory.
14const MAX_LINES_PER_FILE: usize = 1_000_000;
15
16/// Extract a string argument, erroring if the arg is missing/NULL/non-string.
17fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
18    match args.get(idx) {
19        Some(DataValue::String(s)) => Ok(s.clone()),
20        Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
21        Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
22        Some(v) => Err(anyhow!(
23            "{} argument {} must be a string, got {:?}",
24            name,
25            idx + 1,
26            v
27        )),
28    }
29}
30
31/// Extract an optional string argument. Returns None for missing or NULL.
32fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
33    match args.get(idx) {
34        Some(DataValue::String(s)) => Some(s.clone()),
35        Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
36        _ => None,
37    }
38}
39
40/// Open a file and stream its lines, applying an optional include-regex filter
41/// and the global truncation cap. Emits a stderr warning when truncation kicks in.
42///
43/// Returns (line_num, line) pairs where `line_num` is the original 1-based line
44/// number in the source file — so numbers are preserved through filtering.
45fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
46    let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
47    let reader = BufReader::new(file);
48
49    let mut out = Vec::new();
50    let mut truncated = false;
51
52    for (idx, line_result) in reader.lines().enumerate() {
53        let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
54        let line_num = (idx + 1) as i64;
55
56        if let Some(re) = match_regex {
57            if !re.is_match(&line) {
58                continue;
59            }
60        }
61
62        if out.len() >= MAX_LINES_PER_FILE {
63            truncated = true;
64            break;
65        }
66        out.push((line_num, line));
67    }
68
69    if truncated {
70        eprintln!(
71            "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
72            MAX_LINES_PER_FILE, path
73        );
74    }
75
76    Ok(out)
77}
78
79/// READ_TEXT(path [, match_regex]) - Read a text file line by line.
80///
81/// Emits `(line_num, line)` rows. Optional `match_regex` filters source lines
82/// *before* materializing them, which is the primary fast path for large logs.
83pub struct ReadText;
84
85impl TableGenerator for ReadText {
86    fn name(&self) -> &str {
87        "READ_TEXT"
88    }
89
90    fn columns(&self) -> Vec<DataColumn> {
91        vec![DataColumn::new("line_num"), DataColumn::new("line")]
92    }
93
94    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
95        if args.is_empty() || args.len() > 2 {
96            return Err(anyhow!(
97                "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
98            ));
99        }
100
101        let path = require_string(&args, 0, "READ_TEXT")?;
102        let match_regex = optional_string(&args, 1)
103            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
104            .transpose()?;
105
106        let lines = read_filtered_lines(&path, match_regex.as_ref())?;
107
108        let mut table = DataTable::new("read_text");
109        table.add_column(DataColumn::new("line_num"));
110        table.add_column(DataColumn::new("line"));
111
112        for (line_num, line) in lines {
113            table
114                .add_row(DataRow::new(vec![
115                    DataValue::Integer(line_num),
116                    DataValue::String(line),
117                ]))
118                .map_err(|e| anyhow!(e))?;
119        }
120
121        Ok(Arc::new(table))
122    }
123
124    fn description(&self) -> &str {
125        "Read a text file line-by-line. Optional second arg is a regex that filters lines at read time."
126    }
127
128    fn arg_count(&self) -> usize {
129        2
130    }
131}
132
133/// GREP(path, pattern [, invert]) - Read only lines matching a regex.
134///
135/// Thin composable wrapper around READ_TEXT's filter path. Third argument
136/// (boolean or integer truthy value) inverts the match, matching `grep -v`.
137pub struct Grep;
138
139impl TableGenerator for Grep {
140    fn name(&self) -> &str {
141        "GREP"
142    }
143
144    fn columns(&self) -> Vec<DataColumn> {
145        vec![DataColumn::new("line_num"), DataColumn::new("line")]
146    }
147
148    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
149        if args.len() < 2 || args.len() > 3 {
150            return Err(anyhow!(
151                "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
152            ));
153        }
154
155        let path = require_string(&args, 0, "GREP")?;
156        let pattern_str = require_string(&args, 1, "GREP")?;
157        let pattern =
158            Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
159
160        let invert = match args.get(2) {
161            Some(DataValue::Boolean(b)) => *b,
162            Some(DataValue::Integer(n)) => *n != 0,
163            Some(DataValue::Null) | None => false,
164            Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
165        };
166
167        // When not inverted we can push the filter down into the file reader for
168        // the fast path. When inverted we still iterate every line.
169        let lines = if invert {
170            let all = read_filtered_lines(&path, None)?;
171            all.into_iter()
172                .filter(|(_, line)| !pattern.is_match(line))
173                .collect::<Vec<_>>()
174        } else {
175            read_filtered_lines(&path, Some(&pattern))?
176        };
177
178        let mut table = DataTable::new("grep");
179        table.add_column(DataColumn::new("line_num"));
180        table.add_column(DataColumn::new("line"));
181
182        for (line_num, line) in lines {
183            table
184                .add_row(DataRow::new(vec![
185                    DataValue::Integer(line_num),
186                    DataValue::String(line),
187                ]))
188                .map_err(|e| anyhow!(e))?;
189        }
190
191        Ok(Arc::new(table))
192    }
193
194    fn description(&self) -> &str {
195        "Read only lines matching a regex (third arg inverts the match, like grep -v)"
196    }
197
198    fn arg_count(&self) -> usize {
199        3
200    }
201}
202
203/// READ_WORDS(path [, min_length [, case]]) - Read a text file and emit one row per word.
204///
205/// Emits `(word_num, word, line_num, word_pos)` rows where:
206///   - `word_num` is a global 1-based word counter across the whole file
207///   - `word` is the extracted token (punctuation stripped)
208///   - `line_num` is the original 1-based line number the word came from
209///   - `word_pos` is the 1-based position of the word within its line
210///
211/// Optional `min_length` (default 1) filters out short words.
212/// Optional `case` ('lower' or 'upper') normalises word casing.
213pub struct ReadWords;
214
215impl TableGenerator for ReadWords {
216    fn name(&self) -> &str {
217        "READ_WORDS"
218    }
219
220    fn columns(&self) -> Vec<DataColumn> {
221        vec![
222            DataColumn::new("word_num"),
223            DataColumn::new("word"),
224            DataColumn::new("line_num"),
225            DataColumn::new("word_pos"),
226        ]
227    }
228
229    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
230        if args.is_empty() || args.len() > 3 {
231            return Err(anyhow!(
232                "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
233            ));
234        }
235
236        let path = require_string(&args, 0, "READ_WORDS")?;
237
238        let min_length: usize = match args.get(1) {
239            Some(DataValue::Integer(n)) => {
240                if *n < 1 {
241                    return Err(anyhow!("READ_WORDS min_length must be >= 1"));
242                }
243                *n as usize
244            }
245            Some(DataValue::Float(f)) => *f as usize,
246            Some(DataValue::Null) | None => 1,
247            Some(v) => {
248                return Err(anyhow!(
249                    "READ_WORDS min_length must be an integer, got {:?}",
250                    v
251                ))
252            }
253        };
254
255        let case_option = optional_string(&args, 2);
256
257        let lines = read_filtered_lines(&path, None)?;
258
259        let mut table = DataTable::new("read_words");
260        table.add_column(DataColumn::new("word_num"));
261        table.add_column(DataColumn::new("word"));
262        table.add_column(DataColumn::new("line_num"));
263        table.add_column(DataColumn::new("word_pos"));
264
265        let mut word_num: i64 = 0;
266
267        for (line_num, line) in &lines {
268            let mut word_pos: i64 = 0;
269
270            for token in line.split(|c: char| !c.is_alphanumeric()) {
271                if token.is_empty() || token.len() < min_length {
272                    continue;
273                }
274
275                word_pos += 1;
276                word_num += 1;
277
278                let word = match case_option.as_deref() {
279                    Some("lower") | Some("lowercase") => token.to_lowercase(),
280                    Some("upper") | Some("uppercase") => token.to_uppercase(),
281                    _ => token.to_string(),
282                };
283
284                table
285                    .add_row(DataRow::new(vec![
286                        DataValue::Integer(word_num),
287                        DataValue::String(word),
288                        DataValue::Integer(*line_num),
289                        DataValue::Integer(word_pos),
290                    ]))
291                    .map_err(|e| anyhow!(e))?;
292            }
293        }
294
295        Ok(Arc::new(table))
296    }
297
298    fn description(&self) -> &str {
299        "Read a text file and emit one row per word, with optional min length and case normalisation"
300    }
301
302    fn arg_count(&self) -> usize {
303        3
304    }
305}
306
307/// READ_JSONL(path [, match_regex]) - Read newline-delimited JSON.
308///
309/// Each non-blank line is parsed as a self-contained JSON object. Schema is
310/// the union of object keys across the first 100 records, so heterogeneous
311/// log streams (later events introducing new fields) don't drop columns.
312/// Optional `match_regex` filters source lines *before* JSON parsing — the
313/// fast path for grepping large log files.
314pub struct ReadJsonl;
315
316impl TableGenerator for ReadJsonl {
317    fn name(&self) -> &str {
318        "READ_JSONL"
319    }
320
321    fn columns(&self) -> Vec<DataColumn> {
322        // Schema is dynamic; the actual columns are inferred at generate() time
323        // from the JSON keys in the file.
324        vec![DataColumn::new("(inferred from JSON keys)")]
325    }
326
327    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
328        if args.is_empty() || args.len() > 2 {
329            return Err(anyhow!(
330                "READ_JSONL expects 1 or 2 arguments: (path [, match_regex])"
331            ));
332        }
333
334        let path = require_string(&args, 0, "READ_JSONL")?;
335        let match_regex = optional_string(&args, 1)
336            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
337            .transpose()?;
338
339        let lines = read_filtered_lines(&path, match_regex.as_ref())?;
340
341        let mut records: Vec<JsonValue> = Vec::with_capacity(lines.len());
342        for (line_num, line) in &lines {
343            let trimmed = line.trim();
344            if trimmed.is_empty() {
345                continue;
346            }
347            let value: JsonValue = serde_json::from_str(trimmed)
348                .map_err(|e| anyhow!("READ_JSONL parse error at line {}: {}", line_num, e))?;
349            records.push(value);
350        }
351
352        if records.is_empty() {
353            return Ok(Arc::new(DataTable::new("read_jsonl")));
354        }
355
356        let column_names = collect_column_names(&records, 100);
357        if column_names.is_empty() {
358            return Err(anyhow!(
359                "READ_JSONL: no JSON objects found (records must be objects, not arrays/scalars)"
360            ));
361        }
362
363        // Stringify, infer types from the first 100 rows, then convert to typed
364        // DataValues — same pipeline the file-level JSON loader uses.
365        let mut string_rows: Vec<Vec<String>> = Vec::with_capacity(records.len());
366        for record in &records {
367            let obj = match record.as_object() {
368                Some(o) => o,
369                None => continue,
370            };
371            let mut row = Vec::with_capacity(column_names.len());
372            for col_name in &column_names {
373                let value = obj
374                    .get(col_name)
375                    .map(json_value_to_string)
376                    .unwrap_or_default();
377                row.push(value);
378            }
379            string_rows.push(row);
380        }
381
382        let mut column_types: Vec<DataType> = vec![DataType::Null; column_names.len()];
383        let sample_size = string_rows.len().min(100);
384        for row in string_rows.iter().take(sample_size) {
385            for (col_idx, value) in row.iter().enumerate() {
386                if !value.is_empty() && value != "null" {
387                    let inferred = DataType::infer_from_string(value);
388                    column_types[col_idx] = column_types[col_idx].merge(&inferred);
389                }
390            }
391        }
392
393        let mut table = DataTable::new("read_jsonl");
394        for (name, dtype) in column_names.iter().zip(column_types.iter()) {
395            let mut col = DataColumn::new(name);
396            col.data_type = dtype.clone();
397            table.add_column(col);
398        }
399
400        for string_row in &string_rows {
401            let mut values = Vec::with_capacity(string_row.len());
402            for (col_idx, value) in string_row.iter().enumerate() {
403                let dv = if value.is_empty() || value == "null" {
404                    DataValue::Null
405                } else {
406                    DataValue::from_string(value, &column_types[col_idx])
407                };
408                values.push(dv);
409            }
410            table
411                .add_row(DataRow::new(values))
412                .map_err(|e| anyhow!(e))?;
413        }
414
415        Ok(Arc::new(table))
416    }
417
418    fn description(&self) -> &str {
419        "Read newline-delimited JSON (one object per line). Optional second arg is a regex that filters lines at read time."
420    }
421
422    fn arg_count(&self) -> usize {
423        2
424    }
425}
426
427fn json_value_to_string(value: &JsonValue) -> String {
428    match value {
429        JsonValue::Null => String::new(),
430        JsonValue::Bool(b) => b.to_string(),
431        JsonValue::Number(n) => n.to_string(),
432        JsonValue::String(s) => s.clone(),
433        JsonValue::Array(arr) => format!("{:?}", arr),
434        JsonValue::Object(obj) => format!("{:?}", obj),
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use std::io::Write;
442    use tempfile::NamedTempFile;
443
444    fn write_tmp(contents: &str) -> NamedTempFile {
445        let mut f = NamedTempFile::new().unwrap();
446        f.write_all(contents.as_bytes()).unwrap();
447        f
448    }
449
450    #[test]
451    fn test_read_text_returns_all_lines() {
452        let f = write_tmp("one\ntwo\nthree\n");
453        let table = ReadText
454            .generate(vec![DataValue::String(
455                f.path().to_string_lossy().to_string(),
456            )])
457            .unwrap();
458        assert_eq!(table.row_count(), 3);
459        assert_eq!(
460            table.get_value(0, 1).unwrap(),
461            &DataValue::String("one".to_string())
462        );
463        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
464    }
465
466    #[test]
467    fn test_read_text_with_match_regex_filters_lines() {
468        let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
469        let table = ReadText
470            .generate(vec![
471                DataValue::String(f.path().to_string_lossy().to_string()),
472                DataValue::String("ERROR".to_string()),
473            ])
474            .unwrap();
475        assert_eq!(table.row_count(), 2);
476        // Line numbers preserve original file positions (2 and 4), not 1 and 2.
477        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
478        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
479    }
480
481    #[test]
482    fn test_read_text_requires_path() {
483        assert!(ReadText.generate(vec![]).is_err());
484    }
485
486    #[test]
487    fn test_read_text_invalid_regex_errors_early() {
488        let f = write_tmp("hello\n");
489        let err = ReadText
490            .generate(vec![
491                DataValue::String(f.path().to_string_lossy().to_string()),
492                DataValue::String("(unclosed".to_string()),
493            ])
494            .unwrap_err();
495        assert!(err.to_string().contains("match_regex"));
496    }
497
498    #[test]
499    fn test_grep_matches_like_grep() {
500        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
501        let table = Grep
502            .generate(vec![
503                DataValue::String(f.path().to_string_lossy().to_string()),
504                DataValue::String("^ap".to_string()),
505            ])
506            .unwrap();
507        assert_eq!(table.row_count(), 2);
508        assert_eq!(
509            table.get_value(0, 1).unwrap(),
510            &DataValue::String("apple".to_string())
511        );
512        assert_eq!(
513            table.get_value(1, 1).unwrap(),
514            &DataValue::String("apricot".to_string())
515        );
516    }
517
518    #[test]
519    fn test_grep_invert_like_grep_v() {
520        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
521        let table = Grep
522            .generate(vec![
523                DataValue::String(f.path().to_string_lossy().to_string()),
524                DataValue::String("^ap".to_string()),
525                DataValue::Boolean(true),
526            ])
527            .unwrap();
528        assert_eq!(table.row_count(), 2);
529        assert_eq!(
530            table.get_value(0, 1).unwrap(),
531            &DataValue::String("banana".to_string())
532        );
533    }
534
535    // ---- ReadWords tests ----
536
537    #[test]
538    fn test_read_words_basic() {
539        let f = write_tmp("hello world\ngoodbye moon\n");
540        let table = ReadWords
541            .generate(vec![DataValue::String(
542                f.path().to_string_lossy().to_string(),
543            )])
544            .unwrap();
545        // 4 words total
546        assert_eq!(table.row_count(), 4);
547        // Columns: word_num, word, line_num, word_pos
548        // First word
549        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); // word_num
550        assert_eq!(
551            table.get_value(0, 1).unwrap(),
552            &DataValue::String("hello".to_string())
553        );
554        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); // line_num
555        assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); // word_pos
556                                                                            // Third word (first on line 2)
557        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
558        assert_eq!(
559            table.get_value(2, 1).unwrap(),
560            &DataValue::String("goodbye".to_string())
561        );
562        assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
563        assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
564    }
565
566    #[test]
567    fn test_read_words_min_length() {
568        let f = write_tmp("I am a big dog\n");
569        let table = ReadWords
570            .generate(vec![
571                DataValue::String(f.path().to_string_lossy().to_string()),
572                DataValue::Integer(3),
573            ])
574            .unwrap();
575        // Only "big" and "dog" have length >= 3
576        assert_eq!(table.row_count(), 2);
577        assert_eq!(
578            table.get_value(0, 1).unwrap(),
579            &DataValue::String("big".to_string())
580        );
581        assert_eq!(
582            table.get_value(1, 1).unwrap(),
583            &DataValue::String("dog".to_string())
584        );
585    }
586
587    #[test]
588    fn test_read_words_case_lower() {
589        let f = write_tmp("Hello World\n");
590        let table = ReadWords
591            .generate(vec![
592                DataValue::String(f.path().to_string_lossy().to_string()),
593                DataValue::Integer(1),
594                DataValue::String("lower".to_string()),
595            ])
596            .unwrap();
597        assert_eq!(
598            table.get_value(0, 1).unwrap(),
599            &DataValue::String("hello".to_string())
600        );
601        assert_eq!(
602            table.get_value(1, 1).unwrap(),
603            &DataValue::String("world".to_string())
604        );
605    }
606
607    #[test]
608    fn test_read_words_strips_punctuation() {
609        let f = write_tmp("hello, world! foo-bar.\n");
610        let table = ReadWords
611            .generate(vec![DataValue::String(
612                f.path().to_string_lossy().to_string(),
613            )])
614            .unwrap();
615        let words: Vec<String> = (0..table.row_count())
616            .map(|i| match table.get_value(i, 1).unwrap() {
617                DataValue::String(s) => s.clone(),
618                _ => panic!("expected string"),
619            })
620            .collect();
621        assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
622    }
623
624    #[test]
625    fn test_read_words_requires_path() {
626        assert!(ReadWords.generate(vec![]).is_err());
627    }
628
629    #[test]
630    fn test_read_words_empty_lines_skipped() {
631        let f = write_tmp("hello\n\n\nworld\n");
632        let table = ReadWords
633            .generate(vec![DataValue::String(
634                f.path().to_string_lossy().to_string(),
635            )])
636            .unwrap();
637        assert_eq!(table.row_count(), 2);
638        // word_num is contiguous
639        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
640        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
641        // line_num preserves original positions
642        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
643        assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
644    }
645
646    // ---- ReadJsonl tests ----
647
648    fn col_index(table: &DataTable, name: &str) -> usize {
649        table
650            .columns
651            .iter()
652            .position(|c| c.name == name)
653            .unwrap_or_else(|| panic!("column '{}' not found", name))
654    }
655
656    #[test]
657    fn test_read_jsonl_basic() {
658        let f = write_tmp(
659            r#"{"id":1,"name":"alice","score":10}
660{"id":2,"name":"bob","score":20}
661{"id":3,"name":"carol","score":30}
662"#,
663        );
664        let table = ReadJsonl
665            .generate(vec![DataValue::String(
666                f.path().to_string_lossy().to_string(),
667            )])
668            .unwrap();
669        assert_eq!(table.row_count(), 3);
670        assert_eq!(table.column_count(), 3);
671
672        let id_col = col_index(&table, "id");
673        let name_col = col_index(&table, "name");
674        assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
675        assert_eq!(
676            table.get_value(2, name_col).unwrap(),
677            &DataValue::String("carol".to_string())
678        );
679    }
680
681    #[test]
682    fn test_read_jsonl_heterogeneous_schema_unioned() {
683        // Later records introduce fields the first one didn't have. Schema
684        // should union them; missing values become Null.
685        let f = write_tmp(
686            r#"{"id":1,"name":"alice"}
687{"id":2,"name":"bob","extra":"hello"}
688{"id":3,"name":"carol","other":42}
689"#,
690        );
691        let table = ReadJsonl
692            .generate(vec![DataValue::String(
693                f.path().to_string_lossy().to_string(),
694            )])
695            .unwrap();
696        assert_eq!(table.row_count(), 3);
697        assert_eq!(table.column_count(), 4);
698        let extra = col_index(&table, "extra");
699        let other = col_index(&table, "other");
700        // Row 0 has neither extra nor other -> Null
701        assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
702        assert_eq!(table.get_value(0, other).unwrap(), &DataValue::Null);
703        // Row 1 has extra="hello"
704        assert_eq!(
705            table.get_value(1, extra).unwrap(),
706            &DataValue::String("hello".to_string())
707        );
708        // Row 2 has other=42
709        assert_eq!(table.get_value(2, other).unwrap(), &DataValue::Integer(42));
710    }
711
712    #[test]
713    fn test_read_jsonl_blank_lines_skipped() {
714        let f = write_tmp(
715            r#"{"id":1}
716
717{"id":2}
718
719"#,
720        );
721        let table = ReadJsonl
722            .generate(vec![DataValue::String(
723                f.path().to_string_lossy().to_string(),
724            )])
725            .unwrap();
726        assert_eq!(table.row_count(), 2);
727    }
728
729    #[test]
730    fn test_read_jsonl_match_regex_pre_filters() {
731        let f = write_tmp(
732            r#"{"level":"INFO","msg":"boot"}
733{"level":"ERROR","msg":"disk"}
734{"level":"INFO","msg":"shutdown"}
735{"level":"ERROR","msg":"oom"}
736"#,
737        );
738        let table = ReadJsonl
739            .generate(vec![
740                DataValue::String(f.path().to_string_lossy().to_string()),
741                DataValue::String("ERROR".to_string()),
742            ])
743            .unwrap();
744        assert_eq!(table.row_count(), 2);
745        let msg = col_index(&table, "msg");
746        assert_eq!(
747            table.get_value(0, msg).unwrap(),
748            &DataValue::String("disk".to_string())
749        );
750    }
751
752    #[test]
753    fn test_read_jsonl_invalid_line_errors_with_line_number() {
754        let f = write_tmp(
755            r#"{"id":1}
756not json at all
757{"id":3}
758"#,
759        );
760        let err = ReadJsonl
761            .generate(vec![DataValue::String(
762                f.path().to_string_lossy().to_string(),
763            )])
764            .unwrap_err();
765        let msg = err.to_string();
766        assert!(
767            msg.contains("line 2"),
768            "error should cite line number: {}",
769            msg
770        );
771    }
772
773    #[test]
774    fn test_read_jsonl_requires_path() {
775        assert!(ReadJsonl.generate(vec![]).is_err());
776    }
777
778    #[test]
779    fn test_read_jsonl_empty_file_returns_empty_table() {
780        let f = write_tmp("");
781        let table = ReadJsonl
782            .generate(vec![DataValue::String(
783                f.path().to_string_lossy().to_string(),
784            )])
785            .unwrap();
786        assert_eq!(table.row_count(), 0);
787    }
788}