1use crate::data::advanced_csv_loader::AdvancedCsvLoader;
2use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
3use crate::data::stream_loader::{
4 collect_column_names, detect_delimiter_from_path, navigate_json_path,
5 parse_delimiter_arg as parse_delim_byte, parse_json_records, CsvReadOptions,
6};
7use crate::sql::generators::TableGenerator;
8use anyhow::{anyhow, Result};
9use regex::Regex;
10use serde_json::Value as JsonValue;
11use std::fs::File;
12use std::io::{BufRead, BufReader, Cursor, IsTerminal};
13use std::sync::{Arc, OnceLock};
14
15const MAX_LINES_PER_FILE: usize = 1_000_000;
19
20fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
22 match args.get(idx) {
23 Some(DataValue::String(s)) => Ok(s.clone()),
24 Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
25 Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
26 Some(v) => Err(anyhow!(
27 "{} argument {} must be a string, got {:?}",
28 name,
29 idx + 1,
30 v
31 )),
32 }
33}
34
35fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
37 match args.get(idx) {
38 Some(DataValue::String(s)) => Some(s.clone()),
39 Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
40 _ => None,
41 }
42}
43
44fn parse_delimiter_arg(s: &str, fn_name: &str) -> Result<u8> {
47 parse_delim_byte(s).map_err(|e| anyhow!("{}: {}", fn_name, e))
48}
49
50fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
56 let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
57 let reader = BufReader::new(file);
58
59 let mut out = Vec::new();
60 let mut truncated = false;
61
62 for (idx, line_result) in reader.lines().enumerate() {
63 let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
64 let line_num = (idx + 1) as i64;
65
66 if let Some(re) = match_regex {
67 if !re.is_match(&line) {
68 continue;
69 }
70 }
71
72 if out.len() >= MAX_LINES_PER_FILE {
73 truncated = true;
74 break;
75 }
76 out.push((line_num, line));
77 }
78
79 if truncated {
80 eprintln!(
81 "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
82 MAX_LINES_PER_FILE, path
83 );
84 }
85
86 Ok(out)
87}
88
89fn read_lines_from_path_or_stdin(
95 path: &str,
96 match_regex: Option<&Regex>,
97) -> Result<Vec<(i64, String)>> {
98 if path == "-" {
99 let cached = cached_stdin_lines()?;
100 return Ok(cached
101 .iter()
102 .filter(|(_, line)| match_regex.map_or(true, |re| re.is_match(line)))
103 .cloned()
104 .collect());
105 }
106 read_filtered_lines(path, match_regex)
107}
108
109pub struct ReadText;
114
115impl TableGenerator for ReadText {
116 fn name(&self) -> &str {
117 "READ_TEXT"
118 }
119
120 fn columns(&self) -> Vec<DataColumn> {
121 vec![DataColumn::new("line_num"), DataColumn::new("line")]
122 }
123
124 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
125 if args.is_empty() || args.len() > 2 {
126 return Err(anyhow!(
127 "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
128 ));
129 }
130
131 let path = require_string(&args, 0, "READ_TEXT")?;
132 let match_regex = optional_string(&args, 1)
133 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
134 .transpose()?;
135
136 let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
137
138 let mut table = DataTable::new("read_text");
139 table.add_column(DataColumn::new("line_num"));
140 table.add_column(DataColumn::new("line"));
141
142 for (line_num, line) in lines {
143 table
144 .add_row(DataRow::new(vec![
145 DataValue::Integer(line_num),
146 DataValue::String(line),
147 ]))
148 .map_err(|e| anyhow!(e))?;
149 }
150
151 Ok(Arc::new(table))
152 }
153
154 fn description(&self) -> &str {
155 "Read a text file line-by-line. Pass '-' as path to read from stdin. Optional second arg is a regex that filters lines at read time."
156 }
157
158 fn arg_count(&self) -> usize {
159 2
160 }
161}
162
163pub struct Grep;
168
169impl TableGenerator for Grep {
170 fn name(&self) -> &str {
171 "GREP"
172 }
173
174 fn columns(&self) -> Vec<DataColumn> {
175 vec![DataColumn::new("line_num"), DataColumn::new("line")]
176 }
177
178 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
179 if args.len() < 2 || args.len() > 3 {
180 return Err(anyhow!(
181 "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
182 ));
183 }
184
185 let path = require_string(&args, 0, "GREP")?;
186 let pattern_str = require_string(&args, 1, "GREP")?;
187 let pattern =
188 Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
189
190 let invert = match args.get(2) {
191 Some(DataValue::Boolean(b)) => *b,
192 Some(DataValue::Integer(n)) => *n != 0,
193 Some(DataValue::Null) | None => false,
194 Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
195 };
196
197 let lines = if invert {
200 let all = read_lines_from_path_or_stdin(&path, None)?;
201 all.into_iter()
202 .filter(|(_, line)| !pattern.is_match(line))
203 .collect::<Vec<_>>()
204 } else {
205 read_lines_from_path_or_stdin(&path, Some(&pattern))?
206 };
207
208 let mut table = DataTable::new("grep");
209 table.add_column(DataColumn::new("line_num"));
210 table.add_column(DataColumn::new("line"));
211
212 for (line_num, line) in lines {
213 table
214 .add_row(DataRow::new(vec![
215 DataValue::Integer(line_num),
216 DataValue::String(line),
217 ]))
218 .map_err(|e| anyhow!(e))?;
219 }
220
221 Ok(Arc::new(table))
222 }
223
224 fn description(&self) -> &str {
225 "Read only lines matching a regex (third arg inverts the match, like grep -v). Pass '-' as path to read from stdin."
226 }
227
228 fn arg_count(&self) -> usize {
229 3
230 }
231}
232
233pub struct ReadWords;
244
245impl TableGenerator for ReadWords {
246 fn name(&self) -> &str {
247 "READ_WORDS"
248 }
249
250 fn columns(&self) -> Vec<DataColumn> {
251 vec![
252 DataColumn::new("word_num"),
253 DataColumn::new("word"),
254 DataColumn::new("line_num"),
255 DataColumn::new("word_pos"),
256 ]
257 }
258
259 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
260 if args.is_empty() || args.len() > 3 {
261 return Err(anyhow!(
262 "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
263 ));
264 }
265
266 let path = require_string(&args, 0, "READ_WORDS")?;
267
268 let min_length: usize = match args.get(1) {
269 Some(DataValue::Integer(n)) => {
270 if *n < 1 {
271 return Err(anyhow!("READ_WORDS min_length must be >= 1"));
272 }
273 *n as usize
274 }
275 Some(DataValue::Float(f)) => *f as usize,
276 Some(DataValue::Null) | None => 1,
277 Some(v) => {
278 return Err(anyhow!(
279 "READ_WORDS min_length must be an integer, got {:?}",
280 v
281 ))
282 }
283 };
284
285 let case_option = optional_string(&args, 2);
286
287 let lines = read_lines_from_path_or_stdin(&path, None)?;
288
289 let mut table = DataTable::new("read_words");
290 table.add_column(DataColumn::new("word_num"));
291 table.add_column(DataColumn::new("word"));
292 table.add_column(DataColumn::new("line_num"));
293 table.add_column(DataColumn::new("word_pos"));
294
295 let mut word_num: i64 = 0;
296
297 for (line_num, line) in &lines {
298 let mut word_pos: i64 = 0;
299
300 for token in line.split(|c: char| !c.is_alphanumeric()) {
301 if token.is_empty() || token.len() < min_length {
302 continue;
303 }
304
305 word_pos += 1;
306 word_num += 1;
307
308 let word = match case_option.as_deref() {
309 Some("lower") | Some("lowercase") => token.to_lowercase(),
310 Some("upper") | Some("uppercase") => token.to_uppercase(),
311 _ => token.to_string(),
312 };
313
314 table
315 .add_row(DataRow::new(vec![
316 DataValue::Integer(word_num),
317 DataValue::String(word),
318 DataValue::Integer(*line_num),
319 DataValue::Integer(word_pos),
320 ]))
321 .map_err(|e| anyhow!(e))?;
322 }
323 }
324
325 Ok(Arc::new(table))
326 }
327
328 fn description(&self) -> &str {
329 "Read a text file and emit one row per word, with optional min length and case normalisation"
330 }
331
332 fn arg_count(&self) -> usize {
333 3
334 }
335}
336
337pub struct ReadJsonl;
345
346impl TableGenerator for ReadJsonl {
347 fn name(&self) -> &str {
348 "READ_JSONL"
349 }
350
351 fn columns(&self) -> Vec<DataColumn> {
352 vec![DataColumn::new("(inferred from JSON keys)")]
355 }
356
357 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
358 if args.is_empty() || args.len() > 2 {
359 return Err(anyhow!(
360 "READ_JSONL expects 1 or 2 arguments: (path [, match_regex])"
361 ));
362 }
363
364 let path = require_string(&args, 0, "READ_JSONL")?;
365 let match_regex = optional_string(&args, 1)
366 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
367 .transpose()?;
368
369 let lines = read_lines_from_path_or_stdin(&path, match_regex.as_ref())?;
370
371 let mut records: Vec<JsonValue> = Vec::with_capacity(lines.len());
372 for (line_num, line) in &lines {
373 let trimmed = line.trim();
374 if trimmed.is_empty() {
375 continue;
376 }
377 let value: JsonValue = serde_json::from_str(trimmed)
378 .map_err(|e| anyhow!("READ_JSONL parse error at line {}: {}", line_num, e))?;
379 records.push(value);
380 }
381
382 let table = json_records_to_table(records, "read_jsonl", "READ_JSONL")?;
383 Ok(Arc::new(table))
384 }
385
386 fn description(&self) -> &str {
387 "Read newline-delimited JSON (one object per line). Pass '-' as path to read JSONL from stdin. Optional second arg is a regex that filters lines at read time."
388 }
389
390 fn arg_count(&self) -> usize {
391 2
392 }
393}
394
395pub struct ReadCsv;
408
409impl TableGenerator for ReadCsv {
410 fn name(&self) -> &str {
411 "READ_CSV"
412 }
413
414 fn columns(&self) -> Vec<DataColumn> {
415 vec![DataColumn::new("(inferred from CSV header)")]
417 }
418
419 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
420 if args.is_empty() || args.len() > 2 {
421 return Err(anyhow!(
422 "READ_CSV expects 1 or 2 arguments: (path [, delimiter])"
423 ));
424 }
425
426 let path = require_string(&args, 0, "READ_CSV")?;
427
428 let delimiter = if let Some(s) = optional_string(&args, 1) {
431 parse_delimiter_arg(&s, "READ_CSV")?
432 } else if path == "-" {
433 b','
434 } else {
435 detect_delimiter_from_path(&path)
436 };
437
438 let opts = CsvReadOptions {
439 delimiter,
440 has_headers: true,
441 };
442
443 let mut loader = AdvancedCsvLoader::new();
444
445 let table = if path == "-" {
446 let lines = cached_stdin_lines()?;
449 let mut buffer = String::with_capacity(lines.iter().map(|(_, l)| l.len() + 1).sum());
450 for (i, (_, line)) in lines.iter().enumerate() {
451 if i > 0 {
452 buffer.push('\n');
453 }
454 buffer.push_str(line);
455 }
456 let cursor = Cursor::new(buffer.into_bytes());
457 loader
458 .load_csv_from_reader_with_opts(cursor, "read_csv", "<stdin>", &opts)
459 .map_err(|e| anyhow!("READ_CSV parse error reading stdin: {}", e))?
460 } else {
461 let file = File::open(&path)
462 .map_err(|e| anyhow!("READ_CSV failed to open '{}': {}", path, e))?;
463 loader
464 .load_csv_from_reader_with_opts(file, "read_csv", &path, &opts)
465 .map_err(|e| anyhow!("READ_CSV parse error reading '{}': {}", path, e))?
466 };
467
468 Ok(Arc::new(table))
469 }
470
471 fn description(&self) -> &str {
472 "Read a delimited text file (header row required). Pass '-' as path to read from stdin. \
473 Second arg overrides the delimiter; otherwise '.tsv' → tab, '.psv' → pipe, else comma."
474 }
475
476 fn arg_count(&self) -> usize {
477 2
478 }
479}
480
481fn json_value_to_string(value: &JsonValue) -> String {
482 match value {
483 JsonValue::Null => String::new(),
484 JsonValue::Bool(b) => b.to_string(),
485 JsonValue::Number(n) => n.to_string(),
486 JsonValue::String(s) => s.clone(),
487 JsonValue::Array(arr) => format!("{:?}", arr),
488 JsonValue::Object(obj) => format!("{:?}", obj),
489 }
490}
491
492fn json_records_to_table(
500 records: Vec<JsonValue>,
501 table_name: &str,
502 func: &str,
503) -> Result<DataTable> {
504 if records.is_empty() {
505 return Ok(DataTable::new(table_name));
506 }
507
508 let column_names = collect_column_names(&records, 100);
509 if column_names.is_empty() {
510 return Err(anyhow!(
511 "{}: no JSON objects found (records must be objects, not arrays/scalars)",
512 func
513 ));
514 }
515
516 let mut string_rows: Vec<Vec<String>> = Vec::with_capacity(records.len());
519 for record in &records {
520 let obj = match record.as_object() {
521 Some(o) => o,
522 None => continue,
523 };
524 let mut row = Vec::with_capacity(column_names.len());
525 for col_name in &column_names {
526 let value = obj
527 .get(col_name)
528 .map(json_value_to_string)
529 .unwrap_or_default();
530 row.push(value);
531 }
532 string_rows.push(row);
533 }
534
535 let mut column_types: Vec<DataType> = vec![DataType::Null; column_names.len()];
536 let sample_size = string_rows.len().min(100);
537 for row in string_rows.iter().take(sample_size) {
538 for (col_idx, value) in row.iter().enumerate() {
539 if !value.is_empty() && value != "null" {
540 let inferred = DataType::infer_from_string(value);
541 column_types[col_idx] = column_types[col_idx].merge(&inferred);
542 }
543 }
544 }
545
546 let mut table = DataTable::new(table_name);
547 for (name, dtype) in column_names.iter().zip(column_types.iter()) {
548 let mut col = DataColumn::new(name);
549 col.data_type = dtype.clone();
550 table.add_column(col);
551 }
552
553 for string_row in &string_rows {
554 let mut values = Vec::with_capacity(string_row.len());
555 for (col_idx, value) in string_row.iter().enumerate() {
556 let dv = if value.is_empty() || value == "null" {
557 DataValue::Null
558 } else {
559 DataValue::from_string(value, &column_types[col_idx])
560 };
561 values.push(dv);
562 }
563 table
564 .add_row(DataRow::new(values))
565 .map_err(|e| anyhow!(e))?;
566 }
567
568 Ok(table)
569}
570
571pub struct ReadJson;
588
589impl TableGenerator for ReadJson {
590 fn name(&self) -> &str {
591 "READ_JSON"
592 }
593
594 fn columns(&self) -> Vec<DataColumn> {
595 vec![DataColumn::new("(inferred from JSON keys)")]
598 }
599
600 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
601 if args.is_empty() || args.len() > 2 {
602 return Err(anyhow!(
603 "READ_JSON expects 1 or 2 arguments: (path [, json_path])"
604 ));
605 }
606
607 let path = require_string(&args, 0, "READ_JSON")?;
608 let json_path = optional_string(&args, 1);
609
610 let content = if path == "-" {
611 let lines = cached_stdin_lines()?;
614 let mut buffer = String::with_capacity(lines.iter().map(|(_, l)| l.len() + 1).sum());
615 for (i, (_, line)) in lines.iter().enumerate() {
616 if i > 0 {
617 buffer.push('\n');
618 }
619 buffer.push_str(line);
620 }
621 buffer
622 } else {
623 std::fs::read_to_string(&path)
624 .map_err(|e| anyhow!("READ_JSON failed to read '{}': {}", path, e))?
625 };
626
627 let records = match json_path {
628 None => {
630 parse_json_records(&content).map_err(|e| anyhow!("READ_JSON parse error: {}", e))?
631 }
632 Some(path) => {
636 let value: JsonValue = serde_json::from_str(&content)
637 .map_err(|e| anyhow!("READ_JSON parse error: {}", e))?;
638 let extracted = navigate_json_path(&value, &path)
639 .map_err(|e| anyhow!("READ_JSON json_path '{}': {}", path, e))?;
640 match extracted {
641 JsonValue::Array(arr) => arr,
642 other => vec![other],
643 }
644 }
645 };
646
647 let table = json_records_to_table(records, "read_json", "READ_JSON")?;
648 Ok(Arc::new(table))
649 }
650
651 fn description(&self) -> &str {
652 "Read a whole JSON document — a JSON array of objects (possibly pretty-printed) or newline-delimited JSON — and emit one row per object. Pass '-' as path to read from stdin. Optional second arg is a JSON path (dotted, with '[]' array projection — same as the WEB CTE JSON_PATH) that drills into a nested document to locate the rows, e.g. READ_JSON('-', 'projects.project'). Unlike READ_JSONL, the input may span multiple lines per record."
653 }
654
655 fn arg_count(&self) -> usize {
656 2
657 }
658}
659
660pub struct ReadStdin;
671
672static STDIN_CACHE: OnceLock<Result<Vec<(i64, String)>, String>> = OnceLock::new();
673
674fn cached_stdin_lines() -> Result<&'static Vec<(i64, String)>> {
675 let cached = STDIN_CACHE.get_or_init(|| {
676 let stdin = std::io::stdin();
677 if stdin.is_terminal() {
678 return Err("READ_STDIN() requires data piped on stdin; got an interactive terminal. Try: cat file | sql-cli -q '...'".to_string());
679 }
680 let handle = stdin.lock();
681 let reader = BufReader::new(handle);
682 let mut out = Vec::new();
683 let mut truncated = false;
684 for (idx, line_result) in reader.lines().enumerate() {
685 let line = match line_result {
686 Ok(l) => l,
687 Err(e) => return Err(format!("Error reading stdin: {}", e)),
688 };
689 if out.len() >= MAX_LINES_PER_FILE {
690 truncated = true;
691 break;
692 }
693 out.push(((idx + 1) as i64, line));
694 }
695 if truncated {
696 eprintln!(
697 "WARNING: truncated to {} rows (max_lines_per_file cap) when reading stdin",
698 MAX_LINES_PER_FILE
699 );
700 }
701 Ok(out)
702 });
703 cached.as_ref().map_err(|e| anyhow!(e.clone()))
704}
705
706impl TableGenerator for ReadStdin {
707 fn name(&self) -> &str {
708 "READ_STDIN"
709 }
710
711 fn columns(&self) -> Vec<DataColumn> {
712 vec![DataColumn::new("line_num"), DataColumn::new("line")]
713 }
714
715 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
716 if args.len() > 1 {
717 return Err(anyhow!(
718 "READ_STDIN expects 0 or 1 arguments: ([match_regex])"
719 ));
720 }
721
722 let match_regex = optional_string(&args, 0)
723 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
724 .transpose()?;
725
726 let lines = cached_stdin_lines()?;
727
728 let mut table = DataTable::new("read_stdin");
729 table.add_column(DataColumn::new("line_num"));
730 table.add_column(DataColumn::new("line"));
731
732 for (line_num, line) in lines {
733 if let Some(ref re) = match_regex {
734 if !re.is_match(line) {
735 continue;
736 }
737 }
738 table
739 .add_row(DataRow::new(vec![
740 DataValue::Integer(*line_num),
741 DataValue::String(line.clone()),
742 ]))
743 .map_err(|e| anyhow!(e))?;
744 }
745
746 Ok(Arc::new(table))
747 }
748
749 fn description(&self) -> &str {
750 "Read lines piped on stdin (cached once per process). Optional regex filters lines at read time. Yields (line_num, line)."
751 }
752
753 fn arg_count(&self) -> usize {
754 1
755 }
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761 use std::io::Write;
762 use tempfile::NamedTempFile;
763
764 fn write_tmp(contents: &str) -> NamedTempFile {
765 let mut f = NamedTempFile::new().unwrap();
766 f.write_all(contents.as_bytes()).unwrap();
767 f
768 }
769
770 #[test]
771 fn test_read_text_returns_all_lines() {
772 let f = write_tmp("one\ntwo\nthree\n");
773 let table = ReadText
774 .generate(vec![DataValue::String(
775 f.path().to_string_lossy().to_string(),
776 )])
777 .unwrap();
778 assert_eq!(table.row_count(), 3);
779 assert_eq!(
780 table.get_value(0, 1).unwrap(),
781 &DataValue::String("one".to_string())
782 );
783 assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
784 }
785
786 #[test]
787 fn test_read_text_with_match_regex_filters_lines() {
788 let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
789 let table = ReadText
790 .generate(vec![
791 DataValue::String(f.path().to_string_lossy().to_string()),
792 DataValue::String("ERROR".to_string()),
793 ])
794 .unwrap();
795 assert_eq!(table.row_count(), 2);
796 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
798 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
799 }
800
801 #[test]
802 fn test_read_text_requires_path() {
803 assert!(ReadText.generate(vec![]).is_err());
804 }
805
806 #[test]
807 fn test_read_text_invalid_regex_errors_early() {
808 let f = write_tmp("hello\n");
809 let err = ReadText
810 .generate(vec![
811 DataValue::String(f.path().to_string_lossy().to_string()),
812 DataValue::String("(unclosed".to_string()),
813 ])
814 .unwrap_err();
815 assert!(err.to_string().contains("match_regex"));
816 }
817
818 #[test]
819 fn test_grep_matches_like_grep() {
820 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
821 let table = Grep
822 .generate(vec![
823 DataValue::String(f.path().to_string_lossy().to_string()),
824 DataValue::String("^ap".to_string()),
825 ])
826 .unwrap();
827 assert_eq!(table.row_count(), 2);
828 assert_eq!(
829 table.get_value(0, 1).unwrap(),
830 &DataValue::String("apple".to_string())
831 );
832 assert_eq!(
833 table.get_value(1, 1).unwrap(),
834 &DataValue::String("apricot".to_string())
835 );
836 }
837
838 #[test]
839 fn test_grep_invert_like_grep_v() {
840 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
841 let table = Grep
842 .generate(vec![
843 DataValue::String(f.path().to_string_lossy().to_string()),
844 DataValue::String("^ap".to_string()),
845 DataValue::Boolean(true),
846 ])
847 .unwrap();
848 assert_eq!(table.row_count(), 2);
849 assert_eq!(
850 table.get_value(0, 1).unwrap(),
851 &DataValue::String("banana".to_string())
852 );
853 }
854
855 #[test]
858 fn test_read_words_basic() {
859 let f = write_tmp("hello world\ngoodbye moon\n");
860 let table = ReadWords
861 .generate(vec![DataValue::String(
862 f.path().to_string_lossy().to_string(),
863 )])
864 .unwrap();
865 assert_eq!(table.row_count(), 4);
867 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); assert_eq!(
871 table.get_value(0, 1).unwrap(),
872 &DataValue::String("hello".to_string())
873 );
874 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
878 assert_eq!(
879 table.get_value(2, 1).unwrap(),
880 &DataValue::String("goodbye".to_string())
881 );
882 assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
883 assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
884 }
885
886 #[test]
887 fn test_read_words_min_length() {
888 let f = write_tmp("I am a big dog\n");
889 let table = ReadWords
890 .generate(vec![
891 DataValue::String(f.path().to_string_lossy().to_string()),
892 DataValue::Integer(3),
893 ])
894 .unwrap();
895 assert_eq!(table.row_count(), 2);
897 assert_eq!(
898 table.get_value(0, 1).unwrap(),
899 &DataValue::String("big".to_string())
900 );
901 assert_eq!(
902 table.get_value(1, 1).unwrap(),
903 &DataValue::String("dog".to_string())
904 );
905 }
906
907 #[test]
908 fn test_read_words_case_lower() {
909 let f = write_tmp("Hello World\n");
910 let table = ReadWords
911 .generate(vec![
912 DataValue::String(f.path().to_string_lossy().to_string()),
913 DataValue::Integer(1),
914 DataValue::String("lower".to_string()),
915 ])
916 .unwrap();
917 assert_eq!(
918 table.get_value(0, 1).unwrap(),
919 &DataValue::String("hello".to_string())
920 );
921 assert_eq!(
922 table.get_value(1, 1).unwrap(),
923 &DataValue::String("world".to_string())
924 );
925 }
926
927 #[test]
928 fn test_read_words_strips_punctuation() {
929 let f = write_tmp("hello, world! foo-bar.\n");
930 let table = ReadWords
931 .generate(vec![DataValue::String(
932 f.path().to_string_lossy().to_string(),
933 )])
934 .unwrap();
935 let words: Vec<String> = (0..table.row_count())
936 .map(|i| match table.get_value(i, 1).unwrap() {
937 DataValue::String(s) => s.clone(),
938 _ => panic!("expected string"),
939 })
940 .collect();
941 assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
942 }
943
944 #[test]
945 fn test_read_words_requires_path() {
946 assert!(ReadWords.generate(vec![]).is_err());
947 }
948
949 #[test]
950 fn test_read_words_empty_lines_skipped() {
951 let f = write_tmp("hello\n\n\nworld\n");
952 let table = ReadWords
953 .generate(vec![DataValue::String(
954 f.path().to_string_lossy().to_string(),
955 )])
956 .unwrap();
957 assert_eq!(table.row_count(), 2);
958 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
960 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
961 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
963 assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
964 }
965
966 fn col_index(table: &DataTable, name: &str) -> usize {
969 table
970 .columns
971 .iter()
972 .position(|c| c.name == name)
973 .unwrap_or_else(|| panic!("column '{}' not found", name))
974 }
975
976 #[test]
977 fn test_read_jsonl_basic() {
978 let f = write_tmp(
979 r#"{"id":1,"name":"alice","score":10}
980{"id":2,"name":"bob","score":20}
981{"id":3,"name":"carol","score":30}
982"#,
983 );
984 let table = ReadJsonl
985 .generate(vec![DataValue::String(
986 f.path().to_string_lossy().to_string(),
987 )])
988 .unwrap();
989 assert_eq!(table.row_count(), 3);
990 assert_eq!(table.column_count(), 3);
991
992 let id_col = col_index(&table, "id");
993 let name_col = col_index(&table, "name");
994 assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
995 assert_eq!(
996 table.get_value(2, name_col).unwrap(),
997 &DataValue::String("carol".to_string())
998 );
999 }
1000
1001 #[test]
1002 fn test_read_jsonl_heterogeneous_schema_unioned() {
1003 let f = write_tmp(
1006 r#"{"id":1,"name":"alice"}
1007{"id":2,"name":"bob","extra":"hello"}
1008{"id":3,"name":"carol","other":42}
1009"#,
1010 );
1011 let table = ReadJsonl
1012 .generate(vec![DataValue::String(
1013 f.path().to_string_lossy().to_string(),
1014 )])
1015 .unwrap();
1016 assert_eq!(table.row_count(), 3);
1017 assert_eq!(table.column_count(), 4);
1018 let extra = col_index(&table, "extra");
1019 let other = col_index(&table, "other");
1020 assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
1022 assert_eq!(table.get_value(0, other).unwrap(), &DataValue::Null);
1023 assert_eq!(
1025 table.get_value(1, extra).unwrap(),
1026 &DataValue::String("hello".to_string())
1027 );
1028 assert_eq!(table.get_value(2, other).unwrap(), &DataValue::Integer(42));
1030 }
1031
1032 #[test]
1033 fn test_read_jsonl_blank_lines_skipped() {
1034 let f = write_tmp(
1035 r#"{"id":1}
1036
1037{"id":2}
1038
1039"#,
1040 );
1041 let table = ReadJsonl
1042 .generate(vec![DataValue::String(
1043 f.path().to_string_lossy().to_string(),
1044 )])
1045 .unwrap();
1046 assert_eq!(table.row_count(), 2);
1047 }
1048
1049 #[test]
1050 fn test_read_jsonl_match_regex_pre_filters() {
1051 let f = write_tmp(
1052 r#"{"level":"INFO","msg":"boot"}
1053{"level":"ERROR","msg":"disk"}
1054{"level":"INFO","msg":"shutdown"}
1055{"level":"ERROR","msg":"oom"}
1056"#,
1057 );
1058 let table = ReadJsonl
1059 .generate(vec![
1060 DataValue::String(f.path().to_string_lossy().to_string()),
1061 DataValue::String("ERROR".to_string()),
1062 ])
1063 .unwrap();
1064 assert_eq!(table.row_count(), 2);
1065 let msg = col_index(&table, "msg");
1066 assert_eq!(
1067 table.get_value(0, msg).unwrap(),
1068 &DataValue::String("disk".to_string())
1069 );
1070 }
1071
1072 #[test]
1073 fn test_read_jsonl_invalid_line_errors_with_line_number() {
1074 let f = write_tmp(
1075 r#"{"id":1}
1076not json at all
1077{"id":3}
1078"#,
1079 );
1080 let err = ReadJsonl
1081 .generate(vec![DataValue::String(
1082 f.path().to_string_lossy().to_string(),
1083 )])
1084 .unwrap_err();
1085 let msg = err.to_string();
1086 assert!(
1087 msg.contains("line 2"),
1088 "error should cite line number: {}",
1089 msg
1090 );
1091 }
1092
1093 #[test]
1094 fn test_read_jsonl_requires_path() {
1095 assert!(ReadJsonl.generate(vec![]).is_err());
1096 }
1097
1098 #[test]
1099 fn test_read_jsonl_empty_file_returns_empty_table() {
1100 let f = write_tmp("");
1101 let table = ReadJsonl
1102 .generate(vec![DataValue::String(
1103 f.path().to_string_lossy().to_string(),
1104 )])
1105 .unwrap();
1106 assert_eq!(table.row_count(), 0);
1107 }
1108
1109 #[test]
1115 fn test_read_stdin_rejects_too_many_args() {
1116 let err = ReadStdin
1117 .generate(vec![
1118 DataValue::String("foo".to_string()),
1119 DataValue::String("bar".to_string()),
1120 ])
1121 .unwrap_err();
1122 assert!(
1123 err.to_string().contains("0 or 1 arguments"),
1124 "should mention arg count: {}",
1125 err
1126 );
1127 }
1128
1129 #[test]
1130 fn test_read_stdin_rejects_invalid_regex() {
1131 let err = ReadStdin
1132 .generate(vec![DataValue::String("[invalid(regex".to_string())])
1133 .unwrap_err();
1134 assert!(
1135 err.to_string().contains("Invalid match_regex"),
1136 "should mention regex: {}",
1137 err
1138 );
1139 }
1140
1141 fn write_tmp_with_ext(ext: &str, contents: &str) -> tempfile::NamedTempFile {
1145 let f = tempfile::Builder::new()
1146 .suffix(&format!(".{}", ext))
1147 .tempfile()
1148 .unwrap();
1149 std::fs::write(f.path(), contents).unwrap();
1150 f
1151 }
1152
1153 #[test]
1154 fn test_read_csv_default_comma() {
1155 let f = write_tmp_with_ext("csv", "id,name\n1,alice\n2,bob\n");
1156 let table = ReadCsv
1157 .generate(vec![DataValue::String(
1158 f.path().to_string_lossy().to_string(),
1159 )])
1160 .unwrap();
1161 assert_eq!(table.column_count(), 2);
1162 assert_eq!(table.row_count(), 2);
1163 assert_eq!(table.columns[0].name, "id");
1164 assert_eq!(table.columns[1].name, "name");
1165 }
1166
1167 #[test]
1168 fn test_read_csv_psv_extension_auto_detects_pipe() {
1169 let f = write_tmp_with_ext("psv", "id|name|score\n1|alice|10\n2|bob|20\n");
1170 let table = ReadCsv
1171 .generate(vec![DataValue::String(
1172 f.path().to_string_lossy().to_string(),
1173 )])
1174 .unwrap();
1175 assert_eq!(table.column_count(), 3);
1176 assert_eq!(table.row_count(), 2);
1177 assert_eq!(table.columns[0].name, "id");
1178 assert_eq!(table.columns[2].name, "score");
1179 assert_eq!(
1180 table.metadata.get("delimiter").map(String::as_str),
1181 Some("|")
1182 );
1183 }
1184
1185 #[test]
1186 fn test_read_csv_tsv_extension_auto_detects_tab() {
1187 let f = write_tmp_with_ext("tsv", "id\tname\n1\talice\n2\tbob\n");
1188 let table = ReadCsv
1189 .generate(vec![DataValue::String(
1190 f.path().to_string_lossy().to_string(),
1191 )])
1192 .unwrap();
1193 assert_eq!(table.column_count(), 2);
1194 assert_eq!(table.row_count(), 2);
1195 assert_eq!(
1196 table.metadata.get("delimiter").map(String::as_str),
1197 Some("\\t")
1198 );
1199 }
1200
1201 #[test]
1202 fn test_read_csv_explicit_delimiter_overrides_extension() {
1203 let f = write_tmp_with_ext("psv", "id,name\n1,alice\n");
1207 let table = ReadCsv
1208 .generate(vec![
1209 DataValue::String(f.path().to_string_lossy().to_string()),
1210 DataValue::String(",".to_string()),
1211 ])
1212 .unwrap();
1213 assert_eq!(table.column_count(), 2);
1214 }
1215
1216 #[test]
1217 fn test_read_csv_explicit_pipe_on_unrecognised_extension() {
1218 let f = write_tmp_with_ext("dat", "a|b\n1|2\n");
1219 let table = ReadCsv
1220 .generate(vec![
1221 DataValue::String(f.path().to_string_lossy().to_string()),
1222 DataValue::String("|".to_string()),
1223 ])
1224 .unwrap();
1225 assert_eq!(table.column_count(), 2);
1226 }
1227
1228 #[test]
1229 fn test_read_csv_backslash_t_parses_as_tab() {
1230 let f = write_tmp_with_ext("dat", "a\tb\n1\t2\n");
1231 let table = ReadCsv
1232 .generate(vec![
1233 DataValue::String(f.path().to_string_lossy().to_string()),
1234 DataValue::String("\\t".to_string()),
1235 ])
1236 .unwrap();
1237 assert_eq!(table.column_count(), 2);
1238 assert_eq!(
1239 table.metadata.get("delimiter").map(String::as_str),
1240 Some("\\t")
1241 );
1242 }
1243
1244 #[test]
1245 fn test_read_csv_rejects_multi_char_delimiter() {
1246 let f = write_tmp_with_ext("dat", "a,b\n1,2\n");
1247 let err = ReadCsv
1248 .generate(vec![
1249 DataValue::String(f.path().to_string_lossy().to_string()),
1250 DataValue::String("||".to_string()),
1251 ])
1252 .unwrap_err();
1253 let msg = err.to_string();
1254 assert!(
1255 msg.contains("single ASCII character"),
1256 "should reject multi-char delimiter: {}",
1257 msg
1258 );
1259 }
1260
1261 #[test]
1262 fn test_read_csv_rejects_too_many_args() {
1263 let err = ReadCsv
1264 .generate(vec![
1265 DataValue::String("a".to_string()),
1266 DataValue::String("b".to_string()),
1267 DataValue::String("c".to_string()),
1268 ])
1269 .unwrap_err();
1270 assert!(err.to_string().contains("1 or 2 arguments"));
1271 }
1272
1273 #[test]
1274 fn test_read_csv_requires_path() {
1275 assert!(ReadCsv.generate(vec![]).is_err());
1276 }
1277
1278 #[test]
1281 fn test_read_json_array_of_objects() {
1282 let f = write_tmp(
1284 r#"[
1285 {"id": 1, "name": "alice"},
1286 {"id": 2, "name": "bob"}
1287]
1288"#,
1289 );
1290 let table = ReadJson
1291 .generate(vec![DataValue::String(
1292 f.path().to_string_lossy().to_string(),
1293 )])
1294 .unwrap();
1295 assert_eq!(table.row_count(), 2);
1296 assert_eq!(table.column_count(), 2);
1297 let id_col = col_index(&table, "id");
1298 let name_col = col_index(&table, "name");
1299 assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
1300 assert_eq!(
1301 table.get_value(1, name_col).unwrap(),
1302 &DataValue::String("bob".to_string())
1303 );
1304 }
1305
1306 #[test]
1307 fn test_read_json_also_accepts_jsonl() {
1308 let f = write_tmp("{\"id\":1}\n{\"id\":2}\n");
1310 let table = ReadJson
1311 .generate(vec![DataValue::String(
1312 f.path().to_string_lossy().to_string(),
1313 )])
1314 .unwrap();
1315 assert_eq!(table.row_count(), 2);
1316 let id_col = col_index(&table, "id");
1317 assert_eq!(table.get_value(1, id_col).unwrap(), &DataValue::Integer(2));
1318 }
1319
1320 #[test]
1321 fn test_read_json_heterogeneous_schema_unioned() {
1322 let f = write_tmp(
1323 r#"[
1324 {"id": 1, "name": "alice"},
1325 {"id": 2, "name": "bob", "extra": "hello"}
1326]"#,
1327 );
1328 let table = ReadJson
1329 .generate(vec![DataValue::String(
1330 f.path().to_string_lossy().to_string(),
1331 )])
1332 .unwrap();
1333 assert_eq!(table.column_count(), 3);
1334 let extra = col_index(&table, "extra");
1335 assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
1336 assert_eq!(
1337 table.get_value(1, extra).unwrap(),
1338 &DataValue::String("hello".to_string())
1339 );
1340 }
1341
1342 #[test]
1343 fn test_read_json_empty_array_returns_empty_table() {
1344 let f = write_tmp("[]");
1345 let table = ReadJson
1346 .generate(vec![DataValue::String(
1347 f.path().to_string_lossy().to_string(),
1348 )])
1349 .unwrap();
1350 assert_eq!(table.row_count(), 0);
1351 }
1352
1353 #[test]
1354 fn test_read_json_requires_path() {
1355 assert!(ReadJson.generate(vec![]).is_err());
1356 }
1357
1358 #[test]
1359 fn test_read_json_bad_path_errors() {
1360 let err = ReadJson
1361 .generate(vec![DataValue::String(
1362 "/no/such/file/here.json".to_string(),
1363 )])
1364 .unwrap_err();
1365 assert!(err.to_string().contains("READ_JSON failed to read"));
1366 }
1367
1368 #[test]
1369 fn test_read_json_json_path_drills_into_nested_array() {
1370 let f = write_tmp(
1372 r#"{
1373 "count": 2,
1374 "project": [
1375 {"id": "a", "name": "Alpha"},
1376 {"id": "b", "name": "Beta"}
1377 ]
1378}"#,
1379 );
1380 let table = ReadJson
1381 .generate(vec![
1382 DataValue::String(f.path().to_string_lossy().to_string()),
1383 DataValue::String("project".to_string()),
1384 ])
1385 .unwrap();
1386 assert_eq!(table.row_count(), 2);
1387 assert_eq!(table.column_count(), 2);
1388 let id_col = col_index(&table, "id");
1389 assert_eq!(
1390 table.get_value(1, id_col).unwrap(),
1391 &DataValue::String("b".to_string())
1392 );
1393 }
1394
1395 #[test]
1396 fn test_read_json_json_path_with_array_projection() {
1397 let f = write_tmp(
1399 r#"{
1400 "hits": {
1401 "hits": [
1402 {"_source": {"id": 1, "user": "alice"}},
1403 {"_source": {"id": 2, "user": "bob"}}
1404 ]
1405 }
1406}"#,
1407 );
1408 let table = ReadJson
1409 .generate(vec![
1410 DataValue::String(f.path().to_string_lossy().to_string()),
1411 DataValue::String("hits.hits[]._source".to_string()),
1412 ])
1413 .unwrap();
1414 assert_eq!(table.row_count(), 2);
1415 let user_col = col_index(&table, "user");
1416 assert_eq!(
1417 table.get_value(0, user_col).unwrap(),
1418 &DataValue::String("alice".to_string())
1419 );
1420 }
1421
1422 #[test]
1423 fn test_read_json_json_path_single_object_becomes_one_row() {
1424 let f = write_tmp(r#"{"meta": {"id": 7, "name": "solo"}}"#);
1425 let table = ReadJson
1426 .generate(vec![
1427 DataValue::String(f.path().to_string_lossy().to_string()),
1428 DataValue::String("meta".to_string()),
1429 ])
1430 .unwrap();
1431 assert_eq!(table.row_count(), 1);
1432 let id_col = col_index(&table, "id");
1433 assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(7));
1434 }
1435
1436 #[test]
1437 fn test_read_json_json_path_missing_key_errors() {
1438 let f = write_tmp(r#"{"project": []}"#);
1439 let err = ReadJson
1440 .generate(vec![
1441 DataValue::String(f.path().to_string_lossy().to_string()),
1442 DataValue::String("nope".to_string()),
1443 ])
1444 .unwrap_err();
1445 let msg = err.to_string();
1446 assert!(msg.contains("json_path"), "{}", msg);
1447 assert!(msg.contains("not found"), "{}", msg);
1448 }
1449
1450 #[test]
1451 fn test_read_json_rejects_three_args() {
1452 let err = ReadJson
1453 .generate(vec![
1454 DataValue::String("a".to_string()),
1455 DataValue::String("b".to_string()),
1456 DataValue::String("c".to_string()),
1457 ])
1458 .unwrap_err();
1459 assert!(err.to_string().contains("1 or 2 arguments"));
1460 }
1461}