sql_cli/
non_interactive.rs

1use anyhow::{Context, Result};
2use std::fs;
3use std::io::{self, Write};
4use std::path::Path;
5use std::time::Instant;
6use tracing::{debug, info};
7
8use crate::config::config::Config;
9use crate::data::data_view::DataView;
10use crate::data::datatable::{DataTable, DataValue};
11use crate::data::datatable_loaders::{load_csv_to_datatable, load_json_to_datatable};
12use crate::services::query_execution_service::QueryExecutionService;
13use crate::sql::parser::ast::{CTEType, TableSource, CTE};
14use crate::sql::script_parser::{ScriptParser, ScriptResult};
15
16/// Extract dependencies from a CTE by analyzing what tables it references
17fn extract_cte_dependencies(cte: &CTE) -> Vec<String> {
18    // For now, we'll just return the from_table if it exists
19    // This could be enhanced to do deeper analysis of the query AST
20    let mut deps = Vec::new();
21
22    if let CTEType::Standard(query) = &cte.cte_type {
23        if let Some(from_table) = &query.from_table {
24            deps.push(from_table.clone());
25        }
26
27        // Could also check joins, subqueries, etc.
28        for join in &query.joins {
29            match &join.table {
30                TableSource::Table(table_name) => {
31                    deps.push(table_name.clone());
32                }
33                TableSource::DerivedTable { alias, .. } => {
34                    deps.push(alias.clone());
35                }
36            }
37        }
38    }
39
40    deps
41}
42
43/// Output format for query results
44#[derive(Debug, Clone)]
45pub enum OutputFormat {
46    Csv,
47    Json,
48    Table,
49    Tsv,
50}
51
52impl OutputFormat {
53    pub fn from_str(s: &str) -> Result<Self> {
54        match s.to_lowercase().as_str() {
55            "csv" => Ok(OutputFormat::Csv),
56            "json" => Ok(OutputFormat::Json),
57            "table" => Ok(OutputFormat::Table),
58            "tsv" => Ok(OutputFormat::Tsv),
59            _ => Err(anyhow::anyhow!(
60                "Invalid output format: {}. Use csv, json, table, or tsv",
61                s
62            )),
63        }
64    }
65}
66
67/// Configuration for non-interactive query execution
68pub struct NonInteractiveConfig {
69    pub data_file: String,
70    pub query: String,
71    pub output_format: OutputFormat,
72    pub output_file: Option<String>,
73    pub case_insensitive: bool,
74    pub auto_hide_empty: bool,
75    pub limit: Option<usize>,
76    pub query_plan: bool,
77    pub show_work_units: bool,
78    pub execution_plan: bool,
79    pub cte_info: bool,
80    pub rewrite_analysis: bool,
81    pub lift_in_expressions: bool,
82    pub script_file: Option<String>, // Path to the script file for relative path resolution
83    pub debug_trace: bool,
84    pub max_col_width: Option<usize>, // Maximum column width for table output (None = unlimited)
85    pub col_sample_rows: usize,       // Number of rows to sample for column width (0 = all rows)
86}
87
88/// Execute a query in non-interactive mode
89pub fn execute_non_interactive(config: NonInteractiveConfig) -> Result<()> {
90    let start_time = Instant::now();
91
92    // 1. Load the data file or create DUAL table
93    let (data_table, _is_dual) = if config.data_file.is_empty() {
94        info!("No data file provided, using DUAL table");
95        (crate::data::datatable::DataTable::dual(), true)
96    } else {
97        info!("Loading data from: {}", config.data_file);
98        let table = load_data_file(&config.data_file)?;
99        info!(
100            "Loaded {} rows with {} columns",
101            table.row_count(),
102            table.column_count()
103        );
104        (table, false)
105    };
106    let _table_name = data_table.name.clone();
107
108    // 2. Create a DataView from the table
109    let dataview = DataView::new(std::sync::Arc::new(data_table));
110
111    // 3. Execute the query
112    info!("Executing query: {}", config.query);
113
114    // If execution_plan is requested, show detailed execution information
115    if config.execution_plan {
116        println!("\n=== EXECUTION PLAN ===");
117        println!("Query: {}", config.query);
118        println!("\nExecution Steps:");
119        println!("1. PARSE - Parse SQL query");
120        println!("2. LOAD_DATA - Load data from {}", &config.data_file);
121        println!(
122            "   • Loaded {} rows, {} columns",
123            dataview.row_count(),
124            dataview.column_count()
125        );
126    }
127
128    // If show_work_units is requested, analyze and display work units
129    if config.show_work_units {
130        use crate::query_plan::{ExpressionLifter, QueryAnalyzer};
131        use crate::sql::recursive_parser::Parser;
132
133        let mut parser = Parser::new(&config.query);
134        match parser.parse() {
135            Ok(stmt) => {
136                let mut analyzer = QueryAnalyzer::new();
137                let mut lifter = ExpressionLifter::new();
138
139                // Check if the query has liftable expressions
140                let mut stmt_copy = stmt.clone();
141                let lifted = lifter.lift_expressions(&mut stmt_copy);
142
143                // Build the query plan
144                match analyzer.analyze(&stmt_copy, config.query.clone()) {
145                    Ok(plan) => {
146                        println!("\n{}", plan.explain());
147
148                        if !lifted.is_empty() {
149                            println!("\nLifted CTEs:");
150                            for cte in &lifted {
151                                println!("  - {}", cte.name);
152                            }
153                        }
154
155                        return Ok(());
156                    }
157                    Err(e) => {
158                        eprintln!("Error analyzing query: {}", e);
159                        return Err(anyhow::anyhow!("Query analysis failed: {}", e));
160                    }
161                }
162            }
163            Err(e) => {
164                eprintln!("Error parsing query: {}", e);
165                return Err(anyhow::anyhow!("Parse error: {}", e));
166            }
167        }
168    }
169
170    // If query_plan is requested, parse and display the AST
171    if config.query_plan {
172        use crate::sql::recursive_parser::Parser;
173        let mut parser = Parser::new(&config.query);
174        match parser.parse() {
175            Ok(statement) => {
176                println!("\n=== QUERY PLAN (AST) ===");
177                println!("{statement:#?}");
178                println!("=== END QUERY PLAN ===\n");
179            }
180            Err(e) => {
181                eprintln!("Failed to parse query for plan: {e}");
182            }
183        }
184    }
185
186    // If rewrite analysis is requested, analyze query for optimization opportunities
187    if config.rewrite_analysis {
188        use crate::sql::query_rewriter::{QueryRewriter, RewriteAnalysis};
189        use crate::sql::recursive_parser::Parser;
190        use serde_json::json;
191
192        let mut parser = Parser::new(&config.query);
193        match parser.parse() {
194            Ok(statement) => {
195                let mut rewriter = QueryRewriter::new();
196                let suggestions = rewriter.analyze(&statement);
197
198                let analysis = RewriteAnalysis::from_suggestions(suggestions);
199                println!("{}", serde_json::to_string_pretty(&analysis).unwrap());
200                return Ok(());
201            }
202            Err(e) => {
203                let output = json!({
204                    "success": false,
205                    "error": format!("{}", e),
206                    "suggestions": [],
207                    "can_auto_rewrite": false,
208                });
209                println!("{}", serde_json::to_string_pretty(&output).unwrap());
210                return Ok(());
211            }
212        }
213    }
214
215    // If CTE info is requested, parse and output CTE information as JSON
216    if config.cte_info {
217        use crate::sql::recursive_parser::Parser;
218        use serde_json::json;
219
220        let mut parser = Parser::new(&config.query);
221        match parser.parse() {
222            Ok(statement) => {
223                let mut cte_info = Vec::new();
224
225                // Extract CTE information
226                for (index, cte) in statement.ctes.iter().enumerate() {
227                    let cte_json = json!({
228                        "index": index,
229                        "name": cte.name,
230                        "columns": cte.column_list,
231                        // We can't easily get line numbers from the AST,
232                        // but we can provide the structure
233                        "dependencies": extract_cte_dependencies(cte),
234                    });
235                    cte_info.push(cte_json);
236                }
237
238                let output = json!({
239                    "success": true,
240                    "ctes": cte_info,
241                    "total": statement.ctes.len(),
242                    "has_final_select": !statement.columns.is_empty() || !statement.select_items.is_empty(),
243                });
244
245                println!("{}", serde_json::to_string_pretty(&output).unwrap());
246                return Ok(());
247            }
248            Err(e) => {
249                let output = json!({
250                    "success": false,
251                    "error": format!("{}", e),
252                    "ctes": [],
253                    "total": 0,
254                });
255                println!("{}", serde_json::to_string_pretty(&output).unwrap());
256                return Ok(());
257            }
258        }
259    }
260
261    let query_start = Instant::now();
262
263    // Load configuration file to get date notation and other settings
264    let app_config = Config::load().unwrap_or_else(|e| {
265        debug!("Could not load config file: {}. Using defaults.", e);
266        Config::default()
267    });
268
269    // Initialize global config for function registry
270    crate::config::global::init_config(app_config.clone());
271
272    // Use QueryExecutionService with full BehaviorConfig
273    let mut behavior_config = app_config.behavior.clone();
274    debug!(
275        "Using date notation: {}",
276        behavior_config.default_date_notation
277    );
278    // Command line args override config file settings
279    if config.case_insensitive {
280        behavior_config.case_insensitive_default = true;
281    }
282    if config.auto_hide_empty {
283        behavior_config.hide_empty_columns = true;
284    }
285
286    // Parse and potentially rewrite the query
287    let exec_start = Instant::now();
288    let result = if config.lift_in_expressions {
289        use crate::data::query_engine::QueryEngine;
290        use crate::query_plan::{CTEHoister, InOperatorLifter};
291        use crate::sql::recursive_parser::Parser;
292
293        let mut parser = Parser::new(&config.query);
294        match parser.parse() {
295            Ok(mut stmt) => {
296                // Apply expression lifting for column alias dependencies
297                use crate::query_plan::ExpressionLifter;
298                let mut expr_lifter = ExpressionLifter::new();
299                let lifted = expr_lifter.lift_expressions(&mut stmt);
300                if !lifted.is_empty() {
301                    info!(
302                        "Applied expression lifting - {} CTEs generated",
303                        lifted.len()
304                    );
305                }
306
307                // Apply CTE hoisting to handle nested CTEs
308                stmt = CTEHoister::hoist_ctes(stmt);
309
310                // Try to rewrite the query with IN lifting
311                let mut lifter = InOperatorLifter::new();
312                if lifter.rewrite_query(&mut stmt) {
313                    info!("Applied IN expression lifting - query rewritten with CTEs");
314
315                    // Execute the rewritten AST directly
316                    let engine = QueryEngine::with_case_insensitive(config.case_insensitive);
317
318                    match engine.execute_statement(dataview.source_arc(), stmt) {
319                        Ok(result_view) => {
320                            // Create a QueryExecutionResult to match the expected type
321                            Ok(
322                                crate::services::query_execution_service::QueryExecutionResult {
323                                    dataview: result_view,
324                                    stats: crate::services::query_execution_service::QueryStats {
325                                        row_count: 0, // Will be filled by result_view
326                                        column_count: 0,
327                                        execution_time: exec_start.elapsed(),
328                                        query_engine_time: exec_start.elapsed(),
329                                    },
330                                    hidden_columns: Vec::new(),
331                                    query: config.query.clone(),
332                                    execution_plan: None,
333                                    debug_trace: None,
334                                },
335                            )
336                        }
337                        Err(e) => Err(e),
338                    }
339                } else {
340                    // No lifting needed, execute normally
341                    let query_service =
342                        QueryExecutionService::with_behavior_config(behavior_config);
343                    if config.debug_trace {
344                        let debug_ctx = crate::debug_trace::DebugContext::new(
345                            crate::debug_trace::DebugLevel::Debug,
346                        );
347                        query_service.execute_with_debug(
348                            &config.query,
349                            Some(&dataview),
350                            Some(dataview.source()),
351                            Some(debug_ctx),
352                        )
353                    } else {
354                        query_service.execute(
355                            &config.query,
356                            Some(&dataview),
357                            Some(dataview.source()),
358                        )
359                    }
360                }
361            }
362            Err(_) => {
363                // Parse failed, execute normally and let it fail with proper error
364                let query_service = QueryExecutionService::with_behavior_config(behavior_config);
365                if config.debug_trace {
366                    let debug_ctx = crate::debug_trace::DebugContext::new(
367                        crate::debug_trace::DebugLevel::Debug,
368                    );
369                    query_service.execute_with_debug(
370                        &config.query,
371                        Some(&dataview),
372                        Some(dataview.source()),
373                        Some(debug_ctx),
374                    )
375                } else {
376                    query_service.execute(&config.query, Some(&dataview), Some(dataview.source()))
377                }
378            }
379        }
380    } else {
381        // Normal execution without lifting
382        let query_service = QueryExecutionService::with_behavior_config(behavior_config);
383
384        // Create debug context if debug trace is enabled
385        if config.debug_trace {
386            let debug_ctx =
387                crate::debug_trace::DebugContext::new(crate::debug_trace::DebugLevel::Debug);
388            query_service.execute_with_debug(
389                &config.query,
390                Some(&dataview),
391                Some(dataview.source()),
392                Some(debug_ctx),
393            )
394        } else {
395            query_service.execute(&config.query, Some(&dataview), Some(dataview.source()))
396        }
397    }?;
398    let exec_time = exec_start.elapsed();
399
400    let query_time = query_start.elapsed();
401    info!("Query executed in {:?}", query_time);
402    info!(
403        "Result: {} rows, {} columns",
404        result.dataview.row_count(),
405        result.dataview.column_count()
406    );
407
408    // Show execution plan details if requested
409    if config.execution_plan {
410        // Try to get detailed execution plan
411        use crate::data::query_engine::QueryEngine;
412
413        let query_engine = QueryEngine::new();
414
415        match query_engine.execute_with_plan(
416            std::sync::Arc::new(dataview.source().clone()),
417            &config.query,
418        ) {
419            Ok((_view, plan)) => {
420                // Display the detailed execution plan tree
421                print!("{}", plan.format_tree());
422            }
423            Err(e) => {
424                // Fall back to simple execution plan display
425                eprintln!("Could not generate detailed execution plan: {}", e);
426                println!(
427                    "3. QUERY_EXECUTION [{:.3}ms]",
428                    exec_time.as_secs_f64() * 1000.0
429                );
430
431                // Parse query to understand what operations are being performed
432                use crate::sql::recursive_parser::Parser;
433                let mut parser = Parser::new(&config.query);
434                if let Ok(stmt) = parser.parse() {
435                    if stmt.where_clause.is_some() {
436                        println!("   • WHERE clause filtering applied");
437                        println!("   • Rows after filter: {}", result.dataview.row_count());
438                    }
439
440                    if let Some(ref order_by) = stmt.order_by {
441                        println!("   • ORDER BY: {} column(s)", order_by.len());
442                    }
443
444                    if let Some(ref group_by) = stmt.group_by {
445                        println!("   • GROUP BY: {} column(s)", group_by.len());
446                    }
447
448                    if let Some(limit) = stmt.limit {
449                        println!("   • LIMIT: {} rows", limit);
450                    }
451
452                    if stmt.distinct {
453                        println!("   • DISTINCT applied");
454                    }
455                }
456            }
457        }
458
459        println!("\nExecution Statistics:");
460        println!(
461            "  Preparation:    {:.3}ms",
462            (exec_start - start_time).as_secs_f64() * 1000.0
463        );
464        println!(
465            "  Query time:     {:.3}ms",
466            exec_time.as_secs_f64() * 1000.0
467        );
468        println!(
469            "  Total time:     {:.3}ms",
470            query_time.as_secs_f64() * 1000.0
471        );
472        println!("  Rows returned:  {}", result.dataview.row_count());
473        println!("  Columns:        {}", result.dataview.column_count());
474        println!("\n=== END EXECUTION PLAN ===");
475        println!();
476    }
477
478    // 4. Apply limit if specified
479    let final_view = if let Some(limit) = config.limit {
480        let limited_table = limit_results(&result.dataview, limit)?;
481        DataView::new(std::sync::Arc::new(limited_table))
482    } else {
483        result.dataview
484    };
485
486    // 5. Output debug trace if enabled
487    if let Some(ref trace_output) = result.debug_trace {
488        eprintln!("{}", trace_output);
489    }
490
491    // 6. Output the results
492    let output_result = if let Some(ref path) = config.output_file {
493        let mut file = fs::File::create(path)
494            .with_context(|| format!("Failed to create output file: {path}"))?;
495        output_results(
496            &final_view,
497            config.output_format,
498            &mut file,
499            config.max_col_width,
500            config.col_sample_rows,
501        )?;
502        info!("Results written to: {}", path);
503        Ok(())
504    } else {
505        output_results(
506            &final_view,
507            config.output_format,
508            &mut io::stdout(),
509            config.max_col_width,
510            config.col_sample_rows,
511        )?;
512        Ok(())
513    };
514
515    let total_time = start_time.elapsed();
516    debug!("Total execution time: {:?}", total_time);
517
518    // Print stats to stderr so they don't interfere with output
519    if config.output_file.is_none() {
520        eprintln!(
521            "\n# Query completed: {} rows in {:?}",
522            final_view.row_count(),
523            query_time
524        );
525    }
526
527    output_result
528}
529
530/// Execute a script file with multiple SQL statements separated by GO
531pub fn execute_script(config: NonInteractiveConfig) -> Result<()> {
532    let _start_time = Instant::now();
533
534    // Parse the script into individual statements
535    let parser = ScriptParser::new(&config.query);
536    let statements = parser.parse_and_validate()?;
537
538    info!("Found {} statements in script", statements.len());
539
540    // Determine data file to use (command-line overrides script hint)
541    let data_file = if !config.data_file.is_empty() {
542        // Command-line argument takes precedence
543        config.data_file.clone()
544    } else if let Some(hint) = parser.data_file_hint() {
545        // Use data file hint from script
546        info!("Using data file from script hint: {}", hint);
547
548        // Resolve relative paths relative to script file if provided
549        if let Some(script_path) = config.script_file.as_ref() {
550            let script_dir = std::path::Path::new(script_path)
551                .parent()
552                .unwrap_or(std::path::Path::new("."));
553            let hint_path = std::path::Path::new(hint);
554
555            if hint_path.is_relative() {
556                script_dir.join(hint_path).to_string_lossy().to_string()
557            } else {
558                hint.to_string()
559            }
560        } else {
561            hint.to_string()
562        }
563    } else {
564        String::new()
565    };
566
567    // Load the data file if provided, otherwise use DUAL
568    let (data_table, _is_dual) = if data_file.is_empty() {
569        // No data file provided, use DUAL table
570        info!("No data file provided, using DUAL table");
571        (DataTable::dual(), true)
572    } else {
573        // Check if file exists before trying to load
574        if !std::path::Path::new(&data_file).exists() {
575            anyhow::bail!(
576                "Data file not found: {}\n\
577                Please check the path is correct",
578                data_file
579            );
580        }
581
582        info!("Loading data from: {}", data_file);
583        let table = load_data_file(&data_file)?;
584        info!(
585            "Loaded {} rows with {} columns",
586            table.row_count(),
587            table.column_count()
588        );
589        (table, false)
590    };
591
592    // Track script results
593    let mut script_result = ScriptResult::new();
594    let mut output = Vec::new();
595
596    // Create Arc<DataTable> once for all statements - avoids expensive cloning
597    let arc_data_table = std::sync::Arc::new(data_table);
598
599    // Execute each statement
600    for (idx, statement) in statements.iter().enumerate() {
601        let statement_num = idx + 1;
602        let stmt_start = Instant::now();
603
604        // Print separator for table format
605        if matches!(config.output_format, OutputFormat::Table) {
606            if idx > 0 {
607                output.push(String::new()); // Empty line between queries
608            }
609            output.push(format!("-- Query {} --", statement_num));
610        }
611
612        // Create a fresh DataView for each statement (reuses the Arc)
613        let dataview = DataView::new(arc_data_table.clone());
614
615        // Execute the statement
616        let service = QueryExecutionService::new(config.case_insensitive, config.auto_hide_empty);
617        match service.execute(statement, Some(&dataview), None) {
618            Ok(result) => {
619                let exec_time = stmt_start.elapsed().as_secs_f64() * 1000.0;
620                let final_view = result.dataview;
621
622                // Format the output based on the output format
623                let mut statement_output = Vec::new();
624                match config.output_format {
625                    OutputFormat::Csv => {
626                        output_csv(&final_view, &mut statement_output, ',')?;
627                    }
628                    OutputFormat::Json => {
629                        output_json(&final_view, &mut statement_output)?;
630                    }
631                    OutputFormat::Table => {
632                        output_table(
633                            &final_view,
634                            &mut statement_output,
635                            config.max_col_width,
636                            config.col_sample_rows,
637                        )?;
638                        writeln!(
639                            &mut statement_output,
640                            "Query completed: {} rows in {:.2}ms",
641                            final_view.row_count(),
642                            exec_time
643                        )?;
644                    }
645                    OutputFormat::Tsv => {
646                        output_csv(&final_view, &mut statement_output, '\t')?;
647                    }
648                }
649
650                // Add to overall output
651                output.extend(
652                    String::from_utf8_lossy(&statement_output)
653                        .lines()
654                        .map(String::from),
655                );
656
657                script_result.add_success(
658                    statement_num,
659                    statement.clone(),
660                    final_view.row_count(),
661                    exec_time,
662                );
663            }
664            Err(e) => {
665                let exec_time = stmt_start.elapsed().as_secs_f64() * 1000.0;
666                let error_msg = format!("Query {} failed: {}", statement_num, e);
667
668                if matches!(config.output_format, OutputFormat::Table) {
669                    output.push(error_msg.clone());
670                }
671
672                script_result.add_failure(
673                    statement_num,
674                    statement.clone(),
675                    e.to_string(),
676                    exec_time,
677                );
678
679                // Continue to next statement (don't stop on error)
680            }
681        }
682    }
683
684    // Write output
685    if let Some(ref output_file) = config.output_file {
686        let mut file = fs::File::create(output_file)?;
687        for line in &output {
688            writeln!(file, "{}", line)?;
689        }
690        info!("Results written to: {}", output_file);
691    } else {
692        for line in &output {
693            println!("{}", line);
694        }
695    }
696
697    // Print summary if in table mode
698    if matches!(config.output_format, OutputFormat::Table) {
699        println!("\n=== Script Summary ===");
700        println!("Total statements: {}", script_result.total_statements);
701        println!("Successful: {}", script_result.successful_statements);
702        println!("Failed: {}", script_result.failed_statements);
703        println!(
704            "Total execution time: {:.2}ms",
705            script_result.total_execution_time_ms
706        );
707    }
708
709    if !script_result.all_successful() {
710        return Err(anyhow::anyhow!(
711            "{} of {} statements failed",
712            script_result.failed_statements,
713            script_result.total_statements
714        ));
715    }
716
717    Ok(())
718}
719
720/// Load a data file (CSV or JSON) into a `DataTable`
721fn load_data_file(path: &str) -> Result<DataTable> {
722    let path = Path::new(path);
723
724    if !path.exists() {
725        return Err(anyhow::anyhow!("File not found: {}", path.display()));
726    }
727
728    // Determine file type by extension
729    let extension = path
730        .extension()
731        .and_then(|ext| ext.to_str())
732        .map(str::to_lowercase)
733        .unwrap_or_default();
734
735    let table_name = path
736        .file_stem()
737        .and_then(|stem| stem.to_str())
738        .unwrap_or("data")
739        .to_string();
740
741    match extension.as_str() {
742        "csv" => load_csv_to_datatable(path, &table_name)
743            .with_context(|| format!("Failed to load CSV file: {}", path.display())),
744        "json" => load_json_to_datatable(path, &table_name)
745            .with_context(|| format!("Failed to load JSON file: {}", path.display())),
746        _ => Err(anyhow::anyhow!(
747            "Unsupported file type: {}. Use .csv or .json",
748            extension
749        )),
750    }
751}
752
753/// Limit the number of rows in results
754fn limit_results(dataview: &DataView, limit: usize) -> Result<DataTable> {
755    let source = dataview.source();
756    let mut limited_table = DataTable::new(&source.name);
757
758    // Copy columns
759    for col in &source.columns {
760        limited_table.add_column(col.clone());
761    }
762
763    // Copy limited rows
764    let rows_to_copy = dataview.row_count().min(limit);
765    for i in 0..rows_to_copy {
766        if let Some(row) = dataview.get_row(i) {
767            limited_table.add_row(row.clone());
768        }
769    }
770
771    Ok(limited_table)
772}
773
774/// Output query results in the specified format
775fn output_results<W: Write>(
776    dataview: &DataView,
777    format: OutputFormat,
778    writer: &mut W,
779    max_col_width: Option<usize>,
780    col_sample_rows: usize,
781) -> Result<()> {
782    match format {
783        OutputFormat::Csv => output_csv(dataview, writer, ','),
784        OutputFormat::Tsv => output_csv(dataview, writer, '\t'),
785        OutputFormat::Json => output_json(dataview, writer),
786        OutputFormat::Table => output_table(dataview, writer, max_col_width, col_sample_rows),
787    }
788}
789
790/// Output results as CSV/TSV
791fn output_csv<W: Write>(dataview: &DataView, writer: &mut W, delimiter: char) -> Result<()> {
792    // Write headers
793    let columns = dataview.column_names();
794    for (i, col) in columns.iter().enumerate() {
795        if i > 0 {
796            write!(writer, "{delimiter}")?;
797        }
798        write!(writer, "{}", escape_csv_field(col, delimiter))?;
799    }
800    writeln!(writer)?;
801
802    // Write rows
803    for row_idx in 0..dataview.row_count() {
804        if let Some(row) = dataview.get_row(row_idx) {
805            for (i, value) in row.values.iter().enumerate() {
806                if i > 0 {
807                    write!(writer, "{delimiter}")?;
808                }
809                write!(
810                    writer,
811                    "{}",
812                    escape_csv_field(&format_value(value), delimiter)
813                )?;
814            }
815            writeln!(writer)?;
816        }
817    }
818
819    Ok(())
820}
821
822/// Output results as JSON
823fn output_json<W: Write>(dataview: &DataView, writer: &mut W) -> Result<()> {
824    let columns = dataview.column_names();
825    let mut rows = Vec::new();
826
827    for row_idx in 0..dataview.row_count() {
828        if let Some(row) = dataview.get_row(row_idx) {
829            let mut json_row = serde_json::Map::new();
830            for (col_idx, value) in row.values.iter().enumerate() {
831                if col_idx < columns.len() {
832                    json_row.insert(columns[col_idx].clone(), value_to_json(value));
833                }
834            }
835            rows.push(serde_json::Value::Object(json_row));
836        }
837    }
838
839    let json = serde_json::to_string_pretty(&rows)?;
840    writeln!(writer, "{json}")?;
841
842    Ok(())
843}
844
845/// Output results as an ASCII table
846fn output_table<W: Write>(
847    dataview: &DataView,
848    writer: &mut W,
849    max_col_width: Option<usize>,
850    col_sample_rows: usize,
851) -> Result<()> {
852    let columns = dataview.column_names();
853
854    // Calculate column widths
855    let mut widths = vec![0; columns.len()];
856    for (i, col) in columns.iter().enumerate() {
857        widths[i] = col.len();
858    }
859
860    // Sample rows for width calculation
861    // If col_sample_rows is 0, scan all rows
862    // Otherwise, scan min(col_sample_rows, total_rows)
863    let sample_size = if col_sample_rows == 0 {
864        dataview.row_count()
865    } else {
866        dataview.row_count().min(col_sample_rows)
867    };
868
869    for row_idx in 0..sample_size {
870        if let Some(row) = dataview.get_row(row_idx) {
871            for (i, value) in row.values.iter().enumerate() {
872                if i < widths.len() {
873                    let value_str = format_value(value);
874                    widths[i] = widths[i].max(value_str.len());
875                }
876            }
877        }
878    }
879
880    // Apply maximum column width if specified
881    if let Some(max_width) = max_col_width {
882        for width in &mut widths {
883            *width = (*width).min(max_width);
884        }
885    }
886
887    // Print header separator
888    write!(writer, "+")?;
889    for width in &widths {
890        write!(writer, "-{}-+", "-".repeat(*width))?;
891    }
892    writeln!(writer)?;
893
894    // Print headers
895    write!(writer, "|")?;
896    for (i, col) in columns.iter().enumerate() {
897        write!(writer, " {:^width$} |", col, width = widths[i])?;
898    }
899    writeln!(writer)?;
900
901    // Print header separator
902    write!(writer, "+")?;
903    for width in &widths {
904        write!(writer, "-{}-+", "-".repeat(*width))?;
905    }
906    writeln!(writer)?;
907
908    // Print rows
909    for row_idx in 0..dataview.row_count() {
910        if let Some(row) = dataview.get_row(row_idx) {
911            write!(writer, "|")?;
912            for (i, value) in row.values.iter().enumerate() {
913                if i < widths.len() {
914                    let value_str = format_value(value);
915                    let truncated = if value_str.len() > widths[i] {
916                        format!("{}...", &value_str[..widths[i] - 3])
917                    } else {
918                        value_str
919                    };
920                    write!(writer, " {:<width$} |", truncated, width = widths[i])?;
921                }
922            }
923            writeln!(writer)?;
924        }
925    }
926
927    // Print bottom separator
928    write!(writer, "+")?;
929    for width in &widths {
930        write!(writer, "-{}-+", "-".repeat(*width))?;
931    }
932    writeln!(writer)?;
933
934    Ok(())
935}
936
937/// Format a `DataValue` for display
938fn format_value(value: &DataValue) -> String {
939    match value {
940        DataValue::Null => String::new(),
941        DataValue::Integer(i) => i.to_string(),
942        DataValue::Float(f) => f.to_string(),
943        DataValue::String(s) => s.clone(),
944        DataValue::InternedString(s) => s.to_string(),
945        DataValue::Boolean(b) => b.to_string(),
946        DataValue::DateTime(dt) => dt.to_string(),
947    }
948}
949
950/// Convert `DataValue` to JSON
951fn value_to_json(value: &DataValue) -> serde_json::Value {
952    match value {
953        DataValue::Null => serde_json::Value::Null,
954        DataValue::Integer(i) => serde_json::Value::Number((*i).into()),
955        DataValue::Float(f) => {
956            if let Some(n) = serde_json::Number::from_f64(*f) {
957                serde_json::Value::Number(n)
958            } else {
959                serde_json::Value::Null
960            }
961        }
962        DataValue::String(s) => serde_json::Value::String(s.clone()),
963        DataValue::InternedString(s) => serde_json::Value::String(s.to_string()),
964        DataValue::Boolean(b) => serde_json::Value::Bool(*b),
965        DataValue::DateTime(dt) => serde_json::Value::String(dt.to_string()),
966    }
967}
968
969/// Escape a CSV field if it contains special characters
970fn escape_csv_field(field: &str, delimiter: char) -> String {
971    if field.contains(delimiter)
972        || field.contains('"')
973        || field.contains('\n')
974        || field.contains('\r')
975    {
976        format!("\"{}\"", field.replace('"', "\"\""))
977    } else {
978        field.to_string()
979    }
980}