1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
2use crate::sql::generators::TableGenerator;
3use anyhow::{anyhow, Result};
4use regex::Regex;
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::sync::Arc;
8
9const MAX_LINES_PER_FILE: usize = 1_000_000;
13
14fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
16 match args.get(idx) {
17 Some(DataValue::String(s)) => Ok(s.clone()),
18 Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
19 Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
20 Some(v) => Err(anyhow!(
21 "{} argument {} must be a string, got {:?}",
22 name,
23 idx + 1,
24 v
25 )),
26 }
27}
28
29fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
31 match args.get(idx) {
32 Some(DataValue::String(s)) => Some(s.clone()),
33 Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
34 _ => None,
35 }
36}
37
38fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
44 let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
45 let reader = BufReader::new(file);
46
47 let mut out = Vec::new();
48 let mut truncated = false;
49
50 for (idx, line_result) in reader.lines().enumerate() {
51 let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
52 let line_num = (idx + 1) as i64;
53
54 if let Some(re) = match_regex {
55 if !re.is_match(&line) {
56 continue;
57 }
58 }
59
60 if out.len() >= MAX_LINES_PER_FILE {
61 truncated = true;
62 break;
63 }
64 out.push((line_num, line));
65 }
66
67 if truncated {
68 eprintln!(
69 "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
70 MAX_LINES_PER_FILE, path
71 );
72 }
73
74 Ok(out)
75}
76
77pub struct ReadText;
82
83impl TableGenerator for ReadText {
84 fn name(&self) -> &str {
85 "READ_TEXT"
86 }
87
88 fn columns(&self) -> Vec<DataColumn> {
89 vec![DataColumn::new("line_num"), DataColumn::new("line")]
90 }
91
92 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
93 if args.is_empty() || args.len() > 2 {
94 return Err(anyhow!(
95 "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
96 ));
97 }
98
99 let path = require_string(&args, 0, "READ_TEXT")?;
100 let match_regex = optional_string(&args, 1)
101 .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
102 .transpose()?;
103
104 let lines = read_filtered_lines(&path, match_regex.as_ref())?;
105
106 let mut table = DataTable::new("read_text");
107 table.add_column(DataColumn::new("line_num"));
108 table.add_column(DataColumn::new("line"));
109
110 for (line_num, line) in lines {
111 table
112 .add_row(DataRow::new(vec![
113 DataValue::Integer(line_num),
114 DataValue::String(line),
115 ]))
116 .map_err(|e| anyhow!(e))?;
117 }
118
119 Ok(Arc::new(table))
120 }
121
122 fn description(&self) -> &str {
123 "Read a text file line-by-line. Optional second arg is a regex that filters lines at read time."
124 }
125
126 fn arg_count(&self) -> usize {
127 2
128 }
129}
130
131pub struct Grep;
136
137impl TableGenerator for Grep {
138 fn name(&self) -> &str {
139 "GREP"
140 }
141
142 fn columns(&self) -> Vec<DataColumn> {
143 vec![DataColumn::new("line_num"), DataColumn::new("line")]
144 }
145
146 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
147 if args.len() < 2 || args.len() > 3 {
148 return Err(anyhow!(
149 "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
150 ));
151 }
152
153 let path = require_string(&args, 0, "GREP")?;
154 let pattern_str = require_string(&args, 1, "GREP")?;
155 let pattern =
156 Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
157
158 let invert = match args.get(2) {
159 Some(DataValue::Boolean(b)) => *b,
160 Some(DataValue::Integer(n)) => *n != 0,
161 Some(DataValue::Null) | None => false,
162 Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
163 };
164
165 let lines = if invert {
168 let all = read_filtered_lines(&path, None)?;
169 all.into_iter()
170 .filter(|(_, line)| !pattern.is_match(line))
171 .collect::<Vec<_>>()
172 } else {
173 read_filtered_lines(&path, Some(&pattern))?
174 };
175
176 let mut table = DataTable::new("grep");
177 table.add_column(DataColumn::new("line_num"));
178 table.add_column(DataColumn::new("line"));
179
180 for (line_num, line) in lines {
181 table
182 .add_row(DataRow::new(vec![
183 DataValue::Integer(line_num),
184 DataValue::String(line),
185 ]))
186 .map_err(|e| anyhow!(e))?;
187 }
188
189 Ok(Arc::new(table))
190 }
191
192 fn description(&self) -> &str {
193 "Read only lines matching a regex (third arg inverts the match, like grep -v)"
194 }
195
196 fn arg_count(&self) -> usize {
197 3
198 }
199}
200
201pub struct ReadWords;
212
213impl TableGenerator for ReadWords {
214 fn name(&self) -> &str {
215 "READ_WORDS"
216 }
217
218 fn columns(&self) -> Vec<DataColumn> {
219 vec![
220 DataColumn::new("word_num"),
221 DataColumn::new("word"),
222 DataColumn::new("line_num"),
223 DataColumn::new("word_pos"),
224 ]
225 }
226
227 fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
228 if args.is_empty() || args.len() > 3 {
229 return Err(anyhow!(
230 "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
231 ));
232 }
233
234 let path = require_string(&args, 0, "READ_WORDS")?;
235
236 let min_length: usize = match args.get(1) {
237 Some(DataValue::Integer(n)) => {
238 if *n < 1 {
239 return Err(anyhow!("READ_WORDS min_length must be >= 1"));
240 }
241 *n as usize
242 }
243 Some(DataValue::Float(f)) => *f as usize,
244 Some(DataValue::Null) | None => 1,
245 Some(v) => {
246 return Err(anyhow!(
247 "READ_WORDS min_length must be an integer, got {:?}",
248 v
249 ))
250 }
251 };
252
253 let case_option = optional_string(&args, 2);
254
255 let lines = read_filtered_lines(&path, None)?;
256
257 let mut table = DataTable::new("read_words");
258 table.add_column(DataColumn::new("word_num"));
259 table.add_column(DataColumn::new("word"));
260 table.add_column(DataColumn::new("line_num"));
261 table.add_column(DataColumn::new("word_pos"));
262
263 let mut word_num: i64 = 0;
264
265 for (line_num, line) in &lines {
266 let mut word_pos: i64 = 0;
267
268 for token in line.split(|c: char| !c.is_alphanumeric()) {
269 if token.is_empty() || token.len() < min_length {
270 continue;
271 }
272
273 word_pos += 1;
274 word_num += 1;
275
276 let word = match case_option.as_deref() {
277 Some("lower") | Some("lowercase") => token.to_lowercase(),
278 Some("upper") | Some("uppercase") => token.to_uppercase(),
279 _ => token.to_string(),
280 };
281
282 table
283 .add_row(DataRow::new(vec![
284 DataValue::Integer(word_num),
285 DataValue::String(word),
286 DataValue::Integer(*line_num),
287 DataValue::Integer(word_pos),
288 ]))
289 .map_err(|e| anyhow!(e))?;
290 }
291 }
292
293 Ok(Arc::new(table))
294 }
295
296 fn description(&self) -> &str {
297 "Read a text file and emit one row per word, with optional min length and case normalisation"
298 }
299
300 fn arg_count(&self) -> usize {
301 3
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use std::io::Write;
309 use tempfile::NamedTempFile;
310
311 fn write_tmp(contents: &str) -> NamedTempFile {
312 let mut f = NamedTempFile::new().unwrap();
313 f.write_all(contents.as_bytes()).unwrap();
314 f
315 }
316
317 #[test]
318 fn test_read_text_returns_all_lines() {
319 let f = write_tmp("one\ntwo\nthree\n");
320 let table = ReadText
321 .generate(vec![DataValue::String(
322 f.path().to_string_lossy().to_string(),
323 )])
324 .unwrap();
325 assert_eq!(table.row_count(), 3);
326 assert_eq!(
327 table.get_value(0, 1).unwrap(),
328 &DataValue::String("one".to_string())
329 );
330 assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
331 }
332
333 #[test]
334 fn test_read_text_with_match_regex_filters_lines() {
335 let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
336 let table = ReadText
337 .generate(vec![
338 DataValue::String(f.path().to_string_lossy().to_string()),
339 DataValue::String("ERROR".to_string()),
340 ])
341 .unwrap();
342 assert_eq!(table.row_count(), 2);
343 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
345 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
346 }
347
348 #[test]
349 fn test_read_text_requires_path() {
350 assert!(ReadText.generate(vec![]).is_err());
351 }
352
353 #[test]
354 fn test_read_text_invalid_regex_errors_early() {
355 let f = write_tmp("hello\n");
356 let err = ReadText
357 .generate(vec![
358 DataValue::String(f.path().to_string_lossy().to_string()),
359 DataValue::String("(unclosed".to_string()),
360 ])
361 .unwrap_err();
362 assert!(err.to_string().contains("match_regex"));
363 }
364
365 #[test]
366 fn test_grep_matches_like_grep() {
367 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
368 let table = Grep
369 .generate(vec![
370 DataValue::String(f.path().to_string_lossy().to_string()),
371 DataValue::String("^ap".to_string()),
372 ])
373 .unwrap();
374 assert_eq!(table.row_count(), 2);
375 assert_eq!(
376 table.get_value(0, 1).unwrap(),
377 &DataValue::String("apple".to_string())
378 );
379 assert_eq!(
380 table.get_value(1, 1).unwrap(),
381 &DataValue::String("apricot".to_string())
382 );
383 }
384
385 #[test]
386 fn test_grep_invert_like_grep_v() {
387 let f = write_tmp("apple\nbanana\ncherry\napricot\n");
388 let table = Grep
389 .generate(vec![
390 DataValue::String(f.path().to_string_lossy().to_string()),
391 DataValue::String("^ap".to_string()),
392 DataValue::Boolean(true),
393 ])
394 .unwrap();
395 assert_eq!(table.row_count(), 2);
396 assert_eq!(
397 table.get_value(0, 1).unwrap(),
398 &DataValue::String("banana".to_string())
399 );
400 }
401
402 #[test]
405 fn test_read_words_basic() {
406 let f = write_tmp("hello world\ngoodbye moon\n");
407 let table = ReadWords
408 .generate(vec![DataValue::String(
409 f.path().to_string_lossy().to_string(),
410 )])
411 .unwrap();
412 assert_eq!(table.row_count(), 4);
414 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); assert_eq!(
418 table.get_value(0, 1).unwrap(),
419 &DataValue::String("hello".to_string())
420 );
421 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
425 assert_eq!(
426 table.get_value(2, 1).unwrap(),
427 &DataValue::String("goodbye".to_string())
428 );
429 assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
430 assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
431 }
432
433 #[test]
434 fn test_read_words_min_length() {
435 let f = write_tmp("I am a big dog\n");
436 let table = ReadWords
437 .generate(vec![
438 DataValue::String(f.path().to_string_lossy().to_string()),
439 DataValue::Integer(3),
440 ])
441 .unwrap();
442 assert_eq!(table.row_count(), 2);
444 assert_eq!(
445 table.get_value(0, 1).unwrap(),
446 &DataValue::String("big".to_string())
447 );
448 assert_eq!(
449 table.get_value(1, 1).unwrap(),
450 &DataValue::String("dog".to_string())
451 );
452 }
453
454 #[test]
455 fn test_read_words_case_lower() {
456 let f = write_tmp("Hello World\n");
457 let table = ReadWords
458 .generate(vec![
459 DataValue::String(f.path().to_string_lossy().to_string()),
460 DataValue::Integer(1),
461 DataValue::String("lower".to_string()),
462 ])
463 .unwrap();
464 assert_eq!(
465 table.get_value(0, 1).unwrap(),
466 &DataValue::String("hello".to_string())
467 );
468 assert_eq!(
469 table.get_value(1, 1).unwrap(),
470 &DataValue::String("world".to_string())
471 );
472 }
473
474 #[test]
475 fn test_read_words_strips_punctuation() {
476 let f = write_tmp("hello, world! foo-bar.\n");
477 let table = ReadWords
478 .generate(vec![DataValue::String(
479 f.path().to_string_lossy().to_string(),
480 )])
481 .unwrap();
482 let words: Vec<String> = (0..table.row_count())
483 .map(|i| match table.get_value(i, 1).unwrap() {
484 DataValue::String(s) => s.clone(),
485 _ => panic!("expected string"),
486 })
487 .collect();
488 assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
489 }
490
491 #[test]
492 fn test_read_words_requires_path() {
493 assert!(ReadWords.generate(vec![]).is_err());
494 }
495
496 #[test]
497 fn test_read_words_empty_lines_skipped() {
498 let f = write_tmp("hello\n\n\nworld\n");
499 let table = ReadWords
500 .generate(vec![DataValue::String(
501 f.path().to_string_lossy().to_string(),
502 )])
503 .unwrap();
504 assert_eq!(table.row_count(), 2);
505 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
507 assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
508 assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
510 assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
511 }
512}