1use anyhow::{Context, Result};
2use std::fs;
3use std::io::{self, Write};
4use std::path::Path;
5use std::time::Instant;
6use tracing::{debug, info};
7
8use crate::config::config::Config;
9use crate::data::data_view::DataView;
10use crate::data::datatable::{DataTable, DataValue};
11use crate::data::datatable_loaders::{load_csv_to_datatable, load_json_to_datatable};
12use crate::services::query_execution_service::QueryExecutionService;
13use crate::sql::script_parser::{ScriptParser, ScriptResult};
14
15#[derive(Debug, Clone)]
17pub enum OutputFormat {
18 Csv,
19 Json,
20 Table,
21 Tsv,
22}
23
24impl OutputFormat {
25 pub fn from_str(s: &str) -> Result<Self> {
26 match s.to_lowercase().as_str() {
27 "csv" => Ok(OutputFormat::Csv),
28 "json" => Ok(OutputFormat::Json),
29 "table" => Ok(OutputFormat::Table),
30 "tsv" => Ok(OutputFormat::Tsv),
31 _ => Err(anyhow::anyhow!(
32 "Invalid output format: {}. Use csv, json, table, or tsv",
33 s
34 )),
35 }
36 }
37}
38
39pub struct NonInteractiveConfig {
41 pub data_file: String,
42 pub query: String,
43 pub output_format: OutputFormat,
44 pub output_file: Option<String>,
45 pub case_insensitive: bool,
46 pub auto_hide_empty: bool,
47 pub limit: Option<usize>,
48 pub query_plan: bool,
49 pub show_work_units: bool,
50 pub execution_plan: bool,
51 pub lift_in_expressions: bool,
52 pub script_file: Option<String>, pub debug_trace: bool,
54}
55
56pub fn execute_non_interactive(config: NonInteractiveConfig) -> Result<()> {
58 let start_time = Instant::now();
59
60 let (data_table, _is_dual) = if config.data_file.is_empty() {
62 info!("No data file provided, using DUAL table");
63 (crate::data::datatable::DataTable::dual(), true)
64 } else {
65 info!("Loading data from: {}", config.data_file);
66 let table = load_data_file(&config.data_file)?;
67 info!(
68 "Loaded {} rows with {} columns",
69 table.row_count(),
70 table.column_count()
71 );
72 (table, false)
73 };
74 let _table_name = data_table.name.clone();
75
76 let dataview = DataView::new(std::sync::Arc::new(data_table));
78
79 info!("Executing query: {}", config.query);
81
82 if config.execution_plan {
84 println!("\n=== EXECUTION PLAN ===");
85 println!("Query: {}", config.query);
86 println!("\nExecution Steps:");
87 println!("1. PARSE - Parse SQL query");
88 println!("2. LOAD_DATA - Load data from {}", &config.data_file);
89 println!(
90 " • Loaded {} rows, {} columns",
91 dataview.row_count(),
92 dataview.column_count()
93 );
94 }
95
96 if config.show_work_units {
98 use crate::query_plan::{ExpressionLifter, QueryAnalyzer};
99 use crate::sql::recursive_parser::Parser;
100
101 let mut parser = Parser::new(&config.query);
102 match parser.parse() {
103 Ok(stmt) => {
104 let mut analyzer = QueryAnalyzer::new();
105 let mut lifter = ExpressionLifter::new();
106
107 let mut stmt_copy = stmt.clone();
109 let lifted = lifter.lift_expressions(&mut stmt_copy);
110
111 match analyzer.analyze(&stmt_copy, config.query.clone()) {
113 Ok(plan) => {
114 println!("\n{}", plan.explain());
115
116 if !lifted.is_empty() {
117 println!("\nLifted CTEs:");
118 for cte in &lifted {
119 println!(" - {}", cte.name);
120 }
121 }
122
123 return Ok(());
124 }
125 Err(e) => {
126 eprintln!("Error analyzing query: {}", e);
127 return Err(anyhow::anyhow!("Query analysis failed: {}", e));
128 }
129 }
130 }
131 Err(e) => {
132 eprintln!("Error parsing query: {}", e);
133 return Err(anyhow::anyhow!("Parse error: {}", e));
134 }
135 }
136 }
137
138 if config.query_plan {
140 use crate::sql::recursive_parser::Parser;
141 let mut parser = Parser::new(&config.query);
142 match parser.parse() {
143 Ok(statement) => {
144 println!("\n=== QUERY PLAN (AST) ===");
145 println!("{statement:#?}");
146 println!("=== END QUERY PLAN ===\n");
147 }
148 Err(e) => {
149 eprintln!("Failed to parse query for plan: {e}");
150 }
151 }
152 }
153
154 let query_start = Instant::now();
155
156 let app_config = Config::load().unwrap_or_else(|e| {
158 debug!("Could not load config file: {}. Using defaults.", e);
159 Config::default()
160 });
161
162 crate::config::global::init_config(app_config.clone());
164
165 let mut behavior_config = app_config.behavior.clone();
167 debug!(
168 "Using date notation: {}",
169 behavior_config.default_date_notation
170 );
171 if config.case_insensitive {
173 behavior_config.case_insensitive_default = true;
174 }
175 if config.auto_hide_empty {
176 behavior_config.hide_empty_columns = true;
177 }
178
179 let exec_start = Instant::now();
181 let result = if config.lift_in_expressions {
182 use crate::data::query_engine::QueryEngine;
183 use crate::query_plan::{CTEHoister, InOperatorLifter};
184 use crate::sql::recursive_parser::Parser;
185
186 let mut parser = Parser::new(&config.query);
187 match parser.parse() {
188 Ok(mut stmt) => {
189 use crate::query_plan::ExpressionLifter;
191 let mut expr_lifter = ExpressionLifter::new();
192 let lifted = expr_lifter.lift_expressions(&mut stmt);
193 if !lifted.is_empty() {
194 info!(
195 "Applied expression lifting - {} CTEs generated",
196 lifted.len()
197 );
198 }
199
200 stmt = CTEHoister::hoist_ctes(stmt);
202
203 let mut lifter = InOperatorLifter::new();
205 if lifter.rewrite_query(&mut stmt) {
206 info!("Applied IN expression lifting - query rewritten with CTEs");
207
208 let engine = QueryEngine::with_case_insensitive(config.case_insensitive);
210
211 match engine.execute_statement(dataview.source_arc(), stmt) {
212 Ok(result_view) => {
213 Ok(
215 crate::services::query_execution_service::QueryExecutionResult {
216 dataview: result_view,
217 stats: crate::services::query_execution_service::QueryStats {
218 row_count: 0, column_count: 0,
220 execution_time: exec_start.elapsed(),
221 query_engine_time: exec_start.elapsed(),
222 },
223 hidden_columns: Vec::new(),
224 query: config.query.clone(),
225 execution_plan: None,
226 debug_trace: None,
227 },
228 )
229 }
230 Err(e) => Err(e),
231 }
232 } else {
233 let query_service =
235 QueryExecutionService::with_behavior_config(behavior_config);
236 if config.debug_trace {
237 let debug_ctx = crate::debug_trace::DebugContext::new(
238 crate::debug_trace::DebugLevel::Debug,
239 );
240 query_service.execute_with_debug(
241 &config.query,
242 Some(&dataview),
243 Some(dataview.source()),
244 Some(debug_ctx),
245 )
246 } else {
247 query_service.execute(
248 &config.query,
249 Some(&dataview),
250 Some(dataview.source()),
251 )
252 }
253 }
254 }
255 Err(_) => {
256 let query_service = QueryExecutionService::with_behavior_config(behavior_config);
258 if config.debug_trace {
259 let debug_ctx = crate::debug_trace::DebugContext::new(
260 crate::debug_trace::DebugLevel::Debug,
261 );
262 query_service.execute_with_debug(
263 &config.query,
264 Some(&dataview),
265 Some(dataview.source()),
266 Some(debug_ctx),
267 )
268 } else {
269 query_service.execute(&config.query, Some(&dataview), Some(dataview.source()))
270 }
271 }
272 }
273 } else {
274 let query_service = QueryExecutionService::with_behavior_config(behavior_config);
276
277 if config.debug_trace {
279 let debug_ctx =
280 crate::debug_trace::DebugContext::new(crate::debug_trace::DebugLevel::Debug);
281 query_service.execute_with_debug(
282 &config.query,
283 Some(&dataview),
284 Some(dataview.source()),
285 Some(debug_ctx),
286 )
287 } else {
288 query_service.execute(&config.query, Some(&dataview), Some(dataview.source()))
289 }
290 }?;
291 let exec_time = exec_start.elapsed();
292
293 let query_time = query_start.elapsed();
294 info!("Query executed in {:?}", query_time);
295 info!(
296 "Result: {} rows, {} columns",
297 result.dataview.row_count(),
298 result.dataview.column_count()
299 );
300
301 if config.execution_plan {
303 use crate::data::query_engine::QueryEngine;
305
306 let query_engine = QueryEngine::new();
307
308 match query_engine.execute_with_plan(
309 std::sync::Arc::new(dataview.source().clone()),
310 &config.query,
311 ) {
312 Ok((_view, plan)) => {
313 print!("{}", plan.format_tree());
315 }
316 Err(e) => {
317 eprintln!("Could not generate detailed execution plan: {}", e);
319 println!(
320 "3. QUERY_EXECUTION [{:.3}ms]",
321 exec_time.as_secs_f64() * 1000.0
322 );
323
324 use crate::sql::recursive_parser::Parser;
326 let mut parser = Parser::new(&config.query);
327 if let Ok(stmt) = parser.parse() {
328 if stmt.where_clause.is_some() {
329 println!(" • WHERE clause filtering applied");
330 println!(" • Rows after filter: {}", result.dataview.row_count());
331 }
332
333 if let Some(ref order_by) = stmt.order_by {
334 println!(" • ORDER BY: {} column(s)", order_by.len());
335 }
336
337 if let Some(ref group_by) = stmt.group_by {
338 println!(" • GROUP BY: {} column(s)", group_by.len());
339 }
340
341 if let Some(limit) = stmt.limit {
342 println!(" • LIMIT: {} rows", limit);
343 }
344
345 if stmt.distinct {
346 println!(" • DISTINCT applied");
347 }
348 }
349 }
350 }
351
352 println!("\nExecution Statistics:");
353 println!(
354 " Preparation: {:.3}ms",
355 (exec_start - start_time).as_secs_f64() * 1000.0
356 );
357 println!(
358 " Query time: {:.3}ms",
359 exec_time.as_secs_f64() * 1000.0
360 );
361 println!(
362 " Total time: {:.3}ms",
363 query_time.as_secs_f64() * 1000.0
364 );
365 println!(" Rows returned: {}", result.dataview.row_count());
366 println!(" Columns: {}", result.dataview.column_count());
367 println!("\n=== END EXECUTION PLAN ===");
368 println!();
369 }
370
371 let final_view = if let Some(limit) = config.limit {
373 let limited_table = limit_results(&result.dataview, limit)?;
374 DataView::new(std::sync::Arc::new(limited_table))
375 } else {
376 result.dataview
377 };
378
379 if let Some(ref trace_output) = result.debug_trace {
381 eprintln!("{}", trace_output);
382 }
383
384 let output_result = if let Some(ref path) = config.output_file {
386 let mut file = fs::File::create(path)
387 .with_context(|| format!("Failed to create output file: {path}"))?;
388 output_results(&final_view, config.output_format, &mut file)?;
389 info!("Results written to: {}", path);
390 Ok(())
391 } else {
392 output_results(&final_view, config.output_format, &mut io::stdout())?;
393 Ok(())
394 };
395
396 let total_time = start_time.elapsed();
397 debug!("Total execution time: {:?}", total_time);
398
399 if config.output_file.is_none() {
401 eprintln!(
402 "\n# Query completed: {} rows in {:?}",
403 final_view.row_count(),
404 query_time
405 );
406 }
407
408 output_result
409}
410
411pub fn execute_script(config: NonInteractiveConfig) -> Result<()> {
413 let _start_time = Instant::now();
414
415 let parser = ScriptParser::new(&config.query);
417 let statements = parser.parse_and_validate()?;
418
419 info!("Found {} statements in script", statements.len());
420
421 let data_file = if !config.data_file.is_empty() {
423 config.data_file.clone()
425 } else if let Some(hint) = parser.data_file_hint() {
426 info!("Using data file from script hint: {}", hint);
428
429 if let Some(script_path) = config.script_file.as_ref() {
431 let script_dir = std::path::Path::new(script_path)
432 .parent()
433 .unwrap_or(std::path::Path::new("."));
434 let hint_path = std::path::Path::new(hint);
435
436 if hint_path.is_relative() {
437 script_dir.join(hint_path).to_string_lossy().to_string()
438 } else {
439 hint.to_string()
440 }
441 } else {
442 hint.to_string()
443 }
444 } else {
445 String::new()
446 };
447
448 let (data_table, _is_dual) = if data_file.is_empty() {
450 info!("No data file provided, using DUAL table");
452 (DataTable::dual(), true)
453 } else {
454 if !std::path::Path::new(&data_file).exists() {
456 anyhow::bail!(
457 "Data file not found: {}\n\
458 Please check the path is correct",
459 data_file
460 );
461 }
462
463 info!("Loading data from: {}", data_file);
464 let table = load_data_file(&data_file)?;
465 info!(
466 "Loaded {} rows with {} columns",
467 table.row_count(),
468 table.column_count()
469 );
470 (table, false)
471 };
472
473 let mut script_result = ScriptResult::new();
475 let mut output = Vec::new();
476
477 let arc_data_table = std::sync::Arc::new(data_table);
479
480 for (idx, statement) in statements.iter().enumerate() {
482 let statement_num = idx + 1;
483 let stmt_start = Instant::now();
484
485 if matches!(config.output_format, OutputFormat::Table) {
487 if idx > 0 {
488 output.push(String::new()); }
490 output.push(format!("-- Query {} --", statement_num));
491 }
492
493 let dataview = DataView::new(arc_data_table.clone());
495
496 let service = QueryExecutionService::new(config.case_insensitive, config.auto_hide_empty);
498 match service.execute(statement, Some(&dataview), None) {
499 Ok(result) => {
500 let exec_time = stmt_start.elapsed().as_secs_f64() * 1000.0;
501 let final_view = result.dataview;
502
503 let mut statement_output = Vec::new();
505 match config.output_format {
506 OutputFormat::Csv => {
507 output_csv(&final_view, &mut statement_output, ',')?;
508 }
509 OutputFormat::Json => {
510 output_json(&final_view, &mut statement_output)?;
511 }
512 OutputFormat::Table => {
513 output_table(&final_view, &mut statement_output)?;
514 writeln!(
515 &mut statement_output,
516 "Query completed: {} rows in {:.2}ms",
517 final_view.row_count(),
518 exec_time
519 )?;
520 }
521 OutputFormat::Tsv => {
522 output_csv(&final_view, &mut statement_output, '\t')?;
523 }
524 }
525
526 output.extend(
528 String::from_utf8_lossy(&statement_output)
529 .lines()
530 .map(String::from),
531 );
532
533 script_result.add_success(
534 statement_num,
535 statement.clone(),
536 final_view.row_count(),
537 exec_time,
538 );
539 }
540 Err(e) => {
541 let exec_time = stmt_start.elapsed().as_secs_f64() * 1000.0;
542 let error_msg = format!("Query {} failed: {}", statement_num, e);
543
544 if matches!(config.output_format, OutputFormat::Table) {
545 output.push(error_msg.clone());
546 }
547
548 script_result.add_failure(
549 statement_num,
550 statement.clone(),
551 e.to_string(),
552 exec_time,
553 );
554
555 }
557 }
558 }
559
560 if let Some(ref output_file) = config.output_file {
562 let mut file = fs::File::create(output_file)?;
563 for line in &output {
564 writeln!(file, "{}", line)?;
565 }
566 info!("Results written to: {}", output_file);
567 } else {
568 for line in &output {
569 println!("{}", line);
570 }
571 }
572
573 if matches!(config.output_format, OutputFormat::Table) {
575 println!("\n=== Script Summary ===");
576 println!("Total statements: {}", script_result.total_statements);
577 println!("Successful: {}", script_result.successful_statements);
578 println!("Failed: {}", script_result.failed_statements);
579 println!(
580 "Total execution time: {:.2}ms",
581 script_result.total_execution_time_ms
582 );
583 }
584
585 if !script_result.all_successful() {
586 return Err(anyhow::anyhow!(
587 "{} of {} statements failed",
588 script_result.failed_statements,
589 script_result.total_statements
590 ));
591 }
592
593 Ok(())
594}
595
596fn load_data_file(path: &str) -> Result<DataTable> {
598 let path = Path::new(path);
599
600 if !path.exists() {
601 return Err(anyhow::anyhow!("File not found: {}", path.display()));
602 }
603
604 let extension = path
606 .extension()
607 .and_then(|ext| ext.to_str())
608 .map(str::to_lowercase)
609 .unwrap_or_default();
610
611 let table_name = path
612 .file_stem()
613 .and_then(|stem| stem.to_str())
614 .unwrap_or("data")
615 .to_string();
616
617 match extension.as_str() {
618 "csv" => load_csv_to_datatable(path, &table_name)
619 .with_context(|| format!("Failed to load CSV file: {}", path.display())),
620 "json" => load_json_to_datatable(path, &table_name)
621 .with_context(|| format!("Failed to load JSON file: {}", path.display())),
622 _ => Err(anyhow::anyhow!(
623 "Unsupported file type: {}. Use .csv or .json",
624 extension
625 )),
626 }
627}
628
629fn limit_results(dataview: &DataView, limit: usize) -> Result<DataTable> {
631 let source = dataview.source();
632 let mut limited_table = DataTable::new(&source.name);
633
634 for col in &source.columns {
636 limited_table.add_column(col.clone());
637 }
638
639 let rows_to_copy = dataview.row_count().min(limit);
641 for i in 0..rows_to_copy {
642 if let Some(row) = dataview.get_row(i) {
643 limited_table.add_row(row.clone());
644 }
645 }
646
647 Ok(limited_table)
648}
649
650fn output_results<W: Write>(
652 dataview: &DataView,
653 format: OutputFormat,
654 writer: &mut W,
655) -> Result<()> {
656 match format {
657 OutputFormat::Csv => output_csv(dataview, writer, ','),
658 OutputFormat::Tsv => output_csv(dataview, writer, '\t'),
659 OutputFormat::Json => output_json(dataview, writer),
660 OutputFormat::Table => output_table(dataview, writer),
661 }
662}
663
664fn output_csv<W: Write>(dataview: &DataView, writer: &mut W, delimiter: char) -> Result<()> {
666 let columns = dataview.column_names();
668 for (i, col) in columns.iter().enumerate() {
669 if i > 0 {
670 write!(writer, "{delimiter}")?;
671 }
672 write!(writer, "{}", escape_csv_field(col, delimiter))?;
673 }
674 writeln!(writer)?;
675
676 for row_idx in 0..dataview.row_count() {
678 if let Some(row) = dataview.get_row(row_idx) {
679 for (i, value) in row.values.iter().enumerate() {
680 if i > 0 {
681 write!(writer, "{delimiter}")?;
682 }
683 write!(
684 writer,
685 "{}",
686 escape_csv_field(&format_value(value), delimiter)
687 )?;
688 }
689 writeln!(writer)?;
690 }
691 }
692
693 Ok(())
694}
695
696fn output_json<W: Write>(dataview: &DataView, writer: &mut W) -> Result<()> {
698 let columns = dataview.column_names();
699 let mut rows = Vec::new();
700
701 for row_idx in 0..dataview.row_count() {
702 if let Some(row) = dataview.get_row(row_idx) {
703 let mut json_row = serde_json::Map::new();
704 for (col_idx, value) in row.values.iter().enumerate() {
705 if col_idx < columns.len() {
706 json_row.insert(columns[col_idx].clone(), value_to_json(value));
707 }
708 }
709 rows.push(serde_json::Value::Object(json_row));
710 }
711 }
712
713 let json = serde_json::to_string_pretty(&rows)?;
714 writeln!(writer, "{json}")?;
715
716 Ok(())
717}
718
719fn output_table<W: Write>(dataview: &DataView, writer: &mut W) -> Result<()> {
721 let columns = dataview.column_names();
722
723 let mut widths = vec![0; columns.len()];
725 for (i, col) in columns.iter().enumerate() {
726 widths[i] = col.len();
727 }
728
729 let sample_size = dataview.row_count().min(100);
731 for row_idx in 0..sample_size {
732 if let Some(row) = dataview.get_row(row_idx) {
733 for (i, value) in row.values.iter().enumerate() {
734 if i < widths.len() {
735 let value_str = format_value(value);
736 widths[i] = widths[i].max(value_str.len());
737 }
738 }
739 }
740 }
741
742 for width in &mut widths {
744 *width = (*width).min(50);
745 }
746
747 write!(writer, "+")?;
749 for width in &widths {
750 write!(writer, "-{}-+", "-".repeat(*width))?;
751 }
752 writeln!(writer)?;
753
754 write!(writer, "|")?;
756 for (i, col) in columns.iter().enumerate() {
757 write!(writer, " {:^width$} |", col, width = widths[i])?;
758 }
759 writeln!(writer)?;
760
761 write!(writer, "+")?;
763 for width in &widths {
764 write!(writer, "-{}-+", "-".repeat(*width))?;
765 }
766 writeln!(writer)?;
767
768 for row_idx in 0..dataview.row_count() {
770 if let Some(row) = dataview.get_row(row_idx) {
771 write!(writer, "|")?;
772 for (i, value) in row.values.iter().enumerate() {
773 if i < widths.len() {
774 let value_str = format_value(value);
775 let truncated = if value_str.len() > widths[i] {
776 format!("{}...", &value_str[..widths[i] - 3])
777 } else {
778 value_str
779 };
780 write!(writer, " {:<width$} |", truncated, width = widths[i])?;
781 }
782 }
783 writeln!(writer)?;
784 }
785 }
786
787 write!(writer, "+")?;
789 for width in &widths {
790 write!(writer, "-{}-+", "-".repeat(*width))?;
791 }
792 writeln!(writer)?;
793
794 Ok(())
795}
796
797fn format_value(value: &DataValue) -> String {
799 match value {
800 DataValue::Null => String::new(),
801 DataValue::Integer(i) => i.to_string(),
802 DataValue::Float(f) => f.to_string(),
803 DataValue::String(s) => s.clone(),
804 DataValue::InternedString(s) => s.to_string(),
805 DataValue::Boolean(b) => b.to_string(),
806 DataValue::DateTime(dt) => dt.to_string(),
807 }
808}
809
810fn value_to_json(value: &DataValue) -> serde_json::Value {
812 match value {
813 DataValue::Null => serde_json::Value::Null,
814 DataValue::Integer(i) => serde_json::Value::Number((*i).into()),
815 DataValue::Float(f) => {
816 if let Some(n) = serde_json::Number::from_f64(*f) {
817 serde_json::Value::Number(n)
818 } else {
819 serde_json::Value::Null
820 }
821 }
822 DataValue::String(s) => serde_json::Value::String(s.clone()),
823 DataValue::InternedString(s) => serde_json::Value::String(s.to_string()),
824 DataValue::Boolean(b) => serde_json::Value::Bool(*b),
825 DataValue::DateTime(dt) => serde_json::Value::String(dt.to_string()),
826 }
827}
828
829fn escape_csv_field(field: &str, delimiter: char) -> String {
831 if field.contains(delimiter)
832 || field.contains('"')
833 || field.contains('\n')
834 || field.contains('\r')
835 {
836 format!("\"{}\"", field.replace('"', "\"\""))
837 } else {
838 field.to_string()
839 }
840}