1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
2use crate::data::stream_loader::collect_column_names;
3use crate::sql::generators::TableGenerator;
4use anyhow::{anyhow, Result};
5use regex::Regex;
6use serde_json::Value as JsonValue;
7use std::fs::File;
8use std::io::{BufRead, BufReader};
9use std::sync::Arc;
10
11const MAX_LINES_PER_FILE: usize = 1_000_000;
15
16fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
18 match args.get(idx) {
19 Some(DataValue::String(s)) => Ok(s.clone()),
20 Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
21 Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
22 Some(v) => Err(anyhow!(
23 "{} argument {} must be a string, got {:?}",
24 name,
25 idx + 1,
26 v
27 )),
28 }
29}
30
31fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
33 match args.get(idx) {
34 Some(DataValue::String(s)) => Some(s.clone()),
35 Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
36 _ => None,
37 }
38}
39
40fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
46 let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
47 let reader = BufReader::new(file);
48
49 let mut out = Vec::new();
50 let mut truncated = false;
51
52 for (idx, line_result) in reader.lines().enumerate() {
53 let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
54 let line_num = (idx + 1) as i64;
55
56 if let Some(re) = match_regex {
57 if !re.is_match(&line) {
58 continue;
59 }
60 }
61
62 if out.len() >= MAX_LINES_PER_FILE {
63 truncated = true;
64 break;
65 }
66 out.push((line_num, line));
67 }
68
69 if truncated {
70 eprintln!(
71 "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
72 MAX_LINES_PER_FILE, path
73 );
74 }
75
76 Ok(out)
77}
78
79pub struct ReadText;
84
85impl TableGenerator for ReadText {
86 fn name(&self) -> &str {
87 "READ_TEXT"
88 }
89
90 fn columns(&self) -> Vec<DataColumn> {
91 vec![DataColumn::new("line_num"), DataColumn::new("line")]
92 }
93
94 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
95 if args.is_empty() || args.len() > 2 {
96 return Err(anyhow!(
97 "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
98 ));
99 }
100
101 let path = require_string(&args, 0, "READ_TEXT")?;
102 let match_regex = optional_string(&args, 1)
103 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
104 .transpose()?;
105
106 let lines = read_filtered_lines(&path, match_regex.as_ref())?;
107
108 let mut table = DataTable::new("read_text");
109 table.add_column(DataColumn::new("line_num"));
110 table.add_column(DataColumn::new("line"));
111
112 for (line_num, line) in lines {
113 table
114 .add_row(DataRow::new(vec![
115 DataValue::Integer(line_num),
116 DataValue::String(line),
117 ]))
118 .map_err(|e| anyhow!(e))?;
119 }
120
121 Ok(Arc::new(table))
122 }
123
124 fn description(&self) -> &str {
125 "Read a text file line-by-line. Optional second arg is a regex that filters lines at read time."
126 }
127
128 fn arg_count(&self) -> usize {
129 2
130 }
131}
132
133pub struct Grep;
138
139impl TableGenerator for Grep {
140 fn name(&self) -> &str {
141 "GREP"
142 }
143
144 fn columns(&self) -> Vec<DataColumn> {
145 vec![DataColumn::new("line_num"), DataColumn::new("line")]
146 }
147
148 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
149 if args.len() < 2 || args.len() > 3 {
150 return Err(anyhow!(
151 "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
152 ));
153 }
154
155 let path = require_string(&args, 0, "GREP")?;
156 let pattern_str = require_string(&args, 1, "GREP")?;
157 let pattern =
158 Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
159
160 let invert = match args.get(2) {
161 Some(DataValue::Boolean(b)) => *b,
162 Some(DataValue::Integer(n)) => *n != 0,
163 Some(DataValue::Null) | None => false,
164 Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
165 };
166
167 let lines = if invert {
170 let all = read_filtered_lines(&path, None)?;
171 all.into_iter()
172 .filter(|(_, line)| !pattern.is_match(line))
173 .collect::<Vec<_>>()
174 } else {
175 read_filtered_lines(&path, Some(&pattern))?
176 };
177
178 let mut table = DataTable::new("grep");
179 table.add_column(DataColumn::new("line_num"));
180 table.add_column(DataColumn::new("line"));
181
182 for (line_num, line) in lines {
183 table
184 .add_row(DataRow::new(vec![
185 DataValue::Integer(line_num),
186 DataValue::String(line),
187 ]))
188 .map_err(|e| anyhow!(e))?;
189 }
190
191 Ok(Arc::new(table))
192 }
193
194 fn description(&self) -> &str {
195 "Read only lines matching a regex (third arg inverts the match, like grep -v)"
196 }
197
198 fn arg_count(&self) -> usize {
199 3
200 }
201}
202
203pub struct ReadWords;
214
215impl TableGenerator for ReadWords {
216 fn name(&self) -> &str {
217 "READ_WORDS"
218 }
219
220 fn columns(&self) -> Vec<DataColumn> {
221 vec![
222 DataColumn::new("word_num"),
223 DataColumn::new("word"),
224 DataColumn::new("line_num"),
225 DataColumn::new("word_pos"),
226 ]
227 }
228
229 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
230 if args.is_empty() || args.len() > 3 {
231 return Err(anyhow!(
232 "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
233 ));
234 }
235
236 let path = require_string(&args, 0, "READ_WORDS")?;
237
238 let min_length: usize = match args.get(1) {
239 Some(DataValue::Integer(n)) => {
240 if *n < 1 {
241 return Err(anyhow!("READ_WORDS min_length must be >= 1"));
242 }
243 *n as usize
244 }
245 Some(DataValue::Float(f)) => *f as usize,
246 Some(DataValue::Null) | None => 1,
247 Some(v) => {
248 return Err(anyhow!(
249 "READ_WORDS min_length must be an integer, got {:?}",
250 v
251 ))
252 }
253 };
254
255 let case_option = optional_string(&args, 2);
256
257 let lines = read_filtered_lines(&path, None)?;
258
259 let mut table = DataTable::new("read_words");
260 table.add_column(DataColumn::new("word_num"));
261 table.add_column(DataColumn::new("word"));
262 table.add_column(DataColumn::new("line_num"));
263 table.add_column(DataColumn::new("word_pos"));
264
265 let mut word_num: i64 = 0;
266
267 for (line_num, line) in &lines {
268 let mut word_pos: i64 = 0;
269
270 for token in line.split(|c: char| !c.is_alphanumeric()) {
271 if token.is_empty() || token.len() < min_length {
272 continue;
273 }
274
275 word_pos += 1;
276 word_num += 1;
277
278 let word = match case_option.as_deref() {
279 Some("lower") | Some("lowercase") => token.to_lowercase(),
280 Some("upper") | Some("uppercase") => token.to_uppercase(),
281 _ => token.to_string(),
282 };
283
284 table
285 .add_row(DataRow::new(vec![
286 DataValue::Integer(word_num),
287 DataValue::String(word),
288 DataValue::Integer(*line_num),
289 DataValue::Integer(word_pos),
290 ]))
291 .map_err(|e| anyhow!(e))?;
292 }
293 }
294
295 Ok(Arc::new(table))
296 }
297
298 fn description(&self) -> &str {
299 "Read a text file and emit one row per word, with optional min length and case normalisation"
300 }
301
302 fn arg_count(&self) -> usize {
303 3
304 }
305}
306
307pub struct ReadJsonl;
315
316impl TableGenerator for ReadJsonl {
317 fn name(&self) -> &str {
318 "READ_JSONL"
319 }
320
321 fn columns(&self) -> Vec<DataColumn> {
322 vec![DataColumn::new("(inferred from JSON keys)")]
325 }
326
327 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
328 if args.is_empty() || args.len() > 2 {
329 return Err(anyhow!(
330 "READ_JSONL expects 1 or 2 arguments: (path [, match_regex])"
331 ));
332 }
333
334 let path = require_string(&args, 0, "READ_JSONL")?;
335 let match_regex = optional_string(&args, 1)
336 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
337 .transpose()?;
338
339 let lines = read_filtered_lines(&path, match_regex.as_ref())?;
340
341 let mut records: Vec<JsonValue> = Vec::with_capacity(lines.len());
342 for (line_num, line) in &lines {
343 let trimmed = line.trim();
344 if trimmed.is_empty() {
345 continue;
346 }
347 let value: JsonValue = serde_json::from_str(trimmed)
348 .map_err(|e| anyhow!("READ_JSONL parse error at line {}: {}", line_num, e))?;
349 records.push(value);
350 }
351
352 if records.is_empty() {
353 return Ok(Arc::new(DataTable::new("read_jsonl")));
354 }
355
356 let column_names = collect_column_names(&records, 100);
357 if column_names.is_empty() {
358 return Err(anyhow!(
359 "READ_JSONL: no JSON objects found (records must be objects, not arrays/scalars)"
360 ));
361 }
362
363 let mut string_rows: Vec<Vec<String>> = Vec::with_capacity(records.len());
366 for record in &records {
367 let obj = match record.as_object() {
368 Some(o) => o,
369 None => continue,
370 };
371 let mut row = Vec::with_capacity(column_names.len());
372 for col_name in &column_names {
373 let value = obj
374 .get(col_name)
375 .map(json_value_to_string)
376 .unwrap_or_default();
377 row.push(value);
378 }
379 string_rows.push(row);
380 }
381
382 let mut column_types: Vec<DataType> = vec![DataType::Null; column_names.len()];
383 let sample_size = string_rows.len().min(100);
384 for row in string_rows.iter().take(sample_size) {
385 for (col_idx, value) in row.iter().enumerate() {
386 if !value.is_empty() && value != "null" {
387 let inferred = DataType::infer_from_string(value);
388 column_types[col_idx] = column_types[col_idx].merge(&inferred);
389 }
390 }
391 }
392
393 let mut table = DataTable::new("read_jsonl");
394 for (name, dtype) in column_names.iter().zip(column_types.iter()) {
395 let mut col = DataColumn::new(name);
396 col.data_type = dtype.clone();
397 table.add_column(col);
398 }
399
400 for string_row in &string_rows {
401 let mut values = Vec::with_capacity(string_row.len());
402 for (col_idx, value) in string_row.iter().enumerate() {
403 let dv = if value.is_empty() || value == "null" {
404 DataValue::Null
405 } else {
406 DataValue::from_string(value, &column_types[col_idx])
407 };
408 values.push(dv);
409 }
410 table
411 .add_row(DataRow::new(values))
412 .map_err(|e| anyhow!(e))?;
413 }
414
415 Ok(Arc::new(table))
416 }
417
418 fn description(&self) -> &str {
419 "Read newline-delimited JSON (one object per line). Optional second arg is a regex that filters lines at read time."
420 }
421
422 fn arg_count(&self) -> usize {
423 2
424 }
425}
426
427fn json_value_to_string(value: &JsonValue) -> String {
428 match value {
429 JsonValue::Null => String::new(),
430 JsonValue::Bool(b) => b.to_string(),
431 JsonValue::Number(n) => n.to_string(),
432 JsonValue::String(s) => s.clone(),
433 JsonValue::Array(arr) => format!("{:?}", arr),
434 JsonValue::Object(obj) => format!("{:?}", obj),
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use std::io::Write;
442 use tempfile::NamedTempFile;
443
444 fn write_tmp(contents: &str) -> NamedTempFile {
445 let mut f = NamedTempFile::new().unwrap();
446 f.write_all(contents.as_bytes()).unwrap();
447 f
448 }
449
450 #[test]
451 fn test_read_text_returns_all_lines() {
452 let f = write_tmp("one\ntwo\nthree\n");
453 let table = ReadText
454 .generate(vec![DataValue::String(
455 f.path().to_string_lossy().to_string(),
456 )])
457 .unwrap();
458 assert_eq!(table.row_count(), 3);
459 assert_eq!(
460 table.get_value(0, 1).unwrap(),
461 &DataValue::String("one".to_string())
462 );
463 assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
464 }
465
466 #[test]
467 fn test_read_text_with_match_regex_filters_lines() {
468 let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
469 let table = ReadText
470 .generate(vec![
471 DataValue::String(f.path().to_string_lossy().to_string()),
472 DataValue::String("ERROR".to_string()),
473 ])
474 .unwrap();
475 assert_eq!(table.row_count(), 2);
476 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
478 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
479 }
480
481 #[test]
482 fn test_read_text_requires_path() {
483 assert!(ReadText.generate(vec![]).is_err());
484 }
485
486 #[test]
487 fn test_read_text_invalid_regex_errors_early() {
488 let f = write_tmp("hello\n");
489 let err = ReadText
490 .generate(vec![
491 DataValue::String(f.path().to_string_lossy().to_string()),
492 DataValue::String("(unclosed".to_string()),
493 ])
494 .unwrap_err();
495 assert!(err.to_string().contains("match_regex"));
496 }
497
498 #[test]
499 fn test_grep_matches_like_grep() {
500 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
501 let table = Grep
502 .generate(vec![
503 DataValue::String(f.path().to_string_lossy().to_string()),
504 DataValue::String("^ap".to_string()),
505 ])
506 .unwrap();
507 assert_eq!(table.row_count(), 2);
508 assert_eq!(
509 table.get_value(0, 1).unwrap(),
510 &DataValue::String("apple".to_string())
511 );
512 assert_eq!(
513 table.get_value(1, 1).unwrap(),
514 &DataValue::String("apricot".to_string())
515 );
516 }
517
518 #[test]
519 fn test_grep_invert_like_grep_v() {
520 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
521 let table = Grep
522 .generate(vec![
523 DataValue::String(f.path().to_string_lossy().to_string()),
524 DataValue::String("^ap".to_string()),
525 DataValue::Boolean(true),
526 ])
527 .unwrap();
528 assert_eq!(table.row_count(), 2);
529 assert_eq!(
530 table.get_value(0, 1).unwrap(),
531 &DataValue::String("banana".to_string())
532 );
533 }
534
535 #[test]
538 fn test_read_words_basic() {
539 let f = write_tmp("hello world\ngoodbye moon\n");
540 let table = ReadWords
541 .generate(vec![DataValue::String(
542 f.path().to_string_lossy().to_string(),
543 )])
544 .unwrap();
545 assert_eq!(table.row_count(), 4);
547 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); assert_eq!(
551 table.get_value(0, 1).unwrap(),
552 &DataValue::String("hello".to_string())
553 );
554 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
558 assert_eq!(
559 table.get_value(2, 1).unwrap(),
560 &DataValue::String("goodbye".to_string())
561 );
562 assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
563 assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
564 }
565
566 #[test]
567 fn test_read_words_min_length() {
568 let f = write_tmp("I am a big dog\n");
569 let table = ReadWords
570 .generate(vec![
571 DataValue::String(f.path().to_string_lossy().to_string()),
572 DataValue::Integer(3),
573 ])
574 .unwrap();
575 assert_eq!(table.row_count(), 2);
577 assert_eq!(
578 table.get_value(0, 1).unwrap(),
579 &DataValue::String("big".to_string())
580 );
581 assert_eq!(
582 table.get_value(1, 1).unwrap(),
583 &DataValue::String("dog".to_string())
584 );
585 }
586
587 #[test]
588 fn test_read_words_case_lower() {
589 let f = write_tmp("Hello World\n");
590 let table = ReadWords
591 .generate(vec![
592 DataValue::String(f.path().to_string_lossy().to_string()),
593 DataValue::Integer(1),
594 DataValue::String("lower".to_string()),
595 ])
596 .unwrap();
597 assert_eq!(
598 table.get_value(0, 1).unwrap(),
599 &DataValue::String("hello".to_string())
600 );
601 assert_eq!(
602 table.get_value(1, 1).unwrap(),
603 &DataValue::String("world".to_string())
604 );
605 }
606
607 #[test]
608 fn test_read_words_strips_punctuation() {
609 let f = write_tmp("hello, world! foo-bar.\n");
610 let table = ReadWords
611 .generate(vec![DataValue::String(
612 f.path().to_string_lossy().to_string(),
613 )])
614 .unwrap();
615 let words: Vec<String> = (0..table.row_count())
616 .map(|i| match table.get_value(i, 1).unwrap() {
617 DataValue::String(s) => s.clone(),
618 _ => panic!("expected string"),
619 })
620 .collect();
621 assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
622 }
623
624 #[test]
625 fn test_read_words_requires_path() {
626 assert!(ReadWords.generate(vec![]).is_err());
627 }
628
629 #[test]
630 fn test_read_words_empty_lines_skipped() {
631 let f = write_tmp("hello\n\n\nworld\n");
632 let table = ReadWords
633 .generate(vec![DataValue::String(
634 f.path().to_string_lossy().to_string(),
635 )])
636 .unwrap();
637 assert_eq!(table.row_count(), 2);
638 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
640 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
641 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
643 assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
644 }
645
646 fn col_index(table: &DataTable, name: &str) -> usize {
649 table
650 .columns
651 .iter()
652 .position(|c| c.name == name)
653 .unwrap_or_else(|| panic!("column '{}' not found", name))
654 }
655
656 #[test]
657 fn test_read_jsonl_basic() {
658 let f = write_tmp(
659 r#"{"id":1,"name":"alice","score":10}
660{"id":2,"name":"bob","score":20}
661{"id":3,"name":"carol","score":30}
662"#,
663 );
664 let table = ReadJsonl
665 .generate(vec![DataValue::String(
666 f.path().to_string_lossy().to_string(),
667 )])
668 .unwrap();
669 assert_eq!(table.row_count(), 3);
670 assert_eq!(table.column_count(), 3);
671
672 let id_col = col_index(&table, "id");
673 let name_col = col_index(&table, "name");
674 assert_eq!(table.get_value(0, id_col).unwrap(), &DataValue::Integer(1));
675 assert_eq!(
676 table.get_value(2, name_col).unwrap(),
677 &DataValue::String("carol".to_string())
678 );
679 }
680
681 #[test]
682 fn test_read_jsonl_heterogeneous_schema_unioned() {
683 let f = write_tmp(
686 r#"{"id":1,"name":"alice"}
687{"id":2,"name":"bob","extra":"hello"}
688{"id":3,"name":"carol","other":42}
689"#,
690 );
691 let table = ReadJsonl
692 .generate(vec![DataValue::String(
693 f.path().to_string_lossy().to_string(),
694 )])
695 .unwrap();
696 assert_eq!(table.row_count(), 3);
697 assert_eq!(table.column_count(), 4);
698 let extra = col_index(&table, "extra");
699 let other = col_index(&table, "other");
700 assert_eq!(table.get_value(0, extra).unwrap(), &DataValue::Null);
702 assert_eq!(table.get_value(0, other).unwrap(), &DataValue::Null);
703 assert_eq!(
705 table.get_value(1, extra).unwrap(),
706 &DataValue::String("hello".to_string())
707 );
708 assert_eq!(table.get_value(2, other).unwrap(), &DataValue::Integer(42));
710 }
711
712 #[test]
713 fn test_read_jsonl_blank_lines_skipped() {
714 let f = write_tmp(
715 r#"{"id":1}
716
717{"id":2}
718
719"#,
720 );
721 let table = ReadJsonl
722 .generate(vec![DataValue::String(
723 f.path().to_string_lossy().to_string(),
724 )])
725 .unwrap();
726 assert_eq!(table.row_count(), 2);
727 }
728
729 #[test]
730 fn test_read_jsonl_match_regex_pre_filters() {
731 let f = write_tmp(
732 r#"{"level":"INFO","msg":"boot"}
733{"level":"ERROR","msg":"disk"}
734{"level":"INFO","msg":"shutdown"}
735{"level":"ERROR","msg":"oom"}
736"#,
737 );
738 let table = ReadJsonl
739 .generate(vec![
740 DataValue::String(f.path().to_string_lossy().to_string()),
741 DataValue::String("ERROR".to_string()),
742 ])
743 .unwrap();
744 assert_eq!(table.row_count(), 2);
745 let msg = col_index(&table, "msg");
746 assert_eq!(
747 table.get_value(0, msg).unwrap(),
748 &DataValue::String("disk".to_string())
749 );
750 }
751
752 #[test]
753 fn test_read_jsonl_invalid_line_errors_with_line_number() {
754 let f = write_tmp(
755 r#"{"id":1}
756not json at all
757{"id":3}
758"#,
759 );
760 let err = ReadJsonl
761 .generate(vec![DataValue::String(
762 f.path().to_string_lossy().to_string(),
763 )])
764 .unwrap_err();
765 let msg = err.to_string();
766 assert!(
767 msg.contains("line 2"),
768 "error should cite line number: {}",
769 msg
770 );
771 }
772
773 #[test]
774 fn test_read_jsonl_requires_path() {
775 assert!(ReadJsonl.generate(vec![]).is_err());
776 }
777
778 #[test]
779 fn test_read_jsonl_empty_file_returns_empty_table() {
780 let f = write_tmp("");
781 let table = ReadJsonl
782 .generate(vec![DataValue::String(
783 f.path().to_string_lossy().to_string(),
784 )])
785 .unwrap();
786 assert_eq!(table.row_count(), 0);
787 }
788}