Skip to main content

sql_cli/sql/generators/
file_readers.rs

1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
2use crate::sql::generators::TableGenerator;
3use anyhow::{anyhow, Result};
4use regex::Regex;
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::sync::Arc;
8
9/// Hard cap on rows any file reader will return. Users who need more can raise
10/// it via a session setting (future work); for now this protects against
11/// accidentally pulling a multi-GB log into memory.
12const MAX_LINES_PER_FILE: usize = 1_000_000;
13
14/// Extract a string argument, erroring if the arg is missing/NULL/non-string.
15fn require_string(args: &[DataValue], idx: usize, name: &str) -> Result<String> {
16    match args.get(idx) {
17        Some(DataValue::String(s)) => Ok(s.clone()),
18        Some(DataValue::InternedString(s)) => Ok(s.as_str().to_string()),
19        Some(DataValue::Null) | None => Err(anyhow!("{} requires argument {}", name, idx + 1)),
20        Some(v) => Err(anyhow!(
21            "{} argument {} must be a string, got {:?}",
22            name,
23            idx + 1,
24            v
25        )),
26    }
27}
28
29/// Extract an optional string argument. Returns None for missing or NULL.
30fn optional_string(args: &[DataValue], idx: usize) -> Option<String> {
31    match args.get(idx) {
32        Some(DataValue::String(s)) => Some(s.clone()),
33        Some(DataValue::InternedString(s)) => Some(s.as_str().to_string()),
34        _ => None,
35    }
36}
37
38/// Open a file and stream its lines, applying an optional include-regex filter
39/// and the global truncation cap. Emits a stderr warning when truncation kicks in.
40///
41/// Returns (line_num, line) pairs where `line_num` is the original 1-based line
42/// number in the source file — so numbers are preserved through filtering.
43fn read_filtered_lines(path: &str, match_regex: Option<&Regex>) -> Result<Vec<(i64, String)>> {
44    let file = File::open(path).map_err(|e| anyhow!("Failed to open '{}': {}", path, e))?;
45    let reader = BufReader::new(file);
46
47    let mut out = Vec::new();
48    let mut truncated = false;
49
50    for (idx, line_result) in reader.lines().enumerate() {
51        let line = line_result.map_err(|e| anyhow!("Error reading '{}': {}", path, e))?;
52        let line_num = (idx + 1) as i64;
53
54        if let Some(re) = match_regex {
55            if !re.is_match(&line) {
56                continue;
57            }
58        }
59
60        if out.len() >= MAX_LINES_PER_FILE {
61            truncated = true;
62            break;
63        }
64        out.push((line_num, line));
65    }
66
67    if truncated {
68        eprintln!(
69            "WARNING: truncated to {} rows (max_lines_per_file cap) when reading '{}'",
70            MAX_LINES_PER_FILE, path
71        );
72    }
73
74    Ok(out)
75}
76
77/// READ_TEXT(path [, match_regex]) - Read a text file line by line.
78///
79/// Emits `(line_num, line)` rows. Optional `match_regex` filters source lines
80/// *before* materializing them, which is the primary fast path for large logs.
81pub struct ReadText;
82
83impl TableGenerator for ReadText {
84    fn name(&self) -> &str {
85        "READ_TEXT"
86    }
87
88    fn columns(&self) -> Vec<DataColumn> {
89        vec![DataColumn::new("line_num"), DataColumn::new("line")]
90    }
91
92    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
93        if args.is_empty() || args.len() > 2 {
94            return Err(anyhow!(
95                "READ_TEXT expects 1 or 2 arguments: (path [, match_regex])"
96            ));
97        }
98
99        let path = require_string(&args, 0, "READ_TEXT")?;
100        let match_regex = optional_string(&args, 1)
101            .map(|s| Regex::new(&s).map_err(|e| anyhow!("Invalid match_regex: {}", e)))
102            .transpose()?;
103
104        let lines = read_filtered_lines(&path, match_regex.as_ref())?;
105
106        let mut table = DataTable::new("read_text");
107        table.add_column(DataColumn::new("line_num"));
108        table.add_column(DataColumn::new("line"));
109
110        for (line_num, line) in lines {
111            table
112                .add_row(DataRow::new(vec![
113                    DataValue::Integer(line_num),
114                    DataValue::String(line),
115                ]))
116                .map_err(|e| anyhow!(e))?;
117        }
118
119        Ok(Arc::new(table))
120    }
121
122    fn description(&self) -> &str {
123        "Read a text file line-by-line. Optional second arg is a regex that filters lines at read time."
124    }
125
126    fn arg_count(&self) -> usize {
127        2
128    }
129}
130
131/// GREP(path, pattern [, invert]) - Read only lines matching a regex.
132///
133/// Thin composable wrapper around READ_TEXT's filter path. Third argument
134/// (boolean or integer truthy value) inverts the match, matching `grep -v`.
135pub struct Grep;
136
137impl TableGenerator for Grep {
138    fn name(&self) -> &str {
139        "GREP"
140    }
141
142    fn columns(&self) -> Vec<DataColumn> {
143        vec![DataColumn::new("line_num"), DataColumn::new("line")]
144    }
145
146    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
147        if args.len() < 2 || args.len() > 3 {
148            return Err(anyhow!(
149                "GREP expects 2 or 3 arguments: (path, pattern [, invert])"
150            ));
151        }
152
153        let path = require_string(&args, 0, "GREP")?;
154        let pattern_str = require_string(&args, 1, "GREP")?;
155        let pattern =
156            Regex::new(&pattern_str).map_err(|e| anyhow!("Invalid GREP pattern: {}", e))?;
157
158        let invert = match args.get(2) {
159            Some(DataValue::Boolean(b)) => *b,
160            Some(DataValue::Integer(n)) => *n != 0,
161            Some(DataValue::Null) | None => false,
162            Some(v) => return Err(anyhow!("GREP invert flag must be boolean, got {:?}", v)),
163        };
164
165        // When not inverted we can push the filter down into the file reader for
166        // the fast path. When inverted we still iterate every line.
167        let lines = if invert {
168            let all = read_filtered_lines(&path, None)?;
169            all.into_iter()
170                .filter(|(_, line)| !pattern.is_match(line))
171                .collect::<Vec<_>>()
172        } else {
173            read_filtered_lines(&path, Some(&pattern))?
174        };
175
176        let mut table = DataTable::new("grep");
177        table.add_column(DataColumn::new("line_num"));
178        table.add_column(DataColumn::new("line"));
179
180        for (line_num, line) in lines {
181            table
182                .add_row(DataRow::new(vec![
183                    DataValue::Integer(line_num),
184                    DataValue::String(line),
185                ]))
186                .map_err(|e| anyhow!(e))?;
187        }
188
189        Ok(Arc::new(table))
190    }
191
192    fn description(&self) -> &str {
193        "Read only lines matching a regex (third arg inverts the match, like grep -v)"
194    }
195
196    fn arg_count(&self) -> usize {
197        3
198    }
199}
200
201/// READ_WORDS(path [, min_length [, case]]) - Read a text file and emit one row per word.
202///
203/// Emits `(word_num, word, line_num, word_pos)` rows where:
204///   - `word_num` is a global 1-based word counter across the whole file
205///   - `word` is the extracted token (punctuation stripped)
206///   - `line_num` is the original 1-based line number the word came from
207///   - `word_pos` is the 1-based position of the word within its line
208///
209/// Optional `min_length` (default 1) filters out short words.
210/// Optional `case` ('lower' or 'upper') normalises word casing.
211pub struct ReadWords;
212
213impl TableGenerator for ReadWords {
214    fn name(&self) -> &str {
215        "READ_WORDS"
216    }
217
218    fn columns(&self) -> Vec<DataColumn> {
219        vec![
220            DataColumn::new("word_num"),
221            DataColumn::new("word"),
222            DataColumn::new("line_num"),
223            DataColumn::new("word_pos"),
224        ]
225    }
226
227    fn generate(&self, args: Vec<DataValue>) -> Result<Arc<DataTable>> {
228        if args.is_empty() || args.len() > 3 {
229            return Err(anyhow!(
230                "READ_WORDS expects 1 to 3 arguments: (path [, min_length [, case]])"
231            ));
232        }
233
234        let path = require_string(&args, 0, "READ_WORDS")?;
235
236        let min_length: usize = match args.get(1) {
237            Some(DataValue::Integer(n)) => {
238                if *n < 1 {
239                    return Err(anyhow!("READ_WORDS min_length must be >= 1"));
240                }
241                *n as usize
242            }
243            Some(DataValue::Float(f)) => *f as usize,
244            Some(DataValue::Null) | None => 1,
245            Some(v) => {
246                return Err(anyhow!(
247                    "READ_WORDS min_length must be an integer, got {:?}",
248                    v
249                ))
250            }
251        };
252
253        let case_option = optional_string(&args, 2);
254
255        let lines = read_filtered_lines(&path, None)?;
256
257        let mut table = DataTable::new("read_words");
258        table.add_column(DataColumn::new("word_num"));
259        table.add_column(DataColumn::new("word"));
260        table.add_column(DataColumn::new("line_num"));
261        table.add_column(DataColumn::new("word_pos"));
262
263        let mut word_num: i64 = 0;
264
265        for (line_num, line) in &lines {
266            let mut word_pos: i64 = 0;
267
268            for token in line.split(|c: char| !c.is_alphanumeric()) {
269                if token.is_empty() || token.len() < min_length {
270                    continue;
271                }
272
273                word_pos += 1;
274                word_num += 1;
275
276                let word = match case_option.as_deref() {
277                    Some("lower") | Some("lowercase") => token.to_lowercase(),
278                    Some("upper") | Some("uppercase") => token.to_uppercase(),
279                    _ => token.to_string(),
280                };
281
282                table
283                    .add_row(DataRow::new(vec![
284                        DataValue::Integer(word_num),
285                        DataValue::String(word),
286                        DataValue::Integer(*line_num),
287                        DataValue::Integer(word_pos),
288                    ]))
289                    .map_err(|e| anyhow!(e))?;
290            }
291        }
292
293        Ok(Arc::new(table))
294    }
295
296    fn description(&self) -> &str {
297        "Read a text file and emit one row per word, with optional min length and case normalisation"
298    }
299
300    fn arg_count(&self) -> usize {
301        3
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use std::io::Write;
309    use tempfile::NamedTempFile;
310
311    fn write_tmp(contents: &str) -> NamedTempFile {
312        let mut f = NamedTempFile::new().unwrap();
313        f.write_all(contents.as_bytes()).unwrap();
314        f
315    }
316
317    #[test]
318    fn test_read_text_returns_all_lines() {
319        let f = write_tmp("one\ntwo\nthree\n");
320        let table = ReadText
321            .generate(vec![DataValue::String(
322                f.path().to_string_lossy().to_string(),
323            )])
324            .unwrap();
325        assert_eq!(table.row_count(), 3);
326        assert_eq!(
327            table.get_value(0, 1).unwrap(),
328            &DataValue::String("one".to_string())
329        );
330        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
331    }
332
333    #[test]
334    fn test_read_text_with_match_regex_filters_lines() {
335        let f = write_tmp("INFO boot\nERROR disk full\nINFO shutdown\nERROR oom\n");
336        let table = ReadText
337            .generate(vec![
338                DataValue::String(f.path().to_string_lossy().to_string()),
339                DataValue::String("ERROR".to_string()),
340            ])
341            .unwrap();
342        assert_eq!(table.row_count(), 2);
343        // Line numbers preserve original file positions (2 and 4), not 1 and 2.
344        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(2));
345        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(4));
346    }
347
348    #[test]
349    fn test_read_text_requires_path() {
350        assert!(ReadText.generate(vec![]).is_err());
351    }
352
353    #[test]
354    fn test_read_text_invalid_regex_errors_early() {
355        let f = write_tmp("hello\n");
356        let err = ReadText
357            .generate(vec![
358                DataValue::String(f.path().to_string_lossy().to_string()),
359                DataValue::String("(unclosed".to_string()),
360            ])
361            .unwrap_err();
362        assert!(err.to_string().contains("match_regex"));
363    }
364
365    #[test]
366    fn test_grep_matches_like_grep() {
367        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
368        let table = Grep
369            .generate(vec![
370                DataValue::String(f.path().to_string_lossy().to_string()),
371                DataValue::String("^ap".to_string()),
372            ])
373            .unwrap();
374        assert_eq!(table.row_count(), 2);
375        assert_eq!(
376            table.get_value(0, 1).unwrap(),
377            &DataValue::String("apple".to_string())
378        );
379        assert_eq!(
380            table.get_value(1, 1).unwrap(),
381            &DataValue::String("apricot".to_string())
382        );
383    }
384
385    #[test]
386    fn test_grep_invert_like_grep_v() {
387        let f = write_tmp("apple\nbanana\ncherry\napricot\n");
388        let table = Grep
389            .generate(vec![
390                DataValue::String(f.path().to_string_lossy().to_string()),
391                DataValue::String("^ap".to_string()),
392                DataValue::Boolean(true),
393            ])
394            .unwrap();
395        assert_eq!(table.row_count(), 2);
396        assert_eq!(
397            table.get_value(0, 1).unwrap(),
398            &DataValue::String("banana".to_string())
399        );
400    }
401
402    // ---- ReadWords tests ----
403
404    #[test]
405    fn test_read_words_basic() {
406        let f = write_tmp("hello world\ngoodbye moon\n");
407        let table = ReadWords
408            .generate(vec![DataValue::String(
409                f.path().to_string_lossy().to_string(),
410            )])
411            .unwrap();
412        // 4 words total
413        assert_eq!(table.row_count(), 4);
414        // Columns: word_num, word, line_num, word_pos
415        // First word
416        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1)); // word_num
417        assert_eq!(
418            table.get_value(0, 1).unwrap(),
419            &DataValue::String("hello".to_string())
420        );
421        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1)); // line_num
422        assert_eq!(table.get_value(0, 3).unwrap(), &DataValue::Integer(1)); // word_pos
423        // Third word (first on line 2)
424        assert_eq!(table.get_value(2, 0).unwrap(), &DataValue::Integer(3));
425        assert_eq!(
426            table.get_value(2, 1).unwrap(),
427            &DataValue::String("goodbye".to_string())
428        );
429        assert_eq!(table.get_value(2, 2).unwrap(), &DataValue::Integer(2));
430        assert_eq!(table.get_value(2, 3).unwrap(), &DataValue::Integer(1));
431    }
432
433    #[test]
434    fn test_read_words_min_length() {
435        let f = write_tmp("I am a big dog\n");
436        let table = ReadWords
437            .generate(vec![
438                DataValue::String(f.path().to_string_lossy().to_string()),
439                DataValue::Integer(3),
440            ])
441            .unwrap();
442        // Only "big" and "dog" have length >= 3
443        assert_eq!(table.row_count(), 2);
444        assert_eq!(
445            table.get_value(0, 1).unwrap(),
446            &DataValue::String("big".to_string())
447        );
448        assert_eq!(
449            table.get_value(1, 1).unwrap(),
450            &DataValue::String("dog".to_string())
451        );
452    }
453
454    #[test]
455    fn test_read_words_case_lower() {
456        let f = write_tmp("Hello World\n");
457        let table = ReadWords
458            .generate(vec![
459                DataValue::String(f.path().to_string_lossy().to_string()),
460                DataValue::Integer(1),
461                DataValue::String("lower".to_string()),
462            ])
463            .unwrap();
464        assert_eq!(
465            table.get_value(0, 1).unwrap(),
466            &DataValue::String("hello".to_string())
467        );
468        assert_eq!(
469            table.get_value(1, 1).unwrap(),
470            &DataValue::String("world".to_string())
471        );
472    }
473
474    #[test]
475    fn test_read_words_strips_punctuation() {
476        let f = write_tmp("hello, world! foo-bar.\n");
477        let table = ReadWords
478            .generate(vec![DataValue::String(
479                f.path().to_string_lossy().to_string(),
480            )])
481            .unwrap();
482        let words: Vec<String> = (0..table.row_count())
483            .map(|i| match table.get_value(i, 1).unwrap() {
484                DataValue::String(s) => s.clone(),
485                _ => panic!("expected string"),
486            })
487            .collect();
488        assert_eq!(words, vec!["hello", "world", "foo", "bar"]);
489    }
490
491    #[test]
492    fn test_read_words_requires_path() {
493        assert!(ReadWords.generate(vec![]).is_err());
494    }
495
496    #[test]
497    fn test_read_words_empty_lines_skipped() {
498        let f = write_tmp("hello\n\n\nworld\n");
499        let table = ReadWords
500            .generate(vec![DataValue::String(
501                f.path().to_string_lossy().to_string(),
502            )])
503            .unwrap();
504        assert_eq!(table.row_count(), 2);
505        // word_num is contiguous
506        assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
507        assert_eq!(table.get_value(1, 0).unwrap(), &DataValue::Integer(2));
508        // line_num preserves original positions
509        assert_eq!(table.get_value(0, 2).unwrap(), &DataValue::Integer(1));
510        assert_eq!(table.get_value(1, 2).unwrap(), &DataValue::Integer(4));
511    }
512}