Skip to main content

tellaro_query_language/
file_ops.rs

1//! File operations for TQL.
2//!
3//! This module provides file-based query operations including:
4//! - Reading and querying JSON, JSONL, and CSV files
5//! - Folder batch processing with pattern matching
6//! - Format auto-detection
7//! - Streaming support for large files
8
9use crate::error::{Result, TqlError};
10use crate::evaluator::TqlEvaluator;
11use crate::parser::{AstNode, TqlParser};
12use crate::stats_evaluator::{AggregationSpec, StatsEvaluator, StatsQuery};
13use glob::glob;
14use rayon::prelude::*;
15use serde_json::Value as JsonValue;
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::path::Path;
19
20/// Supported file formats
21#[derive(Debug, Clone, Copy, PartialEq)]
22pub enum FileFormat {
23    /// Single JSON object or array
24    Json,
25    /// JSON Lines (newline-delimited JSON)
26    JsonL,
27    /// Comma-separated values
28    Csv,
29    /// Auto-detect format from file extension
30    Auto,
31}
32
33impl std::str::FromStr for FileFormat {
34    type Err = std::convert::Infallible;
35
36    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
37        Ok(match s.to_lowercase().as_str() {
38            "json" => FileFormat::Json,
39            "jsonl" | "ndjson" => FileFormat::JsonL,
40            "csv" => FileFormat::Csv,
41            _ => FileFormat::Auto,
42        })
43    }
44}
45
46impl FileFormat {
47    /// Detect format from file extension
48    pub fn from_path(path: &Path) -> Self {
49        match path.extension().and_then(|e| e.to_str()) {
50            Some("json") => FileFormat::Json,
51            Some("jsonl") | Some("ndjson") => FileFormat::JsonL,
52            Some("csv") => FileFormat::Csv,
53            _ => FileFormat::Json, // Default to JSON
54        }
55    }
56}
57
58/// CSV parsing configuration
59#[derive(Debug, Clone)]
60pub struct CsvConfig {
61    /// Field delimiter character
62    pub delimiter: u8,
63    /// Whether the first row contains headers
64    pub has_headers: bool,
65    /// Custom header names (overrides first row if has_headers is true)
66    pub custom_headers: Option<Vec<String>>,
67}
68
69impl Default for CsvConfig {
70    fn default() -> Self {
71        Self {
72            delimiter: b',',
73            has_headers: true,
74            custom_headers: None,
75        }
76    }
77}
78
79impl CsvConfig {
80    /// Create config with custom delimiter
81    pub fn with_delimiter(mut self, delimiter: char) -> Self {
82        self.delimiter = delimiter as u8;
83        self
84    }
85
86    /// Create config without headers
87    pub fn without_headers(mut self) -> Self {
88        self.has_headers = false;
89        self
90    }
91
92    /// Create config with custom headers
93    pub fn with_headers(mut self, headers: Vec<String>) -> Self {
94        self.custom_headers = Some(headers);
95        self
96    }
97}
98
99/// File query operations
100pub struct FileOps {
101    parser: TqlParser,
102    evaluator: TqlEvaluator,
103    stats_evaluator: StatsEvaluator,
104}
105
106impl Default for FileOps {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl FileOps {
113    /// Create new FileOps instance
114    pub fn new() -> Self {
115        Self {
116            parser: TqlParser::new(),
117            evaluator: TqlEvaluator::new(),
118            stats_evaluator: StatsEvaluator::new(),
119        }
120    }
121
122    /// Read records from a file
123    pub fn read_file(
124        &self,
125        path: &Path,
126        format: FileFormat,
127        csv_config: &CsvConfig,
128    ) -> Result<Vec<JsonValue>> {
129        let format = if format == FileFormat::Auto {
130            FileFormat::from_path(path)
131        } else {
132            format
133        };
134
135        match format {
136            FileFormat::Json => self.read_json(path),
137            FileFormat::JsonL => self.read_jsonl(path),
138            FileFormat::Csv => self.read_csv(path, csv_config),
139            FileFormat::Auto => self.read_json(path),
140        }
141    }
142
143    /// Read JSON file (single object or array)
144    fn read_json(&self, path: &Path) -> Result<Vec<JsonValue>> {
145        let file = File::open(path).map_err(|e| {
146            TqlError::ExecutionError(format!("Failed to open file {}: {}", path.display(), e))
147        })?;
148
149        let reader = BufReader::new(file);
150        let value: JsonValue = serde_json::from_reader(reader).map_err(|e| {
151            TqlError::ExecutionError(format!(
152                "Failed to parse JSON from {}: {}",
153                path.display(),
154                e
155            ))
156        })?;
157
158        match value {
159            JsonValue::Array(arr) => Ok(arr),
160            obj @ JsonValue::Object(_) => Ok(vec![obj]),
161            _ => Err(TqlError::ExecutionError(format!(
162                "JSON file {} must contain an object or array",
163                path.display()
164            ))),
165        }
166    }
167
168    /// Read JSONL file (newline-delimited JSON)
169    fn read_jsonl(&self, path: &Path) -> Result<Vec<JsonValue>> {
170        let file = File::open(path).map_err(|e| {
171            TqlError::ExecutionError(format!("Failed to open file {}: {}", path.display(), e))
172        })?;
173
174        let reader = BufReader::new(file);
175        let mut records = Vec::new();
176
177        for (line_num, line) in reader.lines().enumerate() {
178            let line = line.map_err(|e| {
179                TqlError::ExecutionError(format!(
180                    "Failed to read line {} from {}: {}",
181                    line_num + 1,
182                    path.display(),
183                    e
184                ))
185            })?;
186
187            let trimmed = line.trim();
188            if trimmed.is_empty() {
189                continue;
190            }
191
192            let value: JsonValue = serde_json::from_str(trimmed).map_err(|e| {
193                TqlError::ExecutionError(format!(
194                    "Failed to parse JSON at line {} in {}: {}",
195                    line_num + 1,
196                    path.display(),
197                    e
198                ))
199            })?;
200
201            records.push(value);
202        }
203
204        Ok(records)
205    }
206
207    /// Read CSV file
208    fn read_csv(&self, path: &Path, config: &CsvConfig) -> Result<Vec<JsonValue>> {
209        let file = File::open(path).map_err(|e| {
210            TqlError::ExecutionError(format!("Failed to open file {}: {}", path.display(), e))
211        })?;
212
213        let mut rdr = csv::ReaderBuilder::new()
214            .delimiter(config.delimiter)
215            .has_headers(config.has_headers)
216            .from_reader(file);
217
218        let headers: Vec<String> = if let Some(ref custom) = config.custom_headers {
219            custom.clone()
220        } else if config.has_headers {
221            rdr.headers()
222                .map_err(|e| {
223                    TqlError::ExecutionError(format!(
224                        "Failed to read CSV headers from {}: {}",
225                        path.display(),
226                        e
227                    ))
228                })?
229                .iter()
230                .map(|s| s.to_string())
231                .collect()
232        } else {
233            Vec::new()
234        };
235
236        let mut records = Vec::new();
237
238        for (row_num, result) in rdr.records().enumerate() {
239            let record = result.map_err(|e| {
240                TqlError::ExecutionError(format!(
241                    "Failed to read CSV row {} from {}: {}",
242                    row_num + 1,
243                    path.display(),
244                    e
245                ))
246            })?;
247
248            let mut obj = serde_json::Map::new();
249
250            for (i, field) in record.iter().enumerate() {
251                let key = if i < headers.len() {
252                    headers[i].clone()
253                } else {
254                    format!("col{}", i)
255                };
256
257                // Try to parse as number or boolean, fall back to string
258                let value = if let Ok(n) = field.parse::<i64>() {
259                    JsonValue::Number(n.into())
260                } else if let Ok(n) = field.parse::<f64>() {
261                    JsonValue::Number(serde_json::Number::from_f64(n).unwrap_or_else(|| 0.into()))
262                } else if field.eq_ignore_ascii_case("true") {
263                    JsonValue::Bool(true)
264                } else if field.eq_ignore_ascii_case("false") {
265                    JsonValue::Bool(false)
266                } else if field.is_empty() || field.eq_ignore_ascii_case("null") {
267                    JsonValue::Null
268                } else {
269                    JsonValue::String(field.to_string())
270                };
271
272                obj.insert(key, value);
273            }
274
275            records.push(JsonValue::Object(obj));
276        }
277
278        Ok(records)
279    }
280
281    /// Query a file with a TQL query
282    pub fn query_file(
283        &self,
284        path: &Path,
285        query: &str,
286        format: FileFormat,
287        csv_config: &CsvConfig,
288    ) -> Result<Vec<JsonValue>> {
289        let records = self.read_file(path, format, csv_config)?;
290        let ast = self.parser.parse(query)?;
291
292        let results = self.evaluator.filter(&ast, &records)?;
293        Ok(results.into_iter().cloned().collect())
294    }
295
296    /// Query a file with enrichment (applies mutators to results)
297    pub fn query_file_enriched(
298        &self,
299        path: &Path,
300        query: &str,
301        format: FileFormat,
302        csv_config: &CsvConfig,
303    ) -> Result<Vec<JsonValue>> {
304        let records = self.read_file(path, format, csv_config)?;
305        let ast = self.parser.parse(query)?;
306        self.evaluator.filter_and_enrich(&ast, &records)
307    }
308
309    /// Execute stats query on a file
310    pub fn query_file_stats(
311        &self,
312        path: &Path,
313        query: &str,
314        format: FileFormat,
315        csv_config: &CsvConfig,
316    ) -> Result<JsonValue> {
317        let records = self.read_file(path, format, csv_config)?;
318        let ast = self.parser.parse(query)?;
319
320        self.evaluate_stats_query(&records, &ast, query)
321    }
322
323    /// Query multiple files in a folder
324    pub fn query_folder(
325        &self,
326        folder_path: &Path,
327        query: &str,
328        pattern: &str,
329        format: FileFormat,
330        csv_config: &CsvConfig,
331        parallel: bool,
332    ) -> Result<Vec<JsonValue>> {
333        let glob_pattern = folder_path.join(pattern);
334        let glob_pattern = glob_pattern.to_string_lossy();
335
336        let paths: Vec<_> = glob(&glob_pattern)
337            .map_err(|e| {
338                TqlError::ExecutionError(format!("Invalid glob pattern '{}': {}", pattern, e))
339            })?
340            .filter_map(|entry| entry.ok())
341            .filter(|path| path.is_file())
342            .collect();
343
344        if paths.is_empty() {
345            return Ok(Vec::new());
346        }
347
348        let ast = self.parser.parse(query)?;
349
350        if parallel {
351            let results: Result<Vec<Vec<JsonValue>>> = paths
352                .par_iter()
353                .map(|path| {
354                    let records = self.read_file(path, format, csv_config)?;
355                    let filtered = self.evaluator.filter(&ast, &records)?;
356                    Ok(filtered.into_iter().cloned().collect())
357                })
358                .collect();
359
360            Ok(results?.into_iter().flatten().collect())
361        } else {
362            let mut all_results = Vec::new();
363            for path in paths {
364                let records = self.read_file(&path, format, csv_config)?;
365                let filtered = self.evaluator.filter(&ast, &records)?;
366                all_results.extend(filtered.into_iter().cloned());
367            }
368            Ok(all_results)
369        }
370    }
371
372    /// Query folder with stats aggregation
373    pub fn query_folder_stats(
374        &self,
375        folder_path: &Path,
376        query: &str,
377        pattern: &str,
378        format: FileFormat,
379        csv_config: &CsvConfig,
380    ) -> Result<JsonValue> {
381        let glob_pattern = folder_path.join(pattern);
382        let glob_pattern = glob_pattern.to_string_lossy();
383
384        let paths: Vec<_> = glob(&glob_pattern)
385            .map_err(|e| {
386                TqlError::ExecutionError(format!("Invalid glob pattern '{}': {}", pattern, e))
387            })?
388            .filter_map(|entry| entry.ok())
389            .filter(|path| path.is_file())
390            .collect();
391
392        let mut all_records = Vec::new();
393        for path in paths {
394            let records = self.read_file(&path, format, csv_config)?;
395            all_records.extend(records);
396        }
397
398        let ast = self.parser.parse(query)?;
399        self.evaluate_stats_query(&all_records, &ast, query)
400    }
401
402    /// Streaming file reader - returns an iterator over records
403    pub fn stream_file(
404        &self,
405        path: &Path,
406        format: FileFormat,
407    ) -> Result<Box<dyn Iterator<Item = Result<JsonValue>>>> {
408        let format = if format == FileFormat::Auto {
409            FileFormat::from_path(path)
410        } else {
411            format
412        };
413
414        match format {
415            FileFormat::JsonL => {
416                let file = File::open(path).map_err(|e| {
417                    TqlError::ExecutionError(format!(
418                        "Failed to open file {}: {}",
419                        path.display(),
420                        e
421                    ))
422                })?;
423                let reader = BufReader::new(file);
424                let path_str = path.display().to_string();
425
426                Ok(Box::new(reader.lines().enumerate().filter_map(
427                    move |(line_num, line)| match line {
428                        Ok(line) => {
429                            let trimmed = line.trim();
430                            if trimmed.is_empty() {
431                                return None;
432                            }
433                            Some(serde_json::from_str(trimmed).map_err(|e| {
434                                TqlError::ExecutionError(format!(
435                                    "Failed to parse JSON at line {} in {}: {}",
436                                    line_num + 1,
437                                    path_str,
438                                    e
439                                ))
440                            }))
441                        }
442                        Err(e) => Some(Err(TqlError::ExecutionError(format!(
443                            "Failed to read line {} from {}: {}",
444                            line_num + 1,
445                            path_str,
446                            e
447                        )))),
448                    },
449                )))
450            }
451            _ => Err(TqlError::ExecutionError(
452                "Streaming is only supported for JSONL format".to_string(),
453            )),
454        }
455    }
456
457    /// Helper to evaluate stats query
458    fn evaluate_stats_query(
459        &self,
460        records: &[JsonValue],
461        ast: &AstNode,
462        query: &str,
463    ) -> Result<JsonValue> {
464        use crate::parser::QueryWithStatsNode;
465
466        match ast {
467            AstNode::StatsExpr(stats_node) => self.evaluate_stats_node(records, stats_node),
468            AstNode::QueryWithStats(QueryWithStatsNode { filter, stats }) => {
469                let filtered = self.evaluator.filter(filter, records)?;
470                let owned_records: Vec<JsonValue> = filtered.iter().map(|&r| r.clone()).collect();
471                self.evaluate_stats_node(&owned_records, stats)
472            }
473            _ => Err(TqlError::SyntaxError {
474                message: "Query does not contain stats expressions".to_string(),
475                position: None,
476                query: Some(query.to_string()),
477                suggestions: vec!["Use '| stats' to add aggregations".to_string()],
478            }),
479        }
480    }
481
482    /// Helper to evaluate stats node
483    fn evaluate_stats_node(
484        &self,
485        records: &[JsonValue],
486        stats_node: &crate::parser::StatsNode,
487    ) -> Result<JsonValue> {
488        use crate::parser::{Aggregation, GroupBy};
489
490        let aggregations: Vec<AggregationSpec> = stats_node
491            .aggregations
492            .iter()
493            .map(|agg: &Aggregation| AggregationSpec {
494                function: agg.function.clone(),
495                field: agg.field.clone().unwrap_or_else(|| "*".to_string()),
496                alias: agg.alias.clone(),
497                params: std::collections::HashMap::new(),
498            })
499            .collect();
500
501        let group_by: Vec<String> = stats_node
502            .group_by
503            .iter()
504            .map(|gb: &GroupBy| gb.field.clone())
505            .collect();
506
507        let stats_query = StatsQuery {
508            aggregations,
509            group_by,
510        };
511
512        self.stats_evaluator.evaluate_stats(records, &stats_query)
513    }
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519    use std::io::Write;
520    use tempfile::TempDir;
521
522    fn create_test_file(dir: &TempDir, name: &str, content: &str) -> std::path::PathBuf {
523        let path = dir.path().join(name);
524        let mut file = File::create(&path).unwrap();
525        file.write_all(content.as_bytes()).unwrap();
526        path
527    }
528
529    #[test]
530    fn test_read_json_array() {
531        let dir = TempDir::new().unwrap();
532        let path = create_test_file(
533            &dir,
534            "data.json",
535            r#"[{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]"#,
536        );
537
538        let ops = FileOps::new();
539        let records = ops
540            .read_file(&path, FileFormat::Json, &CsvConfig::default())
541            .unwrap();
542
543        assert_eq!(records.len(), 2);
544        assert_eq!(records[0]["name"], "Alice");
545        assert_eq!(records[1]["name"], "Bob");
546    }
547
548    #[test]
549    fn test_read_json_object() {
550        let dir = TempDir::new().unwrap();
551        let path = create_test_file(&dir, "data.json", r#"{"name": "Alice", "age": 30}"#);
552
553        let ops = FileOps::new();
554        let records = ops
555            .read_file(&path, FileFormat::Json, &CsvConfig::default())
556            .unwrap();
557
558        assert_eq!(records.len(), 1);
559        assert_eq!(records[0]["name"], "Alice");
560    }
561
562    #[test]
563    fn test_read_jsonl() {
564        let dir = TempDir::new().unwrap();
565        let path = create_test_file(
566            &dir,
567            "data.jsonl",
568            r#"{"name": "Alice", "age": 30}
569{"name": "Bob", "age": 25}
570{"name": "Charlie", "age": 35}"#,
571        );
572
573        let ops = FileOps::new();
574        let records = ops
575            .read_file(&path, FileFormat::JsonL, &CsvConfig::default())
576            .unwrap();
577
578        assert_eq!(records.len(), 3);
579        assert_eq!(records[0]["name"], "Alice");
580        assert_eq!(records[2]["name"], "Charlie");
581    }
582
583    #[test]
584    fn test_read_csv() {
585        let dir = TempDir::new().unwrap();
586        let path = create_test_file(
587            &dir,
588            "data.csv",
589            "name,age,active\nAlice,30,true\nBob,25,false",
590        );
591
592        let ops = FileOps::new();
593        let records = ops
594            .read_file(&path, FileFormat::Csv, &CsvConfig::default())
595            .unwrap();
596
597        assert_eq!(records.len(), 2);
598        assert_eq!(records[0]["name"], "Alice");
599        assert_eq!(records[0]["age"], 30);
600        assert_eq!(records[0]["active"], true);
601        assert_eq!(records[1]["name"], "Bob");
602        assert_eq!(records[1]["active"], false);
603    }
604
605    #[test]
606    fn test_query_file() {
607        let dir = TempDir::new().unwrap();
608        let path = create_test_file(
609            &dir,
610            "data.json",
611            r#"[
612                {"name": "Alice", "age": 30, "status": "active"},
613                {"name": "Bob", "age": 25, "status": "inactive"},
614                {"name": "Charlie", "age": 35, "status": "active"}
615            ]"#,
616        );
617
618        let ops = FileOps::new();
619        let results = ops
620            .query_file(
621                &path,
622                "status eq 'active'",
623                FileFormat::Auto,
624                &CsvConfig::default(),
625            )
626            .unwrap();
627
628        assert_eq!(results.len(), 2);
629        assert!(results.iter().all(|r| r["status"] == "active"));
630    }
631
632    #[test]
633    fn test_query_file_with_comparison() {
634        let dir = TempDir::new().unwrap();
635        let path = create_test_file(
636            &dir,
637            "data.json",
638            r#"[
639                {"name": "Alice", "age": 30},
640                {"name": "Bob", "age": 25},
641                {"name": "Charlie", "age": 35}
642            ]"#,
643        );
644
645        let ops = FileOps::new();
646        let results = ops
647            .query_file(&path, "age > 28", FileFormat::Auto, &CsvConfig::default())
648            .unwrap();
649
650        assert_eq!(results.len(), 2);
651        assert!(results.iter().all(|r| r["age"].as_i64().unwrap() > 28));
652    }
653
654    #[test]
655    fn test_query_folder() {
656        let dir = TempDir::new().unwrap();
657
658        create_test_file(
659            &dir,
660            "data1.json",
661            r#"[{"name": "Alice", "status": "active"}]"#,
662        );
663        create_test_file(
664            &dir,
665            "data2.json",
666            r#"[{"name": "Bob", "status": "inactive"}, {"name": "Charlie", "status": "active"}]"#,
667        );
668
669        let ops = FileOps::new();
670        let results = ops
671            .query_folder(
672                dir.path(),
673                "status eq 'active'",
674                "*.json",
675                FileFormat::Auto,
676                &CsvConfig::default(),
677                false,
678            )
679            .unwrap();
680
681        assert_eq!(results.len(), 2);
682        assert!(results.iter().all(|r| r["status"] == "active"));
683    }
684
685    #[test]
686    fn test_query_folder_parallel() {
687        let dir = TempDir::new().unwrap();
688
689        for i in 0..5 {
690            create_test_file(
691                &dir,
692                &format!("data{}.json", i),
693                &format!(r#"[{{"id": {}, "status": "active"}}]"#, i),
694            );
695        }
696
697        let ops = FileOps::new();
698        let results = ops
699            .query_folder(
700                dir.path(),
701                "status eq 'active'",
702                "*.json",
703                FileFormat::Auto,
704                &CsvConfig::default(),
705                true,
706            )
707            .unwrap();
708
709        assert_eq!(results.len(), 5);
710    }
711
712    #[test]
713    fn test_format_auto_detection() {
714        assert_eq!(
715            FileFormat::from_path(Path::new("data.json")),
716            FileFormat::Json
717        );
718        assert_eq!(
719            FileFormat::from_path(Path::new("data.jsonl")),
720            FileFormat::JsonL
721        );
722        assert_eq!(
723            FileFormat::from_path(Path::new("data.ndjson")),
724            FileFormat::JsonL
725        );
726        assert_eq!(
727            FileFormat::from_path(Path::new("data.csv")),
728            FileFormat::Csv
729        );
730        assert_eq!(
731            FileFormat::from_path(Path::new("data.txt")),
732            FileFormat::Json
733        );
734    }
735
736    #[test]
737    fn test_csv_custom_delimiter() {
738        let dir = TempDir::new().unwrap();
739        let path = create_test_file(&dir, "data.csv", "name;age\nAlice;30\nBob;25");
740
741        let ops = FileOps::new();
742        let config = CsvConfig::default().with_delimiter(';');
743        let records = ops.read_file(&path, FileFormat::Csv, &config).unwrap();
744
745        assert_eq!(records.len(), 2);
746        assert_eq!(records[0]["name"], "Alice");
747        assert_eq!(records[0]["age"], 30);
748    }
749
750    #[test]
751    fn test_stream_jsonl() {
752        let dir = TempDir::new().unwrap();
753        let path = create_test_file(
754            &dir,
755            "data.jsonl",
756            r#"{"id": 1}
757{"id": 2}
758{"id": 3}"#,
759        );
760
761        let ops = FileOps::new();
762        let mut count = 0;
763        for result in ops.stream_file(&path, FileFormat::JsonL).unwrap() {
764            let record = result.unwrap();
765            count += 1;
766            assert!(record["id"].as_i64().unwrap() >= 1);
767        }
768        assert_eq!(count, 3);
769    }
770}