1use crate::data::advanced_csv_loader::AdvancedCsvLoader;
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
3use crate::data::stream_loader::{
4 collect_column_names, detect_delimiter_from_path, parse_delimiter_arg as parse_delim_byte,
5 CsvReadOptions,
6};
7use crate::sql::generators::TableGenerator;
8use anyhow::{anyhow, Result};
9use regex::Regex;
10use serde_json::Value as JsonValue;
11use std::fs::File;
12use std::io::{BufRead, BufReader, Cursor, IsTerminal};
13use std::sync::{Arc, OnceLock};
14
15const MAX_LINES_PER_FILE: usize = 1_000_000;
19
20fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
22 match args.get(idx) {
23 Some(DataValue::String(s)) => Ok(s.clone()),
24 Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
25 Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
26 Some(v) => Err(anyhow!(
27 "{} argument {} must be a string, got {:?}",
28 name,
29 idx + 1,
30 v
31 )),
32 }
33}
34
35fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
37 match args.get(idx) {
38 Some(DataValue::String(s)) => Some(s.clone()),
39 Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
40 _ => None,
41 }
42}
43
44fn parse_delimiter_arg(s: &str, fn_name: &str) -> Result<u8> {
47 parse_delim_byte(s).map_err(|e| anyhow!("{}: {}", fn_name, e))
48}
49
50fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
56 let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
57 let reader = BufReader::new(file);
58
59 let mut out = Vec::new();
60 let mut truncated = false;
61
62 for (idx, line_result) in reader.lines().enumerate() {
63 let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
64 let line_num = (idx + 1) as i64;
65
66 if let Some(re) = match_regex {
67 if !re.is_match(&line) {
68 continue;
69 }
70 }
71
72 if out.len() >= MAX_LINES_PER_FILE {
73 truncated = true;
74 break;
75 }
76 out.push((line_num, line));
77 }
78
79 if truncated {
80 eprintln!(
81 "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
82 MAX_LINES_PER_FILE, path
83 );
84 }
85
86 Ok(out)
87}
88
89fn read_lines_from_path_or_stdin(
95 path: &str,
96 match_regex: Option<&Regex>,
97) -> Result<Vec<(i64, String)>> {
98 if path == "-" {
99 let cached = cached_stdin_lines()?;
100 return Ok(cached
101 .iter()
102 .filter(|(_, line)| match_regex.map_or(true, |re| re.is_match(line)))
103 .cloned()
104 .collect());
105 }
106 read_filtered_lines(path, match_regex)
107}
108
109pub struct ReadText;
114
115impl TableGenerator for ReadText {
116 fn name(&self) -> &str {
117 "READ_TEXT"
118 }
119
120 fn columns(&self) -> Vec<DataColumn> {
121 vec![DataColumn::new("line_num"), DataColumn::new("line")]
122 }
123
124 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
125 if args.is_empty() || args.len() > 2 {
126 return Err(anyhow!(
127 "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
128 ));
129 }
130
131 let path = require_string(&args, 0, "READ_TEXT")?;
132 let match_regex = optional_string(&args, 1)
133 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
134 .transpose()?;
135
136 let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
137
138 let mut table = DataTable::new("read_text");
139 table.add_column(DataColumn::new("line_num"));
140 table.add_column(DataColumn::new("line"));
141
142 for (line_num, line) in lines {
143 table
144 .add_row(DataRow::new(vec![
145 DataValue::Integer(line_num),
146 DataValue::String(line),
147 ]))
148 .map_err(|e| anyhow!(e))?;
149 }
150
151 Ok(Arc::new(table))
152 }
153
154 fn description(&self) -> &str {
155 "Read a text file line-by-line. Pass '-' as path to read from stdin. Optional second arg is a regex that filters lines at read time."
156 }
157
158 fn arg_count(&self) -> usize {
159 2
160 }
161}
162
163pub struct Grep;
168
169impl TableGenerator for Grep {
170 fn name(&self) -> &str {
171 "GREP"
172 }
173
174 fn columns(&self) -> Vec<DataColumn> {
175 vec![DataColumn::new("line_num"), DataColumn::new("line")]
176 }
177
178 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
179 if args.len() < 2 || args.len() > 3 {
180 return Err(anyhow!(
181 "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
182 ));
183 }
184
185 let path = require_string(&args, 0, "GREP")?;
186 let pattern_str = require_string(&args, 1, "GREP")?;
187 let pattern =
188 Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
189
190 let invert = match args.get(2) {
191 Some(DataValue::Boolean(b)) => *b,
192 Some(DataValue::Integer(n)) => *n != 0,
193 Some(DataValue::Null) | None => false,
194 Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
195 };
196
197 let lines = if invert {
200 let all = read_lines_from_path_or_stdin(&path, None)?;
201 all.into_iter()
202 .filter(|(_, line)| !pattern.is_match(line))
203 .collect::<Vec<_>>()
204 } else {
205 read_lines_from_path_or_stdin(&path, Some(&pattern))?
206 };
207
208 let mut table = DataTable::new("grep");
209 table.add_column(DataColumn::new("line_num"));
210 table.add_column(DataColumn::new("line"));
211
212 for (line_num, line) in lines {
213 table
214 .add_row(DataRow::new(vec![
215 DataValue::Integer(line_num),
216 DataValue::String(line),
217 ]))
218 .map_err(|e| anyhow!(e))?;
219 }
220
221 Ok(Arc::new(table))
222 }
223
224 fn description(&self) -> &str {
225 "Read only lines matching a regex (third arg inverts the match, like grep -v). Pass '-' as path to read from stdin."
226 }
227
228 fn arg_count(&self) -> usize {
229 3
230 }
231}
232
233pub struct ReadWords;
244
245impl TableGenerator for ReadWords {
246 fn name(&self) -> &str {
247 "READ_WORDS"
248 }
249
250 fn columns(&self) -> Vec<DataColumn> {
251 vec![
252 DataColumn::new("word_num"),
253 DataColumn::new("word"),
254 DataColumn::new("line_num"),
255 DataColumn::new("word_pos"),
256 ]
257 }
258
259 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
260 if args.is_empty() || args.len() > 3 {
261 return Err(anyhow!(
262 "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
263 ));
264 }
265
266 let path = require_string(&args, 0, "READ_WORDS")?;
267
268 let min_length: usize = match args.get(1) {
269 Some(DataValue::Integer(n)) => {
270 if *n < 1 {
271 return Err(anyhow!("READ_WORDS min_length must be >= 1"));
272 }
273 *n as usize
274 }
275 Some(DataValue::Float(f)) => *f as usize,
276 Some(DataValue::Null) | None => 1,
277 Some(v) => {
278 return Err(anyhow!(
279 "READ_WORDS min_length must be an integer, got {:?}",
280 v
281 ))
282 }
283 };
284
285 let case_option = optional_string(&args, 2);
286
287 let lines = read_lines_from_path_or_stdin(&path, None)?;
288
289 let mut table = DataTable::new("read_words");
290 table.add_column(DataColumn::new("word_num"));
291 table.add_column(DataColumn::new("word"));
292 table.add_column(DataColumn::new("line_num"));
293 table.add_column(DataColumn::new("word_pos"));
294
295 let mut word_num: i64 = 0;
296
297 for (line_num, line) in &lines {
298 let mut word_pos: i64 = 0;
299
300 for token in line.split(|c: char| !c.is_alphanumeric()) {
301 if token.is_empty() || token.len() < min_length {
302 continue;
303 }
304
305 word_pos += 1;
306 word_num += 1;
307
308 let word = match case_option.as_deref() {
309 Some("lower") | Some("lowercase") => token.to_lowercase(),
310 Some("upper") | Some("uppercase") => token.to_uppercase(),
311 _ => token.to_string(),
312 };
313
314 table
315 .add_row(DataRow::new(vec![
316 DataValue::Integer(word_num),
317 DataValue::String(word),
318 DataValue::Integer(*line_num),
319 DataValue::Integer(word_pos),
320 ]))
321 .map_err(|e| anyhow!(e))?;
322 }
323 }
324
325 Ok(Arc::new(table))
326 }
327
328 fn description(&self) -> &str {
329 "Read a text file and emit one row per word, with optional min length and case normalisation"
330 }
331
332 fn arg_count(&self) -> usize {
333 3
334 }
335}
336
337pub struct ReadJsonl;
345
346impl TableGenerator for ReadJsonl {
347 fn name(&self) -> &str {
348 "READ_JSONL"
349 }
350
351 fn columns(&self) -> Vec<DataColumn> {
352 vec![DataColumn::new("(inferred from JSON keys)")]
355 }
356
357 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
358 if args.is_empty() || args.len() > 2 {
359 return Err(anyhow!(
360 "READ_JSONL expects 1 or 2 arguments: (path [, match_regex])"
361 ));
362 }
363
364 let path = require_string(&args, 0, "READ_JSONL")?;
365 let match_regex = optional_string(&args, 1)
366 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
367 .transpose()?;
368
369 let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
370
371 let mut records: Vec<JsonValue> = Vec::with_capacity(lines.len());
372 for (line_num, line) in &lines {
373 let trimmed = line.trim();
374 if trimmed.is_empty() {
375 continue;
376 }
377 let value: JsonValue = serde_json::from_str(trimmed)
378 .map_err(|e| anyhow!("READ_JSONL parse error at line {}: {}", line_num, e))?;
379 records.push(value);
380 }
381
382 if records.is_empty() {
383 return Ok(Arc::new(DataTable::new("read_jsonl")));
384 }
385
386 let column_names = collect_column_names(&records, 100);
387 if column_names.is_empty() {
388 return Err(anyhow!(
389 "READ_JSONL: no JSON objects found (records must be objects, not arrays/scalars)"
390 ));
391 }
392
393 let mut string_rows: Vec<Vec<String>> = Vec::with_capacity(records.len());
396 for record in &records {
397 let obj = match record.as_object() {
398 Some(o) => o,
399 None => continue,
400 };
401 let mut row = Vec::with_capacity(column_names.len());
402 for col_name in &column_names {
403 let value = obj
404 .get(col_name)
405 .map(json_value_to_string)
406 .unwrap_or_default();
407 row.push(value);
408 }
409 string_rows.push(row);
410 }
411
412 let mut column_types: Vec<DataType> = vec![DataType::Null; column_names.len()];
413 let sample_size = string_rows.len().min(100);
414 for row in string_rows.iter().take(sample_size) {
415 for (col_idx, value) in row.iter().enumerate() {
416 if !value.is_empty() && value != "null" {
417 let inferred = DataType::infer_from_string(value);
418 column_types[col_idx] = column_types[col_idx].merge(&inferred);
419 }
420 }
421 }
422
423 let mut table = DataTable::new("read_jsonl");
424 for (name, dtype) in column_names.iter().zip(column_types.iter()) {
425 let mut col = DataColumn::new(name);
426 col.data_type = dtype.clone();
427 table.add_column(col);
428 }
429
430 for string_row in &string_rows {
431 let mut values = Vec::with_capacity(string_row.len());
432 for (col_idx, value) in string_row.iter().enumerate() {
433 let dv = if value.is_empty() || value == "null" {
434 DataValue::Null
435 } else {
436 DataValue::from_string(value, &column_types[col_idx])
437 };
438 values.push(dv);
439 }
440 table
441 .add_row(DataRow::new(values))
442 .map_err(|e| anyhow!(e))?;
443 }
444
445 Ok(Arc::new(table))
446 }
447
448 fn description(&self) -> &str {
449 "Read newline-delimited JSON (one object per line). Pass '-' as path to read JSONL from stdin. Optional second arg is a regex that filters lines at read time."
450 }
451
452 fn arg_count(&self) -> usize {
453 2
454 }
455}
456
457pub struct ReadCsv;
470
471impl TableGenerator for ReadCsv {
472 fn name(&self) -> &str {
473 "READ_CSV"
474 }
475
476 fn columns(&self) -> Vec<DataColumn> {
477 vec![DataColumn::new("(inferred from CSV header)")]
479 }
480
481 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
482 if args.is_empty() || args.len() > 2 {
483 return Err(anyhow!(
484 "READ_CSV expects 1 or 2 arguments: (path [, delimiter])"
485 ));
486 }
487
488 let path = require_string(&args, 0, "READ_CSV")?;
489
490 let delimiter = if let Some(s) = optional_string(&args, 1) {
493 parse_delimiter_arg(&s, "READ_CSV")?
494 } else if path == "-" {
495 b','
496 } else {
497 detect_delimiter_from_path(&path)
498 };
499
500 let opts = CsvReadOptions {
501 delimiter,
502 has_headers: true,
503 };
504
505 let mut loader = AdvancedCsvLoader::new();
506
507 let table = if path == "-" {
508 let lines = cached_stdin_lines()?;
511 let mut buffer = String::with_capacity(lines.iter().map(|(_, l)| l.len() + 1).sum());
512 for (i, (_, line)) in lines.iter().enumerate() {
513 if i > 0 {
514 buffer.push('\n');
515 }
516 buffer.push_str(line);
517 }
518 let cursor = Cursor::new(buffer.into_bytes());
519 loader
520 .load_csv_from_reader_with_opts(cursor, "read_csv", "<stdin>", &opts)
521 .map_err(|e| anyhow!("READ_CSV parse error reading stdin: {}", e))?
522 } else {
523 let file = File::open(&path)
524 .map_err(|e| anyhow!("READ_CSV failed to open '{}': {}", path, e))?;
525 loader
526 .load_csv_from_reader_with_opts(file, "read_csv", &path, &opts)
527 .map_err(|e| anyhow!("READ_CSV parse error reading '{}': {}", path, e))?
528 };
529
530 Ok(Arc::new(table))
531 }
532
533 fn description(&self) -> &str {
534 "Read a delimited text file (header row required). Pass '-' as path to read from stdin. \
535 Second arg overrides the delimiter; otherwise '.tsv' → tab, '.psv' → pipe, else comma."
536 }
537
538 fn arg_count(&self) -> usize {
539 2
540 }
541}
542
543fn json_value_to_string(value: &JsonValue) -> String {
544 match value {
545 JsonValue::Null => String::new(),
546 JsonValue::Bool(b) => b.to_string(),
547 JsonValue::Number(n) => n.to_string(),
548 JsonValue::String(s) => s.clone(),
549 JsonValue::Array(arr) => format!("{:?}", arr),
550 JsonValue::Object(obj) => format!("{:?}", obj),
551 }
552}
553
554pub struct ReadStdin;
565
566static STDIN_CACHE: OnceLock<Result<Vec<(i64, String)>, String>> = OnceLock::new();
567
568fn cached_stdin_lines() -> Result<&'static Vec<(i64, String)>> {
569 let cached = STDIN_CACHE.get_or_init(|| {
570 let stdin = std::io::stdin();
571 if stdin.is_terminal() {
572 return Err("READ_STDIN() requires data piped on stdin; got an interactive terminal. Try: cat file | sql-cli -q '...'".to_string());
573 }
574 let handle = stdin.lock();
575 let reader = BufReader::new(handle);
576 let mut out = Vec::new();
577 let mut truncated = false;
578 for (idx, line_result) in reader.lines().enumerate() {
579 let line = match line_result {
580 Ok(l) => l,
581 Err(e) => return Err(format!("Error reading stdin: {}", e)),
582 };
583 if out.len() >= MAX_LINES_PER_FILE {
584 truncated = true;
585 break;
586 }
587 out.push(((idx + 1) as i64, line));
588 }
589 if truncated {
590 eprintln!(
591 "WARNING: truncated to {} rows (max_lines_per_file cap) when reading stdin",
592 MAX_LINES_PER_FILE
593 );
594 }
595 Ok(out)
596 });
597 cached.as_ref().map_err(|e| anyhow!(e.clone()))
598}
599
600impl TableGenerator for ReadStdin {
601 fn name(&self) -> &str {
602 "READ_STDIN"
603 }
604
605 fn columns(&self) -> Vec<DataColumn> {
606 vec![DataColumn::new("line_num"), DataColumn::new("line")]
607 }
608
609 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
610 if args.len() > 1 {
611 return Err(anyhow!(
612 "READ_STDIN expects 0 or 1 arguments: ([match_regex])"
613 ));
614 }
615
616 let match_regex = optional_string(&args, 0)
617 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
618 .transpose()?;
619
620 let lines = cached_stdin_lines()?;
621
622 let mut table = DataTable::new("read_stdin");
623 table.add_column(DataColumn::new("line_num"));
624 table.add_column(DataColumn::new("line"));
625
626 for (line_num, line) in lines {
627 if let Some(ref re) = match_regex {
628 if !re.is_match(line) {
629 continue;
630 }
631 }
632 table
633 .add_row(DataRow::new(vec![
634 DataValue::Integer(*line_num),
635 DataValue::String(line.clone()),
636 ]))
637 .map_err(|e| anyhow!(e))?;
638 }
639
640 Ok(Arc::new(table))
641 }
642
643 fn description(&self) -> &str {
644 "Read lines piped on stdin (cached once per process). Optional regex filters lines at read time. Yields (line_num, line)."
645 }
646
647 fn arg_count(&self) -> usize {
648 1
649 }
650}
651
652#[cfg(test)]
653mod tests {
654 use super::*;
655 use std::io::Write;
656 use tempfile::NamedTempFile;
657
658 fn write_tmp(contents: &str) -> NamedTempFile {
659 let mut f = NamedTempFile::new().unwrap();
660 f.write_all(contents.as_bytes()).unwrap();
661 f
662 }
663
664 #[test]
665 fn test_read_text_returns_all_lines() {
666 let f = write_tmp("one\ntwo\nthree\n");
667 let table = ReadText
668 .generate(vec![DataValue::String(
669 f.path().to_string_lossy().to_string(),
670 )])
671 .unwrap();
672 assert_eq!(table.row_count(), 3);
673 assert_eq!(
674 table.get_value(0, 1).unwrap(),
675 &DataValue::String("one".to_string())
676 );
677 assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
678 }
679
680 #[test]
681 fn test_read_text_with_match_regex_filters_lines() {
682 let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
683 let table = ReadText
684 .generate(vec![
685 DataValue::String(f.path().to_string_lossy().to_string()),
686 DataValue::String("ERROR".to_string()),
687 ])
688 .unwrap();
689 assert_eq!(table.row_count(), 2);
690 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
692 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
693 }
694
695 #[test]
696 fn test_read_text_requires_path() {
697 assert!(ReadText.generate(vec![]).is_err());
698 }
699
700 #[test]
701 fn test_read_text_invalid_regex_errors_early() {
702 let f = write_tmp("hello\n");
703 let err = ReadText
704 .generate(vec![
705 DataValue::String(f.path().to_string_lossy().to_string()),
706 DataValue::String("(unclosed".to_string()),
707 ])
708 .unwrap_err();
709 assert!(err.to_string().contains("match_regex"));
710 }
711
712 #[test]
713 fn test_grep_matches_like_grep() {
714 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
715 let table = Grep
716 .generate(vec![
717 DataValue::String(f.path().to_string_lossy().to_string()),
718 DataValue::String("^ap".to_string()),
719 ])
720 .unwrap();
721 assert_eq!(table.row_count(), 2);
722 assert_eq!(
723 table.get_value(0, 1).unwrap(),
724 &DataValue::String("apple".to_string())
725 );
726 assert_eq!(
727 table.get_value(1, 1).unwrap(),
728 &DataValue::String("apricot".to_string())
729 );
730 }
731
732 #[test]
733 fn test_grep_invert_like_grep_v() {
734 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
735 let table = Grep
736 .generate(vec![
737 DataValue::String(f.path().to_string_lossy().to_string()),
738 DataValue::String("^ap".to_string()),
739 DataValue::Boolean(true),
740 ])
741 .unwrap();
742 assert_eq!(table.row_count(), 2);
743 assert_eq!(
744 table.get_value(0, 1).unwrap(),
745 &DataValue::String("banana".to_string())
746 );
747 }
748
749 #[test]
752 fn test_read_words_basic() {
753 let f = write_tmp("hello world\ngoodbye moon\n");
754 let table = ReadWords
755 .generate(vec![DataValue::String(
756 f.path().to_string_lossy().to_string(),
757 )])
758 .unwrap();
759 assert_eq!(table.row_count(), 4);
761 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); assert_eq!(
765 table.get_value(0, 1).unwrap(),
766 &DataValue::String("hello".to_string())
767 );
768 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
772 assert_eq!(
773 table.get_value(2, 1).unwrap(),
774 &DataValue::String("goodbye".to_string())
775 );
776 assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
777 assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
778 }
779
780 #[test]
781 fn test_read_words_min_length() {
782 let f = write_tmp("I am a big dog\n");
783 let table = ReadWords
784 .generate(vec![
785 DataValue::String(f.path().to_string_lossy().to_string()),
786 DataValue::Integer(3),
787 ])
788 .unwrap();
789 assert_eq!(table.row_count(), 2);
791 assert_eq!(
792 table.get_value(0, 1).unwrap(),
793 &DataValue::String("big".to_string())
794 );
795 assert_eq!(
796 table.get_value(1, 1).unwrap(),
797 &DataValue::String("dog".to_string())
798 );
799 }
800
801 #[test]
802 fn test_read_words_case_lower() {
803 let f = write_tmp("Hello World\n");
804 let table = ReadWords
805 .generate(vec![
806 DataValue::String(f.path().to_string_lossy().to_string()),
807 DataValue::Integer(1),
808 DataValue::String("lower".to_string()),
809 ])
810 .unwrap();
811 assert_eq!(
812 table.get_value(0, 1).unwrap(),
813 &DataValue::String("hello".to_string())
814 );
815 assert_eq!(
816 table.get_value(1, 1).unwrap(),
817 &DataValue::String("world".to_string())
818 );
819 }
820
821 #[test]
822 fn test_read_words_strips_punctuation() {
823 let f = write_tmp("hello, world! foo-bar.\n");
824 let table = ReadWords
825 .generate(vec![DataValue::String(
826 f.path().to_string_lossy().to_string(),
827 )])
828 .unwrap();
829 let words: Vec<String> = (0..table.row_count())
830 .map(|i| match table.get_value(i, 1).unwrap() {
831 DataValue::String(s) => s.clone(),
832 _ => panic!("expected string"),
833 })
834 .collect();
835 assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
836 }
837
838 #[test]
839 fn test_read_words_requires_path() {
840 assert!(ReadWords.generate(vec![]).is_err());
841 }
842
843 #[test]
844 fn test_read_words_empty_lines_skipped() {
845 let f = write_tmp("hello\n\n\nworld\n");
846 let table = ReadWords
847 .generate(vec![DataValue::String(
848 f.path().to_string_lossy().to_string(),
849 )])
850 .unwrap();
851 assert_eq!(table.row_count(), 2);
852 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
854 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
855 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
857 assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
858 }
859
860 fn col_index(table: &DataTable, name: &str) -> usize {
863 table
864 .columns
865 .iter()
866 .position(|c| c.name == name)
867 .unwrap_or_else(|| panic!("column '{}' not found", name))
868 }
869
870 #[test]
871 fn test_read_jsonl_basic() {
872 let f = write_tmp(
873 r#"{"id":1,"name":"alice","score":10}
874{"id":2,"name":"bob","score":20}
875{"id":3,"name":"carol","score":30}
876"#,
877 );
878 let table = ReadJsonl
879 .generate(vec![DataValue::String(
880 f.path().to_string_lossy().to_string(),
881 )])
882 .unwrap();
883 assert_eq!(table.row_count(), 3);
884 assert_eq!(table.column_count(), 3);
885
886 let id_col = col_index(&table, "id");
887 let name_col = col_index(&table, "name");
888 assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
889 assert_eq!(
890 table.get_value(2, name_col).unwrap(),
891 &DataValue::String("carol".to_string())
892 );
893 }
894
895 #[test]
896 fn test_read_jsonl_heterogeneous_schema_unioned() {
897 let f = write_tmp(
900 r#"{"id":1,"name":"alice"}
901{"id":2,"name":"bob","extra":"hello"}
902{"id":3,"name":"carol","other":42}
903"#,
904 );
905 let table = ReadJsonl
906 .generate(vec![DataValue::String(
907 f.path().to_string_lossy().to_string(),
908 )])
909 .unwrap();
910 assert_eq!(table.row_count(), 3);
911 assert_eq!(table.column_count(), 4);
912 let extra = col_index(&table, "extra");
913 let other = col_index(&table, "other");
914 assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
916 assert_eq!(table.get_value(0, other).unwrap(), &DataValue::Null);
917 assert_eq!(
919 table.get_value(1, extra).unwrap(),
920 &DataValue::String("hello".to_string())
921 );
922 assert_eq!(table.get_value(2, other).unwrap(), &DataValue::Integer(42));
924 }
925
926 #[test]
927 fn test_read_jsonl_blank_lines_skipped() {
928 let f = write_tmp(
929 r#"{"id":1}
930
931{"id":2}
932
933"#,
934 );
935 let table = ReadJsonl
936 .generate(vec![DataValue::String(
937 f.path().to_string_lossy().to_string(),
938 )])
939 .unwrap();
940 assert_eq!(table.row_count(), 2);
941 }
942
943 #[test]
944 fn test_read_jsonl_match_regex_pre_filters() {
945 let f = write_tmp(
946 r#"{"level":"INFO","msg":"boot"}
947{"level":"ERROR","msg":"disk"}
948{"level":"INFO","msg":"shutdown"}
949{"level":"ERROR","msg":"oom"}
950"#,
951 );
952 let table = ReadJsonl
953 .generate(vec![
954 DataValue::String(f.path().to_string_lossy().to_string()),
955 DataValue::String("ERROR".to_string()),
956 ])
957 .unwrap();
958 assert_eq!(table.row_count(), 2);
959 let msg = col_index(&table, "msg");
960 assert_eq!(
961 table.get_value(0, msg).unwrap(),
962 &DataValue::String("disk".to_string())
963 );
964 }
965
966 #[test]
967 fn test_read_jsonl_invalid_line_errors_with_line_number() {
968 let f = write_tmp(
969 r#"{"id":1}
970not json at all
971{"id":3}
972"#,
973 );
974 let err = ReadJsonl
975 .generate(vec![DataValue::String(
976 f.path().to_string_lossy().to_string(),
977 )])
978 .unwrap_err();
979 let msg = err.to_string();
980 assert!(
981 msg.contains("line 2"),
982 "error should cite line number: {}",
983 msg
984 );
985 }
986
987 #[test]
988 fn test_read_jsonl_requires_path() {
989 assert!(ReadJsonl.generate(vec![]).is_err());
990 }
991
992 #[test]
993 fn test_read_jsonl_empty_file_returns_empty_table() {
994 let f = write_tmp("");
995 let table = ReadJsonl
996 .generate(vec![DataValue::String(
997 f.path().to_string_lossy().to_string(),
998 )])
999 .unwrap();
1000 assert_eq!(table.row_count(), 0);
1001 }
1002
1003 #[test]
1009 fn test_read_stdin_rejects_too_many_args() {
1010 let err = ReadStdin
1011 .generate(vec![
1012 DataValue::String("foo".to_string()),
1013 DataValue::String("bar".to_string()),
1014 ])
1015 .unwrap_err();
1016 assert!(
1017 err.to_string().contains("0 or 1 arguments"),
1018 "should mention arg count: {}",
1019 err
1020 );
1021 }
1022
1023 #[test]
1024 fn test_read_stdin_rejects_invalid_regex() {
1025 let err = ReadStdin
1026 .generate(vec![DataValue::String("[invalid(regex".to_string())])
1027 .unwrap_err();
1028 assert!(
1029 err.to_string().contains("Invalid match_regex"),
1030 "should mention regex: {}",
1031 err
1032 );
1033 }
1034
1035 fn write_tmp_with_ext(ext: &str, contents: &str) -> tempfile::NamedTempFile {
1039 let f = tempfile::Builder::new()
1040 .suffix(&format!(".{}", ext))
1041 .tempfile()
1042 .unwrap();
1043 std::fs::write(f.path(), contents).unwrap();
1044 f
1045 }
1046
1047 #[test]
1048 fn test_read_csv_default_comma() {
1049 let f = write_tmp_with_ext("csv", "id,name\n1,alice\n2,bob\n");
1050 let table = ReadCsv
1051 .generate(vec![DataValue::String(
1052 f.path().to_string_lossy().to_string(),
1053 )])
1054 .unwrap();
1055 assert_eq!(table.column_count(), 2);
1056 assert_eq!(table.row_count(), 2);
1057 assert_eq!(table.columns[0].name, "id");
1058 assert_eq!(table.columns[1].name, "name");
1059 }
1060
1061 #[test]
1062 fn test_read_csv_psv_extension_auto_detects_pipe() {
1063 let f = write_tmp_with_ext("psv", "id|name|score\n1|alice|10\n2|bob|20\n");
1064 let table = ReadCsv
1065 .generate(vec![DataValue::String(
1066 f.path().to_string_lossy().to_string(),
1067 )])
1068 .unwrap();
1069 assert_eq!(table.column_count(), 3);
1070 assert_eq!(table.row_count(), 2);
1071 assert_eq!(table.columns[0].name, "id");
1072 assert_eq!(table.columns[2].name, "score");
1073 assert_eq!(
1074 table.metadata.get("delimiter").map(String::as_str),
1075 Some("|")
1076 );
1077 }
1078
1079 #[test]
1080 fn test_read_csv_tsv_extension_auto_detects_tab() {
1081 let f = write_tmp_with_ext("tsv", "id\tname\n1\talice\n2\tbob\n");
1082 let table = ReadCsv
1083 .generate(vec![DataValue::String(
1084 f.path().to_string_lossy().to_string(),
1085 )])
1086 .unwrap();
1087 assert_eq!(table.column_count(), 2);
1088 assert_eq!(table.row_count(), 2);
1089 assert_eq!(
1090 table.metadata.get("delimiter").map(String::as_str),
1091 Some("\\t")
1092 );
1093 }
1094
1095 #[test]
1096 fn test_read_csv_explicit_delimiter_overrides_extension() {
1097 let f = write_tmp_with_ext("psv", "id,name\n1,alice\n");
1101 let table = ReadCsv
1102 .generate(vec![
1103 DataValue::String(f.path().to_string_lossy().to_string()),
1104 DataValue::String(",".to_string()),
1105 ])
1106 .unwrap();
1107 assert_eq!(table.column_count(), 2);
1108 }
1109
1110 #[test]
1111 fn test_read_csv_explicit_pipe_on_unrecognised_extension() {
1112 let f = write_tmp_with_ext("dat", "a|b\n1|2\n");
1113 let table = ReadCsv
1114 .generate(vec![
1115 DataValue::String(f.path().to_string_lossy().to_string()),
1116 DataValue::String("|".to_string()),
1117 ])
1118 .unwrap();
1119 assert_eq!(table.column_count(), 2);
1120 }
1121
1122 #[test]
1123 fn test_read_csv_backslash_t_parses_as_tab() {
1124 let f = write_tmp_with_ext("dat", "a\tb\n1\t2\n");
1125 let table = ReadCsv
1126 .generate(vec![
1127 DataValue::String(f.path().to_string_lossy().to_string()),
1128 DataValue::String("\\t".to_string()),
1129 ])
1130 .unwrap();
1131 assert_eq!(table.column_count(), 2);
1132 assert_eq!(
1133 table.metadata.get("delimiter").map(String::as_str),
1134 Some("\\t")
1135 );
1136 }
1137
1138 #[test]
1139 fn test_read_csv_rejects_multi_char_delimiter() {
1140 let f = write_tmp_with_ext("dat", "a,b\n1,2\n");
1141 let err = ReadCsv
1142 .generate(vec![
1143 DataValue::String(f.path().to_string_lossy().to_string()),
1144 DataValue::String("||".to_string()),
1145 ])
1146 .unwrap_err();
1147 let msg = err.to_string();
1148 assert!(
1149 msg.contains("single ASCII character"),
1150 "should reject multi-char delimiter: {}",
1151 msg
1152 );
1153 }
1154
1155 #[test]
1156 fn test_read_csv_rejects_too_many_args() {
1157 let err = ReadCsv
1158 .generate(vec![
1159 DataValue::String("a".to_string()),
1160 DataValue::String("b".to_string()),
1161 DataValue::String("c".to_string()),
1162 ])
1163 .unwrap_err();
1164 assert!(err.to_string().contains("1 or 2 arguments"));
1165 }
1166
1167 #[test]
1168 fn test_read_csv_requires_path() {
1169 assert!(ReadCsv.generate(vec![]).is_err());
1170 }
1171}