1use crate::data::advanced_csv_loader::AdvancedCsvLoader;
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
3use crate::data::stream_loader::collect_column_names;
4use crate::sql::generators::TableGenerator;
5use anyhow::{anyhow, Result};
6use regex::Regex;
7use serde_json::Value as JsonValue;
8use std::fs::File;
9use std::io::{BufRead, BufReader, Cursor, IsTerminal};
10use std::sync::{Arc, OnceLock};
11
12const MAX_LINES_PER_FILE: usize = 1_000_000;
16
17fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
19 match args.get(idx) {
20 Some(DataValue::String(s)) => Ok(s.clone()),
21 Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
22 Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
23 Some(v) => Err(anyhow!(
24 "{} argument {} must be a string, got {:?}",
25 name,
26 idx + 1,
27 v
28 )),
29 }
30}
31
32fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
34 match args.get(idx) {
35 Some(DataValue::String(s)) => Some(s.clone()),
36 Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
37 _ => None,
38 }
39}
40
41fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
47 let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
48 let reader = BufReader::new(file);
49
50 let mut out = Vec::new();
51 let mut truncated = false;
52
53 for (idx, line_result) in reader.lines().enumerate() {
54 let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
55 let line_num = (idx + 1) as i64;
56
57 if let Some(re) = match_regex {
58 if !re.is_match(&line) {
59 continue;
60 }
61 }
62
63 if out.len() >= MAX_LINES_PER_FILE {
64 truncated = true;
65 break;
66 }
67 out.push((line_num, line));
68 }
69
70 if truncated {
71 eprintln!(
72 "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
73 MAX_LINES_PER_FILE, path
74 );
75 }
76
77 Ok(out)
78}
79
80fn read_lines_from_path_or_stdin(
86 path: &str,
87 match_regex: Option<&Regex>,
88) -> Result<Vec<(i64, String)>> {
89 if path == "-" {
90 let cached = cached_stdin_lines()?;
91 return Ok(cached
92 .iter()
93 .filter(|(_, line)| match_regex.map_or(true, |re| re.is_match(line)))
94 .cloned()
95 .collect());
96 }
97 read_filtered_lines(path, match_regex)
98}
99
100pub struct ReadText;
105
106impl TableGenerator for ReadText {
107 fn name(&self) -> &str {
108 "READ_TEXT"
109 }
110
111 fn columns(&self) -> Vec<DataColumn> {
112 vec![DataColumn::new("line_num"), DataColumn::new("line")]
113 }
114
115 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
116 if args.is_empty() || args.len() > 2 {
117 return Err(anyhow!(
118 "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
119 ));
120 }
121
122 let path = require_string(&args, 0, "READ_TEXT")?;
123 let match_regex = optional_string(&args, 1)
124 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
125 .transpose()?;
126
127 let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
128
129 let mut table = DataTable::new("read_text");
130 table.add_column(DataColumn::new("line_num"));
131 table.add_column(DataColumn::new("line"));
132
133 for (line_num, line) in lines {
134 table
135 .add_row(DataRow::new(vec![
136 DataValue::Integer(line_num),
137 DataValue::String(line),
138 ]))
139 .map_err(|e| anyhow!(e))?;
140 }
141
142 Ok(Arc::new(table))
143 }
144
145 fn description(&self) -> &str {
146 "Read a text file line-by-line. Pass '-' as path to read from stdin. Optional second arg is a regex that filters lines at read time."
147 }
148
149 fn arg_count(&self) -> usize {
150 2
151 }
152}
153
154pub struct Grep;
159
160impl TableGenerator for Grep {
161 fn name(&self) -> &str {
162 "GREP"
163 }
164
165 fn columns(&self) -> Vec<DataColumn> {
166 vec![DataColumn::new("line_num"), DataColumn::new("line")]
167 }
168
169 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
170 if args.len() < 2 || args.len() > 3 {
171 return Err(anyhow!(
172 "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
173 ));
174 }
175
176 let path = require_string(&args, 0, "GREP")?;
177 let pattern_str = require_string(&args, 1, "GREP")?;
178 let pattern =
179 Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
180
181 let invert = match args.get(2) {
182 Some(DataValue::Boolean(b)) => *b,
183 Some(DataValue::Integer(n)) => *n != 0,
184 Some(DataValue::Null) | None => false,
185 Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
186 };
187
188 let lines = if invert {
191 let all = read_lines_from_path_or_stdin(&path, None)?;
192 all.into_iter()
193 .filter(|(_, line)| !pattern.is_match(line))
194 .collect::<Vec<_>>()
195 } else {
196 read_lines_from_path_or_stdin(&path, Some(&pattern))?
197 };
198
199 let mut table = DataTable::new("grep");
200 table.add_column(DataColumn::new("line_num"));
201 table.add_column(DataColumn::new("line"));
202
203 for (line_num, line) in lines {
204 table
205 .add_row(DataRow::new(vec![
206 DataValue::Integer(line_num),
207 DataValue::String(line),
208 ]))
209 .map_err(|e| anyhow!(e))?;
210 }
211
212 Ok(Arc::new(table))
213 }
214
215 fn description(&self) -> &str {
216 "Read only lines matching a regex (third arg inverts the match, like grep -v). Pass '-' as path to read from stdin."
217 }
218
219 fn arg_count(&self) -> usize {
220 3
221 }
222}
223
224pub struct ReadWords;
235
236impl TableGenerator for ReadWords {
237 fn name(&self) -> &str {
238 "READ_WORDS"
239 }
240
241 fn columns(&self) -> Vec<DataColumn> {
242 vec![
243 DataColumn::new("word_num"),
244 DataColumn::new("word"),
245 DataColumn::new("line_num"),
246 DataColumn::new("word_pos"),
247 ]
248 }
249
250 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
251 if args.is_empty() || args.len() > 3 {
252 return Err(anyhow!(
253 "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
254 ));
255 }
256
257 let path = require_string(&args, 0, "READ_WORDS")?;
258
259 let min_length: usize = match args.get(1) {
260 Some(DataValue::Integer(n)) => {
261 if *n < 1 {
262 return Err(anyhow!("READ_WORDS min_length must be >= 1"));
263 }
264 *n as usize
265 }
266 Some(DataValue::Float(f)) => *f as usize,
267 Some(DataValue::Null) | None => 1,
268 Some(v) => {
269 return Err(anyhow!(
270 "READ_WORDS min_length must be an integer, got {:?}",
271 v
272 ))
273 }
274 };
275
276 let case_option = optional_string(&args, 2);
277
278 let lines = read_lines_from_path_or_stdin(&path, None)?;
279
280 let mut table = DataTable::new("read_words");
281 table.add_column(DataColumn::new("word_num"));
282 table.add_column(DataColumn::new("word"));
283 table.add_column(DataColumn::new("line_num"));
284 table.add_column(DataColumn::new("word_pos"));
285
286 let mut word_num: i64 = 0;
287
288 for (line_num, line) in &lines {
289 let mut word_pos: i64 = 0;
290
291 for token in line.split(|c: char| !c.is_alphanumeric()) {
292 if token.is_empty() || token.len() < min_length {
293 continue;
294 }
295
296 word_pos += 1;
297 word_num += 1;
298
299 let word = match case_option.as_deref() {
300 Some("lower") | Some("lowercase") => token.to_lowercase(),
301 Some("upper") | Some("uppercase") => token.to_uppercase(),
302 _ => token.to_string(),
303 };
304
305 table
306 .add_row(DataRow::new(vec![
307 DataValue::Integer(word_num),
308 DataValue::String(word),
309 DataValue::Integer(*line_num),
310 DataValue::Integer(word_pos),
311 ]))
312 .map_err(|e| anyhow!(e))?;
313 }
314 }
315
316 Ok(Arc::new(table))
317 }
318
319 fn description(&self) -> &str {
320 "Read a text file and emit one row per word, with optional min length and case normalisation"
321 }
322
323 fn arg_count(&self) -> usize {
324 3
325 }
326}
327
328pub struct ReadJsonl;
336
337impl TableGenerator for ReadJsonl {
338 fn name(&self) -> &str {
339 "READ_JSONL"
340 }
341
342 fn columns(&self) -> Vec<DataColumn> {
343 vec![DataColumn::new("(inferred from JSON keys)")]
346 }
347
348 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
349 if args.is_empty() || args.len() > 2 {
350 return Err(anyhow!(
351 "READ_JSONL expects 1 or 2 arguments: (path [, match_regex])"
352 ));
353 }
354
355 let path = require_string(&args, 0, "READ_JSONL")?;
356 let match_regex = optional_string(&args, 1)
357 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
358 .transpose()?;
359
360 let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
361
362 let mut records: Vec<JsonValue> = Vec::with_capacity(lines.len());
363 for (line_num, line) in &lines {
364 let trimmed = line.trim();
365 if trimmed.is_empty() {
366 continue;
367 }
368 let value: JsonValue = serde_json::from_str(trimmed)
369 .map_err(|e| anyhow!("READ_JSONL parse error at line {}: {}", line_num, e))?;
370 records.push(value);
371 }
372
373 if records.is_empty() {
374 return Ok(Arc::new(DataTable::new("read_jsonl")));
375 }
376
377 let column_names = collect_column_names(&records, 100);
378 if column_names.is_empty() {
379 return Err(anyhow!(
380 "READ_JSONL: no JSON objects found (records must be objects, not arrays/scalars)"
381 ));
382 }
383
384 let mut string_rows: Vec<Vec<String>> = Vec::with_capacity(records.len());
387 for record in &records {
388 let obj = match record.as_object() {
389 Some(o) => o,
390 None => continue,
391 };
392 let mut row = Vec::with_capacity(column_names.len());
393 for col_name in &column_names {
394 let value = obj
395 .get(col_name)
396 .map(json_value_to_string)
397 .unwrap_or_default();
398 row.push(value);
399 }
400 string_rows.push(row);
401 }
402
403 let mut column_types: Vec<DataType> = vec![DataType::Null; column_names.len()];
404 let sample_size = string_rows.len().min(100);
405 for row in string_rows.iter().take(sample_size) {
406 for (col_idx, value) in row.iter().enumerate() {
407 if !value.is_empty() && value != "null" {
408 let inferred = DataType::infer_from_string(value);
409 column_types[col_idx] = column_types[col_idx].merge(&inferred);
410 }
411 }
412 }
413
414 let mut table = DataTable::new("read_jsonl");
415 for (name, dtype) in column_names.iter().zip(column_types.iter()) {
416 let mut col = DataColumn::new(name);
417 col.data_type = dtype.clone();
418 table.add_column(col);
419 }
420
421 for string_row in &string_rows {
422 let mut values = Vec::with_capacity(string_row.len());
423 for (col_idx, value) in string_row.iter().enumerate() {
424 let dv = if value.is_empty() || value == "null" {
425 DataValue::Null
426 } else {
427 DataValue::from_string(value, &column_types[col_idx])
428 };
429 values.push(dv);
430 }
431 table
432 .add_row(DataRow::new(values))
433 .map_err(|e| anyhow!(e))?;
434 }
435
436 Ok(Arc::new(table))
437 }
438
439 fn description(&self) -> &str {
440 "Read newline-delimited JSON (one object per line). Pass '-' as path to read JSONL from stdin. Optional second arg is a regex that filters lines at read time."
441 }
442
443 fn arg_count(&self) -> usize {
444 2
445 }
446}
447
448pub struct ReadCsv;
455
456impl TableGenerator for ReadCsv {
457 fn name(&self) -> &str {
458 "READ_CSV"
459 }
460
461 fn columns(&self) -> Vec<DataColumn> {
462 vec![DataColumn::new("(inferred from CSV header)")]
464 }
465
466 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
467 if args.len() != 1 {
468 return Err(anyhow!("READ_CSV expects 1 argument: (path)"));
469 }
470
471 let path = require_string(&args, 0, "READ_CSV")?;
472 let mut loader = AdvancedCsvLoader::new();
473
474 let table = if path == "-" {
475 let lines = cached_stdin_lines()?;
478 let mut buffer = String::with_capacity(lines.iter().map(|(_, l)| l.len() + 1).sum());
479 for (i, (_, line)) in lines.iter().enumerate() {
480 if i > 0 {
481 buffer.push('\n');
482 }
483 buffer.push_str(line);
484 }
485 let cursor = Cursor::new(buffer.into_bytes());
486 loader
487 .load_csv_from_reader(cursor, "read_csv", "<stdin>")
488 .map_err(|e| anyhow!("READ_CSV parse error reading stdin: {}", e))?
489 } else {
490 let file = File::open(&path)
491 .map_err(|e| anyhow!("READ_CSV failed to open '{}': {}", path, e))?;
492 loader
493 .load_csv_from_reader(file, "read_csv", &path)
494 .map_err(|e| anyhow!("READ_CSV parse error reading '{}': {}", path, e))?
495 };
496
497 Ok(Arc::new(table))
498 }
499
500 fn description(&self) -> &str {
501 "Read a CSV file (header row required). Pass '-' as path to read CSV from stdin."
502 }
503
504 fn arg_count(&self) -> usize {
505 1
506 }
507}
508
509fn json_value_to_string(value: &JsonValue) -> String {
510 match value {
511 JsonValue::Null => String::new(),
512 JsonValue::Bool(b) => b.to_string(),
513 JsonValue::Number(n) => n.to_string(),
514 JsonValue::String(s) => s.clone(),
515 JsonValue::Array(arr) => format!("{:?}", arr),
516 JsonValue::Object(obj) => format!("{:?}", obj),
517 }
518}
519
520pub struct ReadStdin;
531
532static STDIN_CACHE: OnceLock<Result<Vec<(i64, String)>, String>> = OnceLock::new();
533
534fn cached_stdin_lines() -> Result<&'static Vec<(i64, String)>> {
535 let cached = STDIN_CACHE.get_or_init(|| {
536 let stdin = std::io::stdin();
537 if stdin.is_terminal() {
538 return Err("READ_STDIN() requires data piped on stdin; got an interactive terminal. Try: cat file | sql-cli -q '...'".to_string());
539 }
540 let handle = stdin.lock();
541 let reader = BufReader::new(handle);
542 let mut out = Vec::new();
543 let mut truncated = false;
544 for (idx, line_result) in reader.lines().enumerate() {
545 let line = match line_result {
546 Ok(l) => l,
547 Err(e) => return Err(format!("Error reading stdin: {}", e)),
548 };
549 if out.len() >= MAX_LINES_PER_FILE {
550 truncated = true;
551 break;
552 }
553 out.push(((idx + 1) as i64, line));
554 }
555 if truncated {
556 eprintln!(
557 "WARNING: truncated to {} rows (max_lines_per_file cap) when reading stdin",
558 MAX_LINES_PER_FILE
559 );
560 }
561 Ok(out)
562 });
563 cached.as_ref().map_err(|e| anyhow!(e.clone()))
564}
565
566impl TableGenerator for ReadStdin {
567 fn name(&self) -> &str {
568 "READ_STDIN"
569 }
570
571 fn columns(&self) -> Vec<DataColumn> {
572 vec![DataColumn::new("line_num"), DataColumn::new("line")]
573 }
574
575 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
576 if args.len() > 1 {
577 return Err(anyhow!(
578 "READ_STDIN expects 0 or 1 arguments: ([match_regex])"
579 ));
580 }
581
582 let match_regex = optional_string(&args, 0)
583 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
584 .transpose()?;
585
586 let lines = cached_stdin_lines()?;
587
588 let mut table = DataTable::new("read_stdin");
589 table.add_column(DataColumn::new("line_num"));
590 table.add_column(DataColumn::new("line"));
591
592 for (line_num, line) in lines {
593 if let Some(ref re) = match_regex {
594 if !re.is_match(line) {
595 continue;
596 }
597 }
598 table
599 .add_row(DataRow::new(vec![
600 DataValue::Integer(*line_num),
601 DataValue::String(line.clone()),
602 ]))
603 .map_err(|e| anyhow!(e))?;
604 }
605
606 Ok(Arc::new(table))
607 }
608
609 fn description(&self) -> &str {
610 "Read lines piped on stdin (cached once per process). Optional regex filters lines at read time. Yields (line_num, line)."
611 }
612
613 fn arg_count(&self) -> usize {
614 1
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use std::io::Write;
622 use tempfile::NamedTempFile;
623
624 fn write_tmp(contents: &str) -> NamedTempFile {
625 let mut f = NamedTempFile::new().unwrap();
626 f.write_all(contents.as_bytes()).unwrap();
627 f
628 }
629
630 #[test]
631 fn test_read_text_returns_all_lines() {
632 let f = write_tmp("one\ntwo\nthree\n");
633 let table = ReadText
634 .generate(vec![DataValue::String(
635 f.path().to_string_lossy().to_string(),
636 )])
637 .unwrap();
638 assert_eq!(table.row_count(), 3);
639 assert_eq!(
640 table.get_value(0, 1).unwrap(),
641 &DataValue::String("one".to_string())
642 );
643 assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
644 }
645
646 #[test]
647 fn test_read_text_with_match_regex_filters_lines() {
648 let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
649 let table = ReadText
650 .generate(vec![
651 DataValue::String(f.path().to_string_lossy().to_string()),
652 DataValue::String("ERROR".to_string()),
653 ])
654 .unwrap();
655 assert_eq!(table.row_count(), 2);
656 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
658 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
659 }
660
661 #[test]
662 fn test_read_text_requires_path() {
663 assert!(ReadText.generate(vec![]).is_err());
664 }
665
666 #[test]
667 fn test_read_text_invalid_regex_errors_early() {
668 let f = write_tmp("hello\n");
669 let err = ReadText
670 .generate(vec![
671 DataValue::String(f.path().to_string_lossy().to_string()),
672 DataValue::String("(unclosed".to_string()),
673 ])
674 .unwrap_err();
675 assert!(err.to_string().contains("match_regex"));
676 }
677
678 #[test]
679 fn test_grep_matches_like_grep() {
680 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
681 let table = Grep
682 .generate(vec![
683 DataValue::String(f.path().to_string_lossy().to_string()),
684 DataValue::String("^ap".to_string()),
685 ])
686 .unwrap();
687 assert_eq!(table.row_count(), 2);
688 assert_eq!(
689 table.get_value(0, 1).unwrap(),
690 &DataValue::String("apple".to_string())
691 );
692 assert_eq!(
693 table.get_value(1, 1).unwrap(),
694 &DataValue::String("apricot".to_string())
695 );
696 }
697
698 #[test]
699 fn test_grep_invert_like_grep_v() {
700 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
701 let table = Grep
702 .generate(vec![
703 DataValue::String(f.path().to_string_lossy().to_string()),
704 DataValue::String("^ap".to_string()),
705 DataValue::Boolean(true),
706 ])
707 .unwrap();
708 assert_eq!(table.row_count(), 2);
709 assert_eq!(
710 table.get_value(0, 1).unwrap(),
711 &DataValue::String("banana".to_string())
712 );
713 }
714
715 #[test]
718 fn test_read_words_basic() {
719 let f = write_tmp("hello world\ngoodbye moon\n");
720 let table = ReadWords
721 .generate(vec![DataValue::String(
722 f.path().to_string_lossy().to_string(),
723 )])
724 .unwrap();
725 assert_eq!(table.row_count(), 4);
727 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); assert_eq!(
731 table.get_value(0, 1).unwrap(),
732 &DataValue::String("hello".to_string())
733 );
734 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
738 assert_eq!(
739 table.get_value(2, 1).unwrap(),
740 &DataValue::String("goodbye".to_string())
741 );
742 assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
743 assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
744 }
745
746 #[test]
747 fn test_read_words_min_length() {
748 let f = write_tmp("I am a big dog\n");
749 let table = ReadWords
750 .generate(vec![
751 DataValue::String(f.path().to_string_lossy().to_string()),
752 DataValue::Integer(3),
753 ])
754 .unwrap();
755 assert_eq!(table.row_count(), 2);
757 assert_eq!(
758 table.get_value(0, 1).unwrap(),
759 &DataValue::String("big".to_string())
760 );
761 assert_eq!(
762 table.get_value(1, 1).unwrap(),
763 &DataValue::String("dog".to_string())
764 );
765 }
766
767 #[test]
768 fn test_read_words_case_lower() {
769 let f = write_tmp("Hello World\n");
770 let table = ReadWords
771 .generate(vec![
772 DataValue::String(f.path().to_string_lossy().to_string()),
773 DataValue::Integer(1),
774 DataValue::String("lower".to_string()),
775 ])
776 .unwrap();
777 assert_eq!(
778 table.get_value(0, 1).unwrap(),
779 &DataValue::String("hello".to_string())
780 );
781 assert_eq!(
782 table.get_value(1, 1).unwrap(),
783 &DataValue::String("world".to_string())
784 );
785 }
786
787 #[test]
788 fn test_read_words_strips_punctuation() {
789 let f = write_tmp("hello, world! foo-bar.\n");
790 let table = ReadWords
791 .generate(vec![DataValue::String(
792 f.path().to_string_lossy().to_string(),
793 )])
794 .unwrap();
795 let words: Vec<String> = (0..table.row_count())
796 .map(|i| match table.get_value(i, 1).unwrap() {
797 DataValue::String(s) => s.clone(),
798 _ => panic!("expected string"),
799 })
800 .collect();
801 assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
802 }
803
804 #[test]
805 fn test_read_words_requires_path() {
806 assert!(ReadWords.generate(vec![]).is_err());
807 }
808
809 #[test]
810 fn test_read_words_empty_lines_skipped() {
811 let f = write_tmp("hello\n\n\nworld\n");
812 let table = ReadWords
813 .generate(vec![DataValue::String(
814 f.path().to_string_lossy().to_string(),
815 )])
816 .unwrap();
817 assert_eq!(table.row_count(), 2);
818 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
820 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
821 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
823 assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
824 }
825
826 fn col_index(table: &DataTable, name: &str) -> usize {
829 table
830 .columns
831 .iter()
832 .position(|c| c.name == name)
833 .unwrap_or_else(|| panic!("column '{}' not found", name))
834 }
835
836 #[test]
837 fn test_read_jsonl_basic() {
838 let f = write_tmp(
839 r#"{"id":1,"name":"alice","score":10}
840{"id":2,"name":"bob","score":20}
841{"id":3,"name":"carol","score":30}
842"#,
843 );
844 let table = ReadJsonl
845 .generate(vec![DataValue::String(
846 f.path().to_string_lossy().to_string(),
847 )])
848 .unwrap();
849 assert_eq!(table.row_count(), 3);
850 assert_eq!(table.column_count(), 3);
851
852 let id_col = col_index(&table, "id");
853 let name_col = col_index(&table, "name");
854 assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
855 assert_eq!(
856 table.get_value(2, name_col).unwrap(),
857 &DataValue::String("carol".to_string())
858 );
859 }
860
861 #[test]
862 fn test_read_jsonl_heterogeneous_schema_unioned() {
863 let f = write_tmp(
866 r#"{"id":1,"name":"alice"}
867{"id":2,"name":"bob","extra":"hello"}
868{"id":3,"name":"carol","other":42}
869"#,
870 );
871 let table = ReadJsonl
872 .generate(vec![DataValue::String(
873 f.path().to_string_lossy().to_string(),
874 )])
875 .unwrap();
876 assert_eq!(table.row_count(), 3);
877 assert_eq!(table.column_count(), 4);
878 let extra = col_index(&table, "extra");
879 let other = col_index(&table, "other");
880 assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
882 assert_eq!(table.get_value(0, other).unwrap(), &DataValue::Null);
883 assert_eq!(
885 table.get_value(1, extra).unwrap(),
886 &DataValue::String("hello".to_string())
887 );
888 assert_eq!(table.get_value(2, other).unwrap(), &DataValue::Integer(42));
890 }
891
892 #[test]
893 fn test_read_jsonl_blank_lines_skipped() {
894 let f = write_tmp(
895 r#"{"id":1}
896
897{"id":2}
898
899"#,
900 );
901 let table = ReadJsonl
902 .generate(vec![DataValue::String(
903 f.path().to_string_lossy().to_string(),
904 )])
905 .unwrap();
906 assert_eq!(table.row_count(), 2);
907 }
908
909 #[test]
910 fn test_read_jsonl_match_regex_pre_filters() {
911 let f = write_tmp(
912 r#"{"level":"INFO","msg":"boot"}
913{"level":"ERROR","msg":"disk"}
914{"level":"INFO","msg":"shutdown"}
915{"level":"ERROR","msg":"oom"}
916"#,
917 );
918 let table = ReadJsonl
919 .generate(vec![
920 DataValue::String(f.path().to_string_lossy().to_string()),
921 DataValue::String("ERROR".to_string()),
922 ])
923 .unwrap();
924 assert_eq!(table.row_count(), 2);
925 let msg = col_index(&table, "msg");
926 assert_eq!(
927 table.get_value(0, msg).unwrap(),
928 &DataValue::String("disk".to_string())
929 );
930 }
931
932 #[test]
933 fn test_read_jsonl_invalid_line_errors_with_line_number() {
934 let f = write_tmp(
935 r#"{"id":1}
936not json at all
937{"id":3}
938"#,
939 );
940 let err = ReadJsonl
941 .generate(vec![DataValue::String(
942 f.path().to_string_lossy().to_string(),
943 )])
944 .unwrap_err();
945 let msg = err.to_string();
946 assert!(
947 msg.contains("line 2"),
948 "error should cite line number: {}",
949 msg
950 );
951 }
952
953 #[test]
954 fn test_read_jsonl_requires_path() {
955 assert!(ReadJsonl.generate(vec![]).is_err());
956 }
957
958 #[test]
959 fn test_read_jsonl_empty_file_returns_empty_table() {
960 let f = write_tmp("");
961 let table = ReadJsonl
962 .generate(vec![DataValue::String(
963 f.path().to_string_lossy().to_string(),
964 )])
965 .unwrap();
966 assert_eq!(table.row_count(), 0);
967 }
968
969 #[test]
975 fn test_read_stdin_rejects_too_many_args() {
976 let err = ReadStdin
977 .generate(vec![
978 DataValue::String("foo".to_string()),
979 DataValue::String("bar".to_string()),
980 ])
981 .unwrap_err();
982 assert!(
983 err.to_string().contains("0 or 1 arguments"),
984 "should mention arg count: {}",
985 err
986 );
987 }
988
989 #[test]
990 fn test_read_stdin_rejects_invalid_regex() {
991 let err = ReadStdin
992 .generate(vec![DataValue::String("[invalid(regex".to_string())])
993 .unwrap_err();
994 assert!(
995 err.to_string().contains("Invalid match_regex"),
996 "should mention regex: {}",
997 err
998 );
999 }
1000}