1use crate::error::{Result, TqlError};
10use crate::evaluator::TqlEvaluator;
11use crate::parser::{AstNode, TqlParser};
12use crate::stats_evaluator::{AggregationSpec, StatsEvaluator, StatsQuery};
13use glob::glob;
14use rayon::prelude::*;
15use serde_json::Value as JsonValue;
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::path::Path;
19
20#[derive(Debug, Clone, Copy, PartialEq)]
22pub enum FileFormat {
23 Json,
25 JsonL,
27 Csv,
29 Auto,
31}
32
33impl std::str::FromStr for FileFormat {
34 type Err = std::convert::Infallible;
35
36 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
37 Ok(match s.to_lowercase().as_str() {
38 "json" => FileFormat::Json,
39 "jsonl" | "ndjson" => FileFormat::JsonL,
40 "csv" => FileFormat::Csv,
41 _ => FileFormat::Auto,
42 })
43 }
44}
45
46impl FileFormat {
47 pub fn from_path(path: &Path) -> Self {
49 match path.extension().and_then(|e| e.to_str()) {
50 Some("json") => FileFormat::Json,
51 Some("jsonl") | Some("ndjson") => FileFormat::JsonL,
52 Some("csv") => FileFormat::Csv,
53 _ => FileFormat::Json, }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub struct CsvConfig {
61 pub delimiter: u8,
63 pub has_headers: bool,
65 pub custom_headers: Option<Vec<String>>,
67}
68
69impl Default for CsvConfig {
70 fn default() -> Self {
71 Self {
72 delimiter: b',',
73 has_headers: true,
74 custom_headers: None,
75 }
76 }
77}
78
79impl CsvConfig {
80 pub fn with_delimiter(mut self, delimiter: char) -> Self {
82 self.delimiter = delimiter as u8;
83 self
84 }
85
86 pub fn without_headers(mut self) -> Self {
88 self.has_headers = false;
89 self
90 }
91
92 pub fn with_headers(mut self, headers: Vec<String>) -> Self {
94 self.custom_headers = Some(headers);
95 self
96 }
97}
98
99pub struct FileOps {
101 parser: TqlParser,
102 evaluator: TqlEvaluator,
103 stats_evaluator: StatsEvaluator,
104}
105
106impl Default for FileOps {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl FileOps {
113 pub fn new() -> Self {
115 Self {
116 parser: TqlParser::new(),
117 evaluator: TqlEvaluator::new(),
118 stats_evaluator: StatsEvaluator::new(),
119 }
120 }
121
122 pub fn read_file(
124 &self,
125 path: &Path,
126 format: FileFormat,
127 csv_config: &CsvConfig,
128 ) -> Result<Vec<JsonValue>> {
129 let format = if format == FileFormat::Auto {
130 FileFormat::from_path(path)
131 } else {
132 format
133 };
134
135 match format {
136 FileFormat::Json => self.read_json(path),
137 FileFormat::JsonL => self.read_jsonl(path),
138 FileFormat::Csv => self.read_csv(path, csv_config),
139 FileFormat::Auto => self.read_json(path),
140 }
141 }
142
143 fn read_json(&self, path: &Path) -> Result<Vec<JsonValue>> {
145 let file = File::open(path).map_err(|e| {
146 TqlError::ExecutionError(format!("Failed to open file {}: {}", path.display(), e))
147 })?;
148
149 let reader = BufReader::new(file);
150 let value: JsonValue = serde_json::from_reader(reader).map_err(|e| {
151 TqlError::ExecutionError(format!(
152 "Failed to parse JSON from {}: {}",
153 path.display(),
154 e
155 ))
156 })?;
157
158 match value {
159 JsonValue::Array(arr) => Ok(arr),
160 obj @ JsonValue::Object(_) => Ok(vec![obj]),
161 _ => Err(TqlError::ExecutionError(format!(
162 "JSON file {} must contain an object or array",
163 path.display()
164 ))),
165 }
166 }
167
168 fn read_jsonl(&self, path: &Path) -> Result<Vec<JsonValue>> {
170 let file = File::open(path).map_err(|e| {
171 TqlError::ExecutionError(format!("Failed to open file {}: {}", path.display(), e))
172 })?;
173
174 let reader = BufReader::new(file);
175 let mut records = Vec::new();
176
177 for (line_num, line) in reader.lines().enumerate() {
178 let line = line.map_err(|e| {
179 TqlError::ExecutionError(format!(
180 "Failed to read line {} from {}: {}",
181 line_num + 1,
182 path.display(),
183 e
184 ))
185 })?;
186
187 let trimmed = line.trim();
188 if trimmed.is_empty() {
189 continue;
190 }
191
192 let value: JsonValue = serde_json::from_str(trimmed).map_err(|e| {
193 TqlError::ExecutionError(format!(
194 "Failed to parse JSON at line {} in {}: {}",
195 line_num + 1,
196 path.display(),
197 e
198 ))
199 })?;
200
201 records.push(value);
202 }
203
204 Ok(records)
205 }
206
207 fn read_csv(&self, path: &Path, config: &CsvConfig) -> Result<Vec<JsonValue>> {
209 let file = File::open(path).map_err(|e| {
210 TqlError::ExecutionError(format!("Failed to open file {}: {}", path.display(), e))
211 })?;
212
213 let mut rdr = csv::ReaderBuilder::new()
214 .delimiter(config.delimiter)
215 .has_headers(config.has_headers)
216 .from_reader(file);
217
218 let headers: Vec<String> = if let Some(ref custom) = config.custom_headers {
219 custom.clone()
220 } else if config.has_headers {
221 rdr.headers()
222 .map_err(|e| {
223 TqlError::ExecutionError(format!(
224 "Failed to read CSV headers from {}: {}",
225 path.display(),
226 e
227 ))
228 })?
229 .iter()
230 .map(|s| s.to_string())
231 .collect()
232 } else {
233 Vec::new()
234 };
235
236 let mut records = Vec::new();
237
238 for (row_num, result) in rdr.records().enumerate() {
239 let record = result.map_err(|e| {
240 TqlError::ExecutionError(format!(
241 "Failed to read CSV row {} from {}: {}",
242 row_num + 1,
243 path.display(),
244 e
245 ))
246 })?;
247
248 let mut obj = serde_json::Map::new();
249
250 for (i, field) in record.iter().enumerate() {
251 let key = if i < headers.len() {
252 headers[i].clone()
253 } else {
254 format!("col{}", i)
255 };
256
257 let value = if let Ok(n) = field.parse::<i64>() {
259 JsonValue::Number(n.into())
260 } else if let Ok(n) = field.parse::<f64>() {
261 JsonValue::Number(serde_json::Number::from_f64(n).unwrap_or_else(|| 0.into()))
262 } else if field.eq_ignore_ascii_case("true") {
263 JsonValue::Bool(true)
264 } else if field.eq_ignore_ascii_case("false") {
265 JsonValue::Bool(false)
266 } else if field.is_empty() || field.eq_ignore_ascii_case("null") {
267 JsonValue::Null
268 } else {
269 JsonValue::String(field.to_string())
270 };
271
272 obj.insert(key, value);
273 }
274
275 records.push(JsonValue::Object(obj));
276 }
277
278 Ok(records)
279 }
280
281 pub fn query_file(
283 &self,
284 path: &Path,
285 query: &str,
286 format: FileFormat,
287 csv_config: &CsvConfig,
288 ) -> Result<Vec<JsonValue>> {
289 let records = self.read_file(path, format, csv_config)?;
290 let ast = self.parser.parse(query)?;
291
292 let results = self.evaluator.filter(&ast, &records)?;
293 Ok(results.into_iter().cloned().collect())
294 }
295
296 pub fn query_file_enriched(
298 &self,
299 path: &Path,
300 query: &str,
301 format: FileFormat,
302 csv_config: &CsvConfig,
303 ) -> Result<Vec<JsonValue>> {
304 let records = self.read_file(path, format, csv_config)?;
305 let ast = self.parser.parse(query)?;
306 self.evaluator.filter_and_enrich(&ast, &records)
307 }
308
309 pub fn query_file_stats(
311 &self,
312 path: &Path,
313 query: &str,
314 format: FileFormat,
315 csv_config: &CsvConfig,
316 ) -> Result<JsonValue> {
317 let records = self.read_file(path, format, csv_config)?;
318 let ast = self.parser.parse(query)?;
319
320 self.evaluate_stats_query(&records, &ast, query)
321 }
322
323 pub fn query_folder(
325 &self,
326 folder_path: &Path,
327 query: &str,
328 pattern: &str,
329 format: FileFormat,
330 csv_config: &CsvConfig,
331 parallel: bool,
332 ) -> Result<Vec<JsonValue>> {
333 let glob_pattern = folder_path.join(pattern);
334 let glob_pattern = glob_pattern.to_string_lossy();
335
336 let paths: Vec<_> = glob(&glob_pattern)
337 .map_err(|e| {
338 TqlError::ExecutionError(format!("Invalid glob pattern '{}': {}", pattern, e))
339 })?
340 .filter_map(|entry| entry.ok())
341 .filter(|path| path.is_file())
342 .collect();
343
344 if paths.is_empty() {
345 return Ok(Vec::new());
346 }
347
348 let ast = self.parser.parse(query)?;
349
350 if parallel {
351 let results: Result<Vec<Vec<JsonValue>>> = paths
352 .par_iter()
353 .map(|path| {
354 let records = self.read_file(path, format, csv_config)?;
355 let filtered = self.evaluator.filter(&ast, &records)?;
356 Ok(filtered.into_iter().cloned().collect())
357 })
358 .collect();
359
360 Ok(results?.into_iter().flatten().collect())
361 } else {
362 let mut all_results = Vec::new();
363 for path in paths {
364 let records = self.read_file(&path, format, csv_config)?;
365 let filtered = self.evaluator.filter(&ast, &records)?;
366 all_results.extend(filtered.into_iter().cloned());
367 }
368 Ok(all_results)
369 }
370 }
371
372 pub fn query_folder_stats(
374 &self,
375 folder_path: &Path,
376 query: &str,
377 pattern: &str,
378 format: FileFormat,
379 csv_config: &CsvConfig,
380 ) -> Result<JsonValue> {
381 let glob_pattern = folder_path.join(pattern);
382 let glob_pattern = glob_pattern.to_string_lossy();
383
384 let paths: Vec<_> = glob(&glob_pattern)
385 .map_err(|e| {
386 TqlError::ExecutionError(format!("Invalid glob pattern '{}': {}", pattern, e))
387 })?
388 .filter_map(|entry| entry.ok())
389 .filter(|path| path.is_file())
390 .collect();
391
392 let mut all_records = Vec::new();
393 for path in paths {
394 let records = self.read_file(&path, format, csv_config)?;
395 all_records.extend(records);
396 }
397
398 let ast = self.parser.parse(query)?;
399 self.evaluate_stats_query(&all_records, &ast, query)
400 }
401
402 pub fn stream_file(
404 &self,
405 path: &Path,
406 format: FileFormat,
407 ) -> Result<Box<dyn Iterator<Item = Result<JsonValue>>>> {
408 let format = if format == FileFormat::Auto {
409 FileFormat::from_path(path)
410 } else {
411 format
412 };
413
414 match format {
415 FileFormat::JsonL => {
416 let file = File::open(path).map_err(|e| {
417 TqlError::ExecutionError(format!(
418 "Failed to open file {}: {}",
419 path.display(),
420 e
421 ))
422 })?;
423 let reader = BufReader::new(file);
424 let path_str = path.display().to_string();
425
426 Ok(Box::new(reader.lines().enumerate().filter_map(
427 move |(line_num, line)| match line {
428 Ok(line) => {
429 let trimmed = line.trim();
430 if trimmed.is_empty() {
431 return None;
432 }
433 Some(serde_json::from_str(trimmed).map_err(|e| {
434 TqlError::ExecutionError(format!(
435 "Failed to parse JSON at line {} in {}: {}",
436 line_num + 1,
437 path_str,
438 e
439 ))
440 }))
441 }
442 Err(e) => Some(Err(TqlError::ExecutionError(format!(
443 "Failed to read line {} from {}: {}",
444 line_num + 1,
445 path_str,
446 e
447 )))),
448 },
449 )))
450 }
451 _ => Err(TqlError::ExecutionError(
452 "Streaming is only supported for JSONL format".to_string(),
453 )),
454 }
455 }
456
457 fn evaluate_stats_query(
459 &self,
460 records: &[JsonValue],
461 ast: &AstNode,
462 query: &str,
463 ) -> Result<JsonValue> {
464 use crate::parser::QueryWithStatsNode;
465
466 match ast {
467 AstNode::StatsExpr(stats_node) => self.evaluate_stats_node(records, stats_node),
468 AstNode::QueryWithStats(QueryWithStatsNode { filter, stats }) => {
469 let filtered = self.evaluator.filter(filter, records)?;
470 let owned_records: Vec<JsonValue> = filtered.iter().map(|&r| r.clone()).collect();
471 self.evaluate_stats_node(&owned_records, stats)
472 }
473 _ => Err(TqlError::SyntaxError {
474 message: "Query does not contain stats expressions".to_string(),
475 position: None,
476 query: Some(query.to_string()),
477 suggestions: vec!["Use '| stats' to add aggregations".to_string()],
478 }),
479 }
480 }
481
482 fn evaluate_stats_node(
484 &self,
485 records: &[JsonValue],
486 stats_node: &crate::parser::StatsNode,
487 ) -> Result<JsonValue> {
488 use crate::parser::{Aggregation, GroupBy};
489
490 let aggregations: Vec<AggregationSpec> = stats_node
491 .aggregations
492 .iter()
493 .map(|agg: &Aggregation| AggregationSpec {
494 function: agg.function.clone(),
495 field: agg.field.clone().unwrap_or_else(|| "*".to_string()),
496 alias: agg.alias.clone(),
497 params: std::collections::HashMap::new(),
498 })
499 .collect();
500
501 let group_by: Vec<String> = stats_node
502 .group_by
503 .iter()
504 .map(|gb: &GroupBy| gb.field.clone())
505 .collect();
506
507 let stats_query = StatsQuery {
508 aggregations,
509 group_by,
510 };
511
512 self.stats_evaluator.evaluate_stats(records, &stats_query)
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519 use std::io::Write;
520 use tempfile::TempDir;
521
522 fn create_test_file(dir: &TempDir, name: &str, content: &str) -> std::path::PathBuf {
523 let path = dir.path().join(name);
524 let mut file = File::create(&path).unwrap();
525 file.write_all(content.as_bytes()).unwrap();
526 path
527 }
528
529 #[test]
530 fn test_read_json_array() {
531 let dir = TempDir::new().unwrap();
532 let path = create_test_file(
533 &dir,
534 "data.json",
535 r#"[{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]"#,
536 );
537
538 let ops = FileOps::new();
539 let records = ops
540 .read_file(&path, FileFormat::Json, &CsvConfig::default())
541 .unwrap();
542
543 assert_eq!(records.len(), 2);
544 assert_eq!(records[0]["name"], "Alice");
545 assert_eq!(records[1]["name"], "Bob");
546 }
547
548 #[test]
549 fn test_read_json_object() {
550 let dir = TempDir::new().unwrap();
551 let path = create_test_file(&dir, "data.json", r#"{"name": "Alice", "age": 30}"#);
552
553 let ops = FileOps::new();
554 let records = ops
555 .read_file(&path, FileFormat::Json, &CsvConfig::default())
556 .unwrap();
557
558 assert_eq!(records.len(), 1);
559 assert_eq!(records[0]["name"], "Alice");
560 }
561
562 #[test]
563 fn test_read_jsonl() {
564 let dir = TempDir::new().unwrap();
565 let path = create_test_file(
566 &dir,
567 "data.jsonl",
568 r#"{"name": "Alice", "age": 30}
569{"name": "Bob", "age": 25}
570{"name": "Charlie", "age": 35}"#,
571 );
572
573 let ops = FileOps::new();
574 let records = ops
575 .read_file(&path, FileFormat::JsonL, &CsvConfig::default())
576 .unwrap();
577
578 assert_eq!(records.len(), 3);
579 assert_eq!(records[0]["name"], "Alice");
580 assert_eq!(records[2]["name"], "Charlie");
581 }
582
583 #[test]
584 fn test_read_csv() {
585 let dir = TempDir::new().unwrap();
586 let path = create_test_file(
587 &dir,
588 "data.csv",
589 "name,age,active\nAlice,30,true\nBob,25,false",
590 );
591
592 let ops = FileOps::new();
593 let records = ops
594 .read_file(&path, FileFormat::Csv, &CsvConfig::default())
595 .unwrap();
596
597 assert_eq!(records.len(), 2);
598 assert_eq!(records[0]["name"], "Alice");
599 assert_eq!(records[0]["age"], 30);
600 assert_eq!(records[0]["active"], true);
601 assert_eq!(records[1]["name"], "Bob");
602 assert_eq!(records[1]["active"], false);
603 }
604
605 #[test]
606 fn test_query_file() {
607 let dir = TempDir::new().unwrap();
608 let path = create_test_file(
609 &dir,
610 "data.json",
611 r#"[
612 {"name": "Alice", "age": 30, "status": "active"},
613 {"name": "Bob", "age": 25, "status": "inactive"},
614 {"name": "Charlie", "age": 35, "status": "active"}
615 ]"#,
616 );
617
618 let ops = FileOps::new();
619 let results = ops
620 .query_file(
621 &path,
622 "status eq 'active'",
623 FileFormat::Auto,
624 &CsvConfig::default(),
625 )
626 .unwrap();
627
628 assert_eq!(results.len(), 2);
629 assert!(results.iter().all(|r| r["status"] == "active"));
630 }
631
632 #[test]
633 fn test_query_file_with_comparison() {
634 let dir = TempDir::new().unwrap();
635 let path = create_test_file(
636 &dir,
637 "data.json",
638 r#"[
639 {"name": "Alice", "age": 30},
640 {"name": "Bob", "age": 25},
641 {"name": "Charlie", "age": 35}
642 ]"#,
643 );
644
645 let ops = FileOps::new();
646 let results = ops
647 .query_file(&path, "age > 28", FileFormat::Auto, &CsvConfig::default())
648 .unwrap();
649
650 assert_eq!(results.len(), 2);
651 assert!(results.iter().all(|r| r["age"].as_i64().unwrap() > 28));
652 }
653
654 #[test]
655 fn test_query_folder() {
656 let dir = TempDir::new().unwrap();
657
658 create_test_file(
659 &dir,
660 "data1.json",
661 r#"[{"name": "Alice", "status": "active"}]"#,
662 );
663 create_test_file(
664 &dir,
665 "data2.json",
666 r#"[{"name": "Bob", "status": "inactive"}, {"name": "Charlie", "status": "active"}]"#,
667 );
668
669 let ops = FileOps::new();
670 let results = ops
671 .query_folder(
672 dir.path(),
673 "status eq 'active'",
674 "*.json",
675 FileFormat::Auto,
676 &CsvConfig::default(),
677 false,
678 )
679 .unwrap();
680
681 assert_eq!(results.len(), 2);
682 assert!(results.iter().all(|r| r["status"] == "active"));
683 }
684
685 #[test]
686 fn test_query_folder_parallel() {
687 let dir = TempDir::new().unwrap();
688
689 for i in 0..5 {
690 create_test_file(
691 &dir,
692 &format!("data{}.json", i),
693 &format!(r#"[{{"id": {}, "status": "active"}}]"#, i),
694 );
695 }
696
697 let ops = FileOps::new();
698 let results = ops
699 .query_folder(
700 dir.path(),
701 "status eq 'active'",
702 "*.json",
703 FileFormat::Auto,
704 &CsvConfig::default(),
705 true,
706 )
707 .unwrap();
708
709 assert_eq!(results.len(), 5);
710 }
711
712 #[test]
713 fn test_format_auto_detection() {
714 assert_eq!(
715 FileFormat::from_path(Path::new("data.json")),
716 FileFormat::Json
717 );
718 assert_eq!(
719 FileFormat::from_path(Path::new("data.jsonl")),
720 FileFormat::JsonL
721 );
722 assert_eq!(
723 FileFormat::from_path(Path::new("data.ndjson")),
724 FileFormat::JsonL
725 );
726 assert_eq!(
727 FileFormat::from_path(Path::new("data.csv")),
728 FileFormat::Csv
729 );
730 assert_eq!(
731 FileFormat::from_path(Path::new("data.txt")),
732 FileFormat::Json
733 );
734 }
735
736 #[test]
737 fn test_csv_custom_delimiter() {
738 let dir = TempDir::new().unwrap();
739 let path = create_test_file(&dir, "data.csv", "name;age\nAlice;30\nBob;25");
740
741 let ops = FileOps::new();
742 let config = CsvConfig::default().with_delimiter(';');
743 let records = ops.read_file(&path, FileFormat::Csv, &config).unwrap();
744
745 assert_eq!(records.len(), 2);
746 assert_eq!(records[0]["name"], "Alice");
747 assert_eq!(records[0]["age"], 30);
748 }
749
750 #[test]
751 fn test_stream_jsonl() {
752 let dir = TempDir::new().unwrap();
753 let path = create_test_file(
754 &dir,
755 "data.jsonl",
756 r#"{"id": 1}
757{"id": 2}
758{"id": 3}"#,
759 );
760
761 let ops = FileOps::new();
762 let mut count = 0;
763 for result in ops.stream_file(&path, FileFormat::JsonL).unwrap() {
764 let record = result.unwrap();
765 count += 1;
766 assert!(record["id"].as_i64().unwrap() >= 1);
767 }
768 assert_eq!(count, 3);
769 }
770}