sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::{FxHashMap, FxHashSet};
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::subquery_executor::SubqueryExecutor;
19use crate::data::virtual_table_generator::VirtualTableGenerator;
20use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
21use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
22use crate::sql::parser::ast::ColumnRef;
23use crate::sql::parser::ast::TableSource;
24use crate::sql::recursive_parser::{
25    CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
26    TableFunction,
27};
28
29/// Query engine that executes SQL directly on `DataTable`
30#[derive(Clone)]
31pub struct QueryEngine {
32    case_insensitive: bool,
33    date_notation: String,
34    behavior_config: Option<BehaviorConfig>,
35}
36
37impl Default for QueryEngine {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl QueryEngine {
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            case_insensitive: false,
48            date_notation: get_date_notation(),
49            behavior_config: None,
50        }
51    }
52
53    #[must_use]
54    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
55        let case_insensitive = config.case_insensitive_default;
56        // Use get_date_notation() to respect environment variable override
57        let date_notation = get_date_notation();
58        Self {
59            case_insensitive,
60            date_notation,
61            behavior_config: Some(config),
62        }
63    }
64
65    #[must_use]
66    pub fn with_date_notation(_date_notation: String) -> Self {
67        Self {
68            case_insensitive: false,
69            date_notation: get_date_notation(), // Always use the global function
70            behavior_config: None,
71        }
72    }
73
74    #[must_use]
75    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
76        Self {
77            case_insensitive,
78            date_notation: get_date_notation(),
79            behavior_config: None,
80        }
81    }
82
83    #[must_use]
84    pub fn with_case_insensitive_and_date_notation(
85        case_insensitive: bool,
86        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
87    ) -> Self {
88        Self {
89            case_insensitive,
90            date_notation: get_date_notation(), // Always use the global function
91            behavior_config: None,
92        }
93    }
94
95    /// Find a column name similar to the given name using edit distance
96    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
97        let columns = table.column_names();
98        let mut best_match: Option<(String, usize)> = None;
99
100        for col in columns {
101            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
102            // Only suggest if distance is small (likely a typo)
103            // Allow up to 3 edits for longer names
104            let max_distance = if name.len() > 10 { 3 } else { 2 };
105            if distance <= max_distance {
106                match &best_match {
107                    None => best_match = Some((col, distance)),
108                    Some((_, best_dist)) if distance < *best_dist => {
109                        best_match = Some((col, distance));
110                    }
111                    _ => {}
112                }
113            }
114        }
115
116        best_match.map(|(name, _)| name)
117    }
118
119    /// Calculate Levenshtein edit distance between two strings
120    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
121        let len1 = s1.len();
122        let len2 = s2.len();
123        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
124
125        for i in 0..=len1 {
126            matrix[i][0] = i;
127        }
128        for j in 0..=len2 {
129            matrix[0][j] = j;
130        }
131
132        for (i, c1) in s1.chars().enumerate() {
133            for (j, c2) in s2.chars().enumerate() {
134                let cost = usize::from(c1 != c2);
135                matrix[i + 1][j + 1] = std::cmp::min(
136                    matrix[i][j + 1] + 1, // deletion
137                    std::cmp::min(
138                        matrix[i + 1][j] + 1, // insertion
139                        matrix[i][j] + cost,  // substitution
140                    ),
141                );
142            }
143        }
144
145        matrix[len1][len2]
146    }
147
148    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
149    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
150        let (view, _plan) = self.execute_with_plan(table, sql)?;
151        Ok(view)
152    }
153
154    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
155    pub fn execute_statement(
156        &self,
157        table: Arc<DataTable>,
158        statement: SelectStatement,
159    ) -> Result<DataView> {
160        // First process CTEs to build context
161        let mut cte_context = HashMap::new();
162        for cte in &statement.ctes {
163            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
164            // Execute the CTE based on its type
165            let cte_result = match &cte.cte_type {
166                CTEType::Standard(query) => {
167                    // Execute the CTE query (it might reference earlier CTEs)
168                    let view = self.build_view_with_context(
169                        table.clone(),
170                        query.clone(),
171                        &mut cte_context,
172                    )?;
173
174                    // Materialize the view and enrich columns with qualified names
175                    let mut materialized = self.materialize_view(view)?;
176
177                    // Enrich columns with qualified names for proper scoping
178                    for column in materialized.columns_mut() {
179                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
180                        column.source_table = Some(cte.name.clone());
181                    }
182
183                    DataView::new(Arc::new(materialized))
184                }
185                CTEType::Web(web_spec) => {
186                    // Fetch data from URL
187                    use crate::web::http_fetcher::WebDataFetcher;
188
189                    let fetcher = WebDataFetcher::new()?;
190                    let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
191
192                    // Enrich columns with qualified names for proper scoping
193                    for column in data_table.columns_mut() {
194                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
195                        column.source_table = Some(cte.name.clone());
196                    }
197
198                    // Convert DataTable to DataView
199                    DataView::new(Arc::new(data_table))
200                }
201            };
202            // Store the result in the context for later use
203            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
204            debug!(
205                "QueryEngine: CTE '{}' pre-processed, stored in context",
206                cte.name
207            );
208        }
209
210        // Now process subqueries with CTE context available
211        let mut subquery_executor =
212            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
213        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
214
215        // Build the view with the same CTE context
216        self.build_view_with_context(table, processed_statement, &mut cte_context)
217    }
218
219    /// Execute a statement with provided CTE context (for subqueries)
220    pub fn execute_statement_with_cte_context(
221        &self,
222        table: Arc<DataTable>,
223        statement: SelectStatement,
224        cte_context: &HashMap<String, Arc<DataView>>,
225    ) -> Result<DataView> {
226        // Clone the context so we can add any CTEs from this statement
227        let mut local_context = cte_context.clone();
228
229        // Process any CTEs in this statement (they might be nested)
230        for cte in &statement.ctes {
231            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
232            let cte_result = match &cte.cte_type {
233                CTEType::Standard(query) => {
234                    let view = self.build_view_with_context(
235                        table.clone(),
236                        query.clone(),
237                        &mut local_context,
238                    )?;
239
240                    // Materialize the view and enrich columns with qualified names
241                    let mut materialized = self.materialize_view(view)?;
242
243                    // Enrich columns with qualified names for proper scoping
244                    for column in materialized.columns_mut() {
245                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
246                        column.source_table = Some(cte.name.clone());
247                    }
248
249                    DataView::new(Arc::new(materialized))
250                }
251                CTEType::Web(web_spec) => {
252                    // Fetch data from URL
253                    use crate::web::http_fetcher::WebDataFetcher;
254
255                    let fetcher = WebDataFetcher::new()?;
256                    let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
257
258                    // Enrich columns with qualified names for proper scoping
259                    for column in data_table.columns_mut() {
260                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
261                        column.source_table = Some(cte.name.clone());
262                    }
263
264                    // Convert DataTable to DataView
265                    DataView::new(Arc::new(data_table))
266                }
267            };
268            local_context.insert(cte.name.clone(), Arc::new(cte_result));
269        }
270
271        // Process subqueries with the complete context
272        let mut subquery_executor =
273            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
274        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
275
276        // Build the view
277        self.build_view_with_context(table, processed_statement, &mut local_context)
278    }
279
280    /// Execute a query and return both the result and the execution plan
281    pub fn execute_with_plan(
282        &self,
283        table: Arc<DataTable>,
284        sql: &str,
285    ) -> Result<(DataView, ExecutionPlan)> {
286        let mut plan_builder = ExecutionPlanBuilder::new();
287        let start_time = Instant::now();
288
289        // Parse the SQL query
290        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
291        plan_builder.add_detail(format!("Query: {}", sql));
292        let mut parser = Parser::new(sql);
293        let statement = parser
294            .parse()
295            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
296        plan_builder.add_detail(format!("Parsed successfully"));
297        if let Some(from) = &statement.from_table {
298            plan_builder.add_detail(format!("FROM: {}", from));
299        }
300        if statement.where_clause.is_some() {
301            plan_builder.add_detail("WHERE clause present".to_string());
302        }
303        plan_builder.end_step();
304
305        // First process CTEs to build context
306        let mut cte_context = HashMap::new();
307
308        if !statement.ctes.is_empty() {
309            plan_builder.begin_step(
310                StepType::CTE,
311                format!("Process {} CTEs", statement.ctes.len()),
312            );
313
314            for cte in &statement.ctes {
315                let cte_start = Instant::now();
316                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
317
318                let cte_result = match &cte.cte_type {
319                    CTEType::Standard(query) => {
320                        // Add CTE query details
321                        if let Some(from) = &query.from_table {
322                            plan_builder.add_detail(format!("Source: {}", from));
323                        }
324                        if query.where_clause.is_some() {
325                            plan_builder.add_detail("Has WHERE clause".to_string());
326                        }
327                        if query.group_by.is_some() {
328                            plan_builder.add_detail("Has GROUP BY".to_string());
329                        }
330
331                        debug!(
332                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
333                            cte.name,
334                            cte_context.keys().collect::<Vec<_>>()
335                        );
336
337                        // Process subqueries in the CTE's query FIRST
338                        // This allows the subqueries to see all previously defined CTEs
339                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
340                            self.clone(),
341                            table.clone(),
342                            cte_context.clone(),
343                        );
344                        let processed_query = subquery_executor.execute_subqueries(query)?;
345
346                        let view = self.build_view_with_context(
347                            table.clone(),
348                            processed_query,
349                            &mut cte_context,
350                        )?;
351
352                        // Materialize the view and enrich columns with qualified names
353                        let mut materialized = self.materialize_view(view)?;
354
355                        // Enrich columns with qualified names for proper scoping
356                        for column in materialized.columns_mut() {
357                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
358                            column.source_table = Some(cte.name.clone());
359                        }
360
361                        DataView::new(Arc::new(materialized))
362                    }
363                    CTEType::Web(web_spec) => {
364                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
365                        if let Some(format) = &web_spec.format {
366                            plan_builder.add_detail(format!("Format: {:?}", format));
367                        }
368                        if let Some(cache) = web_spec.cache_seconds {
369                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
370                        }
371
372                        // Fetch data from URL
373                        use crate::web::http_fetcher::WebDataFetcher;
374
375                        let fetcher = WebDataFetcher::new()?;
376                        let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
377
378                        // Enrich columns with qualified names for proper scoping
379                        for column in data_table.columns_mut() {
380                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
381                            column.source_table = Some(cte.name.clone());
382                        }
383
384                        // Convert DataTable to DataView
385                        DataView::new(Arc::new(data_table))
386                    }
387                };
388
389                // Record CTE statistics
390                plan_builder.set_rows_out(cte_result.row_count());
391                plan_builder.add_detail(format!(
392                    "Result: {} rows, {} columns",
393                    cte_result.row_count(),
394                    cte_result.column_count()
395                ));
396                plan_builder.add_detail(format!(
397                    "Execution time: {:.3}ms",
398                    cte_start.elapsed().as_secs_f64() * 1000.0
399                ));
400
401                debug!(
402                    "QueryEngine: Storing CTE '{}' in context with {} rows",
403                    cte.name,
404                    cte_result.row_count()
405                );
406                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
407                plan_builder.end_step();
408            }
409
410            plan_builder.add_detail(format!(
411                "All {} CTEs cached in context",
412                statement.ctes.len()
413            ));
414            plan_builder.end_step();
415        }
416
417        // Process subqueries in the statement with CTE context
418        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
419        let mut subquery_executor =
420            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
421
422        // Check if there are subqueries to process
423        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
424            // This is a simplified check - in reality we'd need to walk the AST
425            format!("{:?}", w).contains("Subquery")
426        });
427
428        if has_subqueries {
429            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
430        }
431
432        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
433
434        if has_subqueries {
435            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
436        } else {
437            plan_builder.add_detail("No subqueries to process".to_string());
438        }
439
440        plan_builder.end_step();
441        let result = self.build_view_with_context_and_plan(
442            table,
443            processed_statement,
444            &mut cte_context,
445            &mut plan_builder,
446        )?;
447
448        let total_duration = start_time.elapsed();
449        info!(
450            "Query execution complete: total={:?}, rows={}",
451            total_duration,
452            result.row_count()
453        );
454
455        let plan = plan_builder.build();
456        Ok((result, plan))
457    }
458
459    /// Build a `DataView` from a parsed SQL statement
460    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
461        let mut cte_context = HashMap::new();
462        self.build_view_with_context(table, statement, &mut cte_context)
463    }
464
465    /// Build a DataView from a SelectStatement with CTE context
466    fn build_view_with_context(
467        &self,
468        table: Arc<DataTable>,
469        statement: SelectStatement,
470        cte_context: &mut HashMap<String, Arc<DataView>>,
471    ) -> Result<DataView> {
472        let mut dummy_plan = ExecutionPlanBuilder::new();
473        self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
474    }
475
476    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
477    fn build_view_with_context_and_plan(
478        &self,
479        table: Arc<DataTable>,
480        statement: SelectStatement,
481        cte_context: &mut HashMap<String, Arc<DataView>>,
482        plan: &mut ExecutionPlanBuilder,
483    ) -> Result<DataView> {
484        // First, process any CTEs that aren't already in the context
485        for cte in &statement.ctes {
486            // Skip if already processed (e.g., by execute_select for WEB CTEs)
487            if cte_context.contains_key(&cte.name) {
488                debug!(
489                    "QueryEngine: CTE '{}' already in context, skipping",
490                    cte.name
491                );
492                continue;
493            }
494
495            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
496            debug!(
497                "QueryEngine: Available CTEs for '{}': {:?}",
498                cte.name,
499                cte_context.keys().collect::<Vec<_>>()
500            );
501
502            // Execute the CTE query (it might reference earlier CTEs)
503            let cte_result = match &cte.cte_type {
504                CTEType::Standard(query) => {
505                    let view =
506                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
507
508                    // Materialize the view and enrich columns with qualified names
509                    let mut materialized = self.materialize_view(view)?;
510
511                    // Enrich columns with qualified names for proper scoping
512                    for column in materialized.columns_mut() {
513                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
514                        column.source_table = Some(cte.name.clone());
515                    }
516
517                    DataView::new(Arc::new(materialized))
518                }
519                CTEType::Web(_web_spec) => {
520                    // Web CTEs should have been processed earlier in execute_select
521                    return Err(anyhow!(
522                        "Web CTEs should be processed in execute_select method"
523                    ));
524                }
525            };
526
527            // Store the result in the context for later use
528            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
529            debug!(
530                "QueryEngine: CTE '{}' processed, stored in context",
531                cte.name
532            );
533        }
534
535        // Determine the source table for the main query
536        let source_table = if let Some(ref table_func) = statement.from_function {
537            // Handle table functions like RANGE()
538            debug!("QueryEngine: Processing table function...");
539            match table_func {
540                TableFunction::Range { start, end, step } => {
541                    // Evaluate expressions to get numeric values
542                    let mut evaluator =
543                        ArithmeticEvaluator::with_date_notation(&table, self.date_notation.clone());
544
545                    // Create a dummy row for evaluating constant expressions
546                    let dummy_row = 0;
547
548                    let start_val = evaluator.evaluate(start, dummy_row)?;
549                    let end_val = evaluator.evaluate(end, dummy_row)?;
550                    let step_val = if let Some(step_expr) = step {
551                        Some(evaluator.evaluate(step_expr, dummy_row)?)
552                    } else {
553                        None
554                    };
555
556                    // Convert DataValues to integers
557                    let start_int = match start_val {
558                        DataValue::Integer(i) => i,
559                        DataValue::Float(f) => f as i64,
560                        _ => return Err(anyhow!("RANGE start must be numeric")),
561                    };
562
563                    let end_int = match end_val {
564                        DataValue::Integer(i) => i,
565                        DataValue::Float(f) => f as i64,
566                        _ => return Err(anyhow!("RANGE end must be numeric")),
567                    };
568
569                    let step_int = if let Some(step) = step_val {
570                        match step {
571                            DataValue::Integer(i) => Some(i),
572                            DataValue::Float(f) => Some(f as i64),
573                            _ => return Err(anyhow!("RANGE step must be numeric")),
574                        }
575                    } else {
576                        None
577                    };
578
579                    // Generate the virtual table
580                    VirtualTableGenerator::generate_range(start_int, end_int, step_int, None)?
581                }
582                TableFunction::Split { text, delimiter } => {
583                    // Evaluate expressions to get string values
584                    let mut evaluator =
585                        ArithmeticEvaluator::with_date_notation(&table, self.date_notation.clone());
586
587                    // Create a dummy row for evaluating constant expressions
588                    let dummy_row = 0;
589
590                    let text_val = evaluator.evaluate(text, dummy_row)?;
591                    let delimiter_val = if let Some(delim_expr) = delimiter {
592                        Some(evaluator.evaluate(delim_expr, dummy_row)?)
593                    } else {
594                        None
595                    };
596
597                    // Convert DataValues to strings
598                    let text_str = match text_val {
599                        DataValue::String(s) => s,
600                        DataValue::Null => return Err(anyhow!("SPLIT text cannot be NULL")),
601                        _ => text_val.to_string(),
602                    };
603
604                    let delimiter_str = if let Some(delim) = delimiter_val {
605                        match delim {
606                            DataValue::String(s) => Some(s),
607                            DataValue::Null => None,
608                            _ => Some(delim.to_string()),
609                        }
610                    } else {
611                        None
612                    };
613
614                    // Generate the virtual table from split string
615                    VirtualTableGenerator::generate_split(
616                        &text_str,
617                        delimiter_str.as_deref(),
618                        None,
619                    )?
620                }
621                TableFunction::Generator { name, args } => {
622                    // Use the generator registry to create the table
623                    use crate::sql::generators::GeneratorRegistry;
624
625                    // Create generator registry (could be cached in QueryEngine)
626                    let registry = GeneratorRegistry::new();
627
628                    if let Some(generator) = registry.get(name) {
629                        // Evaluate arguments
630                        let mut evaluator = ArithmeticEvaluator::with_date_notation(
631                            &table,
632                            self.date_notation.clone(),
633                        );
634                        let dummy_row = 0;
635
636                        let mut evaluated_args = Vec::new();
637                        for arg in args {
638                            evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
639                        }
640
641                        // Generate the table
642                        generator.generate(evaluated_args)?
643                    } else {
644                        return Err(anyhow!("Unknown generator function: {}", name));
645                    }
646                }
647            }
648        } else if let Some(ref subquery) = statement.from_subquery {
649            // Execute the subquery and use its result as the source
650            debug!("QueryEngine: Processing FROM subquery...");
651            let subquery_result =
652                self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
653
654            // Convert the DataView to a DataTable for use as source
655            // This materializes the subquery result
656            let materialized = self.materialize_view(subquery_result)?;
657            Arc::new(materialized)
658        } else if let Some(ref table_name) = statement.from_table {
659            // Check if this references a CTE
660            if let Some(cte_view) = cte_context.get(table_name) {
661                debug!("QueryEngine: Using CTE '{}' as source table", table_name);
662                // Materialize the CTE view as a table
663                let materialized = self.materialize_view((**cte_view).clone())?;
664                Arc::new(materialized)
665            } else {
666                // Regular table reference - use the provided table
667                table.clone()
668            }
669        } else {
670            // No FROM clause - use the provided table
671            table.clone()
672        };
673
674        // Process JOINs if present
675        let final_table = if !statement.joins.is_empty() {
676            plan.begin_step(
677                StepType::Join,
678                format!("Process {} JOINs", statement.joins.len()),
679            );
680            plan.set_rows_in(source_table.row_count());
681
682            let join_executor = HashJoinExecutor::new(self.case_insensitive);
683            let mut current_table = source_table;
684
685            for (idx, join_clause) in statement.joins.iter().enumerate() {
686                let join_start = Instant::now();
687                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
688                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
689                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
690                plan.add_detail(format!(
691                    "Executing {:?} JOIN on {}",
692                    join_clause.join_type, join_clause.condition.left_column
693                ));
694
695                // Resolve the right table for the join
696                let right_table = match &join_clause.table {
697                    TableSource::Table(name) => {
698                        // Check if it's a CTE reference
699                        if let Some(cte_view) = cte_context.get(name) {
700                            let materialized = self.materialize_view((**cte_view).clone())?;
701                            Arc::new(materialized)
702                        } else {
703                            // For now, we need the actual table data
704                            // In a real implementation, this would load from file
705                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
706                        }
707                    }
708                    TableSource::DerivedTable { query, alias: _ } => {
709                        // Execute the subquery
710                        let subquery_result = self.build_view_with_context(
711                            table.clone(),
712                            *query.clone(),
713                            cte_context,
714                        )?;
715                        let materialized = self.materialize_view(subquery_result)?;
716                        Arc::new(materialized)
717                    }
718                };
719
720                // Execute the join
721                let joined = join_executor.execute_join(
722                    current_table.clone(),
723                    join_clause,
724                    right_table.clone(),
725                )?;
726
727                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
728                plan.set_rows_out(joined.row_count());
729                plan.add_detail(format!("Result: {} rows", joined.row_count()));
730                plan.add_detail(format!(
731                    "Join time: {:.3}ms",
732                    join_start.elapsed().as_secs_f64() * 1000.0
733                ));
734                plan.end_step();
735
736                current_table = Arc::new(joined);
737            }
738
739            plan.set_rows_out(current_table.row_count());
740            plan.add_detail(format!(
741                "Final result after all joins: {} rows",
742                current_table.row_count()
743            ));
744            plan.end_step();
745            current_table
746        } else {
747            source_table
748        };
749
750        // Continue with the existing build_view logic but using final_table
751        self.build_view_internal_with_plan(final_table, statement, plan)
752    }
753
754    /// Materialize a DataView into a new DataTable
755    fn materialize_view(&self, view: DataView) -> Result<DataTable> {
756        let source = view.source();
757        let mut result_table = DataTable::new("derived");
758
759        // Get the visible columns from the view
760        let visible_cols = view.visible_column_indices().to_vec();
761
762        // Copy column definitions
763        for col_idx in &visible_cols {
764            let col = &source.columns[*col_idx];
765            let new_col = DataColumn {
766                name: col.name.clone(),
767                data_type: col.data_type.clone(),
768                nullable: col.nullable,
769                unique_values: col.unique_values,
770                null_count: col.null_count,
771                metadata: col.metadata.clone(),
772                qualified_name: col.qualified_name.clone(), // Preserve qualified name
773                source_table: col.source_table.clone(),     // Preserve source table
774            };
775            result_table.add_column(new_col);
776        }
777
778        // Copy visible rows
779        for row_idx in view.visible_row_indices() {
780            let source_row = &source.rows[*row_idx];
781            let mut new_row = DataRow { values: Vec::new() };
782
783            for col_idx in &visible_cols {
784                new_row.values.push(source_row.values[*col_idx].clone());
785            }
786
787            result_table.add_row(new_row);
788        }
789
790        Ok(result_table)
791    }
792
793    fn build_view_internal(
794        &self,
795        table: Arc<DataTable>,
796        statement: SelectStatement,
797    ) -> Result<DataView> {
798        let mut dummy_plan = ExecutionPlanBuilder::new();
799        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
800    }
801
802    fn build_view_internal_with_plan(
803        &self,
804        table: Arc<DataTable>,
805        statement: SelectStatement,
806        plan: &mut ExecutionPlanBuilder,
807    ) -> Result<DataView> {
808        debug!(
809            "QueryEngine::build_view - select_items: {:?}",
810            statement.select_items
811        );
812        debug!(
813            "QueryEngine::build_view - where_clause: {:?}",
814            statement.where_clause
815        );
816
817        // Start with all rows visible
818        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
819
820        // Apply WHERE clause filtering using recursive evaluator
821        if let Some(where_clause) = &statement.where_clause {
822            let total_rows = table.row_count();
823            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
824            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
825
826            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
827            plan.set_rows_in(total_rows);
828            plan.add_detail(format!("Input: {} rows", total_rows));
829
830            // Add details about WHERE conditions
831            for condition in &where_clause.conditions {
832                plan.add_detail(format!("Condition: {:?}", condition.expr));
833            }
834
835            let filter_start = Instant::now();
836            // Create an evaluation context for caching compiled regexes
837            let mut eval_context = EvaluationContext::new(self.case_insensitive);
838
839            // Filter visible rows based on WHERE clause
840            let mut filtered_rows = Vec::new();
841            for row_idx in visible_rows {
842                // Only log for first few rows to avoid performance impact
843                if row_idx < 3 {
844                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
845                }
846                let mut evaluator =
847                    RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
848                match evaluator.evaluate(where_clause, row_idx) {
849                    Ok(result) => {
850                        if row_idx < 3 {
851                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
852                        }
853                        if result {
854                            filtered_rows.push(row_idx);
855                        }
856                    }
857                    Err(e) => {
858                        if row_idx < 3 {
859                            debug!(
860                                "QueryEngine: WHERE evaluation error for row {}: {}",
861                                row_idx, e
862                            );
863                        }
864                        // Propagate WHERE clause errors instead of silently ignoring them
865                        return Err(e);
866                    }
867                }
868            }
869
870            // Log regex cache statistics
871            let (compilations, cache_hits) = eval_context.get_stats();
872            if compilations > 0 || cache_hits > 0 {
873                debug!(
874                    "LIKE pattern cache: {} compilations, {} cache hits",
875                    compilations, cache_hits
876                );
877            }
878            visible_rows = filtered_rows;
879            let filter_duration = filter_start.elapsed();
880            info!(
881                "WHERE clause filtering: {} rows -> {} rows in {:?}",
882                total_rows,
883                visible_rows.len(),
884                filter_duration
885            );
886
887            plan.set_rows_out(visible_rows.len());
888            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
889            plan.add_detail(format!(
890                "Filter time: {:.3}ms",
891                filter_duration.as_secs_f64() * 1000.0
892            ));
893            plan.end_step();
894        }
895
896        // Create initial DataView with filtered rows
897        let mut view = DataView::new(table.clone());
898        view = view.with_rows(visible_rows);
899
900        // Handle GROUP BY if present
901        if let Some(group_by_exprs) = &statement.group_by {
902            if !group_by_exprs.is_empty() {
903                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
904
905                plan.begin_step(
906                    StepType::GroupBy,
907                    format!("GROUP BY {} expressions", group_by_exprs.len()),
908                );
909                plan.set_rows_in(view.row_count());
910                plan.add_detail(format!("Input: {} rows", view.row_count()));
911                for expr in group_by_exprs {
912                    plan.add_detail(format!("Group by: {:?}", expr));
913                }
914
915                let group_start = Instant::now();
916                view = self.apply_group_by(
917                    view,
918                    group_by_exprs,
919                    &statement.select_items,
920                    statement.having.as_ref(),
921                )?;
922
923                plan.set_rows_out(view.row_count());
924                plan.add_detail(format!("Output: {} groups", view.row_count()));
925                plan.add_detail(format!(
926                    "Group time: {:.3}ms",
927                    group_start.elapsed().as_secs_f64() * 1000.0
928                ));
929                plan.end_step();
930            }
931        } else {
932            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
933            if !statement.select_items.is_empty() {
934                // Check if we have ANY non-star items (not just the first one)
935                let has_non_star_items = statement
936                    .select_items
937                    .iter()
938                    .any(|item| !matches!(item, SelectItem::Star));
939
940                // Apply select items if:
941                // 1. We have computed expressions or explicit columns
942                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
943                if has_non_star_items || statement.select_items.len() > 1 {
944                    view = self.apply_select_items(view, &statement.select_items)?;
945                } else {
946                }
947                // If it's just a single star, no projection needed
948            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
949                debug!("QueryEngine: Using legacy columns path");
950                // Fallback to legacy column projection for backward compatibility
951                // Use the current view's source table, not the original table
952                let source_table = view.source();
953                let column_indices =
954                    self.resolve_column_indices(source_table, &statement.columns)?;
955                view = view.with_columns(column_indices);
956            }
957        }
958
959        // Apply DISTINCT if specified
960        if statement.distinct {
961            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
962            plan.set_rows_in(view.row_count());
963            plan.add_detail(format!("Input: {} rows", view.row_count()));
964
965            let distinct_start = Instant::now();
966            view = self.apply_distinct(view)?;
967
968            plan.set_rows_out(view.row_count());
969            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
970            plan.add_detail(format!(
971                "Distinct time: {:.3}ms",
972                distinct_start.elapsed().as_secs_f64() * 1000.0
973            ));
974            plan.end_step();
975        }
976
977        // Apply ORDER BY sorting
978        if let Some(order_by_columns) = &statement.order_by {
979            if !order_by_columns.is_empty() {
980                plan.begin_step(
981                    StepType::Sort,
982                    format!("ORDER BY {} columns", order_by_columns.len()),
983                );
984                plan.set_rows_in(view.row_count());
985                for col in order_by_columns {
986                    plan.add_detail(format!("{} {:?}", col.column, col.direction));
987                }
988
989                let sort_start = Instant::now();
990                view = self.apply_multi_order_by(view, order_by_columns)?;
991
992                plan.add_detail(format!(
993                    "Sort time: {:.3}ms",
994                    sort_start.elapsed().as_secs_f64() * 1000.0
995                ));
996                plan.end_step();
997            }
998        }
999
1000        // Apply LIMIT/OFFSET
1001        if let Some(limit) = statement.limit {
1002            let offset = statement.offset.unwrap_or(0);
1003            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1004            plan.set_rows_in(view.row_count());
1005            if offset > 0 {
1006                plan.add_detail(format!("OFFSET: {}", offset));
1007            }
1008            view = view.with_limit(limit, offset);
1009            plan.set_rows_out(view.row_count());
1010            plan.add_detail(format!("Output: {} rows", view.row_count()));
1011            plan.end_step();
1012        }
1013
1014        Ok(view)
1015    }
1016
1017    /// Resolve column names to indices
1018    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1019        let mut indices = Vec::new();
1020        let table_columns = table.column_names();
1021
1022        for col_name in columns {
1023            let index = table_columns
1024                .iter()
1025                .position(|c| c.eq_ignore_ascii_case(col_name))
1026                .ok_or_else(|| {
1027                    let suggestion = self.find_similar_column(table, col_name);
1028                    match suggestion {
1029                        Some(similar) => anyhow::anyhow!(
1030                            "Column '{}' not found. Did you mean '{}'?",
1031                            col_name,
1032                            similar
1033                        ),
1034                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1035                    }
1036                })?;
1037            indices.push(index);
1038        }
1039
1040        Ok(indices)
1041    }
1042
1043    /// Apply SELECT items (columns and computed expressions) to create new view
1044    fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
1045        debug!(
1046            "QueryEngine::apply_select_items - items: {:?}",
1047            select_items
1048        );
1049        debug!(
1050            "QueryEngine::apply_select_items - input view has {} rows",
1051            view.row_count()
1052        );
1053
1054        // Check if this is an aggregate query:
1055        // 1. At least one aggregate function exists
1056        // 2. All other items are either aggregates or constants (aggregate-compatible)
1057        let has_aggregates = select_items.iter().any(|item| match item {
1058            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1059            SelectItem::Column(_) => false,
1060            SelectItem::Star => false,
1061        });
1062
1063        let all_aggregate_compatible = select_items.iter().all(|item| match item {
1064            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1065            SelectItem::Column(_) => false, // Columns are not aggregate-compatible
1066            SelectItem::Star => false,      // Star is not aggregate-compatible
1067        });
1068
1069        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1070            // Special handling for aggregate queries with constants (no GROUP BY)
1071            // These should produce exactly one row
1072            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1073            return self.apply_aggregate_select(view, select_items);
1074        }
1075
1076        // Check if we need to create computed columns
1077        let has_computed_expressions = select_items
1078            .iter()
1079            .any(|item| matches!(item, SelectItem::Expression { .. }));
1080
1081        debug!(
1082            "QueryEngine::apply_select_items - has_computed_expressions: {}",
1083            has_computed_expressions
1084        );
1085
1086        if !has_computed_expressions {
1087            // Simple case: only columns, use existing projection logic
1088            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1089            return Ok(view.with_columns(column_indices));
1090        }
1091
1092        // Complex case: we have computed expressions
1093        // IMPORTANT: We create a PROJECTED view, not a new table
1094        // This preserves the original DataTable reference
1095
1096        let source_table = view.source();
1097        let visible_rows = view.visible_row_indices();
1098
1099        // Create a temporary table just for the computed result view
1100        // But this table is only used for the current query result
1101        let mut computed_table = DataTable::new("query_result");
1102
1103        // First, expand any Star selectors to actual columns
1104        let mut expanded_items = Vec::new();
1105        for item in select_items {
1106            match item {
1107                SelectItem::Star => {
1108                    // Expand * to all columns from source table
1109                    for col_name in source_table.column_names() {
1110                        expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1111                            col_name.to_string(),
1112                        )));
1113                    }
1114                }
1115                _ => expanded_items.push(item.clone()),
1116            }
1117        }
1118
1119        // Add columns based on expanded SelectItems, handling duplicates
1120        let mut column_name_counts: std::collections::HashMap<String, usize> =
1121            std::collections::HashMap::new();
1122
1123        for item in &expanded_items {
1124            let base_name = match item {
1125                SelectItem::Column(col_ref) => col_ref.name.clone(),
1126                SelectItem::Expression { alias, .. } => alias.clone(),
1127                SelectItem::Star => unreachable!("Star should have been expanded"),
1128            };
1129
1130            // Check if this column name has been used before
1131            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1132            let column_name = if *count == 0 {
1133                // First occurrence, use the name as-is
1134                base_name.clone()
1135            } else {
1136                // Duplicate, append a suffix
1137                format!("{base_name}_{count}")
1138            };
1139            *count += 1;
1140
1141            computed_table.add_column(DataColumn::new(&column_name));
1142        }
1143
1144        // Calculate values for each row
1145        let mut evaluator =
1146            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1147
1148        for &row_idx in visible_rows {
1149            let mut row_values = Vec::new();
1150
1151            for item in &expanded_items {
1152                let value = match item {
1153                    SelectItem::Column(col_ref) => {
1154                        // Column reference with optional table prefix
1155                        let col_idx = if let Some(table_prefix) = &col_ref.table_prefix {
1156                            // For qualified references, ONLY try qualified lookup - no fallback
1157                            let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1158                            let result = source_table.find_column_by_qualified_name(&qualified_name);
1159
1160                            // Debug: log what qualified names are available when lookup fails
1161                            if result.is_none() {
1162                                let available_qualified: Vec<String> = source_table.columns.iter()
1163                                    .filter_map(|c| c.qualified_name.clone())
1164                                    .collect();
1165                                let available_simple: Vec<String> = source_table.columns.iter()
1166                                    .map(|c| c.name.clone())
1167                                    .collect();
1168                                debug!(
1169                                    "Qualified column '{}' not found. Available qualified names: {:?}, Simple names: {:?}",
1170                                    qualified_name, available_qualified, available_simple
1171                                );
1172                                // This MUST return None to trigger error
1173                            }
1174                            result
1175                        } else {
1176                            // Simple column name lookup
1177                            source_table.get_column_index(&col_ref.name)
1178                        }
1179                        .ok_or_else(|| {
1180                            let display_name = if let Some(prefix) = &col_ref.table_prefix {
1181                                format!("{}.{}", prefix, col_ref.name)
1182                            } else {
1183                                col_ref.name.clone()
1184                            };
1185                            let suggestion = self.find_similar_column(source_table, &col_ref.name);
1186                            match suggestion {
1187                                Some(similar) => anyhow::anyhow!(
1188                                    "Column '{}' not found. Did you mean '{}'?",
1189                                    display_name,
1190                                    similar
1191                                ),
1192                                None => anyhow::anyhow!("Column '{}' not found", display_name),
1193                            }
1194                        })?;
1195                        let row = source_table
1196                            .get_row(row_idx)
1197                            .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1198                        row.get(col_idx)
1199                            .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1200                            .clone()
1201                    }
1202                    SelectItem::Expression { expr, .. } => {
1203                        // Computed expression
1204                        evaluator.evaluate(expr, row_idx)?
1205                    }
1206                    SelectItem::Star => unreachable!("Star should have been expanded"),
1207                };
1208                row_values.push(value);
1209            }
1210
1211            computed_table
1212                .add_row(DataRow::new(row_values))
1213                .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1214        }
1215
1216        // Return a view of the computed result
1217        // This is a temporary view for this query only
1218        Ok(DataView::new(Arc::new(computed_table)))
1219    }
1220
1221    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
1222    fn apply_aggregate_select(
1223        &self,
1224        view: DataView,
1225        select_items: &[SelectItem],
1226    ) -> Result<DataView> {
1227        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1228
1229        let source_table = view.source();
1230        let mut result_table = DataTable::new("aggregate_result");
1231
1232        // Add columns for each select item
1233        for item in select_items {
1234            let column_name = match item {
1235                SelectItem::Expression { alias, .. } => alias.clone(),
1236                _ => unreachable!("Should only have expressions in aggregate-only query"),
1237            };
1238            result_table.add_column(DataColumn::new(&column_name));
1239        }
1240
1241        // Create evaluator with visible rows from the view (for filtered aggregates)
1242        let visible_rows = view.visible_row_indices().to_vec();
1243        let mut evaluator =
1244            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1245                .with_visible_rows(visible_rows);
1246
1247        // Evaluate each aggregate expression once (they handle all rows internally)
1248        let mut row_values = Vec::new();
1249        for item in select_items {
1250            match item {
1251                SelectItem::Expression { expr, .. } => {
1252                    // The evaluator will handle aggregates over all rows
1253                    // We pass row_index=0 but aggregates ignore it and process all rows
1254                    let value = evaluator.evaluate(expr, 0)?;
1255                    row_values.push(value);
1256                }
1257                _ => unreachable!("Should only have expressions in aggregate-only query"),
1258            }
1259        }
1260
1261        // Add the single result row
1262        result_table
1263            .add_row(DataRow::new(row_values))
1264            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1265
1266        Ok(DataView::new(Arc::new(result_table)))
1267    }
1268
1269    /// Resolve `SelectItem` columns to indices (for simple column projections only)
1270    fn resolve_select_columns(
1271        &self,
1272        table: &DataTable,
1273        select_items: &[SelectItem],
1274    ) -> Result<Vec<usize>> {
1275        let mut indices = Vec::new();
1276        let table_columns = table.column_names();
1277
1278        for item in select_items {
1279            match item {
1280                SelectItem::Column(col_ref) => {
1281                    // Check if this has a table prefix
1282                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
1283                        // For qualified references, ONLY try qualified lookup - no fallback
1284                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1285                        table.find_column_by_qualified_name(&qualified_name)
1286                            .ok_or_else(|| {
1287                                // Check if any columns have qualified names for better error message
1288                                let has_qualified = table.columns.iter()
1289                                    .any(|c| c.qualified_name.is_some());
1290                                if !has_qualified {
1291                                    anyhow::anyhow!(
1292                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1293                                        qualified_name, table_prefix
1294                                    )
1295                                } else {
1296                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
1297                                }
1298                            })?
1299                    } else {
1300                        // Simple column name lookup
1301                        table_columns
1302                            .iter()
1303                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1304                            .ok_or_else(|| {
1305                                let suggestion = self.find_similar_column(table, &col_ref.name);
1306                                match suggestion {
1307                                    Some(similar) => anyhow::anyhow!(
1308                                        "Column '{}' not found. Did you mean '{}'?",
1309                                        col_ref.name,
1310                                        similar
1311                                    ),
1312                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1313                                }
1314                            })?
1315                    };
1316                    indices.push(index);
1317                }
1318                SelectItem::Star => {
1319                    // Expand * to all column indices
1320                    for i in 0..table_columns.len() {
1321                        indices.push(i);
1322                    }
1323                }
1324                SelectItem::Expression { .. } => {
1325                    return Err(anyhow::anyhow!(
1326                        "Computed expressions require new table creation"
1327                    ));
1328                }
1329            }
1330        }
1331
1332        Ok(indices)
1333    }
1334
1335    /// Apply DISTINCT to remove duplicate rows
1336    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1337        use std::collections::HashSet;
1338
1339        let source = view.source();
1340        let visible_cols = view.visible_column_indices();
1341        let visible_rows = view.visible_row_indices();
1342
1343        // Build a set to track unique rows
1344        let mut seen_rows = HashSet::new();
1345        let mut unique_row_indices = Vec::new();
1346
1347        for &row_idx in visible_rows {
1348            // Build a key representing this row's visible column values
1349            let mut row_key = Vec::new();
1350            for &col_idx in visible_cols {
1351                let value = source
1352                    .get_value(row_idx, col_idx)
1353                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1354                // Convert value to a hashable representation
1355                row_key.push(format!("{:?}", value));
1356            }
1357
1358            // Check if we've seen this row before
1359            if seen_rows.insert(row_key) {
1360                // First time seeing this row combination
1361                unique_row_indices.push(row_idx);
1362            }
1363        }
1364
1365        // Create a new view with only unique rows
1366        Ok(view.with_rows(unique_row_indices))
1367    }
1368
1369    /// Apply multi-column ORDER BY sorting to the view
1370    fn apply_multi_order_by(
1371        &self,
1372        mut view: DataView,
1373        order_by_columns: &[OrderByColumn],
1374    ) -> Result<DataView> {
1375        // Build list of (source_column_index, ascending) tuples
1376        let mut sort_columns = Vec::new();
1377
1378        for order_col in order_by_columns {
1379            // Try to find the column index
1380            // Check in the current view's source table (this handles both regular columns and computed columns)
1381            // This is especially important after GROUP BY where we have a new result table with aggregate aliases
1382            let col_index = view
1383                .source()
1384                .get_column_index(&order_col.column)
1385                .ok_or_else(|| {
1386                    // If not found, provide helpful error with suggestions
1387                    let suggestion = self.find_similar_column(view.source(), &order_col.column);
1388                    match suggestion {
1389                        Some(similar) => anyhow::anyhow!(
1390                            "Column '{}' not found. Did you mean '{}'?",
1391                            order_col.column,
1392                            similar
1393                        ),
1394                        None => {
1395                            // Also list available columns for debugging
1396                            let available_cols = view.source().column_names().join(", ");
1397                            anyhow::anyhow!(
1398                                "Column '{}' not found. Available columns: {}",
1399                                order_col.column,
1400                                available_cols
1401                            )
1402                        }
1403                    }
1404                })?;
1405
1406            let ascending = matches!(order_col.direction, SortDirection::Asc);
1407            sort_columns.push((col_index, ascending));
1408        }
1409
1410        // Apply multi-column sorting
1411        view.apply_multi_sort(&sort_columns)?;
1412        Ok(view)
1413    }
1414
1415    /// Apply GROUP BY to the view with optional HAVING clause
1416    fn apply_group_by(
1417        &self,
1418        view: DataView,
1419        group_by_exprs: &[SqlExpression],
1420        select_items: &[SelectItem],
1421        having: Option<&SqlExpression>,
1422    ) -> Result<DataView> {
1423        // Use the new expression-based GROUP BY implementation
1424        self.apply_group_by_expressions(
1425            view,
1426            group_by_exprs,
1427            select_items,
1428            having,
1429            self.case_insensitive,
1430            self.date_notation.clone(),
1431        )
1432    }
1433
1434    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
1435    /// This helps pre-size hash tables for better performance
1436    pub fn estimate_group_cardinality(
1437        &self,
1438        view: &DataView,
1439        group_by_exprs: &[SqlExpression],
1440    ) -> usize {
1441        // If we have few rows, just return the row count as upper bound
1442        let row_count = view.get_visible_rows().len();
1443        if row_count <= 100 {
1444            return row_count;
1445        }
1446
1447        // Sample first 1000 rows or 10% of data, whichever is smaller
1448        let sample_size = min(1000, row_count / 10).max(100);
1449        let mut seen = FxHashSet::default();
1450
1451        let visible_rows = view.get_visible_rows();
1452        for (i, &row_idx) in visible_rows.iter().enumerate() {
1453            if i >= sample_size {
1454                break;
1455            }
1456
1457            // Evaluate GROUP BY expressions for this row
1458            let mut key_values = Vec::new();
1459            for expr in group_by_exprs {
1460                let mut evaluator = ArithmeticEvaluator::new(view.source());
1461                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1462                key_values.push(value);
1463            }
1464
1465            seen.insert(key_values);
1466        }
1467
1468        // Estimate total cardinality based on sample
1469        let sample_cardinality = seen.len();
1470        let estimated = (sample_cardinality * row_count) / sample_size;
1471
1472        // Cap at row count and ensure minimum of sample cardinality
1473        estimated.min(row_count).max(sample_cardinality)
1474    }
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479    use super::*;
1480    use crate::data::datatable::{DataColumn, DataRow, DataValue};
1481
1482    fn create_test_table() -> Arc<DataTable> {
1483        let mut table = DataTable::new("test");
1484
1485        // Add columns
1486        table.add_column(DataColumn::new("id"));
1487        table.add_column(DataColumn::new("name"));
1488        table.add_column(DataColumn::new("age"));
1489
1490        // Add rows
1491        table
1492            .add_row(DataRow::new(vec![
1493                DataValue::Integer(1),
1494                DataValue::String("Alice".to_string()),
1495                DataValue::Integer(30),
1496            ]))
1497            .unwrap();
1498
1499        table
1500            .add_row(DataRow::new(vec![
1501                DataValue::Integer(2),
1502                DataValue::String("Bob".to_string()),
1503                DataValue::Integer(25),
1504            ]))
1505            .unwrap();
1506
1507        table
1508            .add_row(DataRow::new(vec![
1509                DataValue::Integer(3),
1510                DataValue::String("Charlie".to_string()),
1511                DataValue::Integer(35),
1512            ]))
1513            .unwrap();
1514
1515        Arc::new(table)
1516    }
1517
1518    #[test]
1519    fn test_select_all() {
1520        let table = create_test_table();
1521        let engine = QueryEngine::new();
1522
1523        let view = engine
1524            .execute(table.clone(), "SELECT * FROM users")
1525            .unwrap();
1526        assert_eq!(view.row_count(), 3);
1527        assert_eq!(view.column_count(), 3);
1528    }
1529
1530    #[test]
1531    fn test_select_columns() {
1532        let table = create_test_table();
1533        let engine = QueryEngine::new();
1534
1535        let view = engine
1536            .execute(table.clone(), "SELECT name, age FROM users")
1537            .unwrap();
1538        assert_eq!(view.row_count(), 3);
1539        assert_eq!(view.column_count(), 2);
1540    }
1541
1542    #[test]
1543    fn test_select_with_limit() {
1544        let table = create_test_table();
1545        let engine = QueryEngine::new();
1546
1547        let view = engine
1548            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1549            .unwrap();
1550        assert_eq!(view.row_count(), 2);
1551    }
1552
1553    #[test]
1554    fn test_type_coercion_contains() {
1555        // Initialize tracing for debug output
1556        let _ = tracing_subscriber::fmt()
1557            .with_max_level(tracing::Level::DEBUG)
1558            .try_init();
1559
1560        let mut table = DataTable::new("test");
1561        table.add_column(DataColumn::new("id"));
1562        table.add_column(DataColumn::new("status"));
1563        table.add_column(DataColumn::new("price"));
1564
1565        // Add test data with mixed types
1566        table
1567            .add_row(DataRow::new(vec![
1568                DataValue::Integer(1),
1569                DataValue::String("Pending".to_string()),
1570                DataValue::Float(99.99),
1571            ]))
1572            .unwrap();
1573
1574        table
1575            .add_row(DataRow::new(vec![
1576                DataValue::Integer(2),
1577                DataValue::String("Confirmed".to_string()),
1578                DataValue::Float(150.50),
1579            ]))
1580            .unwrap();
1581
1582        table
1583            .add_row(DataRow::new(vec![
1584                DataValue::Integer(3),
1585                DataValue::String("Pending".to_string()),
1586                DataValue::Float(75.00),
1587            ]))
1588            .unwrap();
1589
1590        let table = Arc::new(table);
1591        let engine = QueryEngine::new();
1592
1593        println!("\n=== Testing WHERE clause with Contains ===");
1594        println!("Table has {} rows", table.row_count());
1595        for i in 0..table.row_count() {
1596            let status = table.get_value(i, 1);
1597            println!("Row {i}: status = {status:?}");
1598        }
1599
1600        // Test 1: Basic string contains (should work)
1601        println!("\n--- Test 1: status.Contains('pend') ---");
1602        let result = engine.execute(
1603            table.clone(),
1604            "SELECT * FROM test WHERE status.Contains('pend')",
1605        );
1606        match result {
1607            Ok(view) => {
1608                println!("SUCCESS: Found {} matching rows", view.row_count());
1609                assert_eq!(view.row_count(), 2); // Should find both Pending rows
1610            }
1611            Err(e) => {
1612                panic!("Query failed: {e}");
1613            }
1614        }
1615
1616        // Test 2: Numeric contains (should work with type coercion)
1617        println!("\n--- Test 2: price.Contains('9') ---");
1618        let result = engine.execute(
1619            table.clone(),
1620            "SELECT * FROM test WHERE price.Contains('9')",
1621        );
1622        match result {
1623            Ok(view) => {
1624                println!(
1625                    "SUCCESS: Found {} matching rows with price containing '9'",
1626                    view.row_count()
1627                );
1628                // Should find 99.99 row
1629                assert!(view.row_count() >= 1);
1630            }
1631            Err(e) => {
1632                panic!("Numeric coercion query failed: {e}");
1633            }
1634        }
1635
1636        println!("\n=== All tests passed! ===");
1637    }
1638
1639    #[test]
1640    fn test_not_in_clause() {
1641        // Initialize tracing for debug output
1642        let _ = tracing_subscriber::fmt()
1643            .with_max_level(tracing::Level::DEBUG)
1644            .try_init();
1645
1646        let mut table = DataTable::new("test");
1647        table.add_column(DataColumn::new("id"));
1648        table.add_column(DataColumn::new("country"));
1649
1650        // Add test data
1651        table
1652            .add_row(DataRow::new(vec![
1653                DataValue::Integer(1),
1654                DataValue::String("CA".to_string()),
1655            ]))
1656            .unwrap();
1657
1658        table
1659            .add_row(DataRow::new(vec![
1660                DataValue::Integer(2),
1661                DataValue::String("US".to_string()),
1662            ]))
1663            .unwrap();
1664
1665        table
1666            .add_row(DataRow::new(vec![
1667                DataValue::Integer(3),
1668                DataValue::String("UK".to_string()),
1669            ]))
1670            .unwrap();
1671
1672        let table = Arc::new(table);
1673        let engine = QueryEngine::new();
1674
1675        println!("\n=== Testing NOT IN clause ===");
1676        println!("Table has {} rows", table.row_count());
1677        for i in 0..table.row_count() {
1678            let country = table.get_value(i, 1);
1679            println!("Row {i}: country = {country:?}");
1680        }
1681
1682        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
1683        println!("\n--- Test: country NOT IN ('CA') ---");
1684        let result = engine.execute(
1685            table.clone(),
1686            "SELECT * FROM test WHERE country NOT IN ('CA')",
1687        );
1688        match result {
1689            Ok(view) => {
1690                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1691                assert_eq!(view.row_count(), 2); // Should find US and UK
1692            }
1693            Err(e) => {
1694                panic!("NOT IN query failed: {e}");
1695            }
1696        }
1697
1698        println!("\n=== NOT IN test complete! ===");
1699    }
1700
1701    #[test]
1702    fn test_case_insensitive_in_and_not_in() {
1703        // Initialize tracing for debug output
1704        let _ = tracing_subscriber::fmt()
1705            .with_max_level(tracing::Level::DEBUG)
1706            .try_init();
1707
1708        let mut table = DataTable::new("test");
1709        table.add_column(DataColumn::new("id"));
1710        table.add_column(DataColumn::new("country"));
1711
1712        // Add test data with mixed case
1713        table
1714            .add_row(DataRow::new(vec![
1715                DataValue::Integer(1),
1716                DataValue::String("CA".to_string()), // uppercase
1717            ]))
1718            .unwrap();
1719
1720        table
1721            .add_row(DataRow::new(vec![
1722                DataValue::Integer(2),
1723                DataValue::String("us".to_string()), // lowercase
1724            ]))
1725            .unwrap();
1726
1727        table
1728            .add_row(DataRow::new(vec![
1729                DataValue::Integer(3),
1730                DataValue::String("UK".to_string()), // uppercase
1731            ]))
1732            .unwrap();
1733
1734        let table = Arc::new(table);
1735
1736        println!("\n=== Testing Case-Insensitive IN clause ===");
1737        println!("Table has {} rows", table.row_count());
1738        for i in 0..table.row_count() {
1739            let country = table.get_value(i, 1);
1740            println!("Row {i}: country = {country:?}");
1741        }
1742
1743        // Test case-insensitive IN - should match 'CA' with 'ca'
1744        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
1745        let engine = QueryEngine::with_case_insensitive(true);
1746        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1747        match result {
1748            Ok(view) => {
1749                println!(
1750                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
1751                    view.row_count()
1752                );
1753                assert_eq!(view.row_count(), 1); // Should find CA row
1754            }
1755            Err(e) => {
1756                panic!("Case-insensitive IN query failed: {e}");
1757            }
1758        }
1759
1760        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
1761        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
1762        let result = engine.execute(
1763            table.clone(),
1764            "SELECT * FROM test WHERE country NOT IN ('ca')",
1765        );
1766        match result {
1767            Ok(view) => {
1768                println!(
1769                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
1770                    view.row_count()
1771                );
1772                assert_eq!(view.row_count(), 2); // Should find us and UK rows
1773            }
1774            Err(e) => {
1775                panic!("Case-insensitive NOT IN query failed: {e}");
1776            }
1777        }
1778
1779        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
1780        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
1781        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
1782        let result = engine_case_sensitive
1783            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1784        match result {
1785            Ok(view) => {
1786                println!(
1787                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
1788                    view.row_count()
1789                );
1790                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
1791            }
1792            Err(e) => {
1793                panic!("Case-sensitive IN query failed: {e}");
1794            }
1795        }
1796
1797        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
1798    }
1799
1800    #[test]
1801    #[ignore = "Parentheses in WHERE clause not yet implemented"]
1802    fn test_parentheses_in_where_clause() {
1803        // Initialize tracing for debug output
1804        let _ = tracing_subscriber::fmt()
1805            .with_max_level(tracing::Level::DEBUG)
1806            .try_init();
1807
1808        let mut table = DataTable::new("test");
1809        table.add_column(DataColumn::new("id"));
1810        table.add_column(DataColumn::new("status"));
1811        table.add_column(DataColumn::new("priority"));
1812
1813        // Add test data
1814        table
1815            .add_row(DataRow::new(vec![
1816                DataValue::Integer(1),
1817                DataValue::String("Pending".to_string()),
1818                DataValue::String("High".to_string()),
1819            ]))
1820            .unwrap();
1821
1822        table
1823            .add_row(DataRow::new(vec![
1824                DataValue::Integer(2),
1825                DataValue::String("Complete".to_string()),
1826                DataValue::String("High".to_string()),
1827            ]))
1828            .unwrap();
1829
1830        table
1831            .add_row(DataRow::new(vec![
1832                DataValue::Integer(3),
1833                DataValue::String("Pending".to_string()),
1834                DataValue::String("Low".to_string()),
1835            ]))
1836            .unwrap();
1837
1838        table
1839            .add_row(DataRow::new(vec![
1840                DataValue::Integer(4),
1841                DataValue::String("Complete".to_string()),
1842                DataValue::String("Low".to_string()),
1843            ]))
1844            .unwrap();
1845
1846        let table = Arc::new(table);
1847        let engine = QueryEngine::new();
1848
1849        println!("\n=== Testing Parentheses in WHERE clause ===");
1850        println!("Table has {} rows", table.row_count());
1851        for i in 0..table.row_count() {
1852            let status = table.get_value(i, 1);
1853            let priority = table.get_value(i, 2);
1854            println!("Row {i}: status = {status:?}, priority = {priority:?}");
1855        }
1856
1857        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
1858        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
1859        let result = engine.execute(
1860            table.clone(),
1861            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
1862        );
1863        match result {
1864            Ok(view) => {
1865                println!(
1866                    "SUCCESS: Found {} rows with parenthetical logic",
1867                    view.row_count()
1868                );
1869                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
1870            }
1871            Err(e) => {
1872                panic!("Parentheses query failed: {e}");
1873            }
1874        }
1875
1876        println!("\n=== Parentheses test complete! ===");
1877    }
1878
1879    #[test]
1880    #[ignore = "Numeric type coercion needs fixing"]
1881    fn test_numeric_type_coercion() {
1882        // Initialize tracing for debug output
1883        let _ = tracing_subscriber::fmt()
1884            .with_max_level(tracing::Level::DEBUG)
1885            .try_init();
1886
1887        let mut table = DataTable::new("test");
1888        table.add_column(DataColumn::new("id"));
1889        table.add_column(DataColumn::new("price"));
1890        table.add_column(DataColumn::new("quantity"));
1891
1892        // Add test data with different numeric types
1893        table
1894            .add_row(DataRow::new(vec![
1895                DataValue::Integer(1),
1896                DataValue::Float(99.50), // Contains '.'
1897                DataValue::Integer(100),
1898            ]))
1899            .unwrap();
1900
1901        table
1902            .add_row(DataRow::new(vec![
1903                DataValue::Integer(2),
1904                DataValue::Float(150.0), // Contains '.' and '0'
1905                DataValue::Integer(200),
1906            ]))
1907            .unwrap();
1908
1909        table
1910            .add_row(DataRow::new(vec![
1911                DataValue::Integer(3),
1912                DataValue::Integer(75), // No decimal point
1913                DataValue::Integer(50),
1914            ]))
1915            .unwrap();
1916
1917        let table = Arc::new(table);
1918        let engine = QueryEngine::new();
1919
1920        println!("\n=== Testing Numeric Type Coercion ===");
1921        println!("Table has {} rows", table.row_count());
1922        for i in 0..table.row_count() {
1923            let price = table.get_value(i, 1);
1924            let quantity = table.get_value(i, 2);
1925            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
1926        }
1927
1928        // Test Contains on float values - should find rows with decimal points
1929        println!("\n--- Test: price.Contains('.') ---");
1930        let result = engine.execute(
1931            table.clone(),
1932            "SELECT * FROM test WHERE price.Contains('.')",
1933        );
1934        match result {
1935            Ok(view) => {
1936                println!(
1937                    "SUCCESS: Found {} rows with decimal points in price",
1938                    view.row_count()
1939                );
1940                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
1941            }
1942            Err(e) => {
1943                panic!("Numeric Contains query failed: {e}");
1944            }
1945        }
1946
1947        // Test Contains on integer values converted to string
1948        println!("\n--- Test: quantity.Contains('0') ---");
1949        let result = engine.execute(
1950            table.clone(),
1951            "SELECT * FROM test WHERE quantity.Contains('0')",
1952        );
1953        match result {
1954            Ok(view) => {
1955                println!(
1956                    "SUCCESS: Found {} rows with '0' in quantity",
1957                    view.row_count()
1958                );
1959                assert_eq!(view.row_count(), 2); // Should find 100 and 200
1960            }
1961            Err(e) => {
1962                panic!("Integer Contains query failed: {e}");
1963            }
1964        }
1965
1966        println!("\n=== Numeric type coercion test complete! ===");
1967    }
1968
1969    #[test]
1970    fn test_datetime_comparisons() {
1971        // Initialize tracing for debug output
1972        let _ = tracing_subscriber::fmt()
1973            .with_max_level(tracing::Level::DEBUG)
1974            .try_init();
1975
1976        let mut table = DataTable::new("test");
1977        table.add_column(DataColumn::new("id"));
1978        table.add_column(DataColumn::new("created_date"));
1979
1980        // Add test data with date strings (as they would come from CSV)
1981        table
1982            .add_row(DataRow::new(vec![
1983                DataValue::Integer(1),
1984                DataValue::String("2024-12-15".to_string()),
1985            ]))
1986            .unwrap();
1987
1988        table
1989            .add_row(DataRow::new(vec![
1990                DataValue::Integer(2),
1991                DataValue::String("2025-01-15".to_string()),
1992            ]))
1993            .unwrap();
1994
1995        table
1996            .add_row(DataRow::new(vec![
1997                DataValue::Integer(3),
1998                DataValue::String("2025-02-15".to_string()),
1999            ]))
2000            .unwrap();
2001
2002        let table = Arc::new(table);
2003        let engine = QueryEngine::new();
2004
2005        println!("\n=== Testing DateTime Comparisons ===");
2006        println!("Table has {} rows", table.row_count());
2007        for i in 0..table.row_count() {
2008            let date = table.get_value(i, 1);
2009            println!("Row {i}: created_date = {date:?}");
2010        }
2011
2012        // Test DateTime constructor comparison - should find dates after 2025-01-01
2013        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2014        let result = engine.execute(
2015            table.clone(),
2016            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2017        );
2018        match result {
2019            Ok(view) => {
2020                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2021                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
2022            }
2023            Err(e) => {
2024                panic!("DateTime comparison query failed: {e}");
2025            }
2026        }
2027
2028        println!("\n=== DateTime comparison test complete! ===");
2029    }
2030
2031    #[test]
2032    fn test_not_with_method_calls() {
2033        // Initialize tracing for debug output
2034        let _ = tracing_subscriber::fmt()
2035            .with_max_level(tracing::Level::DEBUG)
2036            .try_init();
2037
2038        let mut table = DataTable::new("test");
2039        table.add_column(DataColumn::new("id"));
2040        table.add_column(DataColumn::new("status"));
2041
2042        // Add test data
2043        table
2044            .add_row(DataRow::new(vec![
2045                DataValue::Integer(1),
2046                DataValue::String("Pending Review".to_string()),
2047            ]))
2048            .unwrap();
2049
2050        table
2051            .add_row(DataRow::new(vec![
2052                DataValue::Integer(2),
2053                DataValue::String("Complete".to_string()),
2054            ]))
2055            .unwrap();
2056
2057        table
2058            .add_row(DataRow::new(vec![
2059                DataValue::Integer(3),
2060                DataValue::String("Pending Approval".to_string()),
2061            ]))
2062            .unwrap();
2063
2064        let table = Arc::new(table);
2065        let engine = QueryEngine::with_case_insensitive(true);
2066
2067        println!("\n=== Testing NOT with Method Calls ===");
2068        println!("Table has {} rows", table.row_count());
2069        for i in 0..table.row_count() {
2070            let status = table.get_value(i, 1);
2071            println!("Row {i}: status = {status:?}");
2072        }
2073
2074        // Test NOT with Contains - should exclude rows containing "pend"
2075        println!("\n--- Test: NOT status.Contains('pend') ---");
2076        let result = engine.execute(
2077            table.clone(),
2078            "SELECT * FROM test WHERE NOT status.Contains('pend')",
2079        );
2080        match result {
2081            Ok(view) => {
2082                println!(
2083                    "SUCCESS: Found {} rows NOT containing 'pend'",
2084                    view.row_count()
2085                );
2086                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2087            }
2088            Err(e) => {
2089                panic!("NOT Contains query failed: {e}");
2090            }
2091        }
2092
2093        // Test NOT with StartsWith
2094        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2095        let result = engine.execute(
2096            table.clone(),
2097            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2098        );
2099        match result {
2100            Ok(view) => {
2101                println!(
2102                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
2103                    view.row_count()
2104                );
2105                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2106            }
2107            Err(e) => {
2108                panic!("NOT StartsWith query failed: {e}");
2109            }
2110        }
2111
2112        println!("\n=== NOT with method calls test complete! ===");
2113    }
2114
2115    #[test]
2116    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2117    fn test_complex_logical_expressions() {
2118        // Initialize tracing for debug output
2119        let _ = tracing_subscriber::fmt()
2120            .with_max_level(tracing::Level::DEBUG)
2121            .try_init();
2122
2123        let mut table = DataTable::new("test");
2124        table.add_column(DataColumn::new("id"));
2125        table.add_column(DataColumn::new("status"));
2126        table.add_column(DataColumn::new("priority"));
2127        table.add_column(DataColumn::new("assigned"));
2128
2129        // Add comprehensive test data
2130        table
2131            .add_row(DataRow::new(vec![
2132                DataValue::Integer(1),
2133                DataValue::String("Pending".to_string()),
2134                DataValue::String("High".to_string()),
2135                DataValue::String("John".to_string()),
2136            ]))
2137            .unwrap();
2138
2139        table
2140            .add_row(DataRow::new(vec![
2141                DataValue::Integer(2),
2142                DataValue::String("Complete".to_string()),
2143                DataValue::String("High".to_string()),
2144                DataValue::String("Jane".to_string()),
2145            ]))
2146            .unwrap();
2147
2148        table
2149            .add_row(DataRow::new(vec![
2150                DataValue::Integer(3),
2151                DataValue::String("Pending".to_string()),
2152                DataValue::String("Low".to_string()),
2153                DataValue::String("John".to_string()),
2154            ]))
2155            .unwrap();
2156
2157        table
2158            .add_row(DataRow::new(vec![
2159                DataValue::Integer(4),
2160                DataValue::String("In Progress".to_string()),
2161                DataValue::String("Medium".to_string()),
2162                DataValue::String("Jane".to_string()),
2163            ]))
2164            .unwrap();
2165
2166        let table = Arc::new(table);
2167        let engine = QueryEngine::new();
2168
2169        println!("\n=== Testing Complex Logical Expressions ===");
2170        println!("Table has {} rows", table.row_count());
2171        for i in 0..table.row_count() {
2172            let status = table.get_value(i, 1);
2173            let priority = table.get_value(i, 2);
2174            let assigned = table.get_value(i, 3);
2175            println!(
2176                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2177            );
2178        }
2179
2180        // Test complex AND/OR logic
2181        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2182        let result = engine.execute(
2183            table.clone(),
2184            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2185        );
2186        match result {
2187            Ok(view) => {
2188                println!(
2189                    "SUCCESS: Found {} rows with complex logic",
2190                    view.row_count()
2191                );
2192                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
2193            }
2194            Err(e) => {
2195                panic!("Complex logic query failed: {e}");
2196            }
2197        }
2198
2199        // Test NOT with complex expressions
2200        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2201        let result = engine.execute(
2202            table.clone(),
2203            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2204        );
2205        match result {
2206            Ok(view) => {
2207                println!(
2208                    "SUCCESS: Found {} rows with NOT complex logic",
2209                    view.row_count()
2210                );
2211                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
2212            }
2213            Err(e) => {
2214                panic!("NOT complex logic query failed: {e}");
2215            }
2216        }
2217
2218        println!("\n=== Complex logical expressions test complete! ===");
2219    }
2220
2221    #[test]
2222    fn test_mixed_data_types_and_edge_cases() {
2223        // Initialize tracing for debug output
2224        let _ = tracing_subscriber::fmt()
2225            .with_max_level(tracing::Level::DEBUG)
2226            .try_init();
2227
2228        let mut table = DataTable::new("test");
2229        table.add_column(DataColumn::new("id"));
2230        table.add_column(DataColumn::new("value"));
2231        table.add_column(DataColumn::new("nullable_field"));
2232
2233        // Add test data with mixed types and edge cases
2234        table
2235            .add_row(DataRow::new(vec![
2236                DataValue::Integer(1),
2237                DataValue::String("123.45".to_string()),
2238                DataValue::String("present".to_string()),
2239            ]))
2240            .unwrap();
2241
2242        table
2243            .add_row(DataRow::new(vec![
2244                DataValue::Integer(2),
2245                DataValue::Float(678.90),
2246                DataValue::Null,
2247            ]))
2248            .unwrap();
2249
2250        table
2251            .add_row(DataRow::new(vec![
2252                DataValue::Integer(3),
2253                DataValue::Boolean(true),
2254                DataValue::String("also present".to_string()),
2255            ]))
2256            .unwrap();
2257
2258        table
2259            .add_row(DataRow::new(vec![
2260                DataValue::Integer(4),
2261                DataValue::String("false".to_string()),
2262                DataValue::Null,
2263            ]))
2264            .unwrap();
2265
2266        let table = Arc::new(table);
2267        let engine = QueryEngine::new();
2268
2269        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2270        println!("Table has {} rows", table.row_count());
2271        for i in 0..table.row_count() {
2272            let value = table.get_value(i, 1);
2273            let nullable = table.get_value(i, 2);
2274            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2275        }
2276
2277        // Test type coercion with boolean Contains
2278        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2279        let result = engine.execute(
2280            table.clone(),
2281            "SELECT * FROM test WHERE value.Contains('true')",
2282        );
2283        match result {
2284            Ok(view) => {
2285                println!(
2286                    "SUCCESS: Found {} rows with boolean coercion",
2287                    view.row_count()
2288                );
2289                assert_eq!(view.row_count(), 1); // Should find the boolean true row
2290            }
2291            Err(e) => {
2292                panic!("Boolean coercion query failed: {e}");
2293            }
2294        }
2295
2296        // Test multiple IN values with mixed types
2297        println!("\n--- Test: id IN (1, 3) ---");
2298        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2299        match result {
2300            Ok(view) => {
2301                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2302                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
2303            }
2304            Err(e) => {
2305                panic!("Multiple IN values query failed: {e}");
2306            }
2307        }
2308
2309        println!("\n=== Mixed data types test complete! ===");
2310    }
2311
2312    /// Test that aggregate-only queries return exactly one row (regression test)
2313    #[test]
2314    fn test_aggregate_only_single_row() {
2315        let table = create_test_stock_data();
2316        let engine = QueryEngine::new();
2317
2318        // Test query with multiple aggregates - should return exactly 1 row
2319        let result = engine
2320            .execute(
2321                table.clone(),
2322                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2323            )
2324            .expect("Query should succeed");
2325
2326        assert_eq!(
2327            result.row_count(),
2328            1,
2329            "Aggregate-only query should return exactly 1 row"
2330        );
2331        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2332
2333        // Verify the actual values are correct
2334        let source = result.source();
2335        let row = source.get_row(0).expect("Should have first row");
2336
2337        // COUNT(*) should be 5 (total rows)
2338        assert_eq!(row.values[0], DataValue::Integer(5));
2339
2340        // MIN should be 99.5
2341        assert_eq!(row.values[1], DataValue::Float(99.5));
2342
2343        // MAX should be 105.0
2344        assert_eq!(row.values[2], DataValue::Float(105.0));
2345
2346        // AVG should be approximately 102.4
2347        if let DataValue::Float(avg) = &row.values[3] {
2348            assert!(
2349                (avg - 102.4).abs() < 0.01,
2350                "Average should be approximately 102.4, got {}",
2351                avg
2352            );
2353        } else {
2354            panic!("AVG should return a Float value");
2355        }
2356    }
2357
2358    /// Test single aggregate function returns single row
2359    #[test]
2360    fn test_single_aggregate_single_row() {
2361        let table = create_test_stock_data();
2362        let engine = QueryEngine::new();
2363
2364        let result = engine
2365            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2366            .expect("Query should succeed");
2367
2368        assert_eq!(
2369            result.row_count(),
2370            1,
2371            "Single aggregate query should return exactly 1 row"
2372        );
2373        assert_eq!(result.column_count(), 1, "Should have 1 column");
2374
2375        let source = result.source();
2376        let row = source.get_row(0).expect("Should have first row");
2377        assert_eq!(row.values[0], DataValue::Integer(5));
2378    }
2379
2380    /// Test aggregate with WHERE clause filtering
2381    #[test]
2382    fn test_aggregate_with_where_single_row() {
2383        let table = create_test_stock_data();
2384        let engine = QueryEngine::new();
2385
2386        // Filter to only high-value stocks (>= 103.0) and aggregate
2387        let result = engine
2388            .execute(
2389                table.clone(),
2390                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2391            )
2392            .expect("Query should succeed");
2393
2394        assert_eq!(
2395            result.row_count(),
2396            1,
2397            "Filtered aggregate query should return exactly 1 row"
2398        );
2399        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2400
2401        let source = result.source();
2402        let row = source.get_row(0).expect("Should have first row");
2403
2404        // Should find 2 rows (103.5 and 105.0)
2405        assert_eq!(row.values[0], DataValue::Integer(2));
2406        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
2407        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
2408    }
2409
2410    #[test]
2411    fn test_not_in_parsing() {
2412        use crate::sql::recursive_parser::Parser;
2413
2414        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2415        println!("\n=== Testing NOT IN parsing ===");
2416        println!("Parsing query: {query}");
2417
2418        let mut parser = Parser::new(query);
2419        match parser.parse() {
2420            Ok(statement) => {
2421                println!("Parsed statement: {statement:#?}");
2422                if let Some(where_clause) = statement.where_clause {
2423                    println!("WHERE conditions: {:#?}", where_clause.conditions);
2424                    if let Some(first_condition) = where_clause.conditions.first() {
2425                        println!("First condition expression: {:#?}", first_condition.expr);
2426                    }
2427                }
2428            }
2429            Err(e) => {
2430                panic!("Parse error: {e}");
2431            }
2432        }
2433    }
2434
2435    /// Create test stock data for aggregate testing
2436    fn create_test_stock_data() -> Arc<DataTable> {
2437        let mut table = DataTable::new("stock");
2438
2439        table.add_column(DataColumn::new("symbol"));
2440        table.add_column(DataColumn::new("close"));
2441        table.add_column(DataColumn::new("volume"));
2442
2443        // Add 5 rows of test data
2444        let test_data = vec![
2445            ("AAPL", 99.5, 1000),
2446            ("AAPL", 101.2, 1500),
2447            ("AAPL", 103.5, 2000),
2448            ("AAPL", 105.0, 1200),
2449            ("AAPL", 102.8, 1800),
2450        ];
2451
2452        for (symbol, close, volume) in test_data {
2453            table
2454                .add_row(DataRow::new(vec![
2455                    DataValue::String(symbol.to_string()),
2456                    DataValue::Float(close),
2457                    DataValue::Integer(volume),
2458                ]))
2459                .expect("Should add row successfully");
2460        }
2461
2462        Arc::new(table)
2463    }
2464}
2465
2466#[cfg(test)]
2467#[path = "query_engine_tests.rs"]
2468mod query_engine_tests;